I am trying to use the alpakka kinesis connector to send messages to a Kinesis Stream but I have no success with it. I tried the code below but nothing in my stream.
implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()
val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
println(reqEntry)
reqEntry
}
val entry = new PutRecordsRequestEntry()
.withData(ByteBuffer.wrap("Hello World".getBytes))
.withPartitionKey(Random.nextInt.toString)
Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()
// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
- Using a
Sink.foreach(println)
instead ofKinesisSink
prints out thePutRecordsRequestEntry
every 1 second => EXPECTED - Using
KinesisSink
, the entry is generated only once.
What Am I doing wrong ?
I am checking my stream with a KinesisSource
and reading is working ( tested with another stream)
Also the monitoring dashboard of AWS Kinesis doesnt show any PUT requests.
Note 1: I tried to enable the debug log of alpakka but with no effect
<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>
in my logback.xml
+ debug on root level