1

i have a huge XML File containing many unsorted rows:

<row>
  <field name="f1">group</field>
  <field name="f2">number</field>
  <field name="f3">number2</field>
</row>

and i want to use spark for extracting each group (f1) with distinct numbers into a seperate file.

for now i tried to extract all groups (~ 50) first, using map, distinct and collect, then iterating over this group-array. using the original RDD, i filter all rows matching the current group, saveToTextFile. - works, but i am pretty sure that this way is the least efficient one.

therefore i changed my xml importer to return (f1, (f2,f3)) and tried to:

rdd.groupByKey(numPartitions).mapPartitions(part => {
  part.map(data => {
    val group = data._1
    val numbers = data._2
    ...
}

this is how far i get, because since numbers now is Iterable[(String,String)], and not an RDD.

my idea was basicaly (pseudo code):

 rdd.groupByKey(numPartitions).map((group, numbers) => { 
    numbers.distinct.map(OutputFormatter(_)).
      saveAsTextFile(s"$target$group") 
 }

numPartitions here should match the size of workers, to have one group per worker (i`d also like to benchmark behavior here, but this is OT)

is it correct that numbers cannot be a RDD here, since groupByKey needs to collect data first? if not, what did i miss?

is there a common best-practice for this operation? i am new to spark/hadoop/etc

Thank you.

MomStopFlashing
  • 255
  • 1
  • 2
  • 7
  • No you don't want the `numPartitions` to be the same as the number of workers, you want around 2 - 4 partitions per CPU, but in general I tend to use many many more for stability and performance. Anyway I think as I understand it the question is a duplicate of: http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job – samthebest Sep 07 '14 at 07:54

1 Answers1

0

I can think of the following approach, I guess it's similar in spirit to your original solution with the filtering, except we de not explicitly collect to an array. Is this close to what you need?

def extractGroup(row: Row) = ???

def extractNumbers(row: Row) = ???

val keyed = rdd.keyBy(extractGroup _) 

keyed
  .map { case (group, row) => group }
  .distinct
  .foreach { group =>
     keyed
       .filter { _._1 == group } 
       .map { case (_, row) => extractNumbers(row) }
       .distinct
       .map(OutputFormatter(_))
       .saveAsTextFile(???)
   }
Svend
  • 6,352
  • 1
  • 25
  • 38