1

I wonder if it's possible to turn dask objects into generators. Specifically, can the following pandas-based generator be replicated using dask dataframe, turning each partition into a generator:

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame(range(10), columns=['a'])
ddf = dd.from_pandas(df, npartitions=3)

def gen_pandas(df):
# this is a sample function, its content is not
# important as long as it yields values
    for d in df.iterrows():
        yield d

g = gen_pandas(df)
next(g), next(g)

As another example (not specific to dask dataframes), suppose I wanted to have generators on workers, such that each worker reads a file and returns contents line by line (just as an example function).

Apologies if the question doesn't make sense or is an anti-pattern.

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46

1 Answers1

1

First, I'm assuming you mean iterator here, not generator (see Difference between Python's Generators and Iterators).

So, you can use iterrows on a dask DataFrame, but you shouldn't, because it'll be very slow and inefficient. (Also as a sidenote, itertuples is usually preferred over iterrows.)

Dask's iterrows method has to compute each partition, then iterate over its rows, which eliminates a lot of the benefits of Dask. But your gen_pandas function will behave the same on ddf as on df. (You also don't need the function at all; gen_pandas(df) is exactly the same as just df.iterrows())

turning each partition into a generator

If you meant that you want to iterate over partitions, not rows, then you can use ddf.partitions to access each partition as its own dask DataFrame:

>>> list(ddf.partitions)
[Dask DataFrame Structure:
                    a
 npartitions=1       
 0              int64
 4                ...
 Dask Name: blocks, 4 tasks,
 Dask DataFrame Structure:
                    a
 npartitions=1       
 4              int64
 8                ...
 Dask Name: blocks, 4 tasks,
 Dask DataFrame Structure:
                    a
 npartitions=1       
 8              int64
 9                ...
 Dask Name: blocks, 4 tasks]

However, in either case, you should consider not trying to iterate over a dask DataFrame. Instead, try mapping your operation using apply or map_partitions. This will allow dask to run the operation in parallel over every partition at once.

For example, instead of:

ids = []
counts = []
for row in ddf.iterrows():
    page = download_wikipedia_page(row.wikipedia_name)
    link_count = count_num_links(page)
    ids.append(row.id)
    results.append(link_count)
counts = pd.Series(counts, ids)

You'd do this with Dask:

def get_wikipedia_page_count(row: pd.Series) -> int:
    page = download_wikipedia_page(row.wikipedia_name)
    return count_num_links(page)

counts = ddf.apply(get_wikipedia_page_count, axis=1, meta=int)

Iterators are fundamentally serial. Dask is fundamentally parallel. By forcing Dask to operate serially, you lose a lot of the benefits of Dask. To get good performance, you'll need to refactor iterator-based code to map that operation over every partition in parallel.

Alternatively, if the iterator approach makes more sense for your code, you might be interested in the streamz library, which is for making streaming pipelines and integrates with Dask.

  • Thank you for the answer, but I am specifically interested in generators: `gen_pandas` is only an example function and its content is not important (for the question) as long as it has a yield statement. – SultanOrazbayev Oct 08 '21 at 01:32
  • @SultanOrazbayev can you clarify your question then? "turn `dask` objects into generators" doesn't really make sense—you can never turn any Python object into a generator function. Instead, are you trying to write a generator function which _iterates_ through the dask object in some way? I think my answer will help you do that (though as I've said, you should also ideally just not do that). – Gabe Joseph Oct 08 '21 at 16:34
  • 1
    Or I wonder if [streaming results back from dask](https://github.com/dask/distributed/issues/4754) is something you're interested in? – Gabe Joseph Oct 08 '21 at 16:35
  • Hmmm, thanks, I might be wrong in phrasing this, but isn't it possible to turn say a string into a generator (e.g. some sort of char-by-char processing)? The streaming link is also interesting, let me update the question with another example. – SultanOrazbayev Oct 08 '21 at 16:41
  • You don't "turn a string into a generator"; you'd turn a string into a _iterator_, or _iterate_ over a string. (See [Difference between Python's Generators and Iterators](https://stackoverflow.com/questions/2776829/difference-between-pythons-generators-and-iterators) that I linked.) You could write a generator function that iterates over something internally, but the key part is still iteration. If you're trying to _iterate_ over the contents of a Dask DataFrame then this answer applies, for both how to do it and why you shouldn't do it. – Gabe Joseph Oct 12 '21 at 16:21