1

My goal is to replace all negative elements in a column of a PySpark.DataFrame with zero.

input data

+------+
| col1 |
+------+
|  -2  |
|   1  |
|   3  |
|   0  |
|   2  |
|  -7  |
|  -14 |
|   3  |
+------+

desired output data

+------+
| col1 |
+------+
|   0  |
|   1  |
|   3  |
|   0  |
|   2  |
|   0  |
|   0  |
|   3  |
+------+

Basically I can do this as below:

df = df.withColumn('col1', F.when(F.col('col1') < 0, 0).otherwise(F.col('col1'))

or udf can be defined as

import pyspark.sql.functions as F
smooth = F.udf(lambda x: x if x > 0 else 0, IntegerType())
df = df.withColumn('col1', smooth(F.col('col1')))

or

df = df.withColumn('col1', (F.col('col1') + F.abs('col1')) / 2)

or

df = df.withColumn('col1', F.greatest(F.col('col1'), F.lit(0))

My question is, which one is the most efficient way of doing this? Udf has optimization issues, so absolutely it's not the correct way of doing this. But I don't know how to approach comparing the other two cases. One answer should be absolutely making experiments and comparing the mean running times and so on. But I want to compare these approaches (and new approaches) theoretically.

Thanks in advance...

  • `spark.sql('''select if(col1 < 0, 0, col1) as col1''')` – Bala Nov 05 '19 at 14:33
  • what is the difference (in terms of complexity) between F.when and if condition in sql query? – proof_sandwich Nov 05 '19 at 14:37
  • Possible duplicate of [How to measure the execution time of a query on Spark](https://stackoverflow.com/questions/34629313/how-to-measure-the-execution-time-of-a-query-on-spark) and as shown here: [Spark functions vs UDF performance?](https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance), don't use a `udf` in place of simple spark functions. – pault Nov 05 '19 at 16:42

1 Answers1

0

You can simply make a column where you say, if x > 0: x else 0. This would be the best approach.

The question has already been addressed, theoretically: Spark functions vs UDF performance?

import pyspark.sql.functions as F

df = df.withColumn("only_positive", F.when(F.col("col1") > 0, F.col("col1")).otherwise(0))

You can overwrite col1 in the original dataframe, if you pass that to withColumn()

pissall
  • 7,109
  • 2
  • 25
  • 45
  • 1
    I already gave that answer in my question, and udf comparison is kind of straight forward. But the question actually is how to compare the one you mentioned with df = df.withColumn('col1', (F.col('col1') + F.abs('col1')) / 2) – proof_sandwich Nov 05 '19 at 16:42