3

I am not able to assign a series as a new column to a koalas dataframe. Below is the codebase that I am using:

from databricks import koalas

dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft.assign(c=koalas.Series([1,2,3]))

output:

AnalysisException                         Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
    700                 type_pprinters=self.type_printers,
    701                 deferred_pprinters=self.deferred_printers)
--> 702             printer.pretty(obj)
    703             printer.flush()
    704             return stream.getvalue()

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in pretty(self, obj)
    392                         if cls is not object \
    393                                 and callable(cls.__dict__.get('__repr__')):
--> 394                             return _repr_pprint(obj, self, cycle)
    395 
    396             return _default_pprint(obj, self, cycle)

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
    698     """A pprint that just redirects to the normal repr function."""
    699     # Find newlines and replace them with p.break_()
--> 700     output = repr(obj)
    701     lines = output.splitlines()
    702     with p.group():

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in __repr__(self)
  11661             return self._to_internal_pandas().to_string()
  11662 
> 11663         pdf = self._get_or_create_repr_pandas_cache(max_display_count)
  11664         pdf_length = len(pdf)
  11665         pdf = pdf.iloc[:max_display_count]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
  11652         if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
  11653             object.__setattr__(
> 11654                 self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  11655             )
  11656         return self._repr_pandas_cache[n]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
   5748             return DataFrame(self._internal.with_filter(F.lit(False)))
   5749         else:
-> 5750             sdf = self._internal.resolved_copy.spark_frame
   5751             if get_option("compute.ordered_head"):
   5752                 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
    576     def wrapped_lazy_property(self):
    577         if not hasattr(self, attr_name):
--> 578             setattr(self, attr_name, fn(self))
    579         return getattr(self, attr_name)
    580 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
   1066     def resolved_copy(self) -> "InternalFrame":
   1067         """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068         sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))
   1069         return self.copy(
   1070             spark_frame=sdf,

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
   1683         [Row(name='Alice', age=12), Row(name='Bob', age=15)]
   1684         """
-> 1685         jdf = self._jdf.select(self._jcols(*cols))
   1686         return DataFrame(jdf, self.sql_ctx)
   1687 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
   +- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false


---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
    343             method = get_real_method(obj, self.print_method)
    344             if method is not None:
--> 345                 return method()
    346             return None
    347         else:

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _repr_html_(self)
  11684             return self._to_internal_pandas().to_html(notebook=True, bold_rows=bold_rows)
  11685 
> 11686         pdf = self._get_or_create_repr_pandas_cache(max_display_count)
  11687         pdf_length = len(pdf)
  11688         pdf = pdf.iloc[:max_display_count]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
  11652         if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
  11653             object.__setattr__(
> 11654                 self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  11655             )
  11656         return self._repr_pandas_cache[n]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
   5748             return DataFrame(self._internal.with_filter(F.lit(False)))
   5749         else:
-> 5750             sdf = self._internal.resolved_copy.spark_frame
   5751             if get_option("compute.ordered_head"):
   5752                 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
    576     def wrapped_lazy_property(self):
    577         if not hasattr(self, attr_name):
--> 578             setattr(self, attr_name, fn(self))
    579         return getattr(self, attr_name)
    580 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
   1066     def resolved_copy(self) -> "InternalFrame":
   1067         """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068         sdf = self.spark_frame.select(self.spark_columns + list(HIDDEN_COLUMNS))
   1069         return self.copy(
   1070             spark_frame=sdf,

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
   1683         [Row(name='Alice', age=12), Row(name='Bob', age=15)]
   1684         """
-> 1685         jdf = self._jdf.select(self._jcols(*cols))
   1686         return DataFrame(jdf, self.sql_ctx)
   1687 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
   +- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false

Can you help me understand what is going wrong with my approach and how to assign a new column to a koalas datadrame?

figs_and_nuts
  • 4,870
  • 2
  • 31
  • 56

2 Answers2

1

I honestly don't know why you get that error with assign, but one way to add a new column to a koalas.DataFrame is to use the standard assignment method [''] like below.
It is important to change the option compute.ops_on_diff_frames to allow operations on different Series/DataFrames.

import databricks.koalas as ks
ks.set_option('compute.ops_on_diff_frames', True)

dft = ks.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft['c'] = koalas.Series([1,2,3])

