I have a consumer running in its own thread. This consumer is polling records from a topic, extracts relevant data and stores the result to a database. Is it possible to achieve exactly-once semantics with this setup? I.e is it possible to ensure that a record is stored only once to the database?
The consumer config is:
kafka:
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS}
kafkaSecurityProtocol: ${KAFKA_DK_SECURITY_PROTOCOL}
schemaRegistryUrl: ${SCHEMA_REGISTRY_URL}
autoOffsetReset: earliest
enableAutoCommit: false
sessionTimeoutMs: 60000
heartbeatIntervalMs: 6000
defaultApiTimeoutMs: 120000
inputTopic: ${KAFKA_INPUT_TOPIC}
keyDeserializerClass: org.apache.kafka.common.serialization.StringDeserializer
valueDeserializerClass: org.apache.kafka.common.serialization.StringDeserializer
My consumer thread looks like the following:
import datasource
import dbContext
import extractRelevantData
class ConsumerThread(
name: String,
private val consumer: Consumer<String, String>,
private val sleepTime: Long,
) :
Thread(name) {
private val saveTimer = Metrics.gauge("time.used.on.saving", AtomicLong(0))!!
private val receiveCounter = Metrics.counter("received")
override fun run() {
while (true) {
try {
consumer.poll(Duration.ofSeconds(30)).let { rs ->
rs.forEach { r ->
val data = extractRelevantData(r.value())
dbContext.startConnection(dataSource).use {
val time = measureTimeMillis {
Dao(dbContext).saveData(data)
}
saveTimer.set(time)
}
}
log.info("Received ${rs.count()} {}", this.name)
receiveCounter.increment(rs.count().toDouble())
consumer.commitSync()
}
} catch (e: Exception) {
log.error("Unhandled exception when fetching {} from kafka", this.name, e)
sleep(sleepTime)
}
}
}
}
and my Dao
looks like:
class TpAcknowledgementDao(private val dbContext: DbContext) {
private val table: DbContextTable = dbContext.table("table")
private val reasonTable: DbContextTable = dbContext.table("reason")
fun saveData(data: DataType): String {
dbContext.ensureTransaction().use {
// Do changes to database (i.e. save data to database and create a saveStatus object)
it.setComplete()
return saveStatus.id.toString()
}
}
}
I thought my current setup ensured exactly-once semantics: If an exception is thrown, the consumer is not comitting and the database transaction ensures the changes are rolled back. At restart, the records will be consumed one more time and the database transaction will be re-attempted.
However, when I get the following exception:
java.sql.SQLRecoverableException: I/O-error: The Network Adapter could not establish the connection
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:862)
at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793)
at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57)
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747)
at oracle.jdbc.pool.OracleDataSource.getPhysicalConnection(OracleDataSource.java:413)
at oracle.jdbc.pool.OracleDataSource.getConnection(OracleDataSource.java:298)
at oracle.jdbc.pool.OracleDataSource.getConnection(OracleDataSource.java:213)
at oracle.jdbc.pool.OracleDataSource.getConnection(OracleDataSource.java:191)
at org.fluentjdbc.DbContext$TopLevelDbContextConnection.getConnection(DbContext.java:274)
at org.fluentjdbc.DbContext.getThreadConnection(DbContext.java:151)
at org.fluentjdbc.DbContext.ensureTransaction(DbContext.java:184)
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.connect0(Native Method)
at java.base/sun.nio.ch.Net.connect(Unknown Source)
at java.base/sun.nio.ch.Net.connect(Unknown Source)
at java.base/sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
at java.base/java.nio.channels.SocketChannel.open(Unknown Source)
at oracle.net.nt.TimeoutSocketChannel.connect(TimeoutSocketChannel.java:99)
at oracle.net.nt.TimeoutSocketChannel.<init>(TimeoutSocketChannel.java:77)
at oracle.net.nt.TcpNTAdapter.connect(TcpNTAdapter.java:192)
... 19 common frames omitted
One or more records will not be stored in the database. Any idea on how I can ensure that all the records are stored to the database only once?