1

I am using pyspark for processing data and generate some metrics (around 25/30). Generation of each of metrics independent of each other. Due to company contraints I am not able to paste the code. But my code flow is mentioned below

def metric1_job():
    some operations
    Write data from above df
def metric2_job()
    some operations
    Write data from above df
def metric3_job()
.
.
.
def metric25_job()
    some operations
    Write data from above df

if __name__ == "__main__":
Read Df 1
Read Df 2
Read Df 3
Read Df 4
Read Df 5

Some operations on above Df.
metric1_job(df1, df2, df3, df4, df5)
metric1_job(df1, df2, df3, df4, df5)
metric1_job(df1, df2, df3, df4, df5)
.
.
.
metric25_job(df1, df2, df3, df4, df5)

Now pyspark stop execution at the time of writting in each function and then start processing DAG in other function. All these functions are DAGs and not dependent on each other. One obvious solution is to split then in separate file and run as a separate job. But thats option is not available to me. Can someone tell me how can I make spark run these DAGs in parallel and also write in parallel as well.

Deeply appreciate any help. Due to serial processing above job is taking too much time

Thanks in advance

Manish

MANISH ZOPE
  • 1,181
  • 1
  • 11
  • 28
  • These are not real jobs. Are you caching things? Parallelism is inherent to SPARK and if I do a UNION of DFs, I can see clearly in DAG output the 4DF's used in parallel. I think you are going to source every time. Please confirm. – thebluephantom Aug 13 '18 at 07:43
  • No My dfs are cached. Please understand I am not doing any union. I know parallelism is there in spark. In my case every function is separate DAG which saves data to different locations. But it waits when we save data to df.write.format("com.databricks.spark.avro").save(output_location) – MANISH ZOPE Aug 13 '18 at 10:14
  • It was just an example the UNION. Interesting. How did you start the JOB? Is the link provided useful? I ask as I am more of a SCALA person. Not sure what to make of it, but it would help if you showed code, but that is not possible. Success – thebluephantom Aug 13 '18 at 10:18
  • No it was not that useful. With scala I am not facing the issue. I am facing the issue with pyspark which is hard requirement considering external factors. Does calling this function multiprocess helps ? – MANISH ZOPE Aug 13 '18 at 10:21
  • but how do you submit the job? I have only ever used pyspark in spark-shell, not with YARN – thebluephantom Aug 13 '18 at 10:25

1 Answers1

1

Although most Spark actions synchronous in nature, Like if we perform two actions one after other they always execute in sequentially like one after other. There are few actions that can be executed asynchronously.

In certain scenarios we can execute different asynchronous actions on different RDDs simultaneously when the resources of a Spark cluster are not being utilized completely.

There are async actions like: countAsync, collectAsync, takeAsync, foreachAsync and foreachPartitionAsync.

In your case, you can implement your logic in foreachPartitionAsync.

Reference: https://forums.databricks.com/questions/2119/how-do-i-process-several-rdds-all-at-once.html

and

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.AsyncRDDActions

Lakshman Battini
  • 1,842
  • 11
  • 25