Wednesday, June 15, 2022

Compare array list with Dataframe column

 # If you have array list column of dataframe and need to check or compare  another element of same dataframe then you can achieve this by using  "expr" function for more details please find below code.

from pyspark.sql.functions import expr 
from pyspark.sql import functions as sf 
from pyspark.sql.functions import array_contains


df_desc_split=df_trx.withColumn('split_desc',sf.split(sf.col('description'),' '))
df_name_flg=df_desc_split.withColumn("first_name_flag", sf.expr("array_contains(split_desc, FIRSTNAME)")).withColumn("middle_name_flag", sf.expr("array_contains(split_desc, MIDDLE)"))

Explanation : I have description field containing string like "My name is dheerendra" which I split and keep in array field "split_desc" like ['My','name','is','dheerendra']

Now if I have another column of same dataframe e.g "name" which contain "dheerendra".

|name   |split_desc|
|dheerendra| ['My','name','is','dheerendra']|

If I need to check the existence of 'dheerendra' in  split_desc field then need to use "expr" function along with array_contains functions

df_desc_split.withColumn("first_name_flag", sf.expr("array_contains(split_desc, FIRSTNAME)"))

Sunday, June 6, 2021

Kafka PySpark Streaming with Mysql

 Use case : Data continuously updating in mysql table.We need to read that data through Kafka with Spark Streaming using PySpark and write back to another table of MySql.

 

Solution :

Step 1:  Make connection with Python and Mysql.Connect with Mysql using mysql connector and insert records every 5 second in mysql table.

File name - Data_insert_in_mysql.py


from threading import Thread
from time import sleep
import mysql.connector

def mysqlconn():
mydb = mysql.connector.connect(
host="localhost",
user="root",
password=""
)
mycursor = mydb.cursor()
sql="INSERT INTO kafkadb.emp (name, age) VALUES (%s, %s)"
val = ['Peter',10 ]
mycursor.execute(sql,val)
mycursor.close()
mydb.commit()


while True:
obj_producer=Thread(target=mysqlconn())
obj_producer.start()
sleep(5)

Step 2: Write Kafka producer to write mysql table to topic.(Kafka_producer.py)
from threading import Thread
from time import sleep

import mysql.connector
import python_mysql_conn
from kafka import KafkaProducer


def produce():
topic = 'orcdb2'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
val=python_mysql_conn.mysqlconn()
#val='Amit hello how are you'
msg = "{key_item}".format(key_item=val)
msg = f'{val}'
print(msg)
future = producer.send(topic, msg.encode('utf-8'))
#result = future.get(timeout=30)


while True:
obj_producer=Thread(target=produce())
obj_producer.start()
sleep(5)

 Step 3: Write consumer to read streaming data and write back to mysql table.
 Note :- Download mysql-connector-java-8.0.25.jar and copy in spark jar folder(e.g. /hadoop/spark/jars)
(consumer.py)
 
from kafka import KafkaConsumer
from pyspark.sql.functions import from_json, col, get_json_object, json_tuple, to_json, substring
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.window import Window
from pyspark.streaming import *

spark = SparkSession.builder.getOrCreate()

def consumer1():
db_target_url = "jdbc:mysql://localhost/kafkadb"
db_target_properties = {"user": "root", "password": ""}

bootstrap_servers = 'localhost:9092'
topic_name = 'orcdb2'
sch = StructType([StructField("Name", StringType(), True),
StructField("Age", IntegerType(), True)])
df=spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrap_servers)\
.option("subscribe", topic_name)\
.load()
df2 = df.selectExpr("CAST(value AS STRING)","CAST(timestamp AS TIMESTAMP)")
df3=df2.select(substring(df2.value,4,5).alias("name"),
substring(df2.value,-4,2).alias("age"),
df2.timestamp.alias("time"))

def process_row(df3,epoch_id):
df3.createOrReplaceTempView("ins_emp")
db_target_url = "jdbc:mysql://localhost/kafkadb"
jdbc_properties = {"user": "root", "password": "1212", "driver": "com.mysql.jdbc.Driver"}
df3.write.jdbc(url=db_target_url, table="ins_emp", mode="append", properties=db_target_properties)

q=df3.writeStream.foreachBatch(process_row).start()
q.awaitTermination()

consumer1()

 



