1

Is that possible to perform set of operations on dataframe (adding new columns, replace some existing values, etc) and do not fast fail on first failed rows, but instead perform full transformation and separately return rows that has been processed with errors?

Example: it's more like pseudocode, but the idea must be clear:

df.withColumn('PRICE_AS_NUM', to_num(df["PRICE_AS_STR"]))

to_num - is my custom function of transformation string to number.

assuming I have some records where price can't be cast to number - I want to get those records in separate dataframe.

I see an approach, but it will make code a little ugly (and not quite productive): do a filter with try catch - if exception happen - filter those records into separate df.. What if I have many of such transformations... Is any better way?

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
vvg
  • 6,325
  • 19
  • 36
  • Could you provide a small example with some sample inputs, operations, why they would fail, and desired outputs? Please see [this post](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples). – pault Jan 29 '18 at 16:58
  • 1
    you can do a native python try/except – pratiklodha Jan 29 '18 at 17:00
  • @pault, added example. – vvg Jan 29 '18 at 17:08
  • @pratiklodha I can't wrap whole process - it will fail on first record. I want all of them. – vvg Jan 29 '18 at 17:10

2 Answers2

2

I think one approach would be to wrap your transformation with a try/except function that returns a boolean. Then use when() and otherwise() to filter on the boolean. For example:

def to_num_wrapper(inputs):
    try:
        to_num(inputs)
        return True
    except:
        return False

from pyspark.sql.functions import when
df.withColumn('PRICE_AS_NUM',
              when(
                    to_num_wrapper(df["PRICE_AS_STR"]),
                    to_num(df["PRICE_AS_STR"])
              ).otherwise('FAILED')
)

Then you can filter on the columns where the value is 'FAILED'.

pault
  • 41,343
  • 15
  • 107
  • 149
  • Yeah, thanks, I had something like that in mind! Unfortunately, I'll need to do it for many columns, also derived data type of those columns won't make much sense... – vvg Jan 29 '18 at 17:17
1

Preferred option

Always prefer built-in SQL functions over UDF. There safe to execute and much faster than a Python UDF. As a bonus they follow SQL semantics - if there is a problem on the line, the output is NULL - undefined.

If you go with UDF

Follow the same approach as built-in functions.

def safe_udf(f, dtype):
    def _(*args):
        try:
            return f(*args)
        except:
            pass
    return udf(_, dtype)

to_num_wrapper = safe_udf(lambda x: float(x), "float")

df = spark.createDataFrame([("1.123", ), ("foo", )], ["str"])

df.withColumn("num", to_num_wrapper("str")).show()
# +-----+-----+
# |  str|  num|
# +-----+-----+
# |1.123|1.123|
# |  foo| null|
# +-----+-----+

While swallowing exception might be counter-intuitive it just a matter of following SQL conventions.

No matter which one you choose:

Once you adjust you with one of the above, handling malformed data is just a matter of applying DataFrameNaFunctions (.na.drop, .na.replace).

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115