-1

My requirement is to read the data from HDFS using pyspark, filter only required columns, remove the NULL values and then writing back the processed data to HDFS. Once the these steps are completed, we need to deleted the RAW Dirty data from HDFS. Here is my script for each operations .

Import the Libraries and dependencies

#Spark Version = > version 2.4.0-cdh6.3.1 

from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("example-pyspark-read-and-write").getOrCreate() 
import pyspark.sql.functions as F

Read the Data from HDFS

df_load_1 = sparkSession.read.csv('hdfs:///cdrs/file_path/*.csv', sep = ";") 

Select only the required columns

col = [ '_c0',  '_c1',  '_c2',  '_c3',  '_c5',  '_c7',  '_c8',  '_c9', '_c10', '_C11', '_c12', '_c13', '_c22', '_C32', '_c34', '_c38', '_c40',
   '_c43', '_c46', '_c47', '_c50', '_c52', '_c53', '_c54', '_c56', '_c57', '_c59', '_c62', '_c63','_c77', '_c81','_c83'] 

df1=df_load_1.select(*[col]) 

Check for NULL values and we have any remove them

df_agg_1 = df1.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df1.columns])

df_agg_1.show()

df1 = df1.na.drop()

Writing the pre-processed data to HDFS, same cluster but different directory

df1.write.csv("hdfs://nm/pyspark_cleaned_data/py_in_gateway.csv")

Deleting the original raw data from HDFS

def delete_path(spark , path):
    sc = spark.sparkContext
    fs = (sc._jvm.org
          .apache.hadoop
          .fs.FileSystem
          .get(sc._jsc.hadoopConfiguration())
          )
    fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

Executing below by passing the HDFS absolute path

delete_path(spark , '/cdrs//cdrs/file_path/')

pyspark and HDFS commands

I am able to do all the operations successfully from pyspark prompt .

Now i want to develop the application and submit the job using spark-submit

For example

spark-submit --master yarn --deploy-mode client project.py for local 

spark-submit --master yarn --deploy-mode cluster project.py for cluster

At this point i am stuck, i am not sure what parameter i am supposed to pass in place yarn in spark-submit. i am not sure whether simply copying and pasting all above commands and make .py file will help. I am very new to this technology.

James Z
  • 12,209
  • 10
  • 24
  • 44
Maverick
  • 19
  • 6
  • Where do you want to run your code? – pissall Nov 30 '19 at 05:51
  • Since i am completely new , i am not sure whether i should i run in clinet /cluster /local mode . we have Cluster configuration [ 1 namenode , 2 data nodes ] and data is on namenode , i also need to understand which mode would be advisable for this requirement – Maverick Nov 30 '19 at 05:57

1 Answers1

0

Basically your spark job will run on a cluster. Spark 2.4.4 supports yarn, kubernetes, mesos and spark-standalone cluster doc.

--master yarn specifies that you are submitting your spark job to a yarn cluster.

--deploy-mode specifies whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)

spark-submit --master yarn --deploy-mode client project.py for client mode 

spark-submit --master yarn --deploy-mode cluster project.py for cluster mode

spark-submit --master local project.py for local mode

You can provide other arguments while submitting your spark job like --driver-memory, --executor-memory, --num-executors etc check here.

wypul
  • 807
  • 6
  • 9
  • Thank you wypul but i am struggling to writing the project.py script , i need help on that , if you could help me writing the python.py script ,it would be very helpful – Maverick Nov 30 '19 at 05:54
  • @Maverick just copy-paste all the steps that you have mentioned above. It will work. For eg take a look at this sample script https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py – wypul Nov 30 '19 at 05:57