14

Does Spark support distributed Map collection types ?

So if I have an HashMap[String,String] which are key,value pairs , can this be converted to a distributed Map collection type ? To access the element I could use "filter" but I doubt this performs as well as Map ?

maasg
  • 37,100
  • 11
  • 88
  • 115
blue-sky
  • 51,962
  • 152
  • 427
  • 752
  • basically the same question http://stackoverflow.com/questions/24513440/creating-a-large-dictionary-in-pyspark/24513951#24513951 – aaronman Jul 14 '14 at 13:51

2 Answers2

9

Since I found some new info I thought I'd turn my comments into an answer. @maasg already covered the standard lookup function I would like to point out you should be careful because if the RDD's partitioner is None, lookup just uses a filter anyway. In reference to the (K,V) store on top of spark it looks like this is in progress, but a usable pull request has been made here. Here is an example usage.

import org.apache.spark.rdd.IndexedRDD

// Create an RDD of key-value pairs with Long keys.
val rdd = sc.parallelize((1 to 1000000).map(x => (x.toLong, 0)))
// Construct an IndexedRDD from the pairs, hash-partitioning and indexing
// the entries.
val indexed = IndexedRDD(rdd).cache()

// Perform a point update.
val indexed2 = indexed.put(1234L, 10873).cache()
// Perform a point lookup. Note that the original IndexedRDD remains
// unmodified.
indexed2.get(1234L) // => Some(10873)
indexed.get(1234L) // => Some(0)

// Efficiently join derived IndexedRDD with original.
val indexed3 = indexed.innerJoin(indexed2) { (id, a, b) => b }.filter(_._2 != 0)
indexed3.collect // => Array((1234L, 10873))

// Perform insertions and deletions.
val indexed4 = indexed2.put(-100L, 111).delete(Array(998L, 999L)).cache()
indexed2.get(-100L) // => None
indexed4.get(-100L) // => Some(111)
indexed2.get(999L) // => Some(0)
indexed4.get(999L) // => None

It seems like the pull request was well received and will probably be included in future versions of spark, so it is probably safe to use that pull request in your own code. Here is the JIRA ticket in case you were curious

aaronman
  • 18,343
  • 7
  • 63
  • 78
  • nice :) thanks. just curious but how did you become aware of this ? Spark mailing list ? – blue-sky Jul 14 '14 at 19:42
  • @blue-sky I am on the mailing list but actually the way I figured this out is browsing [spark jira tickets](https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:issues-panel) – aaronman Jul 14 '14 at 19:43
3

The quick answer: Partially.

You can transform a Map[A,B] into an RDD[(A,B)] by first forcing the map into a sequence of (k,v) pairs but by doing so you loose the constrain that keys of a map must be a set. ie. you loose the semantics of the Map structure.

From a practical perspective, you can still resolve an element into its corresponding value using kvRdd.lookup(element) but the result will be a sequence, given that you have no warranties that there's a single lookup value as explained before.

A spark-shell example to make things clear:

val englishNumbers = Map(1 -> "one", 2 ->"two" , 3 -> "three")
val englishNumbersRdd = sc.parallelize(englishNumbers.toSeq)

englishNumbersRdd.lookup(1)
res: Seq[String] = WrappedArray(one) 

val spanishNumbers = Map(1 -> "uno", 2 -> "dos", 3 -> "tres")
val spanishNumbersRdd = sc.parallelize(spanishNumbers.toList)

val bilingueNumbersRdd = englishNumbersRdd union spanishNumbersRdd

bilingueNumbersRdd.lookup(1)
res: Seq[String] = WrappedArray(one, uno)
maasg
  • 37,100
  • 11
  • 88
  • 115
  • 1
    where is lookup method documented ? It does not seem to be available at http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD ? – blue-sky Jul 14 '14 at 12:57
  • oh I was'nt aware of the implicit conversion : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions – blue-sky Jul 14 '14 at 12:58
  • Import `org.apache.spark.SparkContext._` and you are good to go. – maasg Jul 14 '14 at 14:19
  • 1
    also behind the scenes if the RDD's partitioner is `None`, `lookup` just uses a filter anyway, worth mentioning – aaronman Jul 14 '14 at 16:18