Wednesday, June 5, 2019

Issue to write pyspark output to HDFS

Python file:

import sys
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

fdata=spark.read.option("inferSchema","true").option("header","true").csv("/python/data1/flight_data")
df1=fdata.groupBy("ORIGIN_COUNTRY_NAMe").sum("count").withColumnRenamed("sum(count)", "destination_total").sort("destination_total").limit(5)
df1.write.csv("hdfs://home/dheerendra/Working/data4")

Problem :

Traceback (most recent call last):
  File "/home/dheerendra/Working/Spark_Testing/test.py", line 8, in <module>
    df1.write.csv("hdfs://home/dheerendra/Working/data4")
AttributeError: 'NoneType' object has no attribute 'write'
Exception in thread Thread-1 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
  File "/home/dheerendra/anaconda2/lib/python2.7/threading.py", line 801, in __bootstrap_inner
  File "/home/dheerendra/anaconda2/lib/python2.7/threading.py", line 754, in run
  File "/home/dheerendra/anaconda2/lib/python2.7/SocketServer.py", line 236, in serve_forever
  File "/home/dheerendra/anaconda2/lib/python2.7/threading.py", line 585, in set
  File "/home/dheerendra/anaconda2/lib/python2.7/threading.py", line 407, in notifyAll
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
19/06/05 19:52:50 INFO spark.SparkContext: Invoking stop() from shutdown hook
19/06/05 19:52:50 INFO server.AbstractConnector: Stopped Spark@3e200ee7{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
19/06/05 19:52:50 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.0.103:4041
19/06/05 19:52:50 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/06/05 19:52:51 INFO memory.MemoryStore: MemoryStore cleared
19/06/05 19:52:51 INFO storage.BlockManager: BlockManager stopped
19/06/05 19:52:51 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
19/06/05 19:52:51 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/06/05 19:52:51 INFO spark.SparkContext: Successfully stopped SparkContext
19/06/05 19:52:51 INFO util.ShutdownHookManager: Shutdown hook called
19/06/05 19:52:51 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-9ab8d10b-a1bf-443c-ae41-8cdb73255321
19/06/05 19:52:51 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-9ab8d10b-a1bf-443c-ae41-8cdb73255321/pyspark-0ec565f1-5fa5-4d19-a959-1411df247250
19/06/05 19:52:51 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-bcb4c0f9-8aa4-468e-a29e-d3b167972557

Solution:

Write the file by including three slash '///' instead of two '//'
df1.write.csv("hdfs:///home/dheerendra/Working/data4")

Now execute the script again:


dheerendra@dheerendra-PC ~/Software/spark $ bin/spark-submit --master local /home/dheerendra/Working/Spark_Testing/test.py

You will get the result in output directory hdfs or local depend on mode.
In my case it is on cluster.

dheerendra@dheerendra-PC ~/Software/hadoop-2.9 $ hadoop fs -ls /home/dheerendra/Working/data4/
Found 2 items
-rw-r--r--   1 dheerendra supergroup          0 2019-06-05 20:33 /home/dheerendra/Working/data4/_SUCCESS
-rw-r--r--   1 dheerendra supergroup         33 2019-06-05 20:33 /home/dheerendra/Working/data4/part-00000-f173a056-3ead-494a-b872-62d4df37ee30-c000.csv
dheerendra@dheerendra-PC ~/Software/hadoop-2.9 $ hadoop fs -cat /home/dheerendra/Working/data4/part-00000-f173a056-3ead-494a-b872-62d4df37ee30-c000.csv
Croatia,1
Romania,15
Ireland,344




No comments:

Post a Comment