1

Given dataset

foo bar
1 2
null null
3 4

How to replace foo bar at once if one of them or both are nulls with something else, like (5, 6)?

foo bar
1 2
5 6
3 4

This applicable for geo datasets, when lat/lng not known and should be obtained somewhere else. So udf is time consuming and I'd like to be sure its called only for necessary rows (where both foo and bar are null)

The following code

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, udf

spark = (SparkSession.builder.master("local")
         .appName("SimpleApp")
         .getOrCreate()
         )


def my_udf(): return 0, 0


df = spark.createDataFrame([[1, 2], [None, None], [3, 4]], schema=['foo', 'bar'])
df = df.withColumn("result", when(df['foo'].isNull() | df['bar'].isNull(), udf(my_udf)()))

df.show()

is bad try

foo bar result
1 2 null
null null [Ljava.lang.Objec...
3 4 null

So it's necessary to unpack the array somehow to columns.

Considering this it can not be done in one step Apache Spark -- Assign the result of UDF to multiple dataframe columns

But also even if I'll return struct and will unpack it, how to leave not influenced columns alone?

The other approach I have tried (considering I need further processing of foo bar):

def some(baz): return 'some'
def something_else(foo, bar): return 'something'
def my_udf(_):
    foo, bar, baz = _
    return some(baz) if foo is None and bar is None else something_else(foo, bar)


df = spark.createDataFrame([[1, 2, 3], [None, None, 4], [3, 4, 5]], schema=['foo', 'bar', 'baz'])
df = df.withColumn("result", udf(my_udf)(array('foo', 'bar', 'baz')))

df.show()

But I feel it is not so optimal, case even if we don't need baz for most of rows we still pass it to udf, I think it will prevent optimization of request.

Of cause I can apply different udfs for different columns one by one, but it seems also not so optimal.

So are there any way to replace values in both columns at once?

Anton Ovsyannikov
  • 1,010
  • 1
  • 12
  • 30
  • If one of them is NULL do you want to replace both or only null value column? The value being replaced is it static or should it increment for each row? Is it only 2 columns or unknown number of columns? – Bala Aug 11 '21 at 12:07
  • If foo or bar is null I need to replace both at once with values from external service (based on other columns values). Consider 2 columns only for now. – Anton Ovsyannikov Aug 11 '21 at 12:34

2 Answers2

1

I would stick to your first idea and then use coalesce to fill the empty rows from the result of the udf:

from pyspark.sql import functions as F
from pyspark.sql import types as T

@F.udf(returnType=T.StructType([T.StructField("foo",T.DoubleType(), True), 
                  T.StructField("bar",T.DoubleType(), True)]))
def my_udf(foo, bar): 
    return (0.0, 1.0)


df = spark.createDataFrame([[1, 2], [None, None], [3, 4]], schema=['foo', 'bar'])
df.withColumn("result", F.when(df['foo'].isNull() | df['bar'].isNull(), 
                                                    my_udf("foo", "bar"))) \
    .withColumn("foo", F.coalesce("foo", "result.foo")) \
    .withColumn("bar", F.coalesce("bar", "result.bar")) \
    .show()

Output:

+---+---+----------+                                                            
|foo|bar|    result|
+---+---+----------+
|1.0|2.0|      null|
|0.0|1.0|{0.0, 1.0}|
|3.0|4.0|      null|
+---+---+----------+

Using coalesce would not be a performance issue as this function is a narrow transformation and therefore causes no shuffles.

werner
  • 13,518
  • 6
  • 30
  • 45
0

Finally came to the following solution using intermediate struct column. Not sure it is optimal and optimizable by spark, but it seems to be.

The task is to replace foo bar nulls with something time-consuming based on baz (in this case just baz/2, but it can be rest request i.g)

original
+----+----+---+
| foo| bar|baz|
+----+----+---+
| 1.0| 2.0|3.0|
|null|null|4.0|
| 3.0| 4.0|5.0|
+----+----+---+

transformed
+---+---+---+
|foo|bar|baz|
+---+---+---+
|1.0|2.0|3.0|
|2.0|2.0|4.0|
|3.0|4.0|5.0|
+---+---+---+

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, struct, when
from pyspark.sql.types import StructType, StructField, FloatType

spark = (SparkSession.builder.master("local")
         .appName("SimpleApp")
         .getOrCreate()
         )

df = spark.createDataFrame([[1.0, 2.0, 3.0], [None, None, 4.0], [3.0, 4.0, 5.0]], schema=['foo', 'bar', 'baz'])

df.show()

schema = StructType([
    StructField("foo", FloatType(), True),
    StructField("bar", FloatType(), True)
])


def f(foo, bar, baz):
    # prevent time consuming op if optimizer call this udf on unecessary rows
    # https://stackoverflow.com/questions/49634651/using-udf-ignores-condition-in-when
    if foo is not None and bar is not None: return foo, bar

    return baz / 2, baz / 2  # imagine this is time consuming, i.g. rest get


udf_f = udf(f, schema)

df = df.withColumn(
    "result",
    when(
        df.foo.isNull() | df.bar.isNull(),
        udf_f(df.foo, df.bar, df.baz)
    ).otherwise(
        struct(df.foo, df.bar)
    )
).select("result.*", df.baz)

df.show()


Anton Ovsyannikov
  • 1,010
  • 1
  • 12
  • 30