1

First off, thanks to anyone that responds. SO has saved so much time in my programming projects. I appreciate it!

My code iterates through a huge dataframe. Here is an example code:

#len(df)=1,000,000
for i in range(1,len(df))
    df.iloc[i,1]=df.iloc[i,1]*40

NOTE*: my code is doing something far more complicated. The question is the same but I was afraid to post lines and lines of code. Essentially, I want to know how to multi-process over portions of a dataframe using this as an example.

I want to split up the processes using multiprocessing. I want a worker to do tasks 1-500,000 and the next worker to do 500,001-1,000,000. Here is my thought:

from multiprocessing import Pool

def jobs(jobstart,jobend):
#len(df)=1,000,000
    for i in range(jobstart,jobend):
        df.iloc[i,1]=df.iloc[i,1]*40


if __name__ == '__main__':
    p= multiprocessing.Pool(processes=2)
    results=p.starmap(jobs, [(1,500000),(500001,1000000)])
    p.close()

print(results)

How come this doesn't work? Error:

  File "C:/Files for Local/fisher_scrapper/frappy.py.py", line 238, in <module>
    results=p.starmap(jobs, [(0,50),(51,100)])

  File "C:\Users\TLCLA\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py", line 298, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()

  File "C:\Users\TLCLA\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py", line 683, in get
    raise self._value

JSONDecodeError: Expecting value
Tanner Clark
  • 631
  • 1
  • 8
  • 19
  • I think using multiprocessing would not be the right use case for this kind of task because your calculation depends on the previous row's value. Have you taken a look at the [`rolling`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.rolling.html) function btw? It seems like your task can be solved using `df['b'] = df['b'].rolling(window=2).sum()` if `b` is your second column of your `df`... And its much faster than using a for loop. – Scratch'N'Purr Feb 19 '19 at 15:03
  • Oh I am sorry. The task is way different than that. I was giving an example but I definitely see that problem. Check now. – Tanner Clark Feb 19 '19 at 15:04
  • 1
    Please include the full trace of the error. – noxdafox Feb 19 '19 at 15:23
  • 1
    You might need to wrap the code that's raising the exception in a try/except block. See [this SO answer](https://stackoverflow.com/a/29081637/) for example. – user1071847 Feb 19 '19 at 15:32
  • I will try it. Thanks – Tanner Clark Feb 19 '19 at 15:34
  • 1
    Gotcha, in that case, to answer why you're getting the error, the reason is that each thread needs to be able to access the df. While you have defined the `jobs` function to use the `df`, you also have to pass the `df` into the function or the thread will not know what `df` is. I can't say for certain but give the following a try. Change the `jobs` signature to `def jobs(jobstart, jobend, df):` and change starmap line to `results = p.starmap(jobs, [(1, 500000, df), (500001, 1000000, df)])`. Since both threads will need a separate copy of the df, you are incurring memory overhead. – Scratch'N'Purr Feb 19 '19 at 15:52
  • I don't think that worked. It still gave me a JSONDecodeError. – Tanner Clark Feb 19 '19 at 15:59
  • 1
    I also want to add that the point of using multiple threads is so that for loops are avoided. However in your function, you're still using for loops, which is kind of counterintuitive. What I would suggest is re-frame your problem as "how can I apply parallelism to each individual row of the df instead of 2 partitions of the df"? Your example actually just applies row-by-row calculations so re-framing your problem won't be difficult. You would then be able to reduce your problem to simply using `map`. An alternative is taking a look at [dask](https://docs.dask.org/en/latest/) – Scratch'N'Purr Feb 19 '19 at 16:06
  • Okay. That makes sense. The only issue is that each row runs to an API. 1000000 calls at once is a bad idea, correct? – Tanner Clark Feb 19 '19 at 16:10
  • 1
    Yea, that's a bad idea. Are you just making a single API call for each of the partitions when using `starmap`? – Scratch'N'Purr Feb 19 '19 at 16:17
  • No. API call for each row. So, each partition would run through a section of the dataframe and call each row when it got to it. I could put four computers next to eachother and run the code... haha – Tanner Clark Feb 19 '19 at 16:34
  • 1
    Can you have a look if this could be helpful: https://stackoverflow.com/questions/54796244/multiprocessing-on-a-model-with-data-frame-as-input/54810340#54810340 ? – Lukasz Tracewski Feb 21 '19 at 19:34
  • I actually found my answer. My IDLE was acting up. It worked perfectly normal when I ran it on my cmd. Thanks for everyone's input! – Tanner Clark Feb 21 '19 at 19:52

0 Answers0