1

I have a pyspark dataframe with millions of records and hundreds of columns (an example below)

clm1, clm2, clm3
code1,xyz,123
code2,abc,345
code1,qwe,456

I want to do divide it into multiple dataframes based on clm1 i.e. separate dataframe for clm1=code1 and separate dataframe for clm1=code2 and so on and then process them and write the result in separate files. I want to perform this operation in parallel to speed up the process. I am using below code:

S1 = myclass("code1")
S2 = myclass("code2")


t1 = multiprocessing.Process(target=S1.processdata,args=(df,))
t2 = multiprocessing.Process(target=S2.processdata,args=(df,))
t1.start()
t2.start()

t1.join()
t2.join()

but i am getting below error

Method __getstate__([]) does not exist

If I use threading.Thread instead of multiprocessing.Process it is working fine but that doesn't seem to reduce the overall time

user0204
  • 231
  • 3
  • 18

1 Answers1

2

About error

Method getstate([]) does not exist

It's a py4j.Py4JException. You have this error with multiprocessing.Process because this module uses processes. On the other hand threading.Thread uses threads which use the same memory, so they can share the the dataframe object.

Take a also a look in that SO question-answer: Multiprocessing vs Threading Python


General advice

I understand that maybe you are new, to Spark world and I suggest you my solution for your problem. You asked how to do multiprocessing, but if you have Spark maybe this is not a best practice.

You have Spark-a framework for parallel processing, you don't need to parallelize manually your task.

Spark has designed for parallel computing in a cluster, but it works extremely nice in a large single node. Multiprocessing library is useful in Python computation tasks, in Spark/Pyspark all the computations run in parallel in JVM.

In python_code.py

import pyspark.sql.functions as f


# JOB 1
df1 = df.filter(f.col('clm1')=='code1')
... many transformations
df1.write.format('..')..

# JOB 2
df2 = df.filter(f.col('clm1')=='code2')
... many transformations
df2.write.format('..')..

And then run this code with spark-submit by using all your cores (* = all cores)

# Run application locally on all cores
./bin/spark-submit --master local[*] python_code.py

With this approach, you use the Spark power. The jobs will be executed sequentially BUT you will have: CPU utilization all the time <=> parallel processing <=> lower computation time

ggeop
  • 1,230
  • 12
  • 24
  • I am getting below error after 1 hour of processing when it starts writing files `No space left on device ERROR TaskMemoryManager: error while calling spill()` – user0204 Jan 31 '20 at 09:54
  • Ok this ERROR, it's a Linux error. You don't have space in your machine. You can run 'du -sh' in your bash and see the available space. Also this, error is independent with the question. The answer was about the design, this error and maybe other errors that will maybe occur are implementation details, that will answer in different questions if they are not exist in Stackoverflow – ggeop Jan 31 '20 at 10:17
  • @user0204 about the design, are you ok? have understand the concept behind Pyspark (Spark in general)? – ggeop Jan 31 '20 at 10:21
  • but i am writing files on s3 with unlimited space and not on the the same machine on which i am running my script, then why i am receiving this error? I understood the concept but I had to try this before accepting your answer. – user0204 Feb 05 '20 at 07:04
  • @user0204 you have right I didn't explain why you have this error :-) Because it was not a best practice in I target my answer in Spark. Now I updated my answer about your error. – ggeop Feb 05 '20 at 08:01
  • Thanks for explaining that. Now can you please tell me why I am getting no disk left in space on linux? as I am not writing files on linux I am writing files on s3 which has unlimited space. Sorry if that question sounds dumb but I am trying to understand it. – user0204 Feb 05 '20 at 08:06
  • @user0204 Take a look on that question, is the same with your question: https://stackoverflow.com/questions/25707784/why-does-a-job-fail-with-no-space-left-on-device-but-df-says-otherwise – ggeop Feb 05 '20 at 08:09
  • I tried running your solution on a small dataset but it seems to be running sequentially. seconds started when first finished, – user0204 Feb 05 '20 at 08:32
  • You can make other question with this problem, because it's a different case – ggeop Feb 05 '20 at 08:53
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/207253/discussion-between-user0204-and-ggeop). – user0204 Feb 05 '20 at 09:14