Sunday, March 28, 2021

Salting in Spark

 from pyspark.sql import SparkSession

from  pyspark.sql.functions import lit,floor,rand,col,concat,explode,array,split

from pyspark.sql import Window




def salt():

    try:

        spark = SparkSession.builder.getOrCreate()

        sc = spark.sparkContext



        df1= spark.read.format('csv').option('header',True).load('D:\projects\data\emp2.csv')

        df2= spark.read.format('csv').option('header',True).load('D:\projects\data\dept2.csv')


        df1.createOrReplaceTempView("emp_tab")

        df2.createOrReplaceTempView("dept_tab")


        df3=spark.sql("select *," + " concat(name ,'_',floor(rand(123456)*10))" + "as salt_key,name from emp_tab")

        df4=spark.sql("select name,dept,"+ "  explode(array(0,1,2,3,4,5,6,7,8,9))"+ "as salt_key,name from dept_tab")

        df4.createOrReplaceTempView("dept_tab2")

        df3.createOrReplaceTempView("emp_tab2")

        df3.show()

        df4.show()

        df5=spark.sql("select split(t1.salt_key,'_')[0] as key1 ,t2.dept " + " from emp_tab2 t1,"+"dept_tab2 t2 "

                      +"where t1.salt_key=concat(t2.name,'_',t2.salt_key)")

        

        df5.show()

        #df4.show()

        

    except SystemError:

        print('Error in program')    


#Calling the fucntion   

salt()

No comments:

Post a Comment