0

I'm trying to convert parquet files into SourceRecords. I'm currently stuck on generating connect schema from avro schema. I'm able to read parquets to GenericRecords:

public static Seq<GenericRecord> genericRecordsOf(Seq<String> parquets) {
    Configuration config = new Configuration();
    config.setBoolean(AVRO_COMPATIBILITY, true);
    config.setBoolean("parquet.avro.add-list-element-records", false);
    config.setBoolean("parquet.avro.write-old-list-structure", false);
    config.setClass("parquet.avro.data.supplier", SchemaTest.class, AvroDataSupplier.class);
    config.set("fs.s3a.impl", S3AFileSystem.class.getCanonicalName());
    return parquets.flatMap(input -> {
          Builder<Record> builder = Try(() -> AvroParquetReader
              .<Record>builder(HadoopInputFile.fromPath(new Path(input), config))
              )
              .get();
          return readRecords(builder);
        }
    );
  }

  private static List<GenericRecord> readRecords(Builder<Record> builder) {
    return Try
        .withResources(builder::build)
        .of(SchemaTest::readRecords)
        .get();
  }

  private static List<GenericRecord> readRecords(ParquetReader<Record> reader) {
    List<GenericRecord> records = new LinkedList<>();
    Record genericRecord = readRecord(reader);
    while (genericRecord != null) {
      records.add(genericRecord);
      genericRecord = readRecord(reader);
    }
    return records;
  }

  private static Record readRecord(ParquetReader<Record> reader) {
    return Try.of(reader::read).get();
  }

The issue is while I'm trying to make connect data from it, using io.confluent.connect.avro.AvroData.toConnectData(avroSchema, avroValue). The exception:


Exception in thread "main" org.apache.kafka.connect.errors.DataException: Mismatched names: name already added to SchemaBuilder (org.apache.kafka.connect.data.Decimal) differs from name in source schema (cedingrate)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1969)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1669)
        at io.confluent.connect.avro.AvroData.toConnectSchemaWithCycles(AvroData.java:2000)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1836)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1669)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1803)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1645)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1326)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1307)
        at com.tryg.data.ingestors.guidewire.factories.SchemaTest.lambda$main$0(SchemaTest.java:103)
        at io.vavr.Value.forEach(Value.java:340)
        at com.tryg.data.ingestors.guidewire.factories.SchemaTest.main(SchemaTest.java:102)

and avro schema generated by AvroParquetReader is (fragment):

"type": "record",
  "name": "spark_schema",
  "fields": [
    {
      "name": "cedingrate",
      "type": [
        "null",
        {
          "type": "fixed",
          ***"name": "cedingrate",
          "size": 16,
          "logicalType": "decimal",
          "precision": 38,
          "scale": 6
        }
      ],
      "default": null
    },
...

I have debugged into the AvroData code and found out that the issue is that AvroData expects "name": "org.apache.kafka.connect.data.Decimal" instead of "name": "cedingrate" in the place marked with ***. Cedingrate is the field name in parquet file. The parquet files are being generated by Guidewire cloud data access framework. I dont know where to look for further. Any tips apreciated. Regards, Dawid.

kalosh
  • 63
  • 1
  • 5
  • I suggest you simply use SparkSQL to read parquet and write to Kafka. Otherwise, Confluent already has an S3 source connector – OneCricketeer Oct 08 '22 at 13:17
  • Thanks for quick answer. Ad1. I have read the data already. The schema is my problem. Can sparkSQL provide shema for confluent? Ad2. I would have to write my own format by extending `StorageObjectFormat`. In this class I would have to implement `extractRecord` method and provide kafka connect value and schema. Getting schema from parquet file is what I'm struggling right now in question. – kalosh Oct 10 '22 at 07:01
  • SparkSQL can read Parquet, yes, in one line of code, too. After you get a dataframe, its schema can easily be extracted, and Spark has Avro functions to convert dataframe schemas to Avro schemas... It [can integrate with schema registry since data is just bytes](https://stackoverflow.com/questions/48882723/integrating-spark-structured-streaming-with-the-confluent-schema-registry), although, Flink has better support for Registry builtin – OneCricketeer Oct 10 '22 at 13:27

1 Answers1

0

TLDR; use io.confluent.connect.avro.AvroData class from kafka-connect-avro-data:6.2.6 dependency to convert avro to connect data.

The above solution will work with io.confluent.connect.avro.AvroData class which is delivered in kafka-connect-avro-data (maven dependency) with version up to 6.2.6. In version 7.0.0, avro fixed schemas, which are being converted to connect schemas by io.confluent.connect.avro.AvroData, are treated as schemas with type FIXED. Up to 7.0.0, they are treated as BYTE type schemas.

There is some kind of incompatibility between parquet-avro dependency (the way how AvroParquetReader builds fixed schemas from parquet files) and kafka-connect-avro-data (the way how AvroData converts schemas from avro to connect). I wasn't able to find out which of those components work according to avro schema requirements.

kalosh
  • 63
  • 1
  • 5