0

I'm trying to access a HashMap via a function in Spark 2.0, but if I parallelize the List, it fails. If I don't, it works, and if I don't use a Case Class, it works.

Here's some sample code of what I'm trying to do:

case class TestData(val s: String)

def testKey(testData: TestData) {
  println(f"Current Map: $myMap")
  println(f"Key sent into function: $testData")
  println("Key isn't found in Map:")
  println(myMap(testData)) // fails here
}

val myList = sc.parallelize(List(TestData("foo")))
val myMap = Map(TestData("foo") -> "bar")
myList.collect.foreach(testKey) // collect to see println

Here's the exact output:

Current Map: Map(TestData(foo) -> bar)
Key sent into function: TestData(foo)
Key isn't found in Map:
java.util.NoSuchElementException: key not found: TestData(foo)

The code above is similar to what I'm trying to do, except the case class is more complicated and the HashMap has Lists as values. Also in the sample above, I'm using 'collect' so that the print statements are output. The sample still gives the same error without collect, but no prints.

The hashCodes match already, but I tried overriding equals and hashCode for the case class, same problem.

This is using Databricks, so I don't believe I have access to REPL or spark-submit.

TBhimdi
  • 33
  • 5

2 Answers2

1

Thanks to comments for pointing out the similar question, which went to the Spark issue, which led me to this solution for my case:

case class TestData(val s: String) {
  override def equals(obj: Any) = obj.isInstanceOf[TestData] && obj.asInstanceOf[TestData].s == this.s
}

Overriding the equals to include isInstanceOf fixes the issue. It may not be the best solution, but it's definitely the simplest workaround.

Community
  • 1
  • 1
TBhimdi
  • 33
  • 5
-1

Your logic is cyclic & wrong. You are passing the same RDD to Map & calling with TestData. Update it to make it sequential as below:

case class TestData(val s: String)

def testKey(testData: TestData) {
  val myMap = Map(testData -> "bar")
  println(f"Current Map: $myMap")
  println(f"Key sent into function: $testData")
  println("Key isn't found in Map:")
  println(myMap(testData)) // fails here
}

val myList = sc.parallelize(List(TestData("foo")))
myList.collect.foreach(testKey)

The output for it is:

Current Map: Map(TestData(foo) -> bar)
Key sent into function: TestData(foo)
Key isn't found in Map:
bar

I hope this is what you are expecting...

KiranM
  • 1,306
  • 1
  • 11
  • 20
  • The above was just code I wrote to recreate the problem I was having. In my production code I have to send in different keys to get back their values, sometimes they match, sometimes they don't. And I wanted to keep it parallel on different nodes without too much data/network usage. But due to the Spark bug, I _never_ got a match. I had to override equals in the case class to get any matches. – TBhimdi Jan 12 '17 at 13:49
  • Even in that case, your code is wrong. You are passing an RDD & invoking with TestData into myMap. And an update to above code, define myMap right after TestData declaration (instead of within testKey() ) to avoid repetitive declaration. – KiranM Jan 12 '17 at 14:07