13

Problem When I restart/complete/STOP stream the old Consumer does not Die/Shutdown:

[INFO ] a.a.RepointableActorRef -
  Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] 
  from Actor[akka://ufo-sightings/deadLetters]
  to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
  was not delivered. [1] dead letters encountered.

Description I'm building a service that receives a message from Kafka topic and sends the message to an external service via HTTP request.

  1. A connection with the external service can be broken, and my service needs to retry the request.

  2. Additionally, if there is an error in the Stream, entire stream needs to restart.

  3. Finally, sometimes I don't need the stream and its corresponding Kafka-consumer and I would like to shut down the entire stream

So I have a Stream:

Consumer.committableSource(customizedSettings, subscriptions)
  .flatMapConcat(sourceFunction)
  .toMat(Sink.ignore)
  .run

Http request is sent in sourceFunction

I followed new Kafka Consumer Restart instructions in the new documentation

  RestartSource.withBackoff(
      minBackoff = 20.seconds,
      maxBackoff = 5.minutes,
      randomFactor = 0.2 ) { () =>
          Consumer.committableSource(customizedSettings, subscriptions)
            .watchTermination() {
                case (consumerControl, streamComplete) =>
                  logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
                  consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
                  streamComplete
                    .flatMap { _ =>
                      consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
                    }
                    .recoverWith {
                      case _ =>
                        consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
                    }
             }
            .flatMapConcat(sourceFunction)
      }
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(Sink.ignore)(Keep.left)
      .run

There is an issue opened that discusses this non-terminating Consumer in a complex Akka-stream, but there is no solution yet.

Is there a workaround that forces the Kafka Consumer termination

Milad
  • 719
  • 1
  • 10
  • 28
Rabzu
  • 52
  • 5
  • 26

1 Answers1

1

How about wrapping the consumer in an Actor and registering a KillSwitch, see: https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-stream-handling

Then in the Actor postStop method you can terminate the stream. By wrapping the Actor in a BackoffSupervisor, you get the exponential backoff.

Example actor: https://github.com/tradecloud/kafka-akka-extension/blob/master/src/main/scala/nl/tradecloud/kafka/KafkaSubscriberActor.scala#L27

Bennie Krijger
  • 595
  • 7
  • 10
  • KillSwitch also does not work. The example above will only work with simple consumer stream and it will fail to kill the kafka ConsumerActor if the Stream contains nested streams. But I will try this anyway – Rabzu May 23 '18 at 14:28