0

I struggle to read my JSON data in a kafka topic using Spark Structured Streaming.

Context :

I'm building a simple pipeline where I read data from a MongoDb (this db is frequently populate from another app) using kafka, then I want to get this data in Spark.

For that I'm using Spark Structured Streaming which seems to work.

Here is my code :

import org.apache.spark.rdd
import org.apache.spark.sql.avro._
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.schema_of_json
object KafkaToParquetLbcAutomation extends App {





  val spark = SparkSession
    .builder
    .appName("Kafka-Parquet-Writer")
    .master("local")
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._

  val kafkaRawDf = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers",BROKER IP)
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()

  val testJsonDf = kafkaRawDf.selectExpr("CAST(value AS STRING)")






  //affichage des data
  val query = testJsonDf
    .writeStream
    .outputMode("append")
    .format("console")
    .queryName("test")
    .start()
    .awaitTermination()
}

After reading those JSON data I want to make some transformation.

Here start the problem, I can't parse the JSON data due to a strange encoding that I'm not able to decode.

Therefore I can't go further on my pipeline.

How I should get my data :

{
  "field 1" : "value 1 ", 
}

(With many other field)

How I actualy get the data :

VoituresXhttps://URL.fr/voitures/87478648654.htm�https://img5.url.fr/ad-image/49b7c279087d0cce09123a66557b71d09c01a6d2.jpg�https://img7.url.fr/ad-image/eab7e65419c17542840204fa529b02e64771adbb.jpg�https://img7.urln.fr/ad-image/701b547690e48f11a6e0a1a9e72811cc76fe803e.jpg

The issue might be in the delimiter or someting like.

Can you please help me

Thank You

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Yrah
  • 113
  • 1
  • 10
  • Whatever the format is, it doesn't look like JSON (malformed or not) at all. If it was [How to read records in JSON format from Kafka using Structured Streaming?](https://stackoverflow.com/q/43297973/10938362), but here I'd recommend providing a [mcve] (in particular how data is produced). – user10938362 Mar 10 '19 at 22:44
  • Fix the encoding upstream when writing data, avoid pretty indent. – deo Mar 10 '19 at 23:03
  • What's the error ? – allthenutsandbolts Mar 11 '19 at 03:12

1 Answers1

1

Problem solve,

It was a bad configuration in the kafka connector code.

I simply had to add this field to the connector :

"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",

Nothing to do with Spark

Yrah
  • 113
  • 1
  • 10