5

I am passing a long JSON String to kafka topic eg:

{
    "glossary": {
        "title": "example glossary",
        "GlossDiv": {
            "title": "S",
            "GlossList": {
                "GlossEntry": {
                    "ID": "SGML",
                    "SortAs": "SGML",
                    "GlossTerm": "Standard Generalized Markup Language",
                    "Acronym": "SGML",
                    "Abbrev": "ISO 8879:1986",
                    "GlossDef": {
                        "para": "A meta-markup language, used to create markup languages such as DocBook.",
                        "GlossSeeAlso": ["GML", "XML"]
                    },
                    "GlossSee": "markup"
                }
            }
        }
    }
}

and want to create stream from the kafka topic with all the fields with out specifing every field in KSQL for eg:

 CREATE STREAM pageviews_original (*) WITH \
(kafka_topic='pageviews', value_format='JSON');
Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
Mehul Gupta
  • 440
  • 5
  • 23
  • JSON format requires you to set the fields. – OneCricketeer Oct 02 '18 at 07:43
  • 1
    Can you explain _why_ you want to do this "without specifying every field"? If you don't specify the fields, you won't be able to manipulate the messages. What is it you're using KSQL for here? – Robin Moffatt Oct 02 '18 at 08:24
  • @RobinMoffatt I want the KSQL to add all the fields automatically to steam, I do not want to cherry pick the fields, I have 300 fields in JSON and want all of it in stream. – Mehul Gupta Oct 02 '18 at 08:39

1 Answers1

7

If you want the field names picked up automatically by KSQL, you need to use Avro. If you use Avro, the schema for the data is registered in the Confluent Schema Registry, and KSQL will retrieve it automatically when you use the topic.

If you are using JSON, you have to tell KSQL what the columns are. You can do this either in the CREATE STREAM statement, using STRUCT data type for nested elements.

You can kind of workaround listing all fields by declaring only the high-level fields in the CREATE STREAM and then accessing nested elements with EXTRACTJSONFIELD for fields that you want to use. Be aware than there is an issue in 5.0.0, which will be fixed in 5.0.1. Also you can't use this for nested arrays etc which you do have in the sample data you show.

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
  • thanks for suggestion... I have one more query I am using Custom connector which uses SourceRecord to poll the msgs... I don't find a way to use it with AVRO ... any suggestions? – Mehul Gupta Oct 02 '18 at 09:59