When your kafka has started succesfully and you want to integrate Kafka with Pyspark.
Please find word count program regarding this.
Open Pyspark terminal and work below program.
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc,60)
kvs = KafkaUtils.createDirectStream(ssc, ['test12'], {"metadata.broker.list": 'localhost:9092'})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, )).reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
Please find word count program regarding this.
Open Pyspark terminal and work below program.
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc,60)
kvs = KafkaUtils.createDirectStream(ssc, ['test12'], {"metadata.broker.list": 'localhost:9092'})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, )).reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
No comments:
Post a Comment