CLOUD/OpenSource

[Kafka] Broker 서버를 이용한 Producer&Consumer 메세지 전송

alsruds 2023. 3. 20. 15:08

[ 가상머신 준비 ]

◎ CentOS 8 : 3대

→ Producer(200.200.200.150)

→ Consumer(200.200.200.151)

→ Broker(200.200.200.152)

 

● 설치

# jdk 설치
yum -y install java-1.8.0-openjdk-devel.x86_64

# kafka 설치
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz

# 압축 해제
tar -xzvf kafka_2.13-3.4.0.tgz

# 파일 이동
mv kafka_2.13-3.4.0 /opt/kafka

 

● 호스트 이름 설정

vi /etc/hostname

》 각 컴퓨터에 producer / consumer / broker 입력

#명령어
vi /etc/hosts

#입력
[Producer IP] producer
[Consumer IP] consumer
[Broker IP] broker

 

Broker 설정

① 방화벽 해제

systemctl stop firewalld

② Zookeeper 서버 연결

/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties

③ Kafka 서버 연결

# 파일 설정
vi /opt/kafka/config/server.properties		
	#38번 라인 : advertised.listeners 주석 해제 후 브로커 ip 입력

# 서버 연결
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

 

Producer 설정

Broker 서버 연결

/opt/kafka/bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server [broker IP]:9092

 

Consumer 설정

 Broker 서버 연결

/opt/kafka/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server [broker IP]:9092

 

 

☞ 확인하기 ~ ☜

Producer 컴퓨터에서 내용 입력해보기!

producer 에서 메세지 전송!

 

consumer 에서 확인 가능~

 

 

[ Python을 이용한 Producer&Consumer 메세지 전송 ]

PyCharm Project 2개 준비

→ Producer

→ Consumer

→ Broker 서버 유지

 

Producer 설정

라이브러리 설치

pip install kafka-python

② 코드

from kafka import KafkaProducer
import time

producer = KafkaProducer(
    bootstrap_servers=['[Broker IP]:9092']
)
start = time.time()

for i in range(100):
    producer.send('test', value="test".encode("utf-8"))		# consumer 랑 맞춰주어야 하는 topic명
    producer.flush()

print("elapsed :", time.time() - start)

 

 Consumer 설정

 라이브러리 설치

pip install kafka-python

② 코드

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'test',		# topic이름 producer랑 잘 맞춰주기
    bootstrap_servers=['[Broker IP]:9092']
)

print('[begin] get consumer list')

for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value.decode('utf-8') ))

print('[end] get consumer list')

 

 

☞ 확인하기 ~ ☜

producer terminal

 

consumer terminal

 

 

[ JSON 형식으로 주고받기 ]

 Producer 설정

코드 설정

from kafka import KafkaProducer
from json import dumps
import time

producer = KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=['[Broker IP]:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

start = time.time()

for i in range(100):
    data = {'str' : 'result'+str(i)}
    producer.send('test', value=data)		# consumer 랑 맞춰주어야 하는 topic 명
    producer.flush()

print("elapsed :", time.time() - start)

 

 Consumer 설정

 코드 설정

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'test',		# producer 랑 맞춰주어야 하는 topic 명
    bootstrap_servers=['[Broker IP]:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    consumer_timeout_ms=10000
)

print('[begin] get consumer list')

for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
    message.topic, message.partition, message.offset, message.key, message.value))

print('[end] get consumer list')

 

 

☞ 확인하기 ~ ☜

producer terminal

 

consumer terminal