dft
#    a  b  c
# 0  1  3  1
# 1  2  4  2
# 2  3  5  3
Ric S
  • 9,073
  • 3
  • 25
  • 51
  • That is true. I am able to do it with compute.ops_on_diff_frames. I just wanted to do it with the assign functionality since compute.ops_on_different_frames is False by default and I didn't want to deal with the side effects etc of changing that especially when I had a canonical way suggested by koalas itself to add a new column - assign – figs_and_nuts Dec 15 '21 at 10:23
  • 1
    I understand your concern, but I'm afraid you cannot add newly created columns with `assign`: see the *Notes* section in the method [docs](https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.assign.html) – Ric S Dec 15 '21 at 10:28
  • I think that is saying that I cannot refer to a column that I am assigning in computing another column in the same .assign call >Assigning multiple columns within the same assign is possible but you cannot refer to newly created or modified columns. This feature is supported in pandas for Python 3.6 and later but not in Koalas. In Koalas, all items are computed first, and then assigned. – figs_and_nuts Dec 15 '21 at 10:31
  • Yeah you're probably right, I misinterpreted it. Sorry for that – Ric S Dec 15 '21 at 10:35
1

Unfortunately, you can only use expression over existing columns of your dataframe in assign method.

Explanation

The important part in your error stack is the spark execution plan:

AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
+- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
   +- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false

In spark execution plan, Project can be translated as SQL's SELECT. And you can see execution plan fails at the second Project (you read spark execution plan from bottom to top) because it couldn't find column 0#991184L (that is the serie you want to add to your dft dataframe) among columns present in your dft dataframe that are __index_level_0__#991164L, a#991165L, b#991166L, __natural_order__#991170L

Indeed, column 0#991184L comes from a serie you've created out of the blue, not from a serie derivated from your dft dataframe. For Spark, it means that this column comes from another dataframe and so you obviously can't retrieve it from your dft dataframe with a SELECT, precisely what Spark is trying to do.

To link pandas and Spark APIs, a Spark equivalent of assign would be withColumn Spark dataframe method whose documentation states:

The column expression must be an expression over this DataFrame; attempting to add a column from some other DataFrame will raise an error.

Note: actually, Spark equivalent to assign is more select function than withColumn that is limited to only adding one column, but limitations of withColumn also apply on select

So assign will work for the following cases:

dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
# Type of dft['a'] and dft['b'] is Serie
dft.assign(c=dft['a']))
dft.assign(d=dft['a']*2))
dft.assign(e=dft['a']*dft['b']))

but not in the following cases:

dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})

dft.assign(c=koalas.Series([1,2,3]))

dft2=pd.DataFrame({'d': [1, 2, 3]})
# Type of dft2['d'] is Serie
dft.assign(d=dft2['d'])

Workaround

Here the workaround is to do as explained in ric-s' answer and assign column using dft['c'] = koalas.Series([1,2,3])

Here it works because in this case, Spark will join the two dataframes instead of merely selecting columns from first dataframe. As join, here hidden by koalas API, can be very expensive operation in Spark, you have a guardrail that you need to override by setting compute.ops_on_diff_frames to True

Setting compute.ops_on_diff_frames to true just tell koalas "I acknowledge that this operation is a join and may lead to poor performance". You can actually reset this option to its previous value after performing your operation, with koalas.reset_option('compute.ops_on_diff_frames')

Vincent Doba
  • 4,343
  • 3
  • 22
  • 42
  • Beautifully educated .. thank you @Vincent, I learned a lot through your answer. One thing though, how does one then go about creating a new column if all one has to do is to place the new columns of the same size as a new column with the assumption that it is already ordered as per the index of the parent dataframe. pyspark equivalent of ```df['c'] = np.arange(df.shape[0])``` – figs_and_nuts Dec 17 '21 at 07:31
  • I'm not sure to understand what you want to know: add an index column to your dataframe or what happens when you assign value to a column using `df['c'] = ...` ? – Vincent Doba Dec 17 '21 at 14:49
  • 1
    My usecase is this: df1 is a dataframe of 100 rows. df2 is a dataset of 30 rows. 'subset' is a column in df1 that is True at exactly 30 places. I want to fill in values from a column 'c1' in df2 to a column 'c2' in df1 at exactly the places marked by 'subset' column. In pandas you will do this by ```df1.loc[df1[subset],'c2'] = df2['c1].values``` – figs_and_nuts Dec 19 '21 at 09:58