Saturday, May 15, 2021

Pyspark Error - pyspark.sql.utils.AnalysisException: Failed to find data source

from kafka import KafkaConsumer

from pyspark.sql.session import  SparkSession

from pyspark.sql.streaming import *

import sys


def consumer1():

    spark=SparkSession.builder.getOrCreate()

    bootstrap_servers = 'localhost:9092'

    topic_name = 'orcdb'

    df=spark.readStream.format('kafka')\

         .option("kafka.bootstrap.servers","localhost:9092")\

         .option("subscribe",topic_name).option("startingOffsets", "earliest").load()

    df.show()


consumer1()


===========================================================

 You got error like :

(1) pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide"

I am using kafka 3.0.2 then I have use below package  resolve by error.

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 (Kafka version 3.0.2)

Please put more focus on the kafka version. Because if you will not keep right version will face lot of issue and you will keep changing the jars in lib directory and nothing will work. Even at run time also you will get issue when the program will hit df.writeStream() 


Example:--

bin/spark-submit --class org.apache.spark.examples.SparkPi --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 /home/dheerendra/PycharmProjects/pythonProject/python_consumer_df.py


(2) If you are getting error like :

java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.streaming.StreamingRelationV2

Then please remove the addition jars you have added with Spark

locations : -  spark-2.4.7-bin-hadoop2.7\jars


Because if you use right package accoriding to your spark version then you will not face error(2) 






Sunday, May 9, 2021

Python version issue with pyspark

 If you have install Spark which creating issue with version of python installed on your system

Example: 

          I have install kafka on ubuntu using python3 and its working fine.But now I have install spark which is creating issue with python3 and after lot of  hard work  in setting you are getting below error:


(1) env: 'python' No such file or directory


Solution : Please use below command to resolve this problem

                sudo ln -s /usr/bin/python3 /usr/bin/python

        

Write own kafka procedure and consumer

 Step 1 : - Start Zoopkeer

Step 2 :    Start Kafka Server

Step3 :   Install  python3-kafka

Step4:   Write python class file with procedure and consumer together

Consumer.py

from kafka import KafkaConsumer

