3

TL;DR: I have a large file that I iterate over three times to get three different sets of counts out. Is there a way to get three maps out in one pass over the data?

Some more detail:

I'm trying to compute PMI between words and features that are listed in a large file. My pipeline looks something like this:

val wordFeatureCounts = sc.textFile(inputFile).flatMap(line => {
  val word = getWordFromLine(line)
  val features = getFeaturesFromLine(line)
  for (feature <- features) yield ((word, feature), 1)
})

And then I repeat this to get word counts and feature counts separately:

val wordCounts = sc.textFile(inputFile).flatMap(line => {
  val word = getWordFromLine(line)
  val features = getFeaturesFromLine(line)
  for (feature <- features) yield (word, 1)
})

val featureCounts = sc.textFile(inputFile).flatMap(line => {
  val word = getWordFromLine(line)
  val features = getFeaturesFromLine(line)
  for (feature <- features) yield (feature, 1)
})

(I realize I could just iterate over wordFeatureCounts to get the wordCounts and featureCounts, but that doesn't answer my question, and looking at running times in practice I'm not sure it's actually faster to do it that way. Also note that there are some reduceByKey operations and other stuff that I do with this after the counts are computed that aren't shown, as they aren't relevant to the question.)

What I would really like to do is something like this:

val (wordFeatureCounts, wordCounts, featureCounts) = sc.textFile(inputFile).flatMap(line => {
  val word = getWordFromLine(line)
  val features = getFeaturesFromLine(line)
  val wfCounts = for (feature <- features) yield ((word, feature), 1)
  val wCounts = for (feature <- features) yield (word, 1)
  val fCounts = for (feature <- features) yield (feature, 1)
  ??.setOutput1(wfCounts)
  ??.setOutput2(wCounts)
  ??.setOutput3(fCounts)
})

Is there any way to do this with spark? In looking for how to do this, I've seen questions about multiple outputs when you're saving the results to disk (not helpful), and I've seen a bit about accumulators (which don't look like what I need), but that's it.

Also note that I can't just yield all of these results in one big list, because I need three separate maps out. If there's an efficient way to split a combined RDD after the fact, that could work, but the only way I can think of to do this would end up iterating over the data four times, instead of the three I currently do (once to create the combined map, then three times to filter it into the maps I actually want).

mattg
  • 1,731
  • 1
  • 12
  • 20
  • Take a look at my post here: http://stackoverflow.com/questions/27231524/scala-spark-split-collection-into-several-rdd/32817565#32817565. This is about filtering, but I believe the same idea can also be applied to maps. – Jason Scott Lenderman Feb 02 '16 at 20:56
  • That looks like a nice approach when the filtered RDDs fit into memory. What if they are big enough that they don't? Will it still work? – mattg Feb 02 '16 at 21:14
  • It is not possible to generate multiple RDDs in a single transformation. Approach provided by @jasonl should work just fine as long as the output from a single partition fits into memory. Personally I would go with multiple maps which is much cleaner and doesn't create large objects but it is mostly a matter of preference (see discussion). See also http://stackoverflow.com/a/32971246/1560062 – zero323 Feb 02 '16 at 23:40
  • It really depends on what you eventually what you want to do with the resulting `RDDs` and also the nature of the maps (e.g. do the maps share a lot of computation in common?) Calling the `map` method on the `RDD` multiple times is an obvious solution which probably works fine for the majority of applications, but I've also encountered situations (e.g. an implementation variational EM for LDA) where the less obvious approach that I indicated on that other thread resulted in fairly significant computational savings. – Jason Scott Lenderman Feb 03 '16 at 02:49
  • 1
    As seen in the code above, the maps do almost exactly the same thing, just output slightly different objects. I'm guessing the biggest bottleneck in the computation is file IO from iterating over a very large file three times. The output from each partition should easily fit in memory, so I think I'll try a variation on your filter approach. Thanks for suggestion. If you want to put it as an answer, I'll accept it. – mattg Feb 03 '16 at 16:59

3 Answers3

