3

I am trying to port some code from Pandas to Koalas to take advantage of Spark's distributed processing. I am taking a dataframe and grouping it on A and B and then applying a series of functions to populate the columns of the new dataframe. Here is the code that I was using in Pandas:

new = old.groupby(['A', 'B']) \
  .apply(lambda x: pd.Series({
    'v1': x['v1'].sum(),
    'v2': x['v2'].sum(),
    'v3': (x['v1'].sum() / x['v2'].sum()),
    'v4': x['v4'].min()
    })
)

I believe that it is working well and the resulting dataframe appears to be correct value-wise.

I just have a few questions:

  1. Does this error mean that my method will be deprecated in the future? /databricks/spark/python/pyspark/sql/pandas/group_ops.py:76: UserWarning: It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.

  2. How can I rename the group-by columns to 'A' and 'B' instead of "__groupkey_0__ __groupkey_1__"?

  3. As you noticed, I had to call pd.Series -- is there a way to do this in Koalas? Calling ks.Series gives me the following error that I am unsure how to implement: PandasNotImplementedError: The method `pd.Series.__iter__()` is not implemented. If you want to collect your data as an NumPy array, use 'to_numpy()' instead.

Thanks for any help that you can provide!

nineseven
  • 31
  • 3

1 Answers1

1
  1. I'm not sure about the error. I am using koalas==1.2.0 and pandas==1.0.5 and I don't get the error so I wouldn't worry about it
  2. The groupby columns are already called A and B when I run the code. This again may have been a bug which has since been patched.
  3. For this you have 3 options:
    1. Keep utilising pd.Series. As long as your original Dataframe is a koalas Dataframe, your output will also be a koalas Dataframe (with the pd.Series automatically converted to ks.Series)
    2. Keep the function and the data exactly the same and just convert the final dataframe to koalas using the from_pandas function
    3. Do the whole thing in koalas. This is slightly more tricky because you are computing an aggregate column based on two GroupBy columns and koalas doesn't support lambda functions as a valid aggregation. One way we can get around this is by computing the other aggregations together and adding the multi-column aggregation afterwards:
import databricks.koalas as ks
ks.set_option('compute.ops_on_diff_frames', True)

# Dummy data
old = ks.DataFrame({"A":[1,2,3,1,2,3], "B":[1,2,3,3,2,3], "v1":[10,20,30,40,50,60], "v2":[4,5,6,7,8,9], "v4":[0,0,1,1,2,2]})

new = old.groupby(['A', 'B']).agg({'v1':'sum', 'v2':'sum', 'v4': 'min'})
new['v3'] = old.groupby(['A', 'B']).apply(lambda x: x['v1'].sum() / x['v2'].sum())
amin_nejad
  • 989
  • 10
  • 22