4

Correct me if I'm wrong.. A parquet file is self describing, means it contains its proper schema.

I would like to use S3 sink confluent connector ( especially because it handles correctly the Exactly Once semantic with S3) to read JSON records from our Kafka and then create parquet files in s3 ( partitioned by event time). Our JSON records don't have a schema embedded.

I know it's not supported yet, but I have few questions regarding parquet and AVRO as well.

As there is no schema embedded in our JSON records, it would mean that the connector task would have to infer the data from the JSON fields it self ? ( is that a doable solution ?)

There is no such thing like schema registry but for parquet, in Kafka.., is that right ?

AVRO seems well integrated to Kafka, means the schema is read using schema registry.. Does it mean the confluent S3 sink will be smart enough to create files in s3 containing the schema as header and then bunch of records in s3 files ?

I know that guy was working on an implementation of parquet for this s3 sink connector :

https://github.com/confluentinc/kafka-connect-storage-cloud/pull/172

But I don't understand, it seems it's using AVRO schema in the code, does this imply having AVRO records in Kafka to use this Parquet implementation ?

I'm starting to think that it would be easier to target AVRO files on S3 ( I can afford it by loosing some OLAP capabilities), but wanted to be sure before going AVRO.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Yannick
  • 1,240
  • 2
  • 13
  • 25

1 Answers1

3

Correct me if I'm wrong.. A parquet file is self describing, means it contains its proper schema

Correct. If you have a parquet file, you can get the schema from it.

How do I get schema / column names from parquet file?

create files in s3 containing the schema as header and then bunch of records in s3 files ?

Yes, that's exactly how the S3 Connector works for Avro files.

it seems it's using AVRO schema in the code, does this imply having AVRO records in Kafka to use this Parquet implementation ?

I've not looked too extensively at the PR, but I think the Parquet storage format only requires a Connect Schema, not Avro data because using the AvroData class, it's possible to translate back and forth between Connect Schemas and Avro schemas like avroData.fromConnectSchema(schema). This parses the Connect Schema structure and forms a new Avro schema, and doesn't work against the Registry or require input data to be Avro.

That being said, if your JSON objects did have a schema, then it might be possible to write them with options other JSONFormat because the format.class setting gets applied after the Converter. Anecdotally, I know I was able to write Avro input records out as JSON files with AvroConverter + JSONFormat, but I've not tried using JSONConverter + schema'd JSON with AvroFormat.

Update: Read docs

You must use ProtobufConverter or JSONSchemaConverter to get Parquet output. JSONConverter (with or without schemas) will not work.

I'm starting to think that it would be easier to target AVRO files on S3

Probably... Note, you could use Secor instead, which has Hive table integration and claims to have Parquet support for JSON

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Thanks for the answer parts, but I still don't get where the parquet schema lies in Kafka. Should it appears in each records that form the parquet file ..? – Yannick Sep 17 '19 at 20:28
  • The parquet schema not in Kafka. It is part of the uploaded file, after the Schema + Avro conversion. https://github.com/confluentinc/kafka-connect-storage-cloud/pull/172/files#diff-9591b626c53dece290f001cceadcb283R78-R91 – OneCricketeer Sep 17 '19 at 21:09
  • Thanks, but from what I read, the schema is currently in a record value ``` schema = record.valueSchema();``` , and from my understanding, this code assumes that the schema won't change ( as it does not check the schema in each records) . And it assumes ,as well, that the schema could be in any record value, like each record would contain the schema ..? What do you mean by the uploaded file ? – Yannick Sep 17 '19 at 21:32
  • Why wouldn't `valueSchema()` change? If you added new, nullable field, then the schema changes. That would just make the current file-handle close, write one file, then open a new file with the next schema for all new records. That is what the `backwards.compatibility` setting is for. By uploaded file, I mean the file that is sent to S3. – OneCricketeer Sep 17 '19 at 21:58