Saturday, March 14, 2020

Transformations of pyspark

from pyspark import SparkContext
from _ctypes import Union
sc=SparkContext()
class c:
 
    def map1(self):
        a="Hi My name is Hi"
        b=a.split(' ')
        rd1=sc.parallelize(b)
        #rd2=rd1.map(lambda  x:(x,1)).groupByKey().mapValues(sum)
        rd2=rd1.map(lambda  x:(x.upper()))
        print(rd2.collect())
       
    def flatmap(self):
        c=[(2,3),{"a":4},2]
        rd2=sc.parallelize(c)
        rd3=rd2.flatMap(lambda x:(x,2))
        print(rd3.collect())
       
    def filter(self):
        c=[(2,3),{"a":4},2]
        rd2=sc.parallelize(c)
        rd3=rd2.filter(lambda x:(x==2))
        print(rd3.collect()) 
       
    def mappartition(self):
        def fun1(val):
             yield sum(val)       
        lst=[1,2,3,4,5,5,6,7,8,9]
        rdd1=sc.parallelize(lst,3)
        rd=rdd1.mapPartitions(fun1).collect()
        print(rd)
   
    def mappartitionwithindx(self):
        def func(index,val):
            yield str(index)+' '+str(list(val))
           
        lst=[11,10.9,8,7,6.6,5,4,.4]
        rd1=sc.parallelize(lst,3)
       
        rd2=rd1.mapPartitionsWithIndex(func).collect()
        print('I am in if cond---->',rd2)
       
    def groupByKey1(self):
        st=[("a",1),("b",2),("a",3),("c",4),("d",5)]
        rd=sc.parallelize(st)
        rd2=rd.groupByKey().map(lambda x:(x[0],sum(list(x[1]))))
        print(rd2.collect())
       
    def reducebykey(self):
            st=[("a",1),("b",2),("a",3),("c",4),("d",5)]
            rd=sc.parallelize(st)
            rd2=rd.reduceByKey(lambda x,y:x+y)
            print(rd2.collect())
           
    def sample_trns(self):
        val=[2,3,4,5,5,66,7,6,7,]
        rd=sc.parallelize(val)
        rd2=rd.sample(True,.2)
        print(rd2.collect())
       
    def union_trns(self):
        a=[1,2,3,4]
        b=[343,44,55]
        rd1=sc.parallelize(a)
        rd2=sc.parallelize(b)
        uni_rd=rd1.union(rd2)
        print(uni_rd.collect())
       
    def interestion_trns(self):
        a=[1,2,3,4]
        b=[2,3]
        rd1=sc.parallelize(a)
        rd2=sc.parallelize(b)
        uni_rd=rd1.intersection(rd2)
        print(uni_rd.collect()) 
       
    def aggrbykey(self):
        val=[('a',1),('a',2),('b',3),('a',4),('order1',23.3),('order1',24.5),('order2',29.3)]
        rd=sc.parallelize(val)
        rdd1=rd.aggregateByKey((0,0),lambda x,y:(x[0]+1,x[1]+y),
                               lambda x1,y1:(x1[0]+y1[0],x1[1]+y1[1]))
        print(rdd1.collect())
        print(rdd1.sortByKey().collect())
       
    def join_fun(self):
        d1=[('a',1),('b',2),('a',4)]
        d2=[('a',300),('b',56)]
       
        d1_rd=sc.parallelize(d1)
        d2_rd=sc.parallelize(d2)
        d1jnd2=d1_rd.join(d2_rd)
        print(d1jnd2.collect())
       
         
if __name__=="__main__":
    obj1=c()
    obj1.flatmap()
    obj1.filter()
    obj1.mappartition()
    obj1.mappartitionwithindx()
    obj1.groupByKey1()
    obj1.reducebykey()
    obj1.sample_trns()
    obj1.union_trns()
    obj1.interestion_trns()
    obj1.aggrbykey()
    obj1.join_fun()
    

No comments:

Post a Comment