I have 3 different streams in my application with Spring-Cloud. There are hundreds of thousands of records per day, however, about 3 or 4 per day disappear. I see them coming in the logs, though, do not complete all the joins.
The code:
@StreamListener
fun processOrderEvent(
@Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_CHARGE_IN)
chargeEvent: KStream<String, ChargeEvent>,
@Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_IN)
orderEvent: KStream<String, OrderChargedEvent>,
@Input(StreamBindings.BUSINESS_CONDITION_ORDER_CREATED_CHARGE_IN)
orderCreatedEvent: KStream<String, OrderCreatedEvent>
) {
val tracing = Tracing.newBuilder().build()
val kafkaStreamsTracing = KafkaStreamsTracing.create(tracing)
val chargeKeyValue = chargeEvent
.filter { _, event -> shouldProcessBusinessConditions(event) && (event.flow == "___________" || event.flow == "____")}
.transform(
kafkaStreamsTracing.map<String, ChargeEvent, ByteArray, ByteArray>("processOrderEvent_ChargeEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
val keyValue = KeyValue(value.id.toString().toByteArray(), objectMapper.writeValueAsString(
Charge(
value.id.toString(),
value.amount?.value,
value.status?.name,
LocalDateTime.now().toString(),
value.paymentMethod?.type?.name,
value.creditor?.customerId,
value.channel?.name,
value.amount?.currency?.name,
value.paymentMethod?.installments,
value.card?.brand,
buildSellerEmail(value),
value.createdAt,
value.amount?.summary?.total,
value.amount?.summary?.paid,
value.amount?.summary?.refunded,
value.connect?.id,
value.connect?.name,
value.flow
)).toByteArray())
log.info("m=processOrderEvent traceId=$traceId chargeId=${value.id} step=chargeKeyValue")
keyValue
}
)
val orderKeyValue = orderEvent
.transform(
kafkaStreamsTracing.map<String, OrderChargedEvent, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
log.info("m=processOrderEvent traceId=$traceId chargeId=${value.chargeId} orderId=${value.orderId} step=orderKeyValue")
KeyValue(value.chargeId.toByteArray(), objectMapper.writeValueAsString(Order(value.orderId, value.chargeId)).toByteArray())
}
)
val orderCreatedKeyValue = orderCreatedEvent
.transform(
kafkaStreamsTracing.map<String, OrderCreatedEvent, ByteArray, ByteArray>("processOrderEvent_OrderCreatedEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} step=before_orderCreatedKeyValue")
val originalValue = buildOrderOriginalValue(value)
val keyValue = KeyValue(value.orderId.toByteArray(), objectMapper.writeValueAsString(OrderCreated(value.orderId, originalValue)).toByteArray())
log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} originalValue=${originalValue} step=orderCreatedKeyValue")
keyValue
}
)
chargeKeyValue.join(orderKeyValue, OrderChargeValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
.transform(
kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
val orderWithChargeJson = objectMapper.readValue(value, OrderWithCharge::class.java)
val keyValue = KeyValue(orderWithChargeJson.order.orderId!!.toByteArray(), value)
log.info("m=processOrderEvent traceId=$traceId chargeId=${orderWithChargeJson.charge.chargeId} orderId=${orderWithChargeJson.order.orderId} step=orderKeyValueJoin")
keyValue
}
)
.join(orderCreatedKeyValue, OrderCreatedValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
.transform(
kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent_OrderCreatedEvent") { key, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
val event = objectMapper.readValue(value, OrderWithChargeAndOrderCreated::class.java)
log.info("m=processOrderEvent traceId=$traceId chargeId=${event.charge.chargeId} orderId=${event.order.orderId} step=orderCreatedKeyValueJoin")
KeyValue(key, objectMapper.writeValueAsString(OrderWithChargeAndOrderCreatedTraceId(traceId, event)).toByteArray())
}
).process(ProcessorSupplier { businessConditionCkoutEventProcessor })
}
The logs of one of the few that does not work:
9/1/21 5:48:15.863 AM
e2e64c0faa9e 05:48:15.863 [orders-chargeds-charges-v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=06352e7dfb290f6f chargeId=-c798-4891--58ecdbf7ccf7 orderId=ORDE__-413D-4D5C-_-1873F0DB5ADE step=orderKeyValueJoin
9/1/21 5:48:15.763 AM
c232aa303f2f 05:48:15.763 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=06352e7dfb290f6f chargeId=-c798-4891--58ecdbf7ccf7 orderId=ORDE__-413D-4D5C-_-1873F0DB5ADE step=orderKeyValue
9/1/21 5:48:15.749 AM
c232aa303f2f 05:48:15.749 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=7f8c7bb5bbdb422b chargeId=-c798-4891--58ecdbf7ccf7 step=chargeKeyValue
8/31/21 6:31:50.499 PM
c232aa303f2f 18:31:50.499 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=95fc78a495f03b64 orderId=ORDE__-413D-4D5C-_-1873F0DB5ADE originalValue=22000 step=orderCreatedKeyValue
8/31/21 6:31:50.499 PM
c232aa303f2f 18:31:50.499 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=95fc78a495f03b64 orderId=ORDE_3_-413D-4D5C-_-1873F0DB5ADE step=before_orderCreatedKeyValue
The logs of one of the thousands that work:
9/2/21 6:34:33.547 PM
f84980d99867 18:34:33.547 [orders-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=-243a--8576-3cb1173804d5 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A step=orderCreatedKeyValueJoin
9/2/21 6:34:33.446 PM
e2e64c0faa9e 18:34:33.446 [orders-chargeds-charges-v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=3834ad80-243a-4ffe-8576-3cb1173804d5 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A step=orderKeyValueJoin
9/2/21 6:34:33.346 PM
f84980d99867 18:34:33.346 [orders-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=6501784c0bf59948 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A originalValue=85400 step=orderCreatedKeyValue
9/2/21 6:34:33.346 PM
f84980d99867 18:34:33.346 [orders-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=6501784c0bf59948 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A step=before_orderCreatedKeyValue
9/2/21 6:34:33.344 PM
c232aa303f2f 18:34:33.344 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=-243a-4ffe--3cb1173804d5 orderId=-084F-4CCE--BA4DCAF1D50A step=orderKeyValue
9/2/21 6:34:33.326 PM
c232aa303f2f 18:34:33.326 [orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1] INFO u.p.p.s.SomeEventStreams - m=processOrderEvent traceId=efd790c85a8f053c chargeId=-4ffe--3cb1173804d5 step=chargeKeyValue
The "OrderCreatedKeyValuejoin" log is the end and is shown in most cases, however, in some the event never comes at the end.