0

I'm following the answer from this question: pandas multiprocessing apply

Usually when I run a function on rows in pandas, I do something like this

dataframe.apply(lambda row: process(row.attr1, row.attr2, ...))

...

def process(attr1, attr2, ...):
    ...

But I want to run this function multithreaded. So I implemented the parallelize_on_rows from the above posted question. However, the aforementioned solution works because the function passed in doesn't take parameters. For functions with parameters I tried to use partials. However, I can't figure out how to create a partial that takes arguments from the row that require the lambda function to access.

Here is my code

def parallelize_function_on_df(self, data, func, num_of_processes=5):
    # refers to the data being split across array sections
    data_split = np.array_split(data, num_of_processes)

    # map a specific function to the array sections
    pool = Pool(num_of_processes)
    data = pd.concat(pool.map(func, data_split))

    # must call close before join, research why
    pool.close()
    pool.join()
    return data

def run_on_df_subset(self, func, data_subset):
    return data_subset.apply(func, axis=1)

def parallelize_on_rows(self, data, func, num_of_processes=5):
    return self.parallelize_function_on_df(data, partial(self.run_on_df_subset, func), num_of_processes)



def mass_download(some_sql):
    download_table_df = pd.read_sql(some_sql, con=MYSQL.CONNECTION)
    processed_data = {}
    custom_option = True
    process_row_partial = partial(self.process_candidate_row_parallel, processed_data, custom_option)

    parallelize_on_rows(download_table_df, process_row_partial)


def process_candidate_row_parallel(row, processed_data, custom_option=False):
    if row['some_attr'] in processed_data.keys() and processed_data[row['some_attr']] == 'download_successful' and custom_option:
        do_some_other_processing()

    download_single_file(row['some_attr1'], row['some_attr2'], processed_data)



So this doesn't work, because like I said, the row[attributes] aren't actually being passed to the thread since my partial just has the function with no arguments. How can I achieve otherwise?

frei
  • 495
  • 3
  • 19
  • 1
    Where is your code? _So I implemented the parallelize on rows._ Can you share the code for that, too? _However, this solution works because the function doesn't take parameters._ Why does it need to be applied to the rows of the DataFrame if it doesn't take any parameters? – AMC Jan 22 '20 at 02:31
  • to clarify, `parallelize_on_rows` refers to the solution in the question linked. perhaps my question is better thought of as "how do i apply multithreading to `dataframe.apply(lambda row; process(row.attr1, row.attr2))` " – frei Jan 22 '20 at 03:25
  • I checked out the linked answer, I remember it now. _perhaps my question is better thought of as "how do i apply multithreading to `dataframe.apply(lambda row; process(row.attr1, row.attr2))`"_ Is it? Can you clarify this part of your question: _However, the aforementioned solution works because the function passed in doesn't take parameters. For functions with parameters I tried to use partials. However, I can't figure out how to create a partial that takes arguments from the row that require the lambda function to access_. – AMC Jan 22 '20 at 04:03
  • I saw your comment in the linked post. It isn't clear to me why you would need to use partial in the first place, the example you gave certainly doesn't need partial. – AMC Jan 22 '20 at 04:04
  • well basically, if i do something like `process_row_partial = partial(process_specific_row)` then I don't really know how to get the function to call `process_specific_row` on attributes of the row. I don't want to raw access the row attributes inside of the function, i want them to be passed in as parameters, but if i were to do the former I would have no issue I'm pretty sure. – frei Jan 22 '20 at 05:24
  • I think I might understand where you’re struggling. I’m not at my computer though, so it’ll have to wait until tomorrow when I can test some stuff. – AMC Jan 22 '20 at 05:37
  • I now realize that I should have asked for more context for this. What’s the function like, why does it need to be parallelized? – AMC Jan 22 '20 at 05:38
  • i have dl links in a dataframe. i want to batch download them. i want to try to use multithreads to partition the df and download sections of them in parallel. the dataframe also contains other attributes like [id, name, score, download_link] and i need the name id and score to properly download. so something like `process_row_partial = partial(download_from_link)` and then `parallelize_on_rows(dataframe, process_row_partial)` won't allow me to access row attributes. only way to do that is to do `row.score, row.name` inside of the function `download_from_link` which i don't want to do. – frei Jan 22 '20 at 05:47
  • Alright, that sounds easy! Can you share what your program actually looks like, so I can try to whip up a solution? – AMC Jan 22 '20 at 07:57
  • I will update my question with more specifics when I get a chance. – frei Jan 23 '20 at 02:56
  • @AMC updated my answer. let me know if anything you want clarified – frei Jan 23 '20 at 08:32
  • Great, I’ll take a look :) – AMC Jan 23 '20 at 18:04
  • @AMC any idea? the basic idea is that I want to have function parameters in a lambda expression, but since that isn't possible I need a way to refactor. `process_row_partial` needs some way to get row attributes and pass them to a lambda function. – frei Feb 03 '20 at 01:49
  • _I want to have function parameters in a lambda expression_ I’m still not sure I understand the whole lambda issue, sorry. – AMC Feb 03 '20 at 23:04
  • On second thought, I think we should take a different approach here, since this almost feels like a case of the [XY Problem](https://meta.stackexchange.com/questions/66377/what-is-the-xy-problem). What is the larger issue? What do you need to do with that function you're trying to apply? – AMC Feb 04 '20 at 00:15
  • I have a dataframe with many rows. Each row has a number of attributes and a download link. I want to download in parallel by breaking the dataframe into chunks. So 100 rows with 5 threads means each thread downloads 20 each (after breaking apart the dataframe). I have a `download` function but it takes attributes from the rows in the dataframe like I explained. So `for every row in dataframe_chunk: download(row.a, row.b, row.download_link)`. Something like that. I assumed I should use `dataframe.apply` function but maybe `iterroows` or `itertuples` is better. – frei Feb 05 '20 at 02:06
  • Each row contains all the necessary data to call the function? – AMC Feb 05 '20 at 02:36
  • The fact that you’re doing a lot of network IO **and** processing makes for quite a cool problem. There are many different ways we could approach/organize this. – AMC Feb 05 '20 at 02:39
  • By the way, is your code on GitHub, or publicly available anywhere else? – AMC Feb 05 '20 at 02:43
  • yes, every row has the necessary data. code isn't available right now (since it is going to be production code) but I can figure out some way to share it beyond SO if that's interesting to you. – frei Feb 05 '20 at 04:17
  • _yes, every row has the necessary data._ Aah, well this should be easy then, no? _I can figure out some way to share it beyond SO if that's interesting to you._ Sure, sounds good. – AMC Feb 05 '20 at 04:22

0 Answers0