1

I am using the Spring Cloud Dataflow Sample: https://github.com/spring-cloud/spring-cloud-dataflow-samples/tree/master/dataflow-website/recipes/kinesisdemo

Producer and Consumer are running without any error and the Kinesis stream is also created but the consumer is not printing any output however producer printing After/Before Sysouts. Anybody faced like this? Is this Sample Application working for anyone? How to fix this, if I am missing anything?

Consumer Console is like below:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.6.RELEASE)

2020-06-27 21:34:58.397  INFO 14668 --- [           main] c.e.k.KinesisConsumerApplication         : Starting KinesisConsumerApplication on INBAN7C156Y2 with PID 14668 (C:\01_MAHESH_SPACE\SPRING_DATA_FLOW_RUNTIME_&_EXAMPLES\SAMPLES\spring-cloud-dataflow-samples\dataflow-website\recipes\kinesisdemo\kinesisconsumer\target\classes started by nampoom in C:\01_MAHESH_SPACE\SPRING_DATA_FLOW_RUNTIME_&_EXAMPLES\SAMPLES\spring-cloud-dataflow-samples\dataflow-website\recipes\kinesisdemo\kinesisconsumer)
2020-06-27 21:34:58.400  INFO 14668 --- [           main] c.e.k.KinesisConsumerApplication         : No active profile set, falling back to default profiles: default
2020-06-27 21:35:02.199  INFO 14668 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-06-27 21:35:02.204  INFO 14668 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-06-27 21:35:02.208  INFO 14668 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-06-27 21:35:02.230  INFO 14668 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean '(inner bean)#7397c6' of type [com.amazonaws.auth.profile.ProfileCredentialsProvider] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-06-27 21:35:02.231  INFO 14668 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'credentialsProvider' of type [org.springframework.cloud.aws.core.credentials.CredentialsProviderFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-06-27 21:35:02.232  INFO 14668 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'credentialsProvider' of type [com.amazonaws.auth.AWSCredentialsProviderChain] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-06-27 21:35:02.250  INFO 14668 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-06-27 21:35:02.260  INFO 14668 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration$$EnhancerBySpringCGLIB$$3e2a3a55] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-06-27 21:35:02.268  INFO 14668 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration$$EnhancerBySpringCGLIB$$19bc4575] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-06-27 21:35:02.274  INFO 14668 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration$$EnhancerBySpringCGLIB$$52a3ee42] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-06-27 21:35:02.278  INFO 14668 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-06-27 21:35:02.599  INFO 14668 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2020-06-27 21:35:03.368  INFO 14668 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.input' has 1 subscriber(s).
2020-06-27 21:35:03.415  INFO 14668 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel input
2020-06-27 21:35:03.465  INFO 14668 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel nullChannel
2020-06-27 21:35:03.476  INFO 14668 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel errorChannel
2020-06-27 21:35:03.502  INFO 14668 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler org.springframework.cloud.stream.binding.StreamListenerMessageHandler@204d9edf
2020-06-27 21:35:03.540  INFO 14668 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageHandler errorLogger
2020-06-27 21:35:03.562  INFO 14668 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-06-27 21:35:03.562  INFO 14668 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2020-06-27 21:35:03.562  INFO 14668 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2020-06-27 21:35:05.421  INFO 14668 --- [           main] o.s.c.s.b.k.p.KinesisStreamProvisioner   : Using Kinesis stream for inbound: test-kinesis-stream
2020-06-27 21:35:06.752  INFO 14668 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel test-kinesis-stream.test-kinesis-stream-group.errors
2020-06-27 21:35:06.815  INFO 14668 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'application.test-kinesis-stream.test-kinesis-stream-group.errors' has 1 subscriber(s).
2020-06-27 21:35:06.816  INFO 14668 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'application.test-kinesis-stream.test-kinesis-stream-group.errors' has 0 subscriber(s).
2020-06-27 21:35:06.816  INFO 14668 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'application.test-kinesis-stream.test-kinesis-stream-group.errors' has 1 subscriber(s).
2020-06-27 21:35:06.816  INFO 14668 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'application.test-kinesis-stream.test-kinesis-stream-group.errors' has 2 subscriber(s).
2020-06-27 21:35:07.110  INFO 14668 --- [           main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='test-kinesis-stream', shard='shardId-000000000000', reset=false}], consumerGroup='test-kinesis-stream-group'}
2020-06-27 21:35:07.134  INFO 14668 --- [           main] c.e.k.KinesisConsumerApplication         : Started KinesisConsumerApplication in 9.119 seconds (JVM running for 10.455)
2020-06-27 21:35:08.111  INFO 14668 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='test-kinesis-stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
  • Did you find any solution? I'm facing the same issue. Thanks! – san Aug 30 '20 at 23:38
  • After trying for hours, finally I deleted my Stream (and dynamioDb tables), and let the demo application create one. Now Consumer is working. Still I've to find the exact reason why consumer didn't work at first place for existing Stream. – san Aug 30 '20 at 23:58

0 Answers0