Saturday, July 20, 2019

Rdd in pyspark

#Create dataframe from RDD

from pyspark.sql import Row

a=[1,2,3,4,5]
rdd=sc.parallelize(a)

row_of_rdd=rdd.map(Row)

df=row_of_rdd.toDF(['column_name'])
df.show()

========================================================================
#Word Count with RDD with .txt file and filter data

Example :- testfile.txt
this is this is a where is the master


f1=sc.textFile('/Documents/data/data4/testfile.txt')

 f1.map(lambda x:x.split(' ')).flatMap(lambda x:x).map(lambda x:(x,1)).reduceByKey(lambda x,y:(x+y)).filter(lambda x:x[1]==1).collect()




=========================Examples of RDD==========================
st ="Spark The Definitive Guide : Big Data Processing Made Simple : Spark Guide Big data Performance too".split(" ")
words = sc.parallelize(st, 2)


statement ="Please do hard and smart work to achieve the goal of your life".split(" ")
words2 = sc.parallelize(statement, 2)

=======================with map()===========================
statement ="Spark The Definitive Guide : Big Data Processing Made Simple : Spark Guide Big data Performance too".split(" ")
words = sc.parallelize(statement, 2).map(lambda x:(x,1))


statement ="Please do hard and smart work to achieve the goal of your life".split(" ")
words2 = sc.parallelize(statement, 2).map(lambda x:(x,1))

======map================
words.map(lambda words:(words.lower(),1)).collect()
>>> words.map(lambda words:(words.lower(),1)[0]).collect()
['spark', 'the', 'definitive', 'guide', ':', 'big', 'data', 'processing', 'made', 'simple']

======create keys===========
>>> keys=words.keyBy(lambda words:words.lower())
>>> keys.collect()
[('spark', 'Spark'), ('the', 'The'), ('definitive', 'Definitive'), ('guide', 'Guide'), (':', ':'), ('big', 'Big'), ('data', 'Data'), ('processing', 'Processing'), ('made', 'Made'), ('simple', 'Simple')]

====================map Values with key=============
>>> keys.mapValues(lambda words:words.lower()).collect()
[('s', 'spark'), ('t', 'the'), ('d', 'definitive'), ('g', 'guide'), (':', ':'), ('b', 'big'), ('d', 'data'), ('p', 'processing'), ('m', 'made'), ('s', 'simple')]


==================flatMapvalues===========
>>> keys.flatMapValues(lambda words:words.lower()).collect()
[('s', 's'), ('s', 'p'), ('s', 'a'), ('s', 'r'), ('s', 'k'), ('t', 't'), ('t', 'h'), ('t', 'e'), ('d', 'd'), ('d', 'e'), ('d', 'f'), ('d', 'i'), ('d', 'n'), ('d', 'i'), ('d', 't'), ('d', '...............

==================collect key value seperatly===================
>>> keys.keys().collect()
['s', 't', 'd', 'g', ':', 'b', 'd', 'p', 'm', 's']
>>> keys.values().collect()
['Spark', 'The', 'Definitive', 'Guide', ':', 'Big', 'Data', 'Processing', 'Made', 'Simple']

====================groupBYKey()========================
statement ="Spark The Definitive Guide : Big Data Processing Made Simple : Spark Guide Big data Performance too".split(" ")
words = sc.parallelize(statement)
words.map(lambda x:(x,1)).groupByKey().mapValues(sum).collect()

[('Simple', 1), ('data', 1), ('Data', 1), ('Performance', 1), ('Big', 2), ('too', 1), ('Guide', 2), ('Definitive', 1), ('Spark', 2), ('The', 1), (':', 2), ('Processing', 1), ('Made', 1)]

====================reducebykey=====================
words.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collect()

========================join RDD===================

r1=sc.parallelize([1,2,3]).map(lambda x:(x,1))
r2=sc.parallelize([2]).map(lambda x:(x,1))
r1.join(r2).collect()
[(2, (1, 1))]

========================zip================
>>> r1=sc.parallelize(range(10))
>>> r2=sc.parallelize(range(10))
>>> r1.zip(r2).collect()
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]


=======================repartitionAndSortWithinPartitions===========================
>>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
>>> rdd2=rdd.repartitionAndSortWithinPartitions(2,lambda x:x,1)
>>> rdd2.collect()
[(0, 5), (0, 8), (2, 6), (1, 3), (3, 8), (3, 8)]


=======================braodcast variable=================

statement ="Spark The Definitive Guide : Big Data Processing Made Simple : Spark Guide Big data Performance too".split(" ")
bv=sc.broadcast(statement)
bv.value
['Spark', 'The', 'Definitive', 'Guide', ':', 'Big', 'Data', 'Processing', 'Made', 'Simple', ':', 'Spark', 'Guide', 'Big', 'data', 'Performance', 'too']

b = [(1,(1, 4)), (2, (2, 5)), (3, (4,5))]
rdd_b = sc.broadcast(b)
rdd_b.value[0]

rd.map(lambda x:(x,bv.value)).collect()

========================accumulator=================

v= sc.accumulator(1)
v=v.value + 1


=========================aggregateBYkey==========================
>>> data = sc.parallelize([('a',1),('b',2),('a',2),('b',3),('c',4),('c',5)])
a=data.aggregateByKey(0,lambda acc,val:acc+val,lambda acc1,val1:acc1+val1)
>>> a.collect()
[('a', 3), ('c', 9), ('b', 5)]


No comments:

Post a Comment