from pyspark import SparkContext
from _ctypes import Union
sc=SparkContext()
class c:
def map1(self):
a="Hi My name is Hi"
b=a.split(' ')
rd1=sc.parallelize(b)
#rd2=rd1.map(lambda x:(x,1)).groupByKey().mapValues(sum)
rd2=rd1.map(lambda x:(x.upper()))
print(rd2.collect())
def flatmap(self):
c=[(2,3),{"a":4},2]
rd2=sc.parallelize(c)
rd3=rd2.flatMap(lambda x:(x,2))
print(rd3.collect())
def filter(self):
c=[(2,3),{"a":4},2]
rd2=sc.parallelize(c)
rd3=rd2.filter(lambda x:(x==2))
print(rd3.collect())
def mappartition(self):
def fun1(val):
yield sum(val)
lst=[1,2,3,4,5,5,6,7,8,9]
rdd1=sc.parallelize(lst,3)
rd=rdd1.mapPartitions(fun1).collect()
print(rd)
def mappartitionwithindx(self):
def func(index,val):
yield str(index)+' '+str(list(val))
lst=[11,10.9,8,7,6.6,5,4,.4]
rd1=sc.parallelize(lst,3)
rd2=rd1.mapPartitionsWithIndex(func).collect()
print('I am in if cond---->',rd2)
def groupByKey1(self):
st=[("a",1),("b",2),("a",3),("c",4),("d",5)]
rd=sc.parallelize(st)
rd2=rd.groupByKey().map(lambda x:(x[0],sum(list(x[1]))))
print(rd2.collect())
def reducebykey(self):
st=[("a",1),("b",2),("a",3),("c",4),("d",5)]
rd=sc.parallelize(st)
rd2=rd.reduceByKey(lambda x,y:x+y)
print(rd2.collect())
def sample_trns(self):
val=[2,3,4,5,5,66,7,6,7,]
rd=sc.parallelize(val)
rd2=rd.sample(True,.2)
print(rd2.collect())
def union_trns(self):
a=[1,2,3,4]
b=[343,44,55]
rd1=sc.parallelize(a)
rd2=sc.parallelize(b)
uni_rd=rd1.union(rd2)
print(uni_rd.collect())
def interestion_trns(self):
a=[1,2,3,4]
b=[2,3]
rd1=sc.parallelize(a)
rd2=sc.parallelize(b)
uni_rd=rd1.intersection(rd2)
print(uni_rd.collect())
def aggrbykey(self):
val=[('a',1),('a',2),('b',3),('a',4),('order1',23.3),('order1',24.5),('order2',29.3)]
rd=sc.parallelize(val)
rdd1=rd.aggregateByKey((0,0),lambda x,y:(x[0]+1,x[1]+y),
lambda x1,y1:(x1[0]+y1[0],x1[1]+y1[1]))
print(rdd1.collect())
print(rdd1.sortByKey().collect())
def join_fun(self):
d1=[('a',1),('b',2),('a',4)]
d2=[('a',300),('b',56)]
d1_rd=sc.parallelize(d1)
d2_rd=sc.parallelize(d2)
d1jnd2=d1_rd.join(d2_rd)
print(d1jnd2.collect())
if __name__=="__main__":
obj1=c()
obj1.flatmap()
obj1.filter()
obj1.mappartition()
obj1.mappartitionwithindx()
obj1.groupByKey1()
obj1.reducebykey()
obj1.sample_trns()
obj1.union_trns()
obj1.interestion_trns()
obj1.aggrbykey()
obj1.join_fun()
from _ctypes import Union
sc=SparkContext()
class c:
def map1(self):
a="Hi My name is Hi"
b=a.split(' ')
rd1=sc.parallelize(b)
#rd2=rd1.map(lambda x:(x,1)).groupByKey().mapValues(sum)
rd2=rd1.map(lambda x:(x.upper()))
print(rd2.collect())
def flatmap(self):
c=[(2,3),{"a":4},2]
rd2=sc.parallelize(c)
rd3=rd2.flatMap(lambda x:(x,2))
print(rd3.collect())
def filter(self):
c=[(2,3),{"a":4},2]
rd2=sc.parallelize(c)
rd3=rd2.filter(lambda x:(x==2))
print(rd3.collect())
def mappartition(self):
def fun1(val):
yield sum(val)
lst=[1,2,3,4,5,5,6,7,8,9]
rdd1=sc.parallelize(lst,3)
rd=rdd1.mapPartitions(fun1).collect()
print(rd)
def mappartitionwithindx(self):
def func(index,val):
yield str(index)+' '+str(list(val))
lst=[11,10.9,8,7,6.6,5,4,.4]
rd1=sc.parallelize(lst,3)
rd2=rd1.mapPartitionsWithIndex(func).collect()
print('I am in if cond---->',rd2)
def groupByKey1(self):
st=[("a",1),("b",2),("a",3),("c",4),("d",5)]
rd=sc.parallelize(st)
rd2=rd.groupByKey().map(lambda x:(x[0],sum(list(x[1]))))
print(rd2.collect())
def reducebykey(self):
st=[("a",1),("b",2),("a",3),("c",4),("d",5)]
rd=sc.parallelize(st)
rd2=rd.reduceByKey(lambda x,y:x+y)
print(rd2.collect())
def sample_trns(self):
val=[2,3,4,5,5,66,7,6,7,]
rd=sc.parallelize(val)
rd2=rd.sample(True,.2)
print(rd2.collect())
def union_trns(self):
a=[1,2,3,4]
b=[343,44,55]
rd1=sc.parallelize(a)
rd2=sc.parallelize(b)
uni_rd=rd1.union(rd2)
print(uni_rd.collect())
def interestion_trns(self):
a=[1,2,3,4]
b=[2,3]
rd1=sc.parallelize(a)
rd2=sc.parallelize(b)
uni_rd=rd1.intersection(rd2)
print(uni_rd.collect())
def aggrbykey(self):
val=[('a',1),('a',2),('b',3),('a',4),('order1',23.3),('order1',24.5),('order2',29.3)]
rd=sc.parallelize(val)
rdd1=rd.aggregateByKey((0,0),lambda x,y:(x[0]+1,x[1]+y),
lambda x1,y1:(x1[0]+y1[0],x1[1]+y1[1]))
print(rdd1.collect())
print(rdd1.sortByKey().collect())
def join_fun(self):
d1=[('a',1),('b',2),('a',4)]
d2=[('a',300),('b',56)]
d1_rd=sc.parallelize(d1)
d2_rd=sc.parallelize(d2)
d1jnd2=d1_rd.join(d2_rd)
print(d1jnd2.collect())
if __name__=="__main__":
obj1=c()
obj1.flatmap()
obj1.filter()
obj1.mappartition()
obj1.mappartitionwithindx()
obj1.groupByKey1()
obj1.reducebykey()
obj1.sample_trns()
obj1.union_trns()
obj1.interestion_trns()
obj1.aggrbykey()
obj1.join_fun()
No comments:
Post a Comment