Tuesday, July 30, 2019

Working with Dataframe at beginner level

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext,HiveContext
sqlContext = SQLContext(sc)
from pyspark.sql.functions import window, column, desc, col
from pyspark.sql import Row
from pyspark.sql.functions import expr
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField,LongType
=====================================
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
fdata=spark.read.option("inferSchema","true").option("header","true").text("/python/data1/flight_data.txt")
>>> fdata.take(3)
[Row(value=u'DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count'), Row(value=u'United States,Romania,15'), Row(value=u'United States,Croatia,1')]

fdata=spark.read.option("inferSchema","true").option("header","true").csv("/python/data1/flight_data")

>>> fdata.collect()
[Row(value=u'DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count'), Row(value=u'United States,Romania,15'), Row(value=u'United States,Croatia,1'), Row(value=u'United States,Ireland,344')]
>>> fdata.count()
4
>>> ftab=fdata.createOrReplaceTempView("ftable")
>>> sql_q1=spark.sql("select DEST_COUNTRY_NAME,count(1) from ftable group by DEST_COUNTRY_NAME")
>>>sql_q1.show()

>>> sql_q1=spark.sql("select DEST_COUNTRY_NAME,count(1) from ftable group by DEST_COUNTRY_NAME")
>>> sql_q1.show()
+-----------------+--------+                                                   
|DEST_COUNTRY_NAME|count(1)|
+-----------------+--------+
|    United States|       3|
+-----------------+--------+

>>>withdf_data=fdata.groupBy("DEST_COUNTRY_NAME").count()100
>>> withdf_data.show()
+-----------------+-----+                                                     
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|    United States|    3|
+-----------------+-----+

maxSql = spark.sql("SELECT ORIGIN_COUNTRY_NAMe, sum(count) as destination_total FROM data_v GROUP BY ORIGIN_COUNTRY_NAMe ORDER BY sum(count) DESC LIMIT 5")

==Same above query With DF==
df1=fdata.groupBy("ORIGIN_COUNTRY_NAMe").sum("count").withColumnRenamed("sum(count)", "destination_total").sort("destination_total").limit(5).show()

=================================

sdf=spark.read.format("csv").option("header","true").load("/home/data/retail")
sdf.select("InvoiceNo","CustomerID","Description").show()

========Show records in Row format===============
myrow=sdf.select("InvoiceNo","CustomerID","Description").collect()
myrow[1]
myrow[2]



sdf_data=sdf.selectExpr("CustomerID","(UnitPrice*Quantity) as total_price","InvoiceDate").groupBy(col("CustomerID"),window(col("InvoiceDate"),"1 day")).sum("total_price").limit(2)

sdf2_data=sdf2.selectExpr("CustomerID","(UnitPrice*Quantity) as total_price","InvoiceDate").where("CustomerID=940" and "InvoiceDate>='2010-12-01'")

When more conditionin where.

>>> sdf2_data=sdf2.selectExpr("CustomerID","(UnitPrice*Quantity) as total_price","InvoiceDate").where("CustomerID=940").where( "InvoiceDate>='2010-12-01'").where( "InvoiceDate<='2010-12-02'")

=====================================json file======================================
jfile=spark.read.format("json").load("/home/data/ex1.json")
jfile.printSchema()
root
 |-- fruit: string (nullable = true)
 |-- size: string (nullable = true)

===============with tempview====================
jfile.createOrReplaceTempView("jtable")
spark.sql("select fruit,size from jtable").show()

=============with DF==========
jfile.select("fruit","size")
>>> jfile.select("fruit","size")
DataFrame[fruit: string, size: string]
>>> jfile.select("fruit","size").show()'
+-----+-----+
|fruit| size|
+-----+-----+
|Apple|Large|
+-----+-----+

jfile2=spark.read.format("json").option("multiLine", true).option("mode", "PERMISSIVE").load("/home/data/ex.json")
jfile3=spark.read.format("json").load("/home/data/ex.json",multiLine=True)
jfile3.printSchema()

==================with DF===========
jfile3.select("quiz.sport.q1.answer").show()

===============with Temp View table==========

jfile3.createOrReplaceTempView("jtable")
spark.sql("select quiz.sport.q1.answer from jtable").show()


====================explode=============
>>> from pyspark.sql.functions import window, column, desc, col,explode
>>> jfile3.select(explode("options").alias("more_options"),"question").show()


======filter======
>>> sdf.filter("CustomerID==17").show()

col1=col("UnitPrice") > 2
col2=col("StockCode")==71053
sdf.where(sdf.CustomerID.isin("1890")).where(col1|col2).show()


=======================withColumn==================Return df with new column name along with other columsn of same df==========
>>> sdf.withColumn("Quantity",expr("UnitPrice==2.55")).show()
>>> sdf.withColumn("Quantity",expr("UnitPrice==2.55")).filter("Quantity").show()
>>> sdf.withColumn("Quantity",expr("UnitPrice==2.55")).filter("Quantity").select("InvoiceNo").show()

======alias==
sdf.selectExpr("InvoiceNo  as inv").show()

=====groupBy==========
dfile.groupBy("InvoiceNo").count().show()
sdf.groupBy("InvoiceNo").agg(sum("UnitPrice")).sort("InvoiceNo").show()

==========Window function========
from pyspark.sql.window import Window
win=Window.partitionBy("CustomerID").orderBy("InvoiceNo")
win=Window.partitionBy("CustomerID").orderBy("InvoiceNo").rowsBetween(-1,1)
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(win)
sdf.select("StockCode",maxPurchaseQuantity).show()


>>> from pyspark.sql.types import *
>>>
>>> from pyspark.sql.types import Row
>>> fields=[StructField(field_name,StringType(),True) for field_name in schema1.split()]
>>> sch=StructType(fields)
>>> df=spark.createDataFrame(file,sch)
>>> df.describe()
DataFrame[summary: string, id: string, dcntry: string, ocntry: string, cnt: string]


==============
When json is complex and after loading json file you get schema like

>>> df.printSchema()
root
 |-- _corrupt_record: string (nullable = true)

Then in that case use option("multiLine","true") to get complete schema.

>>> df=spark.read.format("json").option("multiline","true").load("/dhiru/input/zipcode.json")
19/07/28 17:54:17 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> df.printSchema()
root
 |-- data: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- meta: struct (nullable = true)
 |    |-- view: struct (nullable = true)
 |    |    |-- attribution: string (nullable = true)
 |    |    |-- averageRating: long (nullable = true)
 |    |    |-- category: string (nullable = true)
 |    |    |-- columns: array (nullable = true)


No comments:

Post a Comment