0

I have a broadcast variable which is constructed in the following manner

// Function
def loadMovieNames(sparkContext: SparkContext): Map[Int, String] = {

    // Handle character encoding issues:
    implicit val codec = Codec("UTF-8")
    codec.onMalformedInput(CodingErrorAction.REPLACE)
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

    // Create a Map of Ints to Strings, and populate it from u.item.
    var movieNames: Map[Int, String] = Map()

    val lines = sparkContext.textFile("s3a://bucket/movies.dat")
    for (line <- lines) {
        var fields = line.split("::")
        if (fields.length > 1) {
        movieNames += (fields(0).toInt -> fields(1))
        }
    }

    return movieNames
}

// Main
val nameDict = loadMovieNames(spark.sparkContext)
val broadcastNames = spark.sparkContext.broadcast(nameDict)

Below is the code in main used to access the broadcast variable.

val resultDF = recommendationsDF.sort($"score".desc).limit(30)

val check = (id1: Int, id2: Int) => if (id1 == movie) broadcastNames.value(id2) else broadcastNames.value(id1)

val getName = udf(check)

val results = resultDF.withColumn("movie", getName($"movieId1", $"movieId2"))

results.show(30)

But when I try to do a lookup in the broadcast variable later in main, I get the following exception.

Caused by: java.util.NoSuchElementException: key not found: 1196
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at com.spark.movierec.MovieRecDF$$anonfun$5.apply(MovieRecDF.scala:144)
at com.spark.movierec.MovieRecDF$$anonfun$5.apply(MovieRecDF.scala:143)

I converted the Map to a broadcast variable when I initially ran into the same issue. After reading the answer to this question here, I've realized it could be an issue with closures. But I'm still not sure how to tackle the issue.

zero323
  • 322,348
  • 103
  • 959
  • 935
blessonm
  • 71
  • 5
  • 12
  • You might want to consider using the get method of Map rather than the apply method. The former yields an Option[V], in your case, Option[String] which you can then deal with in a logical manner. The problem with apply is that it will throw an exception if you don't provide a default value. – Phasmid Feb 25 '17 at 00:21

1 Answers1

3

One way to create a local map is to use collectAsMap:

val nameDict = sparkContext.broadcast(sparkContext
  .textFile(path)
  .map(_.split("::"))
  .filter(_.size > 1)
  .map(arr => (arr(0).toInt, arr(1)))
  .collectAsMap())

You should also consider using DataFrames and broadcast join in place of UDF and broadcast variable, but the logic of you application is not that clear.

zero323
  • 322,348
  • 103
  • 959
  • 935