2

In the this databricks blogpost they instruct how to extract json data from kafka:

# Construct a streaming DataFrame that reads from topic1
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# value schema: { "a": 1, "b": "string" }
schema = StructType().add("a", IntegerType()).add("b", StringType())
df.select( \
  col("key").cast("string"),
  from_json(col("value").cast("string"), schema))

Is there a way to create the schema from a known case class without manually defining all the fields?

Note: this is a spark 2.2 doc, and i believe you now have to add new in the schema line:
schema = new StructType().add("a", new IntegerType()).add("b", new StringType())
I was able to figure this out thanks to this SO question.

Joey Baruch
  • 4,180
  • 6
  • 34
  • 48
  • Did you consider manually parsing the JSON with a library like [circe](https://github.com/circe/circe)? – Yuval Itzchakov Jul 29 '18 at 14:02
  • @YuvalItzchakov, You mean like with a custom UDF? I did, but I wanted to know if there's a better way - since it's best to [avoid UDFs](https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html) since spark doesn't optimize for them – Joey Baruch Jul 29 '18 at 17:44
  • Is that what you want: https://stackoverflow.com/questions/51496844/create-json-schema-from-schema-string-java-spark/51500681#51500681 – lvnt Jul 29 '18 at 21:39

0 Answers0