I have a problem that's bothering me for a few days, and I'm pretty out of ideas.
I built a Spark docker container where Spark runs in standalone mode. Both master and worker are started there. This is machine running in Azure.
Now I tried to deploy my Spark Scala App in a separate container(same machine) where I pass the Spark master URL and other stuff I need to connect to Spark. Connection is seamless.
First problem I encountered was:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition
Then I made a folder of my dependencies except Spark, put them in a folder alongside my app JAR file and added them to SparkConf using SparkConf.setJars
,
Now the strange thing happens:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
More than this, if I just run the scala app from my local machine using java -cp <dependencies(including spark jars) cp> myApp.jar
it works perfectly, jobs run ok.
I don't have any SPARK_HOME
locally and the setJars
basically takes an empty list, as if I won't use it and it still works.
I guess it uses jars provided in the classpath when I run my app and I don't need to provide anything else.
If any of you guys have any ideas I would be grateful, I really can't explain myself why this doesn't work and I didn't do any Spark deployments until now. I mostly ran in embedded Spark.
Spark is same version in my app dependencies (2.0.0) as the one running in the docker container.
I used:
Scala 2.11.7 for my app
Java 1.8 on both containers(app, spark)
As requested here is the code of my app
val jars = Option(new File(Properties.spark_jars_path).listFiles()).toList.flatten.map(_.getAbsolutePath)
val conf = new SparkConf()
.setMaster(RunUtils.determineMasterUrl(Properties.mode))
.setAppName(RunUtils.SPARK_APP_NAME)
.setJars(jars)
.set("spark.cassandra.connection.host", Properties.cassandra_connection_host)
val ssc = new StreamingContext(conf, Seconds(1))
case class Result(buyDate: Timestamp, endDate: Timestamp, maxDate: Timestamp, buyAmount: Double, buyRate: Double)
def main(args: Array[String]): Unit = {
val DateFormatter = new java.text.SimpleDateFormat("yyyy-MM-dd")
val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset)
//
// BITSTAMP
//
val bitstampTopic = Set("bitstamp_trades")
val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic)
val bitstampTradeStream = bitstampStream.map(_._2).map { trade =>
val jsonNode = JsonMapper.readTree(trade)
Trade(
"BITSTAMP",
"BTC_USD",
if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY",
DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)),
new Date(jsonNode.get("timestamp").asLong() * 1000),
jsonNode.get("amount").asDouble(),
jsonNode.get("price").asDouble()
)
}
bitstampTradeStream.saveToCassandra("coin_master", "trades", SomeColumns(
"exchange_house",
"exchange_currencies",
"exchange_type",
"date",
"trade_time",
"amount",
"price")
)
ssc.start()
ssc.awaitTermination()
}