1

Whenever I try to read a Spark dataset using PySpark and convert it to a Pandas df for modeling I get the error: java.io.StreamCorruptedException: invalid stream header: 204356EC on the toPandas() step.

I am not a Java coder (hence PySpark) and so these errors can be pretty cryptic to me. I tried the following things, but I still have this issue:

The logging in the test script below verifies the Spark and PySpark versions are aligned.

test.py:

import logging

from pyspark.sql import SparkSession
from pyspark import SparkContext

import findspark
findspark.init()

logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

sc = SparkContext('local[*]', 'test')
spark = SparkSession(sc)
logging.info('Spark location: {}'.format(findspark.find()))
logging.info('PySpark version: {}'.format(spark.sparkContext.version))

logging.info('Reading spark input dataframe')
test_df = spark.read.csv('./data', header=True, sep='|', inferSchema=True)

logging.info('Converting spark DF to pandas DF')
pandas_df = test_df.toPandas()
logging.info('DF record count: {}'.format(len(pandas_df)))
sc.stop()

Output:

$ python ./test.py   
21/05/13 11:54:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-05-13 11:54:34 INFO     Spark location: /Users/username/server/spark-3.1.1-bin-hadoop2.7
2021-05-13 11:54:34 INFO     PySpark version: 3.1.1
2021-05-13 11:54:34 INFO     Reading spark input dataframe
2021-05-13 11:54:42 INFO     Converting spark DF to pandas DF                   
21/05/13 11:54:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
21/05/13 11:54:45 ERROR TaskResultGetter: Exception while getting task result12]
java.io.StreamCorruptedException: invalid stream header: 204356EC
    at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:936)
    at java.io.ObjectInputStream.<init>(ObjectInputStream.java:394)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:64)
    at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:64)
    at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:123)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(TaskResultGetter.scala:97)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:63)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "./test.py", line 23, in <module>
    pandas_df = test_df.toPandas()
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/pandas/conversion.py", line 141, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 677, in collect
    sock_info = self._jdf.collectToPython()
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/Users/username/server/spark-3.1.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.io.StreamCorruptedException: invalid stream header: 204356EC
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
    at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3519)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Pierre Delecto
  • 455
  • 1
  • 7
  • 26
  • The error means that data that is processed isn't in the Java-specific object serialization format expected by `ObjectInputStream`. Given you're code - AFAICT - doesn't do anything that explicitly uses object serialization or deserialization, it is probably something internal to Spark that does things wrong. – Mark Rotteveel May 13 '21 at 16:24
  • @MarkRotteveel I agree. FYI the source data was created by Spark as well. – Pierre Delecto May 13 '21 at 16:36
  • I meet the same problem when using PySpark 3.1.2, maybe it's not solved till now. – Key.L Aug 13 '21 at 11:51

2 Answers2

0

The issue was resolved for me by ensuring that the serialisation option (registered in configuration under spark.serlializer) was not incompatible with pyarrow (typically used during the conversion of pandas to pyspark and vice versa if you've got it enabled).

The fix was to remove the often recommended spark.serializer: org.apache.spark.serializer.KryoSerializer from the configuration and rely instead on the potentially slower default.

For context, our set-up was with a ML version of the databricks spark cluster (v7.3).

0

I have this exception with Spark Thrift server.

Driver version and cluster version was different.

In my case i delete this, for using version from driver in all cluster.

spark.yarn.archive=hdfs:///spark/3.1.1.zip
BioQwer
  • 61
  • 1
  • 3