2

I am applying a function on all the rows in a Dask dataframe. In PySpark I'm able to return a spark.sql.Row object with named parameters in order to create a structured row for the resulting DataFrame. How do I return a similarly structured row (with columns and types) when applying a function on the rows in a dask dataframe?

I am looking for something along these lines:

# df is a dask.dataframe with a JSON blob in the `data` column

def process(row):
    json_data = json.loads(row.data)
    return Row(a=json_data["a"], b=json_data["b")

result = df.apply(
    process,
    axis=1,
).compute()

result

I see that the rows are themselves pd.Series, so I tried process returning a Series but I get this error

AttributeError: 'Series' object has no attribute 'columns'

The documentation suggests I can use the meta parameter in apply:

meta: An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output ... [Inputs like] iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns)

However when I use an iterable of meta tuples, as suggested

result = df.apply(
    process,
    axis=1,
    meta=[("a", "int")]
).compute()

it's expecting a DataFrame object and returns this error

AttributeError: 'DataFrame' object has no attribute 'name'
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
Nevermore
  • 7,141
  • 5
  • 42
  • 64

1 Answers1

3

This is a dask wrapper around a pandas function developed here:

# see unutbu's answer here: https://stackoverflow.com/a/25512372/10693596
import json
def json_to_series(text):
    keys, values = zip(*[item for dct in json.loads(text) for item in dct.items()])
    return pd.Series(values, index=keys)


def process_chunk(df):
    _tmp = df['data'].apply(json_to_series)
    return pd.concat([df, _tmp], axis=1)

result = df.map_partitions(process_chunk).compute()
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46