4

I just got introduced to this wonderful world of Big Data and Cloud technology, using GCP(dataproc) and pyspark. I have ~5 GB size JSON file(zipped, gz file) containing ~5 million records, I need to read each row and process only those rows which satisfies a certain condition. I have my working code and I issued a spark-submit with --num-partitions=5 but still only one worker is used to carry out the action.

This is the spark-submit command I am using:

spark-submit --num-executors 5 --py-files /home/user/code/dist/package-0.1-py3.6.egg job.py

job.py:

path = "gs://dataproc-bucket/json-files/data_5M.json.gz"
mi = spark.read.json(path)
inf_rel = mi.select(mi.client_id,
                    mi.user_id,
                    mi.first_date,
                    F.hour(mi.first_date).alias('hour'),
                    mi.notes).rdd.map(foo).filter(lambda x: x)
inf_relevance = inf_rel.map(lambda l: Row(**dict(l))).toDF()
save_path = "gs://dataproc-bucket/json-files/output_5M.json"
inf_relevance.write.mode('append').json(save_path)
print("END!!")

Dataproc config: (I am using the free account for now, once I get working solution will add more cores and executors)

(Debian 9, Hadoop 2.9, Spark 2.4) Master node:2 vCPU, 7.50 GB memory Primary disk size: 32 GB 5 Worker nodes: 1 vCPU, 3.75 GB memory Primary disk type: 32 GB

After spark-submit I can see in web UI that 5 executors were added but then only 1 executor remains active and perform all task and rest 4 are released. enter image description here

I did my research and most of the questions talk about accessing data via JDBC.

Please suggest what I am missing here.

P.S. Eventually I would read 64 json files of 5 GB each, so might use 8 core * 100 workers.

ramd
  • 347
  • 2
  • 12
  • 1
    How many partitions do you have on your dataframe? check ```df.rdd.getNumPartitions()``` ... I have not used this google dataproc thing however when reading data from jdbc it defaults to 1 partition because it is single thread. I bet your ```dataframe``` is 1 partition = 1 task = 1 core being used and in this case only on a single machine because nothing is being parallelized. – thePurplePython Jun 13 '19 at 01:33
  • Thanks, I now increased the CPU count to 2 X 3 workers, and also repartitioned it to 10, but still no improvements. One more important thing I forgot to mention is that this json file is zipped file (.gz) format. Not sure if that is causing any issues. – ramd Jun 14 '19 at 12:17
  • 1
    yes that is a problem ... gzip format is non splittable ... possible duplicate to a solution to your question here => https://stackoverflow.com/questions/40492967/dealing-with-a-large-gzipped-file-in-spark – thePurplePython Jun 14 '19 at 18:00

2 Answers2

1

Your best bet is to preprocess the input. Given a single input file, spark.read.json(... will create a single task to read and parse the JSON data as Spark cannot know ahead of time how to parallelize it. If your data is in line-delimited JSON format (http://jsonlines.org/), the best course of action would be to split it into manageable chunks beforehand:

path = "gs://dataproc-bucket/json-files/data_5M.json"
# read monolithic JSON as text to avoid parsing, repartition and *then* parse JSON
mi = spark.read.json(spark.read.text(path).repartition(1000).rdd)
inf_rel = mi.select(mi.client_id,
                   mi.user_id,
                   mi.first_date,
                   F.hour(mi.first_date).alias('hour'),
                   mi.notes).rdd.map(foo).filter(lambda x: x)
inf_relevance = inf_rel.map(lambda l: Row(**dict(l))).toDF()
save_path = "gs://dataproc-bucket/json-files/output_5M.json"
inf_relevance.write.mode('append').json(save_path)
print("END!!")

Your initial step here (spark.read.text(...) will still bottleneck as a single task. If your data isn't line-delimited or (especially!) you anticipate you will need to work with this data more than once, you should figure out a way to turn your 5GB JSON file into 1000 5MB JSON files before getting Spark involved.

Charlie Flowers
  • 1,287
  • 7
  • 12
  • The file is gz zipped file and in production, I would have to read 64 such json zipped files each around 5GB size, do you think chopping them further would help here? – ramd Jun 14 '19 at 12:43
  • 1
    Yes. More files = more parallelism = less parsing overhead. – Charlie Flowers Jun 14 '19 at 14:59
0

.gz files are not splittable, so they're read by one core and placed onto a single partition.

see Dealing with a large gzipped file in Spark for reference.

Mansweet
  • 161
  • 7