4

I am trying to check with testcontainers a streaming pipeline as a integration test but I don´t know how get bootstrapServers, at least in last testcontainers version and create a specific topic there. How can I use 'containerDef' to extract bootstrapservers and add a topic?

import com.dimafeng.testcontainers.{ContainerDef, KafkaContainer}
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import munit.FunSuite
import org.apache.spark.sql.SparkSession

class Mykafkatest extends FunSuite with TestContainerForAll {
  //val kafkaContainer: KafkaContainer      = KafkaContainer("confluentinc/cp-kafka:5.4.3")
  override val containerDef: ContainerDef = KafkaContainer.Def()

  test("do something")(withContainers { container =>
    val sparkSession: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("Unit testing")
      .getOrCreate()

    // How add a topic in that container?

    // This is not posible:
    val servers=container.bootstrapServers

    val df = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", servers)
      .option("subscribe", "topic1")
      .load()


    df.show(false)

  })

}

My sbt configuration:

lazy val root = project
  .in(file("./pipeline"))
  .settings(
    organization := "org.example",
    name := "spark-stream",
    version := "0.1",
    scalaVersion := "2.12.10",
    libraryDependencies := Seq(
      "org.apache.spark" %% "spark-sql-kafka-0-10"       % "3.0.3"  % Compile,
      "org.apache.spark" %% "spark-sql"                  % "3.0.3"  % Compile,
      "com.dimafeng"     %% "testcontainers-scala-munit" % "0.39.5" % Test,
      "org.dimafeng"     %% "testcontainers-scala-kafka" % "0.39.5" % Test,
      "org.scalameta"    %% "munit"                      % "0.7.28" % Test
    ),
    testFrameworks += new TestFramework("munit.Framework"),
    Test / fork := true
  )

Documentation does not show a complete example: https://www.testcontainers.org/modules/kafka/

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
MrElephant
  • 302
  • 4
  • 26

1 Answers1

0

The only problem here is that you are explicitly casting that KafkaContainer.Def to ContainerDef.

The type of container provided by withContianers, Containter is decided by path dependent type in provided ContainerDef,

trait TestContainerForAll extends TestContainersForAll { self: Suite =>

  val containerDef: ContainerDef

  final override type Containers = containerDef.Container

  override def startContainers(): containerDef.Container = {
    containerDef.start()
  }

  // inherited from TestContainersSuite
  def withContainers[A](runTest: Containers => A): A = {
    val c = startedContainers.getOrElse(throw IllegalWithContainersCall())
    runTest(c)
  }

}
trait ContainerDef {

  type Container <: Startable with Stoppable

  protected def createContainer(): Container

  def start(): Container = {
    val container = createContainer()
    container.start()
    container
  }
}

The moment you explicitly specify the type ContainerDef in override val containerDef: ContainerDef = KafkaContainer.Def(), this breaks the whole "type trickery" and thus Scala compiler is left with a type Container <: Startable with Stoppable instead of a KafkaContainer.

So, just remove that explicit type cast to ContainerDef, and that val servers = container.bootstrapServers will work as expected.

import com.dimafeng.testcontainers.KafkaContainer
import com.dimafeng.testcontainers.munit.TestContainerForAll
import munit.FunSuite

class Mykafkatest extends FunSuite with TestContainerForAll {
  override val containerDef = KafkaContainer.Def()

  test("do something")(withContainers { container =>
    //...

    val servers = container.bootstrapServers

    println(servers)

    //...
  })
}
sarveshseri
  • 13,738
  • 28
  • 47
  • Can you give more details? this code does not compiles – MrElephant Oct 07 '21 at 15:00
  • Do you mean the `Mykafkatest` code in the answer ? What compilation error are you getting ? – sarveshseri Oct 07 '21 at 15:06
  • added proper imports. Note that we need to import `com.dimafeng.testcontainers.KafkaContainer` and not `org.testcontainers.containers.KafkaContainer`. – sarveshseri Oct 07 '21 at 15:19
  • well now it is not found: https://repo1.maven.org/maven2/org/dimafeng/testcontainers-scala-kafka_2.12/0.39.5/testcontainers-scala-kafka_2.12-0.39.5.pom and really in this url https://repo1.maven.org/maven2/org/ does not exists 'dimafeng' folder – MrElephant Oct 07 '21 at 15:35
  • Why are you looking for that folder at that url ? `"com.dimafeng" %% "testcontainers-scala-munit" % "0.39.5"` is clrealy available at maven central - https://mvnrepository.com/artifact/com.dimafeng/testcontainers-scala-munit_3/0.39.5 . `sbt` will automatically downlod and add this to your project. Do you have some problem in your sbt project ? – sarveshseri Oct 07 '21 at 17:08
  • container.bootstrapServers is not a correct method. – MrElephant Oct 08 '21 at 10:08
  • Did you try the code which I have provided in my answer ? Or are you talking about your code ? Just copy-paste the code in my answer and try running that. – sarveshseri Oct 08 '21 at 12:36
  • 1
    you are right, it works, sorry – MrElephant Oct 08 '21 at 15:49