1

I'm trying to determine the schema of a json kafka topic. To achieve that, I lifted a code part from this blog(https://medium.com/wehkamp-techblog/streaming-kafka-topic-to-delta-table-s3-with-spark-structured-streaming-2bb3027c7565).

Background : I have a topic streaming json messages which are base64 encoded. So I'm able to decode the value using unbase64(value) casting it to string which gives me the required json in the form of a string. Now because I have to flatten this string json without any information on the schema, I'm using the below function.

`

def infer_topic_schema_json(topic):

    df_json = (spark.read
               .format("kafka")
               .option("kafka.bootstrap.servers", kafka_broker)
               .option("subscribe", topic)
               .option("startingOffsets", "earliest")
               .option("endingOffsets", "latest")
               .option("failOnDataLoss", "false")
               .load()
               # filter out empty values
               .withColumn("value", expr("string(value)"))
               .filter(col("value").isNotNull())
               # get latest version of each record
               .select("key", expr("struct(offset, value) r"))
               .groupBy("key").agg(expr("max(r) r")) 
               .select("r.value"))
    
    # decode the json values
    df_read = spark.read.json(
      df_json.rdd.map(lambda x: x.value), multiLine=True)
    
    # drop corrupt records
    if "_corrupt_record" in df_read.columns:
        df_read = (df_read
                   .filter(col("_corrupt_record").isNotNull())
                   .drop("_corrupt_record"))
 
    return df_read.schema.json()

However, this code piece in databricks is taking too much time (as long as 3 hours) for a topic which has less than 1000 messages so far. Config of the cluster I'm using is below :

2-10 workers 256-1280 GB Memory
             32-160 cores
Driver  -    128 GB memory, 16 cores 
Runtime -    10.4.x-scala2.12
Standard E16ds_v4   12-44 DBU/h

Any suggestions on how to speed up the above process as it is not feasible to let the schema inferring function for 3+ hours as I already know that schema would be updated dynamically quite frequently (once a day or 2).

I already tried reading a small batch of 2 records to see if that speeds up the process but it did not. Also, the plan is to maintain a checkpoint in the above batch function so that I do not have to process all the already processed messages when I'm rerunning to update schema.

Glimpse
  • 23
  • 4
  • Why do you need to map over the entire rdd? Why pass a dataframe into `spark.read.json`? SparkSQL already has json parsing functions from existing dataframes. You don't need to read it again – OneCricketeer Nov 09 '22 at 15:56
  • The value part which I have from the topic is base64 encoded. So when we decode it, we get a json in the form of a string. Now, to consume that data we need to flatten that for which we need a schema to actually infer it as a json. Now since the schema is a dynamic one, meaning there can be new fields added to the topic, I want to build the schema first so that I would be able to flatten that data out. For this purpose, tha above function is used. Also added this same info in the question. Thanks. – Glimpse Nov 09 '22 at 17:32
  • [link](https://stackoverflow.com/questions/48361177/spark-structured-streaming-kafka-convert-json-without-schema-infer-schema) This is similar to my issue. I'm following the same solution the user suggested but that is like running for hours. – Glimpse Nov 09 '22 at 17:46
  • When you're calling `spark.read.json`- Are you reading a local filesystem, or HDFS/S3? How large is that dataset? Both linked solutions seem to suggest it's only one record that's needed to infer the schema of that data. Obviously doing that over and over for every event will be slow, but there's not much you can do about that since there may be no unique identifier to cache on – OneCricketeer Nov 10 '22 at 13:09
  • Why is kafka data dynamic, anyway? Any given producer-send call should always be sending the same serialized object format, then other producers into the same topic should do the same. Why not use distinct topics for different record types? Spark can read multiple topics at a time. Or have you looked at the CloudEvents spec, which includes a `type` field that can be filtered on, and mapped into a specific schema? – OneCricketeer Nov 10 '22 at 13:14
  • Apologies for probably asking this question but can't a kafka topic have different schemas ? Like there might be newer fields being added which we need to pick up. And unfortunately, we won't have a communication when a newer field is being added so we have to infer that ourselves. Also confluent kafka docs say that "If you are using a data encoding such as JSON, without a statically defined schema, you can easily put many different event types in the same topic ". Now I'm genuinely confused and lost. – Glimpse Nov 10 '22 at 18:38
  • Yes, you can do that, as documented, but Spark (or anything that needs a static schema) for a topic, will not be happy about it. That's the tradeoff - Very flexible/plaintext serialization == slow speed. Very strict + binary structured protocol (Avro, Protobuf, etc) == fast. Anyways, the linked answers/blogs are "correct", but you need to limit each `spark.read.json` call to ensure you only read one event. If you max the offset, per partition, you still have N records to parse for N partitions... That shouldn't take "hours" however, – OneCricketeer Nov 10 '22 at 23:03

0 Answers0