1

I have the following function, that do recursion:

  @tailrec
  private def pool[F[_]: Monad, A]
  : Consumer[String, String] => (Vector[KkConsumerRecord] => F[A]) => IO[Unit]
  = consumer => cb => {
    val records: ConsumerRecords[String, String] = consumer.poll(Long.MaxValue)
    val converted = records.iterator().asScala.map(rec => {
      KkConsumerRecord(rec.key(), rec.value(), rec.offset(), rec.partition(), rec.topic())
    })

    val vec = converted.foldLeft(Vector.empty[KkConsumerRecord]) { (b, a) =>
      a +: b
    }
    cb(vec)
    pool(consumer)(cb)
  }

The compiler complains:

[error] /home/developer/Desktop/microservices/bary/kafka-api/src/main/scala/io/khinkali/Consumer/KkConsumer.scala:57:10: type mismatch;
[error]  found   : org.apache.kafka.clients.consumer.Consumer[String,String]
[error]  required: cats.Monad[?]
[error]     pool(consumer)(cb)
[error]          ^
[error] two errors found

What am I doing wrong?

Dmytro Mitin
  • 48,194
  • 3
  • 28
  • 66
softshipper
  • 32,463
  • 51
  • 192
  • 400

1 Answers1

5

The following code compiles:

import cats.Monad
import cats.effect.IO
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords}
import scala.collection.JavaConverters._
import scala.annotation.tailrec

object App {
  case class KkConsumerRecord(key: String, value: String, offset: Long, partition: Int, topic: String)

//  @tailrec
  private def pool[F[_]: Monad, A]
  : Consumer[String, String] => (Vector[KkConsumerRecord] => F[A]) => IO[Unit]
  = consumer => cb => {
    val records: ConsumerRecords[String, String] = consumer.poll(Long.MaxValue)
    val converted = records.iterator().asScala.map(rec => {
      KkConsumerRecord(rec.key(), rec.value(), rec.offset(), rec.partition(), rec.topic())
    })

    val vec = converted.foldLeft(Vector.empty[KkConsumerRecord]) { (b, a) =>
      a +: b
    }
    cb(vec)
    pool.apply(consumer)(cb)
  }
}

def pool[F[_]: Monad, A] means def pool[F[_], A](implicit monad: Monad[F]) so compiler mistreated consumer as implicit parameter.

tailrec annotation is removed since pool is not tail recursive (the last operation is constructing lambda, I guess it's called tail recursion modulo cons).


If you want to make it tail-recursive you can rewrite it as

  @tailrec
  private def pool[F[_]: Monad, A](consumer: Consumer[String, String])(cb: Vector[KkConsumerRecord] => F[A]): IO[Unit] = {
    val records: ConsumerRecords[String, String] = consumer.poll(Long.MaxValue)
    val converted = records.iterator().asScala.map(rec => {
      KkConsumerRecord(rec.key(), rec.value(), rec.offset(), rec.partition(), rec.topic())
    })

    val vec = converted.foldLeft(Vector.empty[KkConsumerRecord]) { (b, a) =>
      a +: b
    }
    cb(vec)
    pool(consumer)(cb)
  }
Dmytro Mitin
  • 48,194
  • 3
  • 28
  • 66
  • First of all, thanks for your answer. Can you please explain, how the lambda is constructing? For me, it is a tail recursion. How is the different between your syntax and my? – softshipper Dec 02 '17 at 19:02
  • In Scala functions and methods are not the same: https://stackoverflow.com/questions/2529184/difference-between-method-and-function-in-scala Your `pool` takes no arguments (actually it takes one implicit argument) and returns a function, my `pool` is a method taking two explicit arguments and returning `IO[Unit]` (in curried form so it's similar to your function). Your method is not tail-recursive. It can't transfer control to inner `pool` call. After this call you should construct lambda `consumer => cb => ...`. – Dmytro Mitin Dec 02 '17 at 19:16
  • This is syntax sugar for `new Function1[Consumer[String, String], Function1[Vector[KkConsumerRecord] => F[A], IO[Unit]]] { override def apply ... }`. But my method is tail-recursive. You can transfer control to inner `pool` call and forget about outer `pool`. – Dmytro Mitin Dec 02 '17 at 19:17
  • `It can't transfer control to inner pool` could you please show me an example? For me, this is `consumer => cb => ` currying. – softshipper Dec 02 '17 at 20:20
  • `def f(n: Int) = if (n == 0) 0 else f(n - 1)` is tail recursion but `def f(n: Int) = if (n == 0) 0 else f(n - 1) + 1` or `def f(n: Int) = (if (n == 0) 0 else f(n - 1)) + 1` are not. – Dmytro Mitin Dec 02 '17 at 20:38
  • 1
    @zero_coding, "tail recursive" method is a kind of method that compiler can automatically re-write to avoid recursion (and thus potential stack overflow). The "tail" part means that the method should only call itself as the very last operation. Your method returns functions. So your last line can be re-written as `val func = pool; func(consumer)(cb)`. Here you can see that the `pool` call is not in the "tail" position: the last operation is actually calling the function that is the result of the `pool` call. – SergGr Dec 02 '17 at 22:26