Sunday, May 9, 2021

Write own kafka procedure and consumer

 Step 1 : - Start Zoopkeer

Step 2 :    Start Kafka Server

Step3 :   Install  python3-kafka

Step4:   Write python class file with procedure and consumer together

Consumer.py

from kafka import KafkaConsumer

def consumer1():
bootstrap_servers = 'localhost:9092'
topic_name = 'mytest2'
consumer = KafkaConsumer(topic_name, bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest')
with open('/home/dheerendra/kafka_files/process/kafka_data.txt','a') as wfile:
for i in consumer:
print(i.value.decode('utf-8'))
wfile.writelines(i.value.decode('utf-8'))

consumer1()
Consumer.py 
from threading import *
from time import *
from kafka import KafkaProducer

import consumer_multithread
from consumer_multithread import consumer1
class kafka_work:
def producer1():
topic = 'mytest2'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
val='hellow Amit Sir'
#msg = "{key_item}".format(key_item=val)
msg = f'{val}'
future = producer.send(topic, msg.encode('utf-8'))
result = future.get(timeout=60)

obj=kafka_work()
while True:
obj_producer=Thread(target=obj.producer1)
obj_producer.start()
sleep(3)

consumer_multithread.consumer1()
 

No comments:

Post a Comment