2

I am trying to read the kafka messages in google dataproc using pyspark - structured streaming.

Version details are :

  1. dataproc image verison is 2.0.0-RC22-debian10 (to get pyspark 3.0.1 verison with delta lake 0.7.0 as I have to finally write this data to delta hosted on google storage)
  2. pyspark version 3.0.1 and python version used by pyspark is 3.7.3
  3. The packages I am using is org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
    io.delta:delta-core_2.12:0.7.0
    org.apache.spark:spark-avro_2.12:3.0.1

Snippet of the code is :

__my_dir = os.path.dirname("<directory_path>") 
jsonFormatSchema = open(os.path.join(__my_dir, 'avro_schema_file.avsc'), "r").read() 

df = spark \    
    .readStream \    
    .format("kafka") \   
    .option("kafka.bootstrap.servers", "<kafka_broker>") \   
    .option("subscribe", "<kafka_topic>") \    
    .option("startingOffsets", "latest") \    
    .load()\    
    .select(from_avro("value", jsonFormatSchema)
    .alias("element"))

df.printSchema()
 
df_output =     df.select("element.after.id","element.after.name","element.after.attribute","element.after.quantity")

StreamQuery = ( df_output.writeStream \ 
               .format("delta") \   
               .outputMode("append") \   
               .option("checkpointLocation","<check_point_location>") \   
               .trigger(once=True) \    
               .start("<target_delta_table>") \    )

Error I am getting is :

java.io.InvalidClassException: org.apache.kafka.common.TopicPartition;
class invalid for deserialization

Why spark fails to deserialize TopicPartition and how can I solve it?

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
Rak
  • 196
  • 2
  • 9
  • Same configuration as your but with delta 0.8.0 and without spark-avro. I get the error when the master is "yarn". This disappear if the master is set to local. However, I am not able to find a solution – Galuoises Apr 12 '21 at 19:18
  • Your question helped me fix version dependency. – Pranav Nandan May 10 '21 at 06:32

2 Answers2

0

The following post helped to resolve this issue : How to use Confluent Schema Registry with from_avro standard function?

In addition, we started pointing to the following jars for kafka-client

kafka-clients-2.4.1.jar

Rak
  • 196
  • 2
  • 9
  • This doesn't work for me. Apparently the issue happens when yarn is used and disappear when the master is set to local – Galuoises Apr 12 '21 at 19:19
  • Check this one https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry Snippet : # The magic goes here: # Skip the first 5 bytes (reserved by schema registry encoding protocol) .selectExpr("substring(value, 6) as avro_value") \ .select(from_avro(col("avro_value"), schema).alias("data")) \ .select(col("data.my_field")) \ – Rak Apr 13 '21 at 04:59
  • I might have found a solution, which is related to an issue between the driver (serialize) and executor (deserialize) jar conflict. https://forums.aws.amazon.com/thread.jspa?messageID=953144 – Galuoises Apr 13 '21 at 08:32
0

The error disappears when you set the master to local[*]. Anyhow, the problem seems to be related with a jar conflict between the driver and the executor: they use different versions of the kafka-clients library.

To solve the problem you may want to launch jobs with

gcloud dataproc jobs submit spark \
 --class <YOUR_CLASS> \
 --jars target/scala-2.12/<COMPILED_JAR_FILE>.jar,kafka-clients-2.4.1.jar  \
 --cluster <CLUSTER_NAME> \
 --region= <YOUR_REGION> \
 --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,spark.executor.extraClassPath=org.apache.kafka_kafka-clients-2.4.1.jar,spark.driver.extraClassPath=org.apache.kafka_kafka-clients-2.4.1.jar

This works in my case. Please check https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.0.1 for further details on versions.

Galuoises
  • 2,630
  • 24
  • 30