5

I have RDD, where each record is int:

[0,1,2,3,4,5,6,7,8]

All I need to do is split this RDD into batches. I.e. make another RDD where each element is fixed size list of elements:

[[0,1,2], [3,4,5], [6,7,8]]

This sounds trivial, however, I am puzzled last several days and cannot find anything except the following solution:

  1. Use ZipWithIndex to enumerate records in RDD:

    [0,1,2,3,4,5] -> [(0, 0),(1, 1),(2, 2),(3, 3),(4, 4),(5, 5)]

  2. Iterate over this RDD using map() and calculate index like index = int(index / batchSize)

    [1,2,3,4,5,6] -> [(0, 0),(0, 1),(0, 2),(1, 3),(1, 4),(1, 5)]

  3. Then group by generated index.

    [(0, [0,1,2]), (1, [3,4,5])]

This will get me what I need, however, I do not want to use group by here. It is trivial when you are using plain Map Reduce or some abstraction like Apache Crunch. But is there a way to produce similar result in Spark without using heavy group by?

Dmitry
  • 221
  • 3
  • 6
  • You can a) apply multiple filters; b) use a custom partitioner and create RDDs from each partition. Although I can't imagine why you need fixed size RDDs. – khachik Nov 13 '17 at 14:35
  • @khachik Can you please elaborate on 'apply multiple filters' and 'implement custom partitioner'? I do not need fixed size RDD. I need each record in RDD to be an array of records (batch). This is required because I have math model that consumes not single record but batch of records and return batch of predictions. – Dmitry Nov 13 '17 at 14:47

2 Answers2

0

You did not clearly explained why you need fixed-size RDDs, depending on what you are trying to accomplish there could be better solution, but to answer the question as it has been asked, I see the following options:
1) implement filters based on the number of items and batch sizes. For example, if you have 1000 items in the original RDD and want to split them into 10 batches, you will end up applying 10 filters, the first one checks if index is [0, 99], the second one [100, 199] and so one. After apply each filter, you will have one RDD. Important to note that the original RDD might be cached to prior to filtering. Pros: each resulting RDD can be processed separately and does not have to be fully allocated on one node. Cons: this approach becomes slower with the number of batches.
2) Logically similar to this, but instead of filter, you just implement a custom partitioner that returns a partition id based on the index (key) as described here: Custom partitioner for equally sized partitions. Pros: faster than filters. Cons: each partition has to be fit into one node.
3) If the order in the original RDD is not important and just need it to be roughly equally chunked, you can coalesce/repartition in, explained here https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html

khachik
  • 28,112
  • 9
  • 59
  • 94
  • 1
    Thanks a lot for the detailed explanation. Let me provide you more background. I have some machine learning model that takes up to 1K records as input, do some magic and return same number of records. All that I need to do is to 'score' all records in my RDD using this model. So basically this is why I need to split original RDD into chunks. Each chunk should contain no more than 1000 records. Is it something that does not fit into Spark paradigm? I have been working mostly with Crunch / MapReduce and there are no such issues there. Thanks in advance – Dmitry Nov 13 '17 at 18:21
0

Maybe you can use aggregateByKey, it's much faster and more lightweight than groupByKey in this case. I tried to split 5 hundred million data into 256 size batches on 10 executors, it only takes half an hour to finish it.

data = data.zipWithIndex().map(lambda x: (x[1] / 256, x[0]))
data = data.aggregateByKey(list(), lambda x, y: x + [y], add)

For more info, see Spark difference between reduceByKey vs groupByKey vs aggregateByKey vs combineByKey