def consumer1():
bootstrap_servers = 'localhost:9092'
topic_name = 'mytest2'
consumer = KafkaConsumer(topic_name, bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest')
with open('/home/dheerendra/kafka_files/process/kafka_data.txt','a') as wfile:
for i in consumer:
print(i.value.decode('utf-8'))
wfile.writelines(i.value.decode('utf-8'))

consumer1()
Consumer.py 
from threading import *
from time import *
from kafka import KafkaProducer

import consumer_multithread
from consumer_multithread import consumer1
class kafka_work:
def producer1():
topic = 'mytest2'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
val='hellow Amit Sir'
#msg = "{key_item}".format(key_item=val)
msg = f'{val}'
future = producer.send(topic, msg.encode('utf-8'))
result = future.get(timeout=60)

obj=kafka_work()
while True:
obj_producer=Thread(target=obj.producer1)
obj_producer.start()
sleep(3)

consumer_multithread.consumer1()
 

Saturday, May 8, 2021

Kafka setup

Step 1 : Download Kafka kafka_2.11-2.4.1.tgz and uncompress it and            

             rename kafka_2.11-2.4.1 >> kafka

Step 2 : Install Java and setup Kafka

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export HADOOP_HOME=/home/dheerendra/hadoop/hadoop-3.2.1
export KAFKA_HOME=/home/dheerendra/hadoop/kafka
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$HIVE_HOME/bin:$JAVA_HOME/bin:$SQOOP_HOME/bin:$KAFKA_HOME/bin
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"

Step 3 : Create 2 folder at any location 

            (a) /home/dheerendra/kafka_files/kafka/kafka-logs

            (b) /home/dheerendra/kafka_files/kafka/zookeeper-data

Step 4: Modified properties file in ./config folder of kafka

            (a) zookeeper.properties

                (1) dataDir=/home/dheerendra/kafka_files/kafka/zookeeper-data

                (2) admin.enableServer=true

            (b) server.properties

                (1) log.dirs=/home/dheerendra/kafka_files/kafka/kafka-logs


Step 4: Below services

          (1) start zookeeper
                    $kafka/./bin/zookeeper-server-start.sh ./config/zookeeper.properties

           (2) start kafka server
                    $kafka/./bin/kafka-server-start.sh ./config/server.properties

           (3) create testtopic
                   $kafka/./bin/kafka-topics.sh --create --zookeeper localhost:2181                                     --replication-factor 1 --partitions 1 --topic testtopic
           

          (4) start producer
                $kafka/bin/kafka-console-producer.sh --broker-list localhost:9092                                 --topic     testtopic
 

         (5)  start consumer

                $kafka/./bin/kafka-console-consumer.sh --bootstrap-server                                         localhost:9092 --topic testtopic --from-beginning

                      






Tuesday, April 13, 2021

JDBC connection with Spark using pyspark

 from pyspark.sql.session import SparkSession

from lib2to3.tests.support import driver



def orcl_conn():

    spark =SparkSession.builder.getOrCreate()

    

    dbconn = spark.read \

    .format("jdbc").option("url","jdbc:oracle:thin:scott/tiger@//LAPTOP-K8VAJ401:1521/xe")\

    .option("dbtable","emp")\

    .option("user", "scott") \

    .option("password", "tiger") \

    .option("driver", "oracle.jdbc.driver.OracleDriver").load()

    

    empdf=spark.read.format('csv').option('header','True').load('D:/projects/data/test/emp.csv')

    

    empdf.write \

    .format("jdbc").option("url","jdbc:oracle:thin:scott/tiger@//LAPTOP-K8VAJ401:1521/xe")\

    .option("dbtable","empdf")\

    .option("user", "scott") \

    .option("password", "tiger") \

    .option("driver", "oracle.jdbc.driver.OracleDriver").save()

    

    empdf.show()

    

    #dbconn.printSchema()

    

orcl_conn()

Sunday, March 28, 2021

Salting in Spark

 from pyspark.sql import SparkSession

from  pyspark.sql.functions import lit,floor,rand,col,concat,explode,array,split

from pyspark.sql import Window




def salt():

    try:

        spark = SparkSession.builder.getOrCreate()

        sc = spark.sparkContext



        df1= spark.read.format('csv').option('header',True).load('D:\projects\data\emp2.csv')

        df2= spark.read.format('csv').option('header',True).load('D:\projects\data\dept2.csv')


        df1.createOrReplaceTempView("emp_tab")

        df2.createOrReplaceTempView("dept_tab")


        df3=spark.sql("select *," + " concat(name ,'_',floor(rand(123456)*10))" + "as salt_key,name from emp_tab")

        df4=spark.sql("select name,dept,"+ "  explode(array(0,1,2,3,4,5,6,7,8,9))"+ "as salt_key,name from dept_tab")

        df4.createOrReplaceTempView("dept_tab2")

        df3.createOrReplaceTempView("emp_tab2")

        df3.show()

        df4.show()

        df5=spark.sql("select split(t1.salt_key,'_')[0] as key1 ,t2.dept " + " from emp_tab2 t1,"+"dept_tab2 t2 "

                      +"where t1.salt_key=concat(t2.name,'_',t2.salt_key)")

        

        df5.show()

        #df4.show()

        

    except SystemError:

        print('Error in program')    


#Calling the fucntion   

salt()

Saturday, January 23, 2021

Install Pyspark on eclipse Error - pyspark: The system cannot find the path specified

 

Solution :-


1. Copy winutils folder where you keep your spark software location(example \spark\spark-3.0.1-bin-hadoop2.7\winutils)


2. Set up the variable in eclipse 







Thursday, October 1, 2020

Pyspark calculate No of Days

 from pyspark.sql.functions import col, size, split,udf

from pyspark.sql import SparkSession

import datetime

from datetime import timedelta


spark = SparkSession.builder.getOrCreate()


df2 = spark.read.format('csv').options(header='true',delimiter = '|').load("/tmp/dataframe_sample.csv")


@udf(returnType='int')

def date_dif(st_date,end_date):

    datetimeFormat = '%Y-%m-%d %H:%M:%S'

    diff = datetime.datetime.strptime(end_date, datetimeFormat) - datetime.datetime.strptime(st_date, datetimeFormat)

    cnt=diff.days

    return cnt 

  

res=df2.withColumn('Noofdays',date_dif(df2.start_date,df2.end_date))

print(res.show())