The framework does not provide any of the KinesisClientLibConfiguration
. It is your project responsibility to expose such a bean and with whatever options you need: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#kinesis-consumer-properties
Starting with version 2.0.1, beans of KinesisClientLibConfiguration
type can be provided in the application context to have a full control over Kinesis Client Library configuration options.
The producer side indeed is covered by the KinesisProducerConfiguration
bean in the KinesisBinderConfiguration
:
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
public KinesisProducerConfiguration kinesisProducerConfiguration() {
KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration();
kinesisProducerConfiguration.setCredentialsProvider(this.awsCredentialsProvider);
kinesisProducerConfiguration.setRegion(this.region);
return kinesisProducerConfiguration;
}
I don't see a big problem from here to declare such a bean in your own configuration with any additional properties you'd like to have, including the mentioned metrics.
If this still is not OK for you, you can do something like this bean injection into your own bean and mutate it whatever way you want:
@Bean
String configurerBean(KinesisProducerConfiguration kinesisProducerConfiguration) {
kinesisProducerConfiguration.setMetricsLevel();
return null;
}
UPDATE
The consumer part:
This is a bean based on default config instance for KCL we create internally:
@Bean
KinesisClientLibConfiguration kinesisClientLibConfiguration() {
return new KinesisClientLibConfiguration(this.consumerGroup,
this.stream,
null,
null,
this.streamInitialSequence,
this.kinesisProxyCredentialsProvider,
null,
null,
KinesisClientLibConfiguration.DEFAULT_FAILOVER_TIME_MILLIS,
this.workerId,
KinesisClientLibConfiguration.DEFAULT_MAX_RECORDS,
this.idleBetweenPolls,
false,
KinesisClientLibConfiguration.DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
KinesisClientLibConfiguration.DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
this.consumerBackoff,
KinesisClientLibConfiguration.DEFAULT_METRICS_BUFFER_TIME_MILLIS,
KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE,
KinesisClientLibConfiguration.DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
null,
KinesisClientLibConfiguration.DEFAULT_SHUTDOWN_GRACE_MILLIS,
KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE,
new SimpleRecordsFetcherFactory(),
DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS,
DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
}
Whatever you see with this.
has to be replaced with respective value from your env. Probably that KinesisClientLibConfiguration.DEFAULT_METRICS_MAX_QUEUE_SIZE
is what you are looking for in this case.
The this.consumerGroup
and this.stream
must be same as in binding you want to configure consumer for.