CLOUD/OpenSource

[Kafka] REST Proxy 서버를 이용한 메세지 전송

alsruds 2023. 3. 21. 02:20

[ Broker 서버 준비 ]

2023.03.20 - [분류 전체보기] - [Kafka] Broker 서버를 이용한 Producer&Consumer 메세지 전송

 

[Kafka] Broker 서버를 이용한 Producer&Consumer 메세지 전송

[ 가상머신 준비 ] ◎ CentOS 8 : 3대 → Producer(200.200.200.150) → Consumer(200.200.200.151) → Broker(200.200.200.152) ● 설치 # jdk 설치 yum -y install java-1.8.0-openjdk-devel.x86_64 # kafka 설치 wget https://downloads.apache.org/kafk

alsrudalsrudalsrud.tistory.com

 

[ REST Proxy 서버 준비 ]

● 설치

# 패키지 다운로드
wget http://packages.confluent.io/archive/5.5/confluent-community-5.5.0-2.12.zip

# 압축 풀기
unzip confluent-community-5.5.0-2.12.zip

 

/etc/kafka-rest/kafka-rest.properties  파일 설정

#IP 설정
zookeeper.connect=[zookeeper IP]:2181
bootstrap.servers=PLAINTEXT://[kafka IP]:9092

#CORS 설정
access.control.allow.origin=*
access.control.allow.methos=GET,POST,PUT,DELETE
access.control.allow.headers=origin,content-type,accept,authorization

 

● 실행

bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties
#bin이 있는 폴더에서 실행

 

● 테스트

》 proxy 서버 터미널 하나 더 실행

# Consumer에서 토픽 가져오기 ( group_id & instance )
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "test_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/test
	# group_id = test
	# instance = test_consumer_instance

# 해당 group_id & instance 로 구독
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["person"]}' \
http://localhost:8082/consumers/test/instances/test_consumer_instance/subscription

# 데이터 조회
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/test/instances/test_consumer_instance/records

테스트 성공~

현재 들어있는 데이터가 없어서 빈 리스트 출력 시 테스트 성공

 

[ 웹 페이지에서 실시간 조회하기 ]

Producer 코드

from kafka import KafkaProducer
from json import dumps

producer = KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=['[Broker IP]:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

data = {'message' : 'result'}			#전송 데이터
producer.send('person', value=data)		#topic
producer.flush()

 

Consumer 코드

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'person',							#topic
    bootstrap_servers=['[Broker IP]:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
)

for message in consumer:
    print("Topic: %s, Offset: %d, Value: %s" % (
    message.topic, message.offset, message.value))

 

 

웹페이지 html

<!DOCTYPE html>
<html>
    <head lang="en"><meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    
        <title>kafEEne</title>
        <script type="text/javascript" src="./pulling_files/jquery-2.1.4.min.js.다운로드"></script>
        <script type="text/javascript" src="./pulling_files/bootstrap.min.js.다운로드"></script>
        <link rel="stylesheet" type="text/css" href="./pulling_files/bootstrap.min.css">
        <style type="text/css">
            h1 {
                text-align: center;
                padding-bottom: 1em;
            }
            #content {
                width: 30%;
                margin: 0 auto;
            }
            #topTable {
            }
        </style>
    </head>

    <body>
        <div id="txt" align="center"></div>

        <div id="content"   >
            <h3>Kafka REST</h3>
            <table id="topTable" class="table">
                <thead>
                    <tr>
                        <th width="150">Topic</th>
                        <th width="100">Offset</th>
                        <th width="100">Message</th>    
                    </tr>
                </thead>
                <tbody>

                </tbody>
            </table>
        </div>

        <script type="text/javascript">

            function sendRequest() {
                var httpRequest = new XMLHttpRequest();
                httpRequest.onreadystatechange = function() {
                    if (httpRequest.readyState == XMLHttpRequest.DONE && httpRequest.status == 200 ) {		
                        var result = JSON.parse(httpRequest.responseText);
                        if (result.length != 0) {	
                            for(var i=0; i<result.length; i++) {
                                var tableBody = $("#topTable").find("tbody");
						
                                var row = $("<tr>").append(
                                        $("<td>").text(result[i]['topic']),
                                        $("<td>").text(result[i]['offset']),
                                        $("<td>").text(result[i]['value']['message'])
                                        );
                                tableBody.append(row);
                            }
                        }
                    }
                };
                // GET 방식 요청 & 데이터 전달
                httpRequest.open("GET", "http://[Proxy IP]:8082/consumers/test/instances/test_consumer_instance/records", true);
                httpRequest.setRequestHeader("Accept", "application/vnd.kafka.json.v2+json");
                httpRequest.send();
            }

            function consume() {
                sendRequest();
                setTimeout(consume, 5000);
            };

            consume();
        </script>
    </body>
</html>

 

☞ 확인하기 ☜

Consumer 코드 실행 》 Producer 코드 실행 》 웹페이지 확인하기

※ Proxy 서버 방화벽 해제 : systemctl stop firewalld

성공~