0

I have a kafka connect sink. Within my topic , I have field which is expressed in Ticks and not proper timestamp. I would ultimately want to use that as the partitioning field in the destination (in this case an Azure data lake gen 2).

I have tried using TimeBasedPartitioner along with timestamp.extractor and timestampconvertor but its just erroring out on the format. From what I see- all these timestampconvertors use a "timestamp" field whereas mine is in ticks, so I have to do additional transformations before I can use the timestamp convertor but I am not sure as to how, as the SMTs I have looked into, do not provide any such thing.

The error I get

java.lang.IllegalArgumentException: Invalid format: "20204642-05-16 21:34:40.000+0000" is malformed at " 21:34:40.000+0000"

This is how my sink configuration looks

{
    "name": "azuredatalakegen2-sink-featuretracking-dl",
    "config": {
      "connector.class": "io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
      "topics": "sometopic",
      "topics.dir": "filesystem/folder",
      "flush.size": "1",
      "file.delim": "-",
      "path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
      "locale": "UTC",
      "timezone": "UTC",
      "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
      "partition.duration.ms": "300000",
      "timestamp.extractor": "RecordField",
      "timestamp.field": "event_date_time_ticks",
      "format.class":"io.confluent.connect.azure.storage.format.parquet.ParquetFormat",
      "transforms": "flatten,TimestampConverter",
      "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
      "transforms.flatten.delimiter": "_",
      "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
      "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSSZ",
      "transforms.TimestampConverter.field":"event_date_time_ticks",
      "transforms.TimestampConverter.target.type": "string",
      "max.retries": "288",
      "retry.backoff.ms": "300000",
      "errors.retry.timeout":"3600000",
      "errors.retry.delay.max.ms":"60000",
      "errors.log.enable":"true",
      "errors.log.include.messages":"true",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
     "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
.......other conn related configs.......
    }
    }

Here is what SMTs I have seen : SMTs in Confluent Platform

How can I partition the data in the destination using the field : event_date_time_ticks which is in ticks e.g 637535500015510000 means : 2021-04-09​T07:26:41.551Z

Tick conversion to datetime: Tick to Datetime

Even if I try FieldPartitioner , how can I convert that tick into a datetime format in the sink configuration above? Or do I have to write something custom?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Saugat Mukherjee
  • 778
  • 8
  • 32
  • I don't understand what you mean by ticks, but `yyyy-MM-dd HH:mm:ss.SSSZ` clearly doesn't match at `20204642` unless you're trying to capture dates millions of years in the future... In other words, sounds like your timestamp is off by an order of magnitude (nano or milliseconds instead of seconds) – OneCricketeer Apr 22 '21 at 13:02

1 Answers1

0

TimestampConverter expects Unix epoch time, not ticks.

You'll need to convert it, which would have to be a custom transform or a modification in your Producer (which shouldn't be a major problem because most languages have datetime epoch functions)

Convert ticks to unix timestamp

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Yeah, I thought so. For now , I used the timestamp.extactor to Record, to get the time of ingestion into Kafka, which is good enough . For future cases, I will probably have the logic in the Producer, like you said. – Saugat Mukherjee Apr 23 '21 at 14:34