Saturday, July 20, 2019

Pyspark Joins

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext,HiveContext
sqlContext = SQLContext(sc)
from pyspark.sql.functions import window, column, desc, col
from pyspark.sql import Row
from pyspark.sql.functions import expr
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField,LongType


sdf=spark.read.format("csv").option("header","true").load("/home/data/retail")

========================InnerJoin==============
>>> sdf.join(sdf2,sdf.InvoiceNo==sdf2.InvoiceNo).select(sdf2.InvoiceNo,sdf2.Country,sdf2.Quantity,sdf.Country).show()
df2=fdata2.join(fdata,join1).select(fdata["ID"],fdata2["count"]).show()
df2=fdata.join(fdata2,fdata.ID==fdata2.ID).select(fdata2.ID,fdata.DEST_COUNTRY_NAME).show()


========================leftJoin==============
join1='left_outer'
joinon=fdata["ID"]==fdata2["ID"]
df2=fdata.join(fdata2,joinon,join1).select(fdata.ID,fdata2.DEST_COUNTRY_NAME).show()

========================rightJoin==============
join1='right_outer'
joinon=fdata["ID"]==fdata2["ID"]
df2=fdata.join(fdata2,joinon,join1).select(fdata.ID,fdata2.DEST_COUNTRY_NAME).show()
fdata.join(fdata2,fdata.ID==fdata2.ID,how='right_outer').select(fdata.ID,fdata2.DEST_COUNTRY_NAME).show()

========================leftJoin==============
join1='left_semi'
joinon=fdata["ID"]==fdata2["ID"]
df2=fdata.join(fdata2,joinon,join1).show()

=====================lef anti join ===========
join1='left_anti'
joinon=fdata["ID"]==fdata2["ID"]
df2=fdata.join(fdata2,joinon,join1).select(fdata.ID,fdata2.DEST_COUNTRY_NAME).show()
fdata.join(fdata2,fdata.ID==fdata2.ID,join1).show()

=====================union===========
df2=fdata.union(fdata2).show()


join1='natural'
joinon=fdata["ID"]==fdata2["ID"]
df2=fdata.join(fdata2,joinon,join1).select(fdata.ID,fdata2.DEST_COUNTRY_NAME).show()
fdata.join(fdata2,fdata.ID==fdata2.ID,join1).show()

==================join with broadcast============(MAPJOIN,BROADCAST,BROADCASTJOIN================
from pyspark.sql.functions import broadcast
sdf.join(broadcast(sdf1),sdf.ID==sdf1.ID).select(sdf.ID,sdf.DEST_COUNTRY_NAME).show()

No comments:

Post a Comment