from pyspark.sql.types import *
from pyspark.sql.functions import split
lst=[ "col_1, col_2, col_3",
"1, ABC, Foo1",
"2, ABCD, Foo2",
"3, ABCDE, Foo3",
"4, ABCDEF, Foo4",
"5, DEF, Foo5",
"6, DEFGHI, Foo6",
"7, GHI, Foo7",
"8, GHIJKL, Foo8",
"9, JKLMNO, Foo9",
"10, MNO, Foo10"]
full_csv = sc.parallelize(lst)
headers= full_csv.zipWithIndex().filter(lambda x : x[1]==0).map(lambda x : x[0])
header_cols=headers.take(1)
fields=[StructField(field_name,StringType(),True) for field_name in header_cols[0].split(', ')]
file_schema=StructType(fields)
lst_values= full_csv.zipWithIndex().filter(lambda x : x[1]>0).map(lambda x : x[0])
rdd_val=lst_values.map(lambda x:x.split(', '))
df=spark.createDataFrame(rdd_val,file_schema)
df.show()
===================================================
Case2 : We have text file as given below. We have to create Dataframe
File name - test4.txt
id name age
1 ram1 12
2 ram2 23
3 ram3 24
rdd_file=sc.textFile('D:/Documents/data/test4.txt')
Now we need to take header of file by using zipWithIndex() which assign index to each row.
headers=rdd_file.zipWithIndex()
>>> rdd_file.zipWithIndex().collect()
[('id name age', 0), ('1 ram1 12', 1), ('2 ram2 23', 2), ('3 ram3 24', 3)]
Now we need to separate out header (first) row and remaining data row using filter function
headers=rdd_file.zipWithIndex().filter(lambda x:x[1]==0)
data=rdd_file.zipWithIndex().filter(lambda x:x[1]!=0)
Now header is on top(0 index) so
headers.collect()
[('id name age', 0)]
Now we need to take only headers from index 0
>>> headers_val=headers.map(lambda x:x[0])
>>> headers_val.collect()
[['id', 'name', 'age']]
Similary for data:
data_val=data.map(lambda x:x[0].split(' '))
data_val.collect()
[['1', 'ram1', '12'], ['2', 'ram2', '23'], ['3', 'ram3', '24']]
Now we need to put datatypes on these hearders using flatMap which split each value of headers_val
and which is delimited by space(' ').Here we have list that's we need iterator to take header element from list that's why use collect.
Note : To split data we have use map but to split headers we use flatMap becoz data have multiple list(array) with in one list which represent the rows of data whereas in header we have single list and we need to assign datatype to each element that's why use flatMap.
fields=[StructField(hds_fld ,StringType(),True) for hds_fld in headers_val.flatMap(lambda x:x.split(' ')).collect()]
Now create schema :
schema=StructType(fields)
df=spark.createDataFrame(data_val,schema)
>>> df.show()
+---+----+---+
| id|name|age|
+---+----+---+
| 1|ram1| 12|
| 2|ram2| 23|
| 3|ram3| 24|
+---+----+---+
No comments:
Post a Comment