0

I am receiving Json strings from Kafka in Spark Streaming (Scala). The processing of each string takes some time, so I want to distribute the processing over X clusters.

Currently I am just making tests on my laptop. So, for simplicity, let's assume that the processing that I should apply to each Json string is just some normalization of fields:

  def normalize(json: String): String = {
    val parsedJson = Json.parse(json)
    val parsedRecord = (parsedJson \ "records")(0)
    val idField = parsedRecord \ "identifier"
    val titleField = parsedRecord \ "title"

    val output = Json.obj(
      "id" -> Json.parse(idField.get.toString().replace('/', '-')),
      "publicationTitle" -> titleField.get
    )
    output.toString()
  }

This is my attempt to distribute the operation normalize over "clusters" (each Json string should be processed entirely; Json strings cannot be splitted). How to deal with the the issue Task not serializable at the line val callRDD = JSONstrings.map(normalize(_))?

val conf = new SparkConf().setAppName("My Spark Job").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))

val topicMap = topic.split(",").map((_, numThreads)).toMap

val JSONstrings = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

val callRDD = JSONstrings.map(normalize(_))

ssc.start()
ssc.awaitTermination()

UPDATE

This is the complete code:

package org.consumer.kafka

import java.util.Properties
import java.util.concurrent._
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.utils.Logging
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import play.api.libs.json.{JsObject, JsString, JsValue, Json}
import scalaj.http.{Http, HttpResponse}

class KafkaJsonConsumer(val datasource: String,
                        val apiURL: String,
                        val zkQuorum: String,
                        val group: String,
                        val topic: String) extends Logging
{
  val delay = 1000
  val config = createConsumerConfig(zkQuorum, group)
  val consumer = Consumer.create(config)
  var executor: ExecutorService = null

  def shutdown() = {
    if (consumer != null)
      consumer.shutdown();
    if (executor != null)
      executor.shutdown();
  }

  def createConsumerConfig(zkQuorum: String, group: String): ConsumerConfig = {
    val props = new Properties()
    props.put("zookeeper.connect", zkQuorum);
    props.put("group.id", group);
    props.put("auto.offset.reset", "largest");
    props.put("zookeeper.session.timeout.ms", "2000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    val config = new ConsumerConfig(props)
    config
  }

  def run(numThreads: Int) = {
    val conf = new SparkConf()
                              .setAppName("TEST")
                              .setMaster("local[*]")
                              //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("checkpoint")

    val topicMap = topic.split(",").map((_, numThreads)).toMap

    val rawdata = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

    val parsed = rawdata.map(Json.parse(_))

    val result = parsed.map(record => {
      val parsedRecord = (record \ "records")(0)
      val idField = parsedRecord \ "identifier"
      val titleField = parsedRecord \ "title"
      val journalTitleField = parsedRecord \ "publicationName"
      Json.obj(
        "id" -> Json.parse(idField.get.toString().replace('/', '-')),
        "publicationTitle" -> titleField.get,
        "journalTitle" -> journalTitleField.get)
    })

    result.print

    val callRDD = result.map(JsonUtils.normalize(_))

    callRDD.print()

    ssc.start()
    ssc.awaitTermination()
  }

  object JsonUtils {
    def normalize(json: JsValue): String = {
      (json \ "id").as[JsString].value
    }
  }

}

I launch the execution of this calss KafkaJsonConsumer as follows:

package org.consumer

import org.consumer.kafka.KafkaJsonConsumer

object TestConsumer {

  def main(args: Array[String]) {

    if (args.length < 6) {
      System.exit(1)
    }

    val Array(datasource, apiURL, zkQuorum, group, topic, numThreads) = args

    val processor = new KafkaJsonConsumer(datasource, apiURL, zkQuorum, group, topic)
    processor.run(numThreads.toInt)

    //processor.shutdown()

  }

}
Klue
  • 1,317
  • 5
  • 22
  • 43

1 Answers1

1

It looks like normalize method is a part of some class. In the line where you're using it inside the map operation, Spark needs to serialize not only the method itself, but the whole instance it is a part of. The easiest solution would be to move normalize to some singleton object:

object JsonUtils {
  def normalize(json: String): String = ???
}

and invoke like that:

val callRDD = JSONstrings.map(JsonUtils.normalize(_))
Paweł Jurczenko
  • 4,431
  • 2
  • 20
  • 24
  • It still says that the task is not serializable. Dos it make sense to try Kryo? – Klue Jul 21 '16 at 07:29
  • No, this problem is not related to using or not using Kryo. Are you running your code in the spark-shell or in an application? – Paweł Jurczenko Jul 21 '16 at 08:06
  • I'm running my application from Intellij, not spark-shell. I use local[*] just for testing. Once it works locally, my idea is to use yarn and distribute calculations over different machines. I've already spent quite a lot of time on this issue, and finally I have no idea how to deal with it. – Klue Jul 21 '16 at 08:14
  • Don't you think that my issue relates to this post?: `http://stackoverflow.com/questions/28554141/how-to-let-spark-serialize-an-object-using-kryo?rq=1`. If it may help to detect the issue, I now get this error: `Caused by: java.io.NotSerializableException: org.consumer.kafka.KafkaJsonConsumer`, where `KafkaJsonConsumer` is the class with all the mentioned code – Klue Jul 21 '16 at 08:17
  • You should provide complete code, with classes as well. – Paweł Jurczenko Jul 21 '16 at 08:21
  • Ok, please see my update. I posted the code of `KafkaJsonConsumer` and also (just in case) the class that starts the application. – Klue Jul 21 '16 at 08:34
  • You have defined `JsonUtils` in a wrong place: currently it is a part of `KafkaJsonConsumer`, while it should be defined completely outside that class. In other words: it should not be nested in any class. – Paweł Jurczenko Jul 21 '16 at 08:37
  • Great! It has worked. You've mentioned that this is a simplest solution... What if I need to do HTTP calls to remote REST api from the function `normalize` (using `scalaj.http`)? Can I use the same approach (i.e. singleton object) to implement it? Or in this case it would be better to use Kryo? – Klue Jul 21 '16 at 08:42
  • 1. Making HTTP calls from `normalize` should work as well in the singleton object approach. 2. Kryo has nothing to do with the case you've described. Kryo is just a replacement for default Java serializer and it's related to performance of your serialization. – Paweł Jurczenko Jul 21 '16 at 08:48
  • If you have time, I appreciate if you can mention a small example of a suitable application of Kryo. I think that I misunderstand in which situations it should be applied. For instance, is it possible to transform my case to the case when Kryo would be applicable? – Klue Jul 21 '16 at 08:54
  • You should start by reading the docs: http://spark.apache.org/docs/latest/tuning.html#data-serialization . – Paweł Jurczenko Jul 21 '16 at 09:09