from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext,HiveContext
sqlContext = SQLContext(sc)
from pyspark.sql.functions import window, column, desc, col
from pyspark.sql import Row
from pyspark.sql.functions import expr
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField,LongType
=====================================
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
fdata=spark.read.option("inferSchema","true").option("header","true").text("/python/data1/flight_data.txt")
>>> fdata.take(3)
[Row(value=u'DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count'), Row(value=u'United States,Romania,15'), Row(value=u'United States,Croatia,1')]
fdata=spark.read.option("inferSchema","true").option("header","true").csv("/python/data1/flight_data")
>>> fdata.collect()
[Row(value=u'DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count'), Row(value=u'United States,Romania,15'), Row(value=u'United States,Croatia,1'), Row(value=u'United States,Ireland,344')]
>>> fdata.count()
4
>>> ftab=fdata.createOrReplaceTempView("ftable")
>>> sql_q1=spark.sql("select DEST_COUNTRY_NAME,count(1) from ftable group by DEST_COUNTRY_NAME")
>>>sql_q1.show()
>>> sql_q1=spark.sql("select DEST_COUNTRY_NAME,count(1) from ftable group by DEST_COUNTRY_NAME")
>>> sql_q1.show()
+-----------------+--------+
|DEST_COUNTRY_NAME|count(1)|
+-----------------+--------+S
| United States| 3|
+-----------------+--------+
>>>withdf_data=fdata.groupBy("DEST_COUNTRY_NAME").count()100
>>> withdf_data.show()
+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
| United States| 3|
+-----------------+-----+
maxSql = spark.sql("SELECT ORIGIN_COUNTRY_NAMe, sum(count) as destination_total FROM data_v GROUP BY ORIGIN_COUNTRY_NAMe ORDER BY sum(count) DESC LIMIT 5")
==Same above query With DF==
df1=fdata.groupBy("ORIGIN_COUNTRY_NAMe").sum("count").withColumnRenamed("sum(count)", "destination_total").sort("destination_total").limit(5).show()
=================================
sdf=spark.read.format("csv").option("header","true").load("/home/data/retail")
sdf.select("InvoiceNo","CustomerID","Description").show()
========Show records in Row format===============
myrow=sdf.select("InvoiceNo","CustomerID","Description").collect()
myrow[1]
myrow[2]
sdf_data=sdf.selectExpr("CustomerID","(UnitPrice*Quantity) as total_price","InvoiceDate").groupBy(col("CustomerID"),window(col("InvoiceDate"),"1 day")).sum("total_price").limit(2)
sdf2_data=sdf2.selectExpr("CustomerID","(UnitPrice*Quantity) as total_price","InvoiceDate").where("CustomerID=940" and "InvoiceDate>='2010-12-01'")
When more conditionin where.
>>> sdf2_data=sdf2.selectExpr("CustomerID","(UnitPrice*Quantity) as total_price","InvoiceDate").where("CustomerID=940").where( "InvoiceDate>='2010-12-01'").where( "InvoiceDate<='2010-12-02'")
=====================================json file======================================
jfile=spark.read.format("json").load("/home/data/ex1.json")
jfile.printSchema()
root
|-- fruit: string (nullable = true)
|-- size: string (nullable = true)
===============with tempview====================
jfile.createOrReplaceTempView("jtable")
spark.sql("select fruit,size from jtable").show()
=============with DF==========
jfile.select("fruit","size")
>>> jfile.select("fruit","size")
DataFrame[fruit: string, size: string]
>>> jfile.select("fruit","size").show()'
+-----+-----+
|fruit| size|
+-----+-----+
|Apple|Large|
+-----+-----+
jfile2=spark.read.format("json").option("multiLine", true).option("mode", "PERMISSIVE").load("/home/data/ex.json")
jfile3=spark.read.format("json").load("/home/data/ex.json",multiLine=True)
jfile3.printSchema()
==================with DF===========
jfile3.select("quiz.sport.q1.answer").show()
===============with Temp View table==========
jfile3.createOrReplaceTempView("jtable")
spark.sql("select quiz.sport.q1.answer from jtable").show()
====================explode=============
>>> from pyspark.sql.functions import window, column, desc, col,explode
>>> jfile3.select(explode("options").alias("more_options"),"question").show()
======filter======
>>> sdf.filter("CustomerID==17").show()
col1=col("UnitPrice") > 2
col2=col("StockCode")==71053
sdf.where(sdf.CustomerID.isin("1890")).where(col1|col2).show()
=======================withColumn==================Return df with new column name along with other columsn of same df==========
>>> sdf.withColumn("Quantity",expr("UnitPrice==2.55")).show()
>>> sdf.withColumn("Quantity",expr("UnitPrice==2.55")).filter("Quantity").show()
>>> sdf.withColumn("Quantity",expr("UnitPrice==2.55")).filter("Quantity").select("InvoiceNo").show()
======alias==
sdf.selectExpr("InvoiceNo as inv").show()
=====groupBy==========
dfile.groupBy("InvoiceNo").count().show()
sdf.groupBy("InvoiceNo").agg(sum("UnitPrice")).sort("InvoiceNo").show()
==========Window function========
from pyspark.sql.window import Window
win=Window.partitionBy("CustomerID").orderBy("InvoiceNo")
win=Window.partitionBy("CustomerID").orderBy("InvoiceNo").rowsBetween(-1,1)
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(win)
sdf.select("StockCode",maxPurchaseQuantity).show()
>>> from pyspark.sql.types import *
>>>
>>> from pyspark.sql.types import Row
>>> fields=[StructField(field_name,StringType(),True) for field_name in schema1.split()]
>>> sch=StructType(fields)
>>> df=spark.createDataFrame(file,sch)
>>> df.describe()
DataFrame[summary: string, id: string, dcntry: string, ocntry: string, cnt: string]
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField,LongType
==================csv===========
sdf.write.format("csv").option("path","/home/data").save()
===append===
sdf.write.format("csv").mode("append").save("/home/data1")
sdf.write.format("csv").mode("append").save("/home/data1/")
sdf.write.format("csv").mode("append").option("sep","\t").save("/home/data1/file_test")
sdf.write.format("csv").mode("overwrite").option("sep","\t").option("codec","gzip").save("/home/data1/file_test")
===json==========
sdf.write.format("json").option("path","/home/data1/json").save()
===parquet====
sdf.write.format("parquet").option("path","/home/data1").save()
sdf=spark.read.format("csv").option("header","true").load('/home/data/flight_data')
spark.read.format("parquet").load('/home/data/data2/part*').show()
====================jdbc=================
pgDF = spark.read.format("jdbc").option("driver", "org.postgresql.Driver").option("url", "jdbc:postgresql://database_server")
.option("dbtable", "schema.tablename").option("user", "username").option("password", "my-secret-password").load()
===================Text file=======================
sdf=spark.read.format("csv").option("header","true").load('/home/data/flight_data')
sdf.select("ID").write.text("/home/data1/4.text")
================================partition======================
sdf.select("DEST_COUNTRY_NAME","count").write.partitionBy("count").text("/home/data1/par/")
sdf.limit(3).select("DEST_COUNTRY_NAME","count").write.mode("overwrite").partitionBy("count").text("/home/data1/par/")
===========================repartition========================
sdf.repartition(5).write.format("csv").save("/home/data1/repart")
==================create table ========================
create table temp(a string,b int) using csv options(header true,path '/user/hive/warehouse');
insert into temp values('ABCD',4545);
create table temp3 using csv (select * from flight);
create external table flight(ID string,DEST_COUNTRY_NAME string,ORIGIN_COUNTRY_NAME string,count string) row format delimited fields terminated by ',' location '/home/data/flight';
============stuct type=====================
create or replace view temp2_v as select (ID,DEST_COUNTRY_NAME,count) as cnt from temp2;
cnt struct<ID:string,DEST_COUNTRY_NAME:string,c>>> from pyspark.sql.types import *
>>>
>>> from pyspark.sql.types import Row
>>> fields=[StructField(field_name,StringType(),True) for field_name in schema1.split()]
>>> sch=StructType(fields)
>>> df=spark.createDataFrame(file,sch)
>>> df.describe()
DataFrame[summary: string, id: string, dcntry: string, ocntry: string, cnt: string]
ount:string> NULL
select cnt.count,cnt.id from temp2_v
select cnt.* from temp2_v;
=============List==============
create or replace view temp3_v as select ID,collect_list(count) as cnt from temp2 group by ID;
select explode(cnt),ID from temp3_v;
=====================================
from pyspark.sql.types import *
file=sc.textFile('/python/data1/flight_data3').map(lambda x:x.split(','))
fields="ids d_cntry o_cntry cnt"
sch=StructType([StructField(field_name,StringType(),True) for field_name in fields.split()])
df=spark.createDataFrame(file,sch)
df.select("ids","d_cntry","o_cntry","cnt").show()
=============================schema load========================
>>> schema_uscrunch=StructType([
... StructField('permalink',StringType(),True),\
... StructField('company',StringType(),True),\
... StructField('numEmps',IntegerType(),True),\
... StructField('category',StringType(),True),\
... StructField('state',StringType(),True),\
... StructField('fundedDate',DateType(),True),\
... StructField('raisedAmt',DecimalType(),True),\
... StructField('raisedCurrency',StringType(),True),\
... StructField('round',StringType(),True)])
>>> file=spark.read.format("csv").load('/dhiru/TechCrunchcontinentalUSA.csv',schema=schema_uscrunch)
>>> file.printSchema()
root
|-- permalink: string (nullable = true)
|-- company: string (nullable = true)
|-- numEmps: integer (nullable = true)
|-- category: string (nullable = true)
|-- state: string (nullable = true)
|-- fundedDate: date (nullable = true)
|-- raisedAmt: decimal(10,0) (nullable = true)
|-- raisedCurrency: string (nullable = true)
|-- round: string (nullable = true)
##Write file on Local directory in Parquet format
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext,HiveContext
sqlContext = SQLContext(sc)
from pyspark.sql.functions import window, column, desc, col
from pyspark.sql import Row
from pyspark.sql.functions import expr
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField,LongType
=====================================
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
fdata=spark.read.option("inferSchema","true").option("header","true").text("/python/data1/flight_data.txt")
>>> fdata.take(3)
[Row(value=u'DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count'), Row(value=u'United States,Romania,15'), Row(value=u'United States,Croatia,1')]
fdata=spark.read.option("inferSchema","true").option("header","true").csv("/python/data1/flight_data")
>>> fdata.collect()
[Row(value=u'DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count'), Row(value=u'United States,Romania,15'), Row(value=u'United States,Croatia,1'), Row(value=u'United States,Ireland,344')]
>>> fdata.count()
4
>>> ftab=fdata.createOrReplaceTempView("ftable")
>>> sql_q1=spark.sql("select DEST_COUNTRY_NAME,count(1) from ftable group by DEST_COUNTRY_NAME")
>>>sql_q1.show()
>>> sql_q1=spark.sql("select DEST_COUNTRY_NAME,count(1) from ftable group by DEST_COUNTRY_NAME")
>>> sql_q1.show()
+-----------------+--------+
|DEST_COUNTRY_NAME|count(1)|
+-----------------+--------+S
| United States| 3|
+-----------------+--------+
>>>withdf_data=fdata.groupBy("DEST_COUNTRY_NAME").count()100
>>> withdf_data.show()
+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
| United States| 3|
+-----------------+-----+
maxSql = spark.sql("SELECT ORIGIN_COUNTRY_NAMe, sum(count) as destination_total FROM data_v GROUP BY ORIGIN_COUNTRY_NAMe ORDER BY sum(count) DESC LIMIT 5")
==Same above query With DF==
df1=fdata.groupBy("ORIGIN_COUNTRY_NAMe").sum("count").withColumnRenamed("sum(count)", "destination_total").sort("destination_total").limit(5).show()
=================================
sdf=spark.read.format("csv").option("header","true").load("/home/data/retail")
sdf.select("InvoiceNo","CustomerID","Description").show()
========Show records in Row format===============
myrow=sdf.select("InvoiceNo","CustomerID","Description").collect()
myrow[1]
myrow[2]
sdf_data=sdf.selectExpr("CustomerID","(UnitPrice*Quantity) as total_price","InvoiceDate").groupBy(col("CustomerID"),window(col("InvoiceDate"),"1 day")).sum("total_price").limit(2)
sdf2_data=sdf2.selectExpr("CustomerID","(UnitPrice*Quantity) as total_price","InvoiceDate").where("CustomerID=940" and "InvoiceDate>='2010-12-01'")
When more conditionin where.
>>> sdf2_data=sdf2.selectExpr("CustomerID","(UnitPrice*Quantity) as total_price","InvoiceDate").where("CustomerID=940").where( "InvoiceDate>='2010-12-01'").where( "InvoiceDate<='2010-12-02'")
=====================================json file======================================
jfile=spark.read.format("json").load("/home/data/ex1.json")
jfile.printSchema()
root
|-- fruit: string (nullable = true)
|-- size: string (nullable = true)
===============with tempview====================
jfile.createOrReplaceTempView("jtable")
spark.sql("select fruit,size from jtable").show()
=============with DF==========
jfile.select("fruit","size")
>>> jfile.select("fruit","size")
DataFrame[fruit: string, size: string]
>>> jfile.select("fruit","size").show()'
+-----+-----+
|fruit| size|
+-----+-----+
|Apple|Large|
+-----+-----+
jfile2=spark.read.format("json").option("multiLine", true).option("mode", "PERMISSIVE").load("/home/data/ex.json")
jfile3=spark.read.format("json").load("/home/data/ex.json",multiLine=True)
jfile3.printSchema()
==================with DF===========
jfile3.select("quiz.sport.q1.answer").show()
===============with Temp View table==========
jfile3.createOrReplaceTempView("jtable")
spark.sql("select quiz.sport.q1.answer from jtable").show()
====================explode=============
>>> from pyspark.sql.functions import window, column, desc, col,explode
>>> jfile3.select(explode("options").alias("more_options"),"question").show()
======filter======
>>> sdf.filter("CustomerID==17").show()
col1=col("UnitPrice") > 2
col2=col("StockCode")==71053
sdf.where(sdf.CustomerID.isin("1890")).where(col1|col2).show()
=======================withColumn==================Return df with new column name along with other columsn of same df==========
>>> sdf.withColumn("Quantity",expr("UnitPrice==2.55")).show()
>>> sdf.withColumn("Quantity",expr("UnitPrice==2.55")).filter("Quantity").show()
>>> sdf.withColumn("Quantity",expr("UnitPrice==2.55")).filter("Quantity").select("InvoiceNo").show()
======alias==
sdf.selectExpr("InvoiceNo as inv").show()
=====groupBy==========
dfile.groupBy("InvoiceNo").count().show()
sdf.groupBy("InvoiceNo").agg(sum("UnitPrice")).sort("InvoiceNo").show()
==========Window function========
from pyspark.sql.window import Window
win=Window.partitionBy("CustomerID").orderBy("InvoiceNo")
win=Window.partitionBy("CustomerID").orderBy("InvoiceNo").rowsBetween(-1,1)
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(win)
sdf.select("StockCode",maxPurchaseQuantity).show()
>>> from pyspark.sql.types import *
>>>
>>> from pyspark.sql.types import Row
>>> fields=[StructField(field_name,StringType(),True) for field_name in schema1.split()]
>>> sch=StructType(fields)
>>> df=spark.createDataFrame(file,sch)
>>> df.describe()
DataFrame[summary: string, id: string, dcntry: string, ocntry: string, cnt: string]
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField,LongType
==================csv===========
sdf.write.format("csv").option("path","/home/data").save()
===append===
sdf.write.format("csv").mode("append").save("/home/data1")
sdf.write.format("csv").mode("append").save("/home/data1/")
sdf.write.format("csv").mode("append").option("sep","\t").save("/home/data1/file_test")
sdf.write.format("csv").mode("overwrite").option("sep","\t").option("codec","gzip").save("/home/data1/file_test")
===json==========
sdf.write.format("json").option("path","/home/data1/json").save()
===parquet====
sdf.write.format("parquet").option("path","/home/data1").save()
sdf=spark.read.format("csv").option("header","true").load('/home/data/flight_data')
spark.read.format("parquet").load('/home/data/data2/part*').show()
====================jdbc=================
pgDF = spark.read.format("jdbc").option("driver", "org.postgresql.Driver").option("url", "jdbc:postgresql://database_server")
.option("dbtable", "schema.tablename").option("user", "username").option("password", "my-secret-password").load()
===================Text file=======================
sdf=spark.read.format("csv").option("header","true").load('/home/data/flight_data')
sdf.select("ID").write.text("/home/data1/4.text")
================================partition======================
sdf.select("DEST_COUNTRY_NAME","count").write.partitionBy("count").text("/home/data1/par/")
sdf.limit(3).select("DEST_COUNTRY_NAME","count").write.mode("overwrite").partitionBy("count").text("/home/data1/par/")
===========================repartition========================
sdf.repartition(5).write.format("csv").save("/home/data1/repart")
==================create table ========================
create table temp(a string,b int) using csv options(header true,path '/user/hive/warehouse');
insert into temp values('ABCD',4545);
create table temp3 using csv (select * from flight);
create external table flight(ID string,DEST_COUNTRY_NAME string,ORIGIN_COUNTRY_NAME string,count string) row format delimited fields terminated by ',' location '/home/data/flight';
============stuct type=====================
create or replace view temp2_v as select (ID,DEST_COUNTRY_NAME,count) as cnt from temp2;
cnt struct<ID:string,DEST_COUNTRY_NAME:string,c>>> from pyspark.sql.types import *
>>>
>>> from pyspark.sql.types import Row
>>> fields=[StructField(field_name,StringType(),True) for field_name in schema1.split()]
>>> sch=StructType(fields)
>>> df=spark.createDataFrame(file,sch)
>>> df.describe()
DataFrame[summary: string, id: string, dcntry: string, ocntry: string, cnt: string]
ount:string> NULL
select cnt.count,cnt.id from temp2_v
select cnt.* from temp2_v;
=============List==============
create or replace view temp3_v as select ID,collect_list(count) as cnt from temp2 group by ID;
select explode(cnt),ID from temp3_v;
=====================================
from pyspark.sql.types import *
file=sc.textFile('/python/data1/flight_data3').map(lambda x:x.split(','))
fields="ids d_cntry o_cntry cnt"
sch=StructType([StructField(field_name,StringType(),True) for field_name in fields.split()])
df=spark.createDataFrame(file,sch)
df.select("ids","d_cntry","o_cntry","cnt").show()
=============================schema load========================
>>> schema_uscrunch=StructType([
... StructField('permalink',StringType(),True),\
... StructField('company',StringType(),True),\
... StructField('numEmps',IntegerType(),True),\
... StructField('category',StringType(),True),\
... StructField('state',StringType(),True),\
... StructField('fundedDate',DateType(),True),\
... StructField('raisedAmt',DecimalType(),True),\
... StructField('raisedCurrency',StringType(),True),\
... StructField('round',StringType(),True)])
>>> file=spark.read.format("csv").load('/dhiru/TechCrunchcontinentalUSA.csv',schema=schema_uscrunch)
>>> file.printSchema()
root
|-- permalink: string (nullable = true)
|-- company: string (nullable = true)
|-- numEmps: integer (nullable = true)
|-- category: string (nullable = true)
|-- state: string (nullable = true)
|-- fundedDate: date (nullable = true)
|-- raisedAmt: decimal(10,0) (nullable = true)
|-- raisedCurrency: string (nullable = true)
|-- round: string (nullable = true)
##Write file on Local directory in Parquet format
from pyspark.sql import SparkSession
from compiler.pycodegen import EXCEPT
def alimatrik():
try:
spark= SparkSession.builder.appName('Altimatic_test').getOrCreate()
empdf=spark.read.format('csv').option('header','True').load('D:/projects/data/test/emp.csv')
deptdf=spark.read.format('csv').option('header','True').load('D:/projects/data/test/dept.csv')
empdf.write.parquet('/projects/data/test/parquet2/')
deptdf.show()
except:
print('error')
alimatrik()
No comments:
Post a Comment