from kafka import KafkaConsumer
from pyspark.sql.session import SparkSession
from pyspark.sql.streaming import *
import sys
def consumer1():
spark=SparkSession.builder.getOrCreate()
bootstrap_servers = 'localhost:9092'
topic_name = 'orcdb'
df=spark.readStream.format('kafka')\
.option("kafka.bootstrap.servers","localhost:9092")\
.option("subscribe",topic_name).option("startingOffsets", "earliest").load()
df.show()
consumer1()
===========================================================
You got error like :
(1) pyspark.sql.utils.
I am using kafka 3.0.2 then I have use below package resolve by error.
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 (Kafka version 3.0.2)
Please put more focus on the kafka version. Because if you will not keep right version will face lot of issue and you will keep changing the jars in lib directory and nothing will work. Even at run time also you will get issue when the program will hit df.writeStream()
Example:--
bin/spark-submit --class org.apache.spark.examples.SparkPi --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 /home/dheerendra/PycharmProjects/pythonProject/python_consumer_df.py
(2) If you are getting error like :
java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
Then please remove the addition jars you have added with Spark
locations : - spark-2.4.7-bin-hadoop2.7\jars
Because if you use right package accoriding to your spark version then you will not face error(2)
No comments:
Post a Comment