19

I have the following data frame:

+---+---+------+
| id| ts|days_r|
+---+---+------+
|123|  T|    32|
|342|  I|     3|
|349|  L|    10|
+---+---+------+

I want to create a new column and fill in the values depending on if certain conditions are met on the "ts" column and "days_r" columns.

This is my desired data frame:

+---+---+------+----------+
| id| ts|days_r|0to2_count|
+---+---+------+----------+
|123|  T|    32|         1|
|342|  I|     3|         0|
|349|  L|    10|         0|
+---+---+------+----------+

I tried the following code in pyspark:

df = df.withColumn('0to2_count', F.when((F.col("ts") == 'I') & (F.col('days_r') >=0) & (F.col('days_r') <= 2), 1) \
    .otherwise(F.when((F.col("ts") == 'T') & (F.col('days_r') >=0) & (F.col('days_r') <= 48), 1) \
    .otherwise(F.when((F.col("ts") == 'L') & (F.col('days_r') >=0 & F.col('days_r') <= 7), 1) \
    .otherwise(0))))

I get the error below:

Traceback (most recent call last):
  File "perc_0to2", line 1, in <module>
  File "perc_0to2", line 9, in perc_0to2
  File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/pyspark/sql/column.py", line 115, in _
    njc = getattr(self._jc, name)(jc)
  File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
Py4JError: An error occurred while calling o826.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
    at com.sun.proxy.$Proxy94.execute(Unknown Source)
    at com.palantir.arrow.module.compute.DelegatedComputeService.lambda$execute$0(DelegatedComputeService.java:63)
    at com.palantir.foundry.spark.api.SparkAuthorization.runAsUserInternal(SparkAuthorization.java:164)
    at com.palantir.foundry.spark.api.SparkAuthorization.runAsUser(SparkAuthorization.java:105)
    at com.palantir.arrow.module.compute.DelegatedComputeService.execute(DelegatedComputeService.java:62)
    at com.palantir.arrow.module.ArrowSparkModuleResource.lambda$executeAsync$0(ArrowSparkModuleResource.java:106)
    at com.palantir.remoting3.tracing.DeferredTracer.withTrace(DeferredTracer.java:43)
    at com.palantir.remoting3.tracing.Tracers$TracingAwareCallable.call(Tracers.java:219)
    at com.codahale.metrics.InstrumentedExecutorService$InstrumentedCallable.call(InstrumentedExecutorService.java:197)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
pault
  • 41,343
  • 15
  • 107
  • 149
PineNuts0
  • 4,740
  • 21
  • 67
  • 112
  • Try changing that last `otherwise(0)` to `.otherwise(F.lit(0))`. Also you don't need to keep doing `.otherwise(when().otherwise(when()))` - you can chain together multiple `when`s like shown [here](https://stackoverflow.com/a/39048475/5858851) – pault Jul 27 '18 at 20:55
  • appreciate quick response; changed to F.lit(0) unfortunately still got same error – PineNuts0 Jul 27 '18 at 20:58
  • 1
    Found the bug: you have a missing parentheses on the line: `(F.col('days_r') >=0 & F.col('days_r') <= 7)` - it should be: `(F.col('days_r') >=0) & (F.col('days_r') <= 7)` – pault Jul 27 '18 at 21:32
  • Maybe keep Palantir out of the stack trace... – floatingice Oct 11 '22 at 20:35

1 Answers1

37

Your code has a bug- you are missing a set of parentheses on the third line. Here is a way to fix your code, and use chained when() statements instead of using multiple otherwise() statements:

df = df.withColumn(
    '0to2_count',
    F.when((F.col("ts") == 'I') & (F.col("days_r") >=0) & (F.col("days_r") <= 2), 1)\
    .when((F.col("ts") == 'T') & (F.col('days_r') >=0) & (F.col('days_r') <= 48), 1)\
    .when((F.col("ts") == 'L') & (F.col('days_r') >=0) & (F.col('days_r') <= 7), 1)\
    .otherwise(0)
)

An even better way to write this logic is to use pyspark.sql.Column.between():

df = df.withColumn(
    '0to2_count',
    F.when((F.col("ts") == 'I') & F.col("days_r").between(0, 2), 1)\
    .when((F.col("ts") == 'T') & F.col('days_r').between(0,48), 1)\
    .when((F.col("ts") == 'L') & F.col('days_r').between(0,7), 1)\
    .otherwise(0)
)
df.show()
#+---+---+------+----------+
#| id| ts|days_r|0to2_count|
#+---+---+------+----------+
#|123|  T|    32|         1|
#|342|  I|     3|         0|
#|349|  L|    10|         0|
#+---+---+------+----------+

Of course since the first three conditions return the same value, you could further simplify this into one Boolean logic condition.

pault
  • 41,343
  • 15
  • 107
  • 149