0

I have a consumer running in its own thread. This consumer is polling records from a topic, extracts relevant data and stores the result to a database. Is it possible to achieve exactly-once semantics with this setup? I.e is it possible to ensure that a record is stored only once to the database?

The consumer config is:

kafka:
  bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS}
  kafkaSecurityProtocol: ${KAFKA_DK_SECURITY_PROTOCOL}
  schemaRegistryUrl: ${SCHEMA_REGISTRY_URL}
  autoOffsetReset: earliest
  enableAutoCommit: false
  sessionTimeoutMs: 60000
  heartbeatIntervalMs: 6000
  defaultApiTimeoutMs: 120000
  inputTopic: ${KAFKA_INPUT_TOPIC}
  keyDeserializerClass: org.apache.kafka.common.serialization.StringDeserializer
  valueDeserializerClass: org.apache.kafka.common.serialization.StringDeserializer

My consumer thread looks like the following:

import datasource
import dbContext
import extractRelevantData


class ConsumerThread(
    name: String,
    private val consumer: Consumer<String, String>,
    private val sleepTime: Long,
) :
    Thread(name) {

    private val saveTimer = Metrics.gauge("time.used.on.saving", AtomicLong(0))!!
    private val receiveCounter = Metrics.counter("received")

    override fun run() {
        while (true) {
            try {
                consumer.poll(Duration.ofSeconds(30)).let { rs ->
                    rs.forEach { r ->
                        val data = extractRelevantData(r.value())
                        dbContext.startConnection(dataSource).use {
                            val time = measureTimeMillis {
                                Dao(dbContext).saveData(data)
                            }
                            saveTimer.set(time)
                        }
                    }
                    log.info("Received ${rs.count()} {}", this.name)
                    receiveCounter.increment(rs.count().toDouble())
                    consumer.commitSync()
                }
            } catch (e: Exception) {
                log.error("Unhandled exception when fetching {} from kafka", this.name, e)
                sleep(sleepTime)
            }
        }
    }
}

and my Dao looks like:

class TpAcknowledgementDao(private val dbContext: DbContext) {

    private val table: DbContextTable = dbContext.table("table")
    private val reasonTable: DbContextTable = dbContext.table("reason")

    fun saveData(data: DataType): String {
        dbContext.ensureTransaction().use {
            // Do changes to database (i.e. save data to database and create a saveStatus object)
            it.setComplete()
            return saveStatus.id.toString()
        }
    }
}

I thought my current setup ensured exactly-once semantics: If an exception is thrown, the consumer is not comitting and the database transaction ensures the changes are rolled back. At restart, the records will be consumed one more time and the database transaction will be re-attempted.

However, when I get the following exception:

java.sql.SQLRecoverableException: I/O-error: The Network Adapter could not establish the connection
    at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:862)
    at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793)
    at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57)
    at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747)
    at oracle.jdbc.pool.OracleDataSource.getPhysicalConnection(OracleDataSource.java:413)
    at oracle.jdbc.pool.OracleDataSource.getConnection(OracleDataSource.java:298)
    at oracle.jdbc.pool.OracleDataSource.getConnection(OracleDataSource.java:213)
    at oracle.jdbc.pool.OracleDataSource.getConnection(OracleDataSource.java:191)
    at org.fluentjdbc.DbContext$TopLevelDbContextConnection.getConnection(DbContext.java:274)
    at org.fluentjdbc.DbContext.getThreadConnection(DbContext.java:151)
    at org.fluentjdbc.DbContext.ensureTransaction(DbContext.java:184)
Caused by: java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.Net.connect0(Native Method)
    at java.base/sun.nio.ch.Net.connect(Unknown Source)
    at java.base/sun.nio.ch.Net.connect(Unknown Source)
    at java.base/sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
    at java.base/java.nio.channels.SocketChannel.open(Unknown Source)
    at oracle.net.nt.TimeoutSocketChannel.connect(TimeoutSocketChannel.java:99)
    at oracle.net.nt.TimeoutSocketChannel.<init>(TimeoutSocketChannel.java:77)
    at oracle.net.nt.TcpNTAdapter.connect(TcpNTAdapter.java:192)
    ... 19 common frames omitted

One or more records will not be stored in the database. Any idea on how I can ensure that all the records are stored to the database only once?

MT0
  • 143,790
  • 11
  • 59
  • 117
Liverbird97
  • 111
  • 2
  • 14
  • 2
    Theres a related question here: https://stackoverflow.com/questions/58128037/does-kafka-supports-xa-transactions - stating that it's not easily possible. – SpaceTrucker May 11 '22 at 12:15
  • Thank you for the link! But is it possible to point out why my setup does not work? I cannot see how the code flows in the case where I get an exception. In my head, my setup should work. – Liverbird97 May 11 '22 at 12:29
  • I found this: https://stackoverflow.com/questions/43502188/consumer-poll-returns-new-records-even-without-committing-offsets The reason why the consumer is not re-consuming the records is because the last consumed offset is stored locally in the consumer. Since the consumer is never restarted, it will never fetch the commited from the broker. – Liverbird97 May 11 '22 at 12:42
  • Your setup won't work. It's just not considering many of the simplest failure cases like program exit in the middle of execution or network failure. There's a reason why experts have spent a lot of time and care designing xa transactions. Do not assume that you are able to reinvent that wheel with a hand full lines of code. – SpaceTrucker May 11 '22 at 12:50
  • I see you are using Oracle JDBC connection. Have you tried to use Kafka Connect JDBC Sink? – OneCricketeer May 11 '22 at 18:28
  • In the `extractRelevantData()` method I am doing some data manipulation before saving the data. I do not think it would be possible to implement this logic with the Connector?. – Liverbird97 May 12 '22 at 06:13

0 Answers0