[ Broker 서버 준비 ]
2023.03.20 - [분류 전체보기] - [Kafka] Broker 서버를 이용한 Producer&Consumer 메세지 전송
[ REST Proxy 서버 준비 ]
● 설치
# 패키지 다운로드
wget http://packages.confluent.io/archive/5.5/confluent-community-5.5.0-2.12.zip
# 압축 풀기
unzip confluent-community-5.5.0-2.12.zip
● /etc/kafka-rest/kafka-rest.properties 파일 설정
#IP 설정
zookeeper.connect=[zookeeper IP]:2181
bootstrap.servers=PLAINTEXT://[kafka IP]:9092
#CORS 설정
access.control.allow.origin=*
access.control.allow.methos=GET,POST,PUT,DELETE
access.control.allow.headers=origin,content-type,accept,authorization
● 실행
bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties
#bin이 있는 폴더에서 실행
● 테스트
》 proxy 서버 터미널 하나 더 실행
# Consumer에서 토픽 가져오기 ( group_id & instance )
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "test_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/test
# group_id = test
# instance = test_consumer_instance
# 해당 group_id & instance 로 구독
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["person"]}' \
http://localhost:8082/consumers/test/instances/test_consumer_instance/subscription
# 데이터 조회
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/test/instances/test_consumer_instance/records
》 현재 들어있는 데이터가 없어서 빈 리스트 출력 시 테스트 성공
[ 웹 페이지에서 실시간 조회하기 ]
● Producer 코드
from kafka import KafkaProducer
from json import dumps
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['[Broker IP]:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
data = {'message' : 'result'} #전송 데이터
producer.send('person', value=data) #topic
producer.flush()
● Consumer 코드
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'person', #topic
bootstrap_servers=['[Broker IP]:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
)
for message in consumer:
print("Topic: %s, Offset: %d, Value: %s" % (
message.topic, message.offset, message.value))
● 웹페이지 html
<!DOCTYPE html>
<html>
<head lang="en"><meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>kafEEne</title>
<script type="text/javascript" src="./pulling_files/jquery-2.1.4.min.js.다운로드"></script>
<script type="text/javascript" src="./pulling_files/bootstrap.min.js.다운로드"></script>
<link rel="stylesheet" type="text/css" href="./pulling_files/bootstrap.min.css">
<style type="text/css">
h1 {
text-align: center;
padding-bottom: 1em;
}
#content {
width: 30%;
margin: 0 auto;
}
#topTable {
}
</style>
</head>
<body>
<div id="txt" align="center"></div>
<div id="content" >
<h3>Kafka REST</h3>
<table id="topTable" class="table">
<thead>
<tr>
<th width="150">Topic</th>
<th width="100">Offset</th>
<th width="100">Message</th>
</tr>
</thead>
<tbody>
</tbody>
</table>
</div>
<script type="text/javascript">
function sendRequest() {
var httpRequest = new XMLHttpRequest();
httpRequest.onreadystatechange = function() {
if (httpRequest.readyState == XMLHttpRequest.DONE && httpRequest.status == 200 ) {
var result = JSON.parse(httpRequest.responseText);
if (result.length != 0) {
for(var i=0; i<result.length; i++) {
var tableBody = $("#topTable").find("tbody");
var row = $("<tr>").append(
$("<td>").text(result[i]['topic']),
$("<td>").text(result[i]['offset']),
$("<td>").text(result[i]['value']['message'])
);
tableBody.append(row);
}
}
}
};
// GET 방식 요청 & 데이터 전달
httpRequest.open("GET", "http://[Proxy IP]:8082/consumers/test/instances/test_consumer_instance/records", true);
httpRequest.setRequestHeader("Accept", "application/vnd.kafka.json.v2+json");
httpRequest.send();
}
function consume() {
sendRequest();
setTimeout(consume, 5000);
};
consume();
</script>
</body>
</html>
☞ 확인하기 ☜
》 Consumer 코드 실행 》 Producer 코드 실행 》 웹페이지 확인하기
※ Proxy 서버 방화벽 해제 : systemctl stop firewalld
'CLOUD > OpenSource' 카테고리의 다른 글
[Kafka] Zookeeper & Kafka Clustering (0) | 2023.03.22 |
---|---|
[Kafka] 게시글 등록 현황 실시간으로 확인 하기 (0) | 2023.03.21 |
[Kafka] 웹 브라우저 동작 시 Consumer 에 토픽 메세지 전송 (2) | 2023.03.20 |
[Kafka] Broker 서버를 이용한 Producer&Consumer 메세지 전송 (2) | 2023.03.20 |
[Prometheus&Grafana] 모니터링 시스템 2 (0) | 2023.02.28 |