4

I have a simple function in pandas for handling time-series. My problem is: the number of time-series I want to apply it to is very large. So I want to use pySpark to scale it out.

I face 2 problems:

  • The schema must be passed explicitly can this made somewhat smoother to be passed implicitly?
  • The code fails with: Number of columns of the returned pandas.DataFrame doesn't match specified schema. Expected: 2 Actual: 3 -- how can I ensure that the schema is automatically matched? Notice: I do not want to manually specify it (ideally). In the worst case scenario - could I apply my function to a single time series and have the outputs of the pandas dataframes dtypes converted over to sparks expected schema?

pandas function

import pandas as pd
from pandas import Timestamp

df = pd.DataFrame({'time':['2020-01-01 00:00', '2020-01-01 03:00', '2020-01-01 04:00', '2020-01-06 00:00'], 'category':['1','1','1','1'], 'value':[5, 8, 7, 2]})
df['time'] = pd.to_datetime(df['time'])
display(df)
print(df.shape)

def my_complex_function(pdf):
    # fill missing hours
    pdf = pdf.set_index('time').resample('H').mean().fillna(0)
    
    # some complex computation/business logic which is adding columns/simplified here:
    pdf['foo'] = 'bar'
    pdf['baz'] = 123
    return pdf

print(df.groupby(['category']).apply(my_complex_function).reset_index().shape)

pyspark function

NOTICE: the version of Spark is: v3.0.1

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("anomlydetection").master("local[4]").config("spark.driver.memory", "2G").getOrCreate()
sdf = spark.createDataFrame(df)
sdf.printSchema()

def my_complex_function_spark(pdf: pd.DataFrame)-> pd.DataFrame:
    # fill missing hours
    pdf = pdf.set_index('time').resample('H').mean().fillna(0)
    
    # some complex computation/business logic which is adding columns/simplified here:
    pdf['foo'] = 'bar'
    pdf['baz'] = 123
    return pdf

# 1) can I somehow implicitly get a reference to the schema?
# 2) this function fails due to schema mismatch
sdf.groupby("category").applyInPandas(my_complex_function_spark, schema=df.schema).show()
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • https://stackoverflow.com/questions/54770485/implicit-schema-for-pandas-udf-in-pyspark seems to very related – Georg Heiler Nov 20 '20 at 19:42
  • so, you just want to add more columns to the existing schema? – jxc Nov 20 '20 at 20:01
  • Well for now adding is probably fine. In case I drop some - I could also perform this later using a regular drop command. But ideally, also nested structs work just fine. – Georg Heiler Nov 20 '20 at 20:04
  • The problem is that Nested structs are only supported as of Arrow 2.x (which spark is not fully supporting yet as far as I know). – Georg Heiler Nov 20 '20 at 20:06
  • It seems to fail for nested structs. I have a list of a 3 tuple (timestamp_start, timestamp_end, value_a, value_b). The message is: `RuntimeError: ('Exception thrown when converting pandas.Series (object) to Arrow Array (string). It can be caused by overflows or other unsafe conversions warned by Arrow. Arrow safe type check can be disabled by using SQL config `spark.sql.execution.pandas.convertToArrowArraySafely`.', ArrowTypeError("Expected bytes, got a 'datetime.date' object"))` – Georg Heiler Nov 20 '20 at 20:23
  • I will write a separate question for this datetime topic though. – Georg Heiler Nov 20 '20 at 20:30
  • 1
    array of timestamps and nested structs are not supported with pandas_udf, see: https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#supported-sql-types. If you have to pass them to pandas_udf functions, convert them into string using `to_json()` and then convert them back later. – jxc Nov 20 '20 at 20:53

2 Answers2

4

To define return_schema for ApplyInPandas, a more robust way is to use p.s.t.StructType.fromJson method and df.schema.jsonValue method which can keep all existing column attributes (nullable, metadata etc.) from the original columns. (Note: the mismatched nullable setting can often lead to bugs which are not easy to detect)

Some examples:

  1. append new columns by using StructType.add method, this is the most common use case (example):

    from pyspark.sql.types import StructType
    
    return_schema = StructType.fromJson(df.schema.jsonValue()) \
        .add('foo', 'string', False, "dummu string field") \
        .add('bar', 'integer')
    
  2. drop existing columns and append new columns:

    return_schema = StructType.fromJson(df.drop("col1", "col2").schema.jsonValue()) \
        .add('foo', 'string')
    
  3. set up arbitrary order of columns mixed with new columns (for primary data types only, be caution with array, map, struct and the nested data types):

    return_schema = df.selectExpr("col1", "cast('1' as string) as foo", "nested_col", "int(NULL) as bar").schema
    

    notice for new columns, nullable can be inferred by specifying a NULL or non-NULL zero_value, see below examples:

    cast('1' as string) as foo -> nullable is False
    string(NULL) as foo        -> nullable is True
    
    int(123) as bar            -> nullable is False
    cast(NULL as int) as bar   -> nullable is True
    

    so if the metadata attribute for all new columns is not required, this method is fine and provides flexibility. A special case is when the return_schema is unchanged which we can use return_schema = df.schema. the schema of the first example above without the metadata can be:

    return_schema = df.selectExpr("*", "string('') as foo", "int(NULL) as bar").schema
    

    For complex data types, below is an example of a new StructType column with nullable setting:

    # specify `nullable` attribute of the StructField
    my_structs = StructType().add('f1', 'int', False).add('f2', 'string')
    
    # specify `nullable=False` for the StructType
    return_schema = df.select("*", F.struct(F.lit(0),F.lit('1')).cast(my_structs).alias('s1')).schema
    # specify `nullable=True` for the StructType
    return_schema = df.select("*", F.lit(None).cast(my_structs).alias('s2')).schema
    

My suggestion: No need to manually specify return_schema which can be tedious and error-prone, use as much information as possible from the existing schema and do NOT completely rely on dynamic inferring.

jxc
  • 13,553
  • 4
  • 16
  • 34
0

Implicit schema for pandas_udf in PySpark? gives a great hint for the solution.

That approach translates here to the following (see the code below). I need to also support nested structs for my real data - and need perform some more testing on how to get these to work as well.

def my_complex_function_spark(pdf: pd.DataFrame)-> pd.DataFrame:
    # fill missing hours
    
    pdf = pdf.set_index('time').resample('H').mean().fillna(0).reset_index()
    
    # some complex computation/business logic which is adding columns/simplified here:
    pdf['foo'] = 'bar'
    pdf['baz'] = 123
    return pdf

from pyspark.sql.types import *

mapping = {"float64": DoubleType,
           "object":StringType,
           "datetime64[ns]":TimestampType,
           "int64":IntegerType} # Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
  column_types  = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
  schema = StructType(column_types)
  return schema


# certainly use some kind of limit here for real data
df_pd = sdf.toPandas()
df_return = my_complex_function_spark(df_pd)
schema = createUDFSchemaFromPandas(df_return)
sdf.groupby("category").applyInPandas(my_complex_function_spark, schema=schema).show()
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • the main problem with this method is that the new schema will lose the properties like metadata, nullable defined in the original dataframe columns which makes it less portable, and you also have to actually run some pandas code to find schema, that's kind of overkill. – jxc Nov 20 '20 at 20:21
  • Do you know a better method? It also fails for nested structs ... – Georg Heiler Nov 20 '20 at 20:22