3

I've an issue that i tried searching for a solution for and couldn't reach anything and would like any *pointers i can get.

So I am trying to integrate Spark structured streaming with Apache Kudu, I am reading the stream from Kafka and doing some processing and should now write to Kudu tables,the problem is that spark structured streaming doesn't provide support for a Kudu sink (that I know of?), and I am using the foreach writer but as soon as try to create a dataframe inside the "ForeachWriter.process()" it just hangs and never move on

import org.apache.spark.sql.ForeachWriter
val foreachWriter = new  ForeachWriter[Row] {


  override def open(partitionId: Long,version: Long): Boolean = {
    val mySchema = StructType(Array(
      StructField("id", IntegerType),
      StructField("value", DoubleType),
      StructField("EventTimestamp", TimestampType)
    ))
       true
  }

  override def process(value: Row): Unit = {
    println("values\n------------------")

    val spark = SparkSession.builder.appName("Spark-Kafka-Integrations").master("local").getOrCreate()
    val valRDD=spark.sparkContext.parallelize(value.toSeq)
    val valRDF=valRDD.map(x=>x.toString.split(",").to[List])
    println(value)

    val valDF=spark.createDataFrame(valRDF)
    valDF.show()
    println("End values\n///////////////////")
    //shoud insert into kudu here
   }

  override def close(errorOrNull: Throwable): Unit = {
   }
}
   //count is a Dstream/streaming dataframe

count.writeStream.foreach(foreachWriter).outputMode("complete") .option("truncate", "false").start().awaitTermination()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • is you issue resolved..can you please provide resolution – BigD Jan 11 '19 at 12:25
  • Revisiting this question after ~4years. Did you find a solution to create dataframe? I have a similar issue with ForeachWriter on Spark 2.3.2. Please post here if you have found a solution. – underwood Jun 23 '21 at 19:08

0 Answers0