0

I am reading data from Kafka in spark(structured streaming) But Data getting in spark from kafka in spark is not in string format. Spark: 2.3.4

Kafka Data format:

{"Patient_ID":316,"Name":"Richa","MobileNo":{"long":7049123177},"BDate":{"int":740},"Gender":"female"}

Here is the code for kafka to spark structured streaming:

#  spark-submit --jars kafka-clients-0.10.0.1.jar --packages org.apache.spark:spark-avro_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.3.4,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/kinjalpatel/kafka_sppark.py
import pyspark
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json
from pyspark.sql.functions import from_json, col, struct
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from pyspark.sql.column import Column, _to_java_column

sc = SparkContext()
sc.setLogLevel("ERROR")
spark = SparkSession(sc)
schema_registry_client = CachedSchemaRegistryClient(
url='http://localhost:8081')
serializer = MessageSerializer(schema_registry_client)
df = spark.readStream.format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "mysql-01-Patient") \
  .option("partition.assignment.strategy", "range") \
  .option("valueConverter", "org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter") \
  .load()
df.printSchema()
mta_stream=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)", "CAST(timestamp AS STRING)", "CAST(timestampType AS STRING)")
mta_stream.printSchema()
qry = mta_stream.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()

This is the output I get:

+----+--------------------+----------------+---------+------+--------------------+-------------+
| key|               value|           topic|partition|offset|           timestamp|timestampType|
+----+--------------------+----------------+---------+------+--------------------+-------------+
|null|�
Richa���...|mysql-01-Patient|        0|   160|2019-12-27 11:56:...|            0|
+----+--------------------+----------------+---------+------+--------------------+-------------+

How to get value column in string format?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

2 Answers2

0

from Spark documentation

import org.apache.spark.sql.avro._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"    )))

val df = spark
    .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()


val output = df
  .select(from_avro('value, jsonFormatSchema) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

from databricks documentation

import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
Mehdi LAMRANI
  • 11,289
  • 14
  • 88
  • 130
  • This is only available from Databricks platform, not just any Spark consumer – OneCricketeer Dec 27 '19 at 23:46
  • 2
    Yes, I am. You can find issues on the spark-avro github page saying as much. Also, spark-avro as of Spark 2.4 does not support Confluent Schema Registry, so I suggest you try your answers before copying from the documentation – OneCricketeer Dec 28 '19 at 17:16
-1

For Reading Avro message from Kafka topic and parsing in pyspark structured streaming, don't have direct libraries for the same . But we can read/parsing Avro message by writing small wrapper and call that function as UDF in your pyspark streaming code.

Please refer:

Reading avro messages from Kafka in spark streaming/structured streaming