I'm starting to work with pandas udf on a Pyspark Jupyter notebook running on an EMR cluster using this 'identity' pandas udf and I'm getting the following error:
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def pudf(pdf):
return pdf
df.filter(df.corp_cust=='LO').groupby('corp_cust').apply(pudf).show()
An error occurred while calling o388.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 113.0 failed 4 times, most recent failure: Lost task 0.3 in stage 113.0 (TID 1666, ip-10-23-226-64.us.scottsco.com, executor 1): java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
I can run df.filter(df.corp_cust=='LO').show()
with success so this makes me think things are 'braking' somewhere in translation from pandas to pyspark dataframe.
This dataframe has a couple StringType and DecimalType columns. I've also tried encoding the string columns to 'utf-8' within the udf and get the same error.
Any suggestion on how to fix this?