7

I have written Kafka stream program using Scala and executing in Spark standalone cluster. Code works fine in my local. I have done Kafka , Cassandra and Spark setup in Azure VM. I have opened all inbound and outbound ports to avoid port blocking.

started Master

sbin>./start-master.sh

Started Slave

sbin# ./start-slave.sh spark://vm-hostname:7077

I have verified this status in Master WEB UI.

Submit Job

bin#./spark-submit --class x.y.StreamJob --master spark://vm-hostname:7077 /home/user/appl.jar

I noticed that Application added and displayed in Master WEB UI.

I have published few messages to topic and messages are not received and persisted to Cassandra DB.

I clicked the Application name on master web console and noticed that Streaming tab is not available in that application console page.

Why application is not working in VM and working good in local ?

How to debug the issue in VM ?

def main(args: Array[String]): Unit = {
    val spark = SparkHelper.getOrCreateSparkSession()
    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    spark.sparkContext.setLogLevel("WARN")
    val kafkaStream = {
      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> 
                "vmip:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "loc",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
      )

      val topics = Array("hello")
      val numPartitionsOfInputTopic = 3
      val streams = (1 to numPartitionsOfInputTopic) map {
        _ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )
      }
     streams
    }


    kafkaStream.foreach(rdd=> {
      rdd.foreachRDD(conRec=> {
        val offsetRanges = conRec.asInstanceOf[HasOffsetRanges].offsetRanges
        conRec.foreach(str=> {
          try {
            println(str.value().trim)
            CassandraHelper.saveItemEvent(str.value().trim)

          }catch {
            case ex: Exception => {
              println(ex.getMessage)
            }
          }
        })
        rdd.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      })
      println("Read Msg")
    })
    println(" Spark parallel reader is ready !!!")
    ssc.start()
    ssc.awaitTermination()
  }

  def getSparkConf(): SparkConf = {
    val conf = new SparkConf(true)
      .setAppName("TestAppl")
      .set("spark.cassandra.connection.host", "vmip")
      .set("spark.streaming.stopGracefullyOnShutdown","true")
    .setMaster("spark://vm-hostname:7077")

    conf
  }

Version

scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"


libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion %"provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion  %"provided",
  "org.apache.spark" %% "spark-hive" % sparkVersion %"provided",
  "com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion  ,
  "org.apache.kafka" %% "kafka" % "0.10.1.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" %  sparkVersion  %"provided",
)
mergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x => (mergeStrategy in assembly).value(x)
}
Gnana
  • 2,130
  • 5
  • 26
  • 57
  • Did you check whether you are getting the messages from kafka in the streaming jobs that gets submitted each second.There would be a streaming tab in spark ui where it will show the number of input records for each job submitted – Kiran Balakrishnan Apr 06 '18 at 15:14
  • as i mentioned in my question, streaming tab is not available. I don't know why ?. please tell me if any debugging steps to check why Streaming is not visible – Gnana Apr 06 '18 at 17:00
  • What do you mean by `kafka_stream_version`? – Alper t. Turker Apr 08 '18 at 13:36
  • sorry, i did n't use kafka_stream_version. it is dummy variable . i have added all dependencies in my question. please take a look my dependencies – Gnana Apr 08 '18 at 16:12

1 Answers1

0

To debug your issue, the first think would be to make sure that messages go through Kafka. To do so you need to have port 9092 open on your VM and try consuming directly from Kafka

bin/kafka-console-consumer.sh --bootstrap-server vmip:9092 --topic hello --from-beginning

from-beginning option will consume everything up to the max retention time you configured on your Kafka topic.

Check as well that you don't have 2 versions of Spark in your VM, and that you need to use "spark2-submit" to submit a Spark2 Job.

Vincent
  • 591
  • 1
  • 5
  • 19
  • I already did it and ensured that Kafka is getting message. But , Spark stream is not receiving the messages – Gnana May 14 '18 at 19:33
  • @Gnana make sure you have kafka 0.10, as you are using spark-streaming-kafka-0-10 because otherwise it won't be able to get the last offset and therefore won't consume anything. If you have kafka 0.8 or 0.9 offset management is done differently. – Vincent May 17 '18 at 13:28