2

I have the following code, that does not compile:

import akka.NotUsed
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.pattern.FutureRef
import akka.stream.scaladsl._
import akka.stream.typed.scaladsl.ActorMaterializer
import org.apache.kafka.clients.admin._
import scala.jdk.FutureConverters._


import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration

object KafkaDetectorActor {

  val create: Behavior[NotUsed] = Behaviors.setup { context =>
    implicit val system = context.system
    implicit val materializer = ActorMaterializer()
    implicit val dispatcher = context.system.dispatchers


    Behaviors.same
  }

  private def health(server: String)(implicit executor: ExecutionContext): Future[Boolean] = {
    val props = new Properties
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server)
    props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "10000")
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000")

        AdminClient
          .create(props)
          .listTopics()
          .names()
          .asScala

  }

where names returns KafkaFuture[java.util.Set[String]].

It does not recognize asScala method. The scala.jdk.FutureConverters._ library is imported. What am I doing wrong?

Mario Galic
  • 47,285
  • 6
  • 56
  • 98
softshipper
  • 32,463
  • 51
  • 192
  • 400

2 Answers2

3

scala.jdk.FutureConverters operate on CompletableFuture so try first converting KafkaFuture to CompletableFuture like so

  implicit class KafkaFutureToCompletableFuture[T](kafkaFuture: KafkaFuture[T]) {
    def toCompletableFuture: CompletableFuture[T] = {
      val wrappingFuture = new CompletableFuture[T]
      kafkaFuture.whenComplete((value, throwable) => {
        if (throwable != null) {
          wrappingFuture.completeExceptionally(throwable)
        }
        else {
          wrappingFuture.complete(value)
        }
      })
      wrappingFuture
    }
  }

Now we can call toCompletableFuture.asScala. For example,

import scala.jdk.FutureConverters._
KafkaFuture.completedFuture(42).toCompletableFuture.asScala.foreach(println)

outputs 42. In your case, try

AdminClient
  .create(props)
  .listTopics()
  .names()
  .toCompletableFuture
  .asScala
Mario Galic
  • 47,285
  • 6
  • 56
  • 98
0

As Alexey Romanov suggested, you could just convert directly to Scala's Future:

   implicit class KafkaFutureEx[T](kafkaFuture: KafkaFuture[T]) {
    val p = Promise[T]
    kafkaFuture.whenComplete { (value, throwable) =>
      {
        if (throwable != null) {
          p.failure(throwable)
        } else {
          p.success(value)
        }
      }
    }

    def toScalaFuture = p.future
  }
Barak BN
  • 442
  • 6
  • 14