0

I have 3 different streams in my application with Spring-Cloud. There are hundreds of thousands of records per day, however, about 3 or 4 per day disappear. I see them coming in the logs, though, do not complete all the joins.

The code:

  @StreamListener
fun processOrderEvent(
    @Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_CHARGE_IN)
    chargeEvent: KStream<String, ChargeEvent>,
    @Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_IN)
    orderEvent: KStream<String, OrderChargedEvent>,
    @Input(StreamBindings.BUSINESS_CONDITION_ORDER_CREATED_CHARGE_IN)
    orderCreatedEvent: KStream<String, OrderCreatedEvent>
) {

        val tracing = Tracing.newBuilder().build()
        val kafkaStreamsTracing = KafkaStreamsTracing.create(tracing)

        val chargeKeyValue = chargeEvent
            .filter { _, event -> shouldProcessBusinessConditions(event) && (event.flow == "___________" || event.flow == "____")}
            .transform(
                kafkaStreamsTracing.map<String, ChargeEvent, ByteArray, ByteArray>("processOrderEvent_ChargeEvent") { _, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    val keyValue = KeyValue(value.id.toString().toByteArray(), objectMapper.writeValueAsString(
                        Charge(
                            value.id.toString(),
                            value.amount?.value,
                            value.status?.name,
                            LocalDateTime.now().toString(),
                            value.paymentMethod?.type?.name,
                            value.creditor?.customerId,
                            value.channel?.name,
                            value.amount?.currency?.name,
                            value.paymentMethod?.installments,
                            value.card?.brand,
                            buildSellerEmail(value),
                            value.createdAt,
                            value.amount?.summary?.total,
                            value.amount?.summary?.paid,
                            value.amount?.summary?.refunded,
                            value.connect?.id,
                            value.connect?.name,
                            value.flow
                        )).toByteArray())

                    log.info("m=processOrderEvent traceId=$traceId chargeId=${value.id} step=chargeKeyValue")
                    keyValue
                }
            )

        val orderKeyValue = orderEvent
            .transform(
                kafkaStreamsTracing.map<String, OrderChargedEvent, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent") { _, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    log.info("m=processOrderEvent traceId=$traceId chargeId=${value.chargeId} orderId=${value.orderId} step=orderKeyValue")
                    KeyValue(value.chargeId.toByteArray(), objectMapper.writeValueAsString(Order(value.orderId, value.chargeId)).toByteArray())
                }
            )

        val orderCreatedKeyValue = orderCreatedEvent
            .transform(
                kafkaStreamsTracing.map<String, OrderCreatedEvent, ByteArray, ByteArray>("processOrderEvent_OrderCreatedEvent") { _, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} step=before_orderCreatedKeyValue")
                    val originalValue = buildOrderOriginalValue(value)
                    val keyValue = KeyValue(value.orderId.toByteArray(), objectMapper.writeValueAsString(OrderCreated(value.orderId, originalValue)).toByteArray())
                    log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} originalValue=${originalValue} step=orderCreatedKeyValue")
                    keyValue
                }
            )

        chargeKeyValue.join(orderKeyValue, OrderChargeValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
            .transform(
                kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent") { _, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    val orderWithChargeJson = objectMapper.readValue(value, OrderWithCharge::class.java)
                    val keyValue = KeyValue(orderWithChargeJson.order.orderId!!.toByteArray(), value)
                    log.info("m=processOrderEvent traceId=$traceId chargeId=${orderWithChargeJson.charge.chargeId} orderId=${orderWithChargeJson.order.orderId} step=orderKeyValueJoin")
                    keyValue
                }
            )
            .join(orderCreatedKeyValue, OrderCreatedValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
            .transform(
                kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent_OrderCreatedEvent") { key, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    val event = objectMapper.readValue(value, OrderWithChargeAndOrderCreated::class.java)
                    log.info("m=processOrderEvent traceId=$traceId chargeId=${event.charge.chargeId} orderId=${event.order.orderId} step=orderCreatedKeyValueJoin")
                    KeyValue(key, objectMapper.writeValueAsString(OrderWithChargeAndOrderCreatedTraceId(traceId, event)).toByteArray())
                }
            ).process(ProcessorSupplier { businessConditionCkoutEventProcessor })
}

The logs of one of the few that does not work:

9/1/21 5:48:15.863 AM
e2e64c0faa9e 05:48:15.863 [orders-chargeds-charges-v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=06352e7dfb290f6f chargeId=-c798-4891--58ecdbf7ccf7 orderId=ORDE__-413D-4D5C-_-1873F0DB5ADE step=orderKeyValueJoin

9/1/21 5:48:15.763 AM
c232aa303f2f 05:48:15.763 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=06352e7dfb290f6f chargeId=-c798-4891--58ecdbf7ccf7 orderId=ORDE__-413D-4D5C-_-1873F0DB5ADE step=orderKeyValue

9/1/21 5:48:15.749 AM
c232aa303f2f 05:48:15.749 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=7f8c7bb5bbdb422b chargeId=-c798-4891--58ecdbf7ccf7 step=chargeKeyValue

8/31/21 6:31:50.499 PM
c232aa303f2f 18:31:50.499 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=95fc78a495f03b64 orderId=ORDE__-413D-4D5C-_-1873F0DB5ADE originalValue=22000 step=orderCreatedKeyValue

8/31/21 6:31:50.499 PM
c232aa303f2f 18:31:50.499 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=95fc78a495f03b64 orderId=ORDE_3_-413D-4D5C-_-1873F0DB5ADE step=before_orderCreatedKeyValue

The logs of one of the thousands that work:

9/2/21 6:34:33.547 PM
f84980d99867 18:34:33.547 [orders-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=-243a--8576-3cb1173804d5 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A step=orderCreatedKeyValueJoin

9/2/21 6:34:33.446 PM
e2e64c0faa9e 18:34:33.446 [orders-chargeds-charges-v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=3834ad80-243a-4ffe-8576-3cb1173804d5 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A step=orderKeyValueJoin

9/2/21 6:34:33.346 PM
f84980d99867 18:34:33.346 [orders-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=6501784c0bf59948 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A originalValue=85400 step=orderCreatedKeyValue

9/2/21 6:34:33.346 PM
f84980d99867 18:34:33.346 [orders-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=6501784c0bf59948 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A step=before_orderCreatedKeyValue

9/2/21 6:34:33.344 PM
c232aa303f2f 18:34:33.344 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=-243a-4ffe--3cb1173804d5 orderId=-084F-4CCE--BA4DCAF1D50A step=orderKeyValue

9/2/21 6:34:33.326 PM
c232aa303f2f 18:34:33.326 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=efd790c85a8f053c chargeId=-4ffe--3cb1173804d5 step=chargeKeyValue

The "OrderCreatedKeyValuejoin" log is the end and is shown in most cases, however, in some the event never comes at the end.

1 Answers1

0

You need to check how to implement DLQ with Kafka.

To Implement DLQ you need to create separate queue and whatever topic get fail you need to put it in DLQ. Write retry logic to read it.

May be this documents help you https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC1/multi/multi_kafka-dlq-processing.html

Dead letter queue (DLQ) for Kafka with spring-kafka

Pratik
  • 21
  • 5
  • 1
    All called methods have logs and try/catchs. if any error happened as you said, it would show in the log, wouldn't it? I think there is some gap between joinwindows – Rafael Firmino Sep 03 '21 at 11:02