2

It is not possible to split an RDD into multiple RDDs. This is understandable if you think about how this would work under the hood. Say you split RDD x = sc.textFile("x") into a = x.filter(_.head == 'A') and b = x.filter(_.head == 'B'). Nothing happens so far, because RDDs are lazy. But now you print a.count. So Spark opens the file, and iterates through the lines. If the line starts with A it counts it. But what do we do with lines starting with B? Will there be a call to b.count in the future? Or maybe it will be b.saveAsTextFile("b") and we should be writing these lines out somewhere? We cannot know at this point. Splitting an RDD is just not possible with the Spark API.

But nothing stops you from implementing something if you know what you want. If you want to get both a.count and b.count you can map lines starting with A into (1, 0) and lines with B into (0, 1) and then sum up the tuples elementwise in a reduce. If you want to save lines with B into a file while counting lines with A, you could use an aggregator in a map before filter(_.head == 'B').saveAsTextFile.

The only generic solution is to store the intermediate data somewhere. One option is to just cache the input (x.cache). Another is to write the contents into separate directories in a single pass, then read them back as separate RDDs. (See Write to multiple outputs by key Spark - one Spark job.) We do this in production and it works great.

Community
  • 1
  • 1
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
1

This is one of the major disadvantages of Spark over traditional map-reduce programming. An RDD/DF/DS can be transformed into another RDD/DF/DS but you cannot map an RDD into multiple outputs. To avoid recomputation you need to cache the results into some intermediate RDD and then run multiple map operations to generate multiple outputs. The caching solution will work if you are dealing with reasonable size data. But if the data is large compared to the memory available the intermediate outputs will be spilled to disk and the advantage of caching will not be that great. Check out the discussion here - https://issues.apache.org/jira/browse/SPARK-1476. This is an old Jira but relevant. Checkout out the comment by Mridul Muralidharan.

Spark needs to provide a solution where a map operation can produce multiple outputs without the need to cache. It may not be elegant from the functional programming perspective but I would argue, it would be a good compromise to achieve better performance.

Shirish Kumar
  • 1,532
  • 17
  • 23
1

I was also quite disappointed to see that this is a hard limitation of Spark over classic MapReduce. I ended up working around it by using multiple successive maps in which I filter out the data I need.

Here's a schematic toy example that performs different calculations on the numbers 0 to 49 and writes both to different output files.

from functools import partial
import os
from pyspark import SparkContext

# Generate mock data
def generate_data():
    for i in range(50):
        yield 'output_square', i * i
        yield 'output_cube', i * i * i

# Map function to siphon data to a specific output
def save_partition_to_output(part_index, part, filter_key, output_dir):
    # Initialise output file handle lazily to avoid creating empty output files
    file = None
    
    try:
        for key, data in part:
            if key != filter_key:
                # Pass through non-matching rows and skip
                yield key, data
                continue

            if file is None:
                file = open(os.path.join(output_dir, '{}-part{:05d}.txt'.format(filter_key, part_index)), 'w')

            # Consume data
            file.write(str(data) + '\n')

        yield from []

    finally:
        if file is not None:
            file.close()

def main():
    sc = SparkContext()
    rdd = sc.parallelize(generate_data())

    # Repartition to number of outputs
    # (not strictly required, but reduces number of output files).
    #
    # To split partitions further, use repartition() instead or
    # partition by another key (not the output name).
    rdd = rdd.partitionBy(numPartitions=2)

    # Map and filter to first output.
    rdd = rdd.mapPartitionsWithIndex(partial(save_partition_to_output, filter_key='output_square', output_dir='.'))

    # Map and filter to second output.
    rdd = rdd.mapPartitionsWithIndex(partial(save_partition_to_output, filter_key='output_cube', output_dir='.'))

    # Trigger execution.
    rdd.count()

if __name__ == '__main__':
    main()

This will create two output files output_square-part00000.txt and output_cube-part00000.txt with the desired output splits.

Janek Bevendorff
  • 578
  • 1
  • 5
  • 16