3

I need to write values with key 1 to file file1.txt and values with key 2 to file2.txt:

val ar = Array (1 -> 1, 1 -> 2, 1 -> 3, 1 -> 4, 1 -> 5, 2 -> 6, 2 -> 7, 2 -> 8, 2 -> 9)
val distAr = sc.parallelize(ar)
val grk = distAr.groupByKey()

How to do this without iterrating collection grk twice?

maasg
  • 37,100
  • 11
  • 88
  • 115
zork
  • 2,085
  • 6
  • 32
  • 48
  • it's an inherently serial operation. So just run `foreach` over `ar` and write out the value to the correct file for that key. – The Archetypal Paul Dec 08 '14 at 17:03
  • I think this question is similar to this one. http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job – Chong Tang Dec 08 '14 at 18:23
  • As I understand, that question has a HDFS-specific answer, not a general one. What is the "a general Spark way" to solve this problem? – zork Dec 08 '14 at 21:30
  • This question seems similar to [this one](http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job?lq=1). There is an open issue on the Spark issue tracker to add the ability to write out a single RDD to multiple locations by key in one pass: [SPARK-3533](https://issues.apache.org/jira/browse/SPARK-3533). Iterating over the RDD once per key will not scale well, unfortunately. – Nick Chammas Dec 12 '14 at 21:34

1 Answers1

2

We write data from different customers to different tables, which is essentially the same usecase. The common pattern we use is something like this:

val customers:List[String] = ???

customers.foreach{customer => rdd.filter(record => belongsToCustomer(record,customer)).saveToFoo()}

This probably does not fulfill the wish of 'not iterating over the rdd twice (or n times)', but filter is a cheap operation to do in a parallel distributed environment and it works, so I think it does comply to the 'general Spark way' of doing things.

maasg
  • 37,100
  • 11
  • 88
  • 115
  • If I got you right, you use `belongsToCustomer` as a kind of 'multi-filter' which decides to what file to write. Correct? – zork Dec 09 '14 at 09:15
  • No, `filter` takes a boolean predicate, so it's a simple boolean operation, something like `record.customer == customer` – maasg Dec 09 '14 at 09:15
  • But what is `saveToFoo`? Is it just a plain Scala function that writes **a single record** to a file? How Spark decides on what node to create this file? How to use Spark `saveAsTextFile` in this case? – zork Dec 09 '14 at 09:18
  • In each loop, a new `filteredRDD` is created for the specific predicate. Then you can use actions on that rdd to matertialize it. You could do e.g. `.saveAsTextFile(s"$basePath/$customer/data.txt")` – maasg Dec 09 '14 at 09:23
  • Ok, I got it, you iterrate RDD as many times as there are predicates. So, alas, there is no way to split RDD into several RDDs, such as, for example, `val result: (RDD,RDD) = data.groupByKey()` – zork Dec 09 '14 at 09:54
  • 1
    Not that I know of. The thing to realize is that `filter` is much more efficient than a 'rdd.splitByKey' could be, because such operations will incur in a shuffle whereas 'filter' will be just in-lined with whatever operation comes after. – maasg Dec 09 '14 at 09:57