Use case : Data continuously updating in mysql table.We need to read that data through Kafka with Spark Streaming using PySpark and write back to another table of MySql.
Solution :
Step 1: Make connection with Python and Mysql.Connect with Mysql using mysql connector and insert records every 5 second in mysql table.
File name - Data_insert_in_mysql.py
from threading import Thread
from time import sleep
import mysql.connector
def mysqlconn():
mydb = mysql.connector.connect(
host="localhost",
user="root",
password=""
)
mycursor = mydb.cursor()
sql="INSERT INTO kafkadb.emp (name, age) VALUES (%s, %s)"
val = ['Peter',10 ]
mycursor.execute(sql,val)
mycursor.close()
mydb.commit()
while True:
obj_producer=Thread(target=mysqlconn())
obj_producer.start()
sleep(5)
Step 2: Write Kafka producer to write mysql table to topic.(Kafka_producer.py)
from threading import Thread
from time import sleep
import mysql.connector
import python_mysql_conn
from kafka import KafkaProducer
def produce():
topic = 'orcdb2'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
val=python_mysql_conn.mysqlconn()
#val='Amit hello how are you'
msg = "{key_item}".format(key_item=val)
msg = f'{val}'
print(msg)
future = producer.send(topic, msg.encode('utf-8'))
#result = future.get(timeout=30)
while True:
obj_producer=Thread(target=produce())
obj_producer.start()
sleep(5)
Step 3: Write consumer to read streaming data and write back to mysql table.
Note :- Download mysql-connector-java-8.0.25.jar and copy in spark jar folder(e.g. /hadoop/spark/jars)
(consumer.py)
from kafka import KafkaConsumer
from pyspark.sql.functions import from_json, col, get_json_object, json_tuple, to_json, substring
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.window import Window
from pyspark.streaming import *
spark = SparkSession.builder.getOrCreate()
def consumer1():
db_target_url = "jdbc:mysql://localhost/kafkadb"
db_target_properties = {"user": "root", "password": ""}
bootstrap_servers = 'localhost:9092'
topic_name = 'orcdb2'
sch = StructType([StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True)])
df=spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrap_servers)\
.option("subscribe", topic_name)\
.load()
df2 = df.selectExpr("CAST(value AS STRING)","CAST(timestamp AS TIMESTAMP)")
df3=df2.select(substring(df2.value,4,5).alias("name"),
substring(df2.value,-4,2).alias("age"),
df2.timestamp.alias("time"))
def process_row(df3,epoch_id):
df3.createOrReplaceTempView("ins_emp")
db_target_url = "jdbc:mysql://localhost/kafkadb"
jdbc_properties = {"user": "root", "password": "1212", "driver": "com.mysql.jdbc.Driver"}
df3.write.jdbc(url=db_target_url, table="ins_emp", mode="append", properties=db_target_properties)
q=df3.writeStream.foreachBatch(process_row).start()
q.awaitTermination()
consumer1()
No comments:
Post a Comment