31

I am using Spark 1.0.1 to process a large amount of data. Each row contains an ID number, some with duplicate IDs. I want to save all the rows with the same ID number in the same location, but I am having trouble doing it efficiently. I create an RDD[(String, String)] of (ID number, data row) pairs:

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)} 

A way that works, but is not performant, is to collect the ID numbers, filter the RDD for each ID, and save the RDD of values with the same ID as a text file.

val ids = rdd.keys.distinct.collect
ids.foreach({ id =>
    val dataRows = mapRdd.filter(_._1 == id).values
    dataRows.saveAsTextFile(id)
})

I also tried a groupByKey or reduceByKey so that each tuple in the RDD contains a unique ID number as the key and a string of combined data rows separated by new lines for that ID number. I want to iterate through the RDD only once using foreach to save the data, but it can't give the values as an RDD

groupedRdd.foreach({ tup =>
  val data = sc.parallelize(List(tup._2)) //nested RDD does not work
  data.saveAsTextFile(tup._1)
})

Essentially, I want to split an RDD into multiple RDDs by an ID number and save the values for that ID number into their own location.

smli
  • 345
  • 1
  • 4
  • 6
  • What's wrong with saving the file after it's grouped by ID, they won't necessarily each be in separate files, but they won't be split among files, and you can control the number of partitions you create which should correspond to the number of files created – aaronman Jul 30 '14 at 20:27
  • @aaronman That doesn't work because I need to split the original data source and store the data in separate locations based on id number. Eventually, the data will be requested on-demand based on the id number and it is a very large dataset. – smli Jul 30 '14 at 20:45
  • If you save it in the fashion I suggested an RDD can definitely re read the data and get data by user ID, would that be an acceptable solution – aaronman Jul 30 '14 at 20:48
  • 1
    I had to perform this same operation a few days ago and ran into the same problems as you. As far as I can tell there is no way to group an RDD and then persist the values of that grouping without bringing them in memory to the driver. Have you considered the mailing list? If you find something please update this question so we can get the details. – jhappoldt Jul 30 '14 at 22:38
  • @jhappoldt this is most definitely not the case I think I'll just answer the question – aaronman Jul 30 '14 at 23:07
  • There is an open request to add a feature to Spark that allows you to save a single RDD to multiple locations at once efficiently: [SPARK-3533](https://issues.apache.org/jira/browse/SPARK-3533) That's probably what you're looking for. – Nick Chammas Dec 15 '14 at 22:50
  • Possible duplicate of [Write to multiple outputs by key Spark - one Spark job](http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job) – Nick Chammas May 10 '16 at 23:34

3 Answers3

13

I think this problem is similar to Write to multiple outputs by key Spark - one Spark job

Please refer the answer there.

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Just saw similar answer above, but actually we don't need customized partitions. The MultipleTextOutputFormat will create file for each key. It is ok that multiple record with same keys fall into the same partition.

new HashPartitioner(num), where the num is the partition number you want. In case you have a big number of different keys, you can set number to big. In this case, each partition will not open too many hdfs file handlers.

Community
  • 1
  • 1
zhang zhan
  • 1,596
  • 13
  • 10
  • 2
    Is there a Python equivalent? I am not sure what to do with `saveAsHadoopFile()`, in PySpark (https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.saveAsHadoopFile). – Eric O. Lebigot Jun 14 '15 at 15:25
  • 1
    Does this actually work with S3 Native FS without having HDFS ? I'm wondering when the file would actually get uploaded to s3, probably when the job finishes? Because the last X records could belong to all X files... So nothing can be uploaded to s3 before the last record is processed, right ? – lisak May 23 '16 at 10:41
0

This will save the data per user ID

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1),
x)}.groupByKey(numPartitions).saveAsObjectFile("file")

If you need to retrieve the data again based on user id you can do something like

val userIdLookupTable = sc.objectFile("file").cache() //could use persist() if data is to big for memory  
val data = userIdLookupTable.lookup(id) //note this returns a sequence, in this case you can just get the first one  

Note that there is no particular reason to save to the file in this case I just did it since the OP asked for it, that being said saving to a file does allow you to load the RDD at anytime after the initial grouping has been done.

One last thing, lookup is faster than a filter approach of accessing ids but if you're willing to go off a pull request from spark you can checkout this answer for a faster approach

Community
  • 1
  • 1
aaronman
  • 18,343
  • 7
  • 63
  • 78
  • This will serialize the grouping in the form of (Key, Seq(Values)) and the question asked how to group then persist only the Seq(Values). Do you know of a way to save the Values in parallel without collecting it to the driver? Thanks! – jhappoldt Jul 31 '14 at 15:09
  • @jhappoldt this is in parallel, why would you want just `Seq(Values)` then you lose the user info? – aaronman Jul 31 '14 at 15:17
0

you can directly call saveAsTextFile on grouped RDD, here it will save the data based on partitions, i mean, if you have 4 distinctID's, and you specified the groupedRDD's number of partitions as 4, then spark stores each partition data into one file(so by which you can have only one fileper ID) u can even see the data as iterables of eachId in the filesystem.

napster
  • 193
  • 2
  • 13