2

I've created a spark job that reads in a textfile everyday from my hdfs and extracts unique keys from each line in the text file. There are roughly 50000 keys in each text file. The same data is then filtered by the extracted key and saved to the hdfs.

I want to create a directory in my hdfs with the structure: hdfs://.../date/key that contains the filtered data. The problem is that writing to the hdfs takes a very very long time because there are so many keys.

The way it's written right now:

val inputData = sparkContext.textFile(""hdfs://...", 2)
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings
keys.map(key => {
    val filteredData = cleanedData.filter(line => line.contains(key))
    filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key")
})

Is there a way to make this faster? I've thought about repartitioning the data into the number of keys extracted but then I can't save in the format hdfs://.../date/key. I've also tried groupByKey but I can't save the values because they aren't RDDs.

Any help is appreciated :)

eliasah
  • 39,588
  • 11
  • 124
  • 154
akinos
  • 21
  • 1
  • 4
  • This question is a duplicate http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job – samthebest Jul 03 '14 at 08:12
  • I'm looking for a solution that uses **saveAsTextFile** instead of saveAsHadoopFile, and saves them into separate directories instead of just different files with different names. I've implemented the solution you've linked to. But specifically I wanted to know if there's a faster way to create many directories. – akinos Jul 04 '14 at 18:03
  • Also the solution linked to is still slow if I have 50,000 keys and I need to create 50,000 partitions to map each key to. – akinos Jul 04 '14 at 18:39
  • Hmm, you don't need 50,000 partitions in the spark sense (but yes in the dir sense). The solution might not really scale well in the number of keys due to opening so many file handles. I guess you could design it so that it closes and opens them up to some limit. 50,000 is a lot of directories, can't see any way of doing this without it just being a bit slow. – samthebest Jul 04 '14 at 18:55

3 Answers3

0
  def writeLines(iterator: Iterator[(String, String)]) = {
  val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map
  try {
  while (iterator.hasNext) {
    val item = iterator.next()
    val key = item._1
    val line = item._2
    val writer = writers.get(key) match {
      case Some(writer) => writer
      case None =>
        val path = arg(1) + key
        val outputStream = FileSystem.get(new Configuration()).create(new Path(path))
        writer = new BufferedWriter(outputStream)
    }
    writer.writeLine(line)
    } finally {
    writers.values.foreach(._close())
    }
}

val inputData = sc.textFile()    
val keyValue = inputData.map(line => (key, line))
val partitions = keyValue.partitionBy(new MyPartition(10))    
partitions.foreachPartition(writeLines)


class MyPartitioner(partitions: Int) extends Partitioner {
    override def numPartitions: Int = partitions

    override def getPartition(key: Any): Int = {
        // make sure lines with the same key in the same partition 
        (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions 
    }
}
fengyun
  • 434
  • 1
  • 5
  • 11
0

I think the approach should be similar to Write to multiple outputs by key Spark - one Spark job. The partition number has nothing to do with the directory number. To implement it, you may need to override generateFileNameForKeyValue with your customized version to save to different directory.

Regarding scalability, it is not an issue of spark, it is hdfs instead. But no matter how you implemented, as long as the requirements is not changed, it is unavoidable. But I think Hdfs is probably OK with 50,000 file handlers

Community
  • 1
  • 1
zhang zhan
  • 1,596
  • 13
  • 10
0

You are specifying just 2 partitions for the input, and 1 partition for the output. One effect of this is severely limiting the parallelism of these operations. Why are these needed?

Instead of computing 50,000 filtered RDDs, which is really slow too, how about just grouping by the key directly? I get that you want to output them into different directories but that is really causing the bottlenecks here. Is there perhaps another way to architect this that simply lets you read (key,value) results?

Sean Owen
  • 66,182
  • 23
  • 141
  • 173