CLOUD/OpenSource
[Kafka] Broker 서버를 이용한 Producer&Consumer 메세지 전송
alsruds
2023. 3. 20. 15:08
[ 가상머신 준비 ]
◎ CentOS 8 : 3대
→ Producer(200.200.200.150)
→ Consumer(200.200.200.151)
→ Broker(200.200.200.152)
● 설치
# jdk 설치
yum -y install java-1.8.0-openjdk-devel.x86_64
# kafka 설치
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
# 압축 해제
tar -xzvf kafka_2.13-3.4.0.tgz
# 파일 이동
mv kafka_2.13-3.4.0 /opt/kafka
● 호스트 이름 설정
①
vi /etc/hostname
》 각 컴퓨터에 producer / consumer / broker 입력
②
#명령어
vi /etc/hosts
#입력
[Producer IP] producer
[Consumer IP] consumer
[Broker IP] broker
● Broker 설정
① 방화벽 해제
systemctl stop firewalld
② Zookeeper 서버 연결
/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
③ Kafka 서버 연결
# 파일 설정
vi /opt/kafka/config/server.properties
#38번 라인 : advertised.listeners 주석 해제 후 브로커 ip 입력
# 서버 연결
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
● Producer 설정
》 Broker 서버 연결
/opt/kafka/bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server [broker IP]:9092
● Consumer 설정
》 Broker 서버 연결
/opt/kafka/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server [broker IP]:9092
☞ 확인하기 ~ ☜
》 Producer 컴퓨터에서 내용 입력해보기!
[ Python을 이용한 Producer&Consumer 메세지 전송 ]
◎ PyCharm Project 2개 준비
→ Producer
→ Consumer
→ Broker 서버 유지
● Producer 설정
① 라이브러리 설치
pip install kafka-python
② 코드
from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers=['[Broker IP]:9092']
)
start = time.time()
for i in range(100):
producer.send('test', value="test".encode("utf-8")) # consumer 랑 맞춰주어야 하는 topic명
producer.flush()
print("elapsed :", time.time() - start)
● Consumer 설정
① 라이브러리 설치
pip install kafka-python
② 코드
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'test', # topic이름 producer랑 잘 맞춰주기
bootstrap_servers=['[Broker IP]:9092']
)
print('[begin] get consumer list')
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value.decode('utf-8') ))
print('[end] get consumer list')
☞ 확인하기 ~ ☜
[ JSON 형식으로 주고받기 ]
● Producer 설정
》 코드 설정
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['[Broker IP]:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
start = time.time()
for i in range(100):
data = {'str' : 'result'+str(i)}
producer.send('test', value=data) # consumer 랑 맞춰주어야 하는 topic 명
producer.flush()
print("elapsed :", time.time() - start)
● Consumer 설정
》 코드 설정
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'test', # producer 랑 맞춰주어야 하는 topic 명
bootstrap_servers=['[Broker IP]:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=10000
)
print('[begin] get consumer list')
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
message.topic, message.partition, message.offset, message.key, message.value))
print('[end] get consumer list')
☞ 확인하기 ~ ☜