I have a kafka connect sink. Within my topic , I have field which is expressed in Ticks and not proper timestamp. I would ultimately want to use that as the partitioning field in the destination (in this case an Azure data lake gen 2).
I have tried using TimeBasedPartitioner along with timestamp.extractor and timestampconvertor but its just erroring out on the format. From what I see- all these timestampconvertors use a "timestamp" field whereas mine is in ticks, so I have to do additional transformations before I can use the timestamp convertor but I am not sure as to how, as the SMTs I have looked into, do not provide any such thing.
The error I get
java.lang.IllegalArgumentException: Invalid format: "20204642-05-16 21:34:40.000+0000" is malformed at " 21:34:40.000+0000"
This is how my sink configuration looks
{
"name": "azuredatalakegen2-sink-featuretracking-dl",
"config": {
"connector.class": "io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"topics": "sometopic",
"topics.dir": "filesystem/folder",
"flush.size": "1",
"file.delim": "-",
"path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "UTC",
"timezone": "UTC",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "300000",
"timestamp.extractor": "RecordField",
"timestamp.field": "event_date_time_ticks",
"format.class":"io.confluent.connect.azure.storage.format.parquet.ParquetFormat",
"transforms": "flatten,TimestampConverter",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSZ",
"transforms.TimestampConverter.field":"event_date_time_ticks",
"transforms.TimestampConverter.target.type": "string",
"max.retries": "288",
"retry.backoff.ms": "300000",
"errors.retry.timeout":"3600000",
"errors.retry.delay.max.ms":"60000",
"errors.log.enable":"true",
"errors.log.include.messages":"true",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
.......other conn related configs.......
}
}
Here is what SMTs I have seen : SMTs in Confluent Platform
How can I partition the data in the destination using the field : event_date_time_ticks which is in ticks e.g 637535500015510000 means : 2021-04-09T07:26:41.551Z
Tick conversion to datetime: Tick to Datetime
Even if I try FieldPartitioner , how can I convert that tick into a datetime format in the sink configuration above? Or do I have to write something custom?