0

I want to speed up a task on a Dataframe that, for each row, saves an image to a local folder. So, it doesn't return anything. I tried to run that function using Dask, but Dask seems to require that the function return something; I cannot make .apply work...

Is there any other way to make this work?

Update: Minimally reproducible example

import dask.dataframe as dd
import pandas as pd

doc = pd.DataFrame({'file_name': ['Bob', 'Jane', 'Alice','Allan'], 
               'text': ['text1','text2', 'text3','text4']})

def func(row):
    with open(row['file_name']+'.txt', 'w') as f:
        f.write(row['text'])

ddf = dd.from_pandas(doc, npartitions=2)
k = ddf.apply(func,axis=1,meta=(None,'object'))
k.compute()

The only reason meta is (None,'object') is because that's what Dask itself suggested when I ran similar code without a meta argument.

This doesn't produce any errors, and it correctly runs.. I am now not able to reproduce my own mistake since I corrected my original mistake yesterday with Michael Delgados answer..

Avatrin
  • 171
  • 1
  • 15
  • 1
    Can you provide your code, ideally as an [mre]? – Michael Delgado Oct 18 '22 at 23:05
  • Oh I don’t recommend writing to the same file this way. If you do do this, definitely use lock objects to make sure only one workers ids writing to the file at a time – Michael Delgado Oct 19 '22 at 15:26
  • @MichaelDelgado Oh.. Well, yeah, that was just my pathetic attempt at a minimal reproducible example... I am actually creating images.. I have edited it to look more like my actual code.. – Avatrin Oct 19 '22 at 17:26
  • The piece that’s missing is the dataset. Can you create the data from code? A truly reproducible example should be able to be run straight through in a new interpreter session. See the guide to creating a [reproducible pandas example](https://stackoverflow.com/questions/20109391/how-to-make-good-reproducible-pandas-examples) – Michael Delgado Oct 19 '22 at 18:12

1 Answers1

1

One way to do this would be to make use of the function dask.dataframe.DataFrame.map_partitions. This calls the passed function on each partition, each of which is itself a pandas.DataFrame. Within that, you can apply a function if you'd like.

For example, the following defines a dataframe with the columns i and n, then maps a function across each row which writes a file based on the row's values:

import os
import tempfile

def do_something_with_side_effects(row, dirname):
    fp = os.path.join(dirname, f"{row.i}_{row.n}.txt")
    with open(fp, 'w+') as f:
        f.write("file contents!")

def do_something_in_partitions(df, dirname):
    df.apply(do_something_with_side_effects, axis=1, dirname=dirname)

df = dask.dataframe.from_pandas(
    pd.DataFrame({'i': ['A'] * 10 + ['B'] * 5, 'n': np.arange(15)}),
    chunksize=5,
)

The trick with getting dask.dataframe.DataFrame.map_partitions right is providing the meta argument, which is needed for dask to understand how to schedule the operation. If you're not returning anything, you can simply provide an empty dictionary:


In [18]: tempdir = str(tempfile.TemporaryDirectory())
    ...: os.makedirs(tempdir, exist_ok=True)
    ...: f = df.map_partitions(do_something_in_partitions, meta={}, dirname=tempdir)
    ...: f.compute()
    ...: os.listdir(tempdir)
Out[18]:
['A_7.txt',
 'A_6.txt',
 'A_4.txt',
 'A_5.txt',
 'A_1.txt',
 'A_0.txt',
 'A_2.txt',
 'A_3.txt',
 'B_12.txt',
 'B_13.txt',
 'B_11.txt',
 'B_10.txt',
 'A_8.txt',
 'B_14.txt',
 'A_9.txt']
Michael Delgado
  • 13,789
  • 3
  • 29
  • 54
  • That's interesting. I couldn't make map_partitions work, but the empty dictionary trick worked with apply. Thank you! – Avatrin Oct 19 '22 at 10:43