CLOUD/OpenSource

[Kafka/Logstash] 메세지 전송

alsruds 2023. 3. 24. 03:44

☆ 개요  

▷ Python Code (producer) Kafka (broker) Logstash (consumer) Elasticsearch Kibana

 

● Python

from kafka import KafkaProducer
from json import dumps
import time

producer = KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=['200.200.200.145:9092'],     #kafka broker ip
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

start = time.time()

data = {'message': 'result'}	# 보낼 메세지
producer.send('test', value=data)	# topic
producer.flush()

print("elapsed :", time.time() - start)

 

Logstash

》 /etc/logstash/conf.d/kafka.conf  파일 생성

input {
    kafka {
        bootstrap_servers =>  "200.200.200.145:9092"	#kafka ip
        group_id => "logstash"
        topics => ["test"]	# topic
        consumer_threads => 1
    }
}

output {
  elasticsearch {
    hosts => ["http://200.200.200.152:9200"]	#elasticsearch ip
    index => "kafka-test-%{+YYYY-MM-dd}"	#index
  }
}

 

Elasticsearch 확인

Postman에서 인덱스 확인

 

Kibana 확인

http://[kibana ip]:5601

》 좌측 상단 햄버거 버튼 클릭 》 Stack Management 》 Index Patterns 》 Create Index Pattern

생성된 인덱스 확인~

 

》 좌측 상단 햄버거 버튼 클릭 Discover

전송한 데이터도 확인~