2

Let us Assume, I have a key value pair in Spark, such as the following.

[ (Key1, Value1), (Key1, Value2), (Key1, Vaue3), (Key2, Value4), (Key2, Value5) ]

Now I want to reduce this, to something like this.

[ (Key1, [Value1, Value2, Value3]), (Key2, [Value4, Value5]) ]

That is, from Key-Value to Key-List of Values.

How can I do that using the map and reduce functions in python?

Rohan
  • 95
  • 2
  • 12

3 Answers3

1

This is what map and reduce are for! First just make all the values a length one list, then reduce by key. Here is a tested example:

from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local[2]").setAppName("Spark Count")
sc = SparkContext(conf=conf)

key_val = [ ('Key1', 'Value1'), ('Key1', 'Value2'), ('Key1', 'Value3'), ('Key2', 'Value4'), ('Key2', 'Value5') ]
kv = sc.parallelize(key_val)
kv_list = kv.map(lambda kv_tuple: (kv_tuple[0], [kv_tuple[1]]))
# Now reduce to concatinate lists
kv_desired = kv_list.reduceByKey(lambda a,b: a+b)
print(kv_desired.collect())

# Prints [('Key1', ['Value1', 'Value2', 'Value3']), ('Key2', ['Value4', 'Value5'])]
Nate M
  • 96
  • 2
0
>>> rdd = sc.parallelize([("a1","b1","c1","d1","e1"), ("a2","b2","c2","d2","e2")])

>>> result = rdd.map(lambda x: (x[0], list(x[1:])))

>>> print result.collect()
[('a1', ['b1', 'c1', 'd1', 'e1']), ('a2', ['b2', 'c2', 'd2', 'e2'])]

Explanation of lambda x: (x[0], list(x[1:])):

x[0] will make the first element to be the first element of the output

x[1:] will make all the elements except the first one to be in the second element

list(x[1:]) will force that to be a list because the default will be a tuple

Prabhanj
  • 262
  • 2
  • 3
  • 16
-1

Doing it with map and reduce is certainly possible, but would surely be an exercise in obfuscation. Doing it iteratively is easy:

lists={}       # key -> list of values
output=[]
for k,v in input:
  l=lists.get(k)
  if l is None:
    l=lists[k]=[]
    output.append((k,l))  # empty for now
  l.append(v)

Notes (since it's hard to get requirements just from a single example):

  1. This assumes that the keys are hashable.
  2. It supports lists like [(k1,v1),(k2,v2),(k1,v3)] where not all the k1 pairs are adjacent.
  3. It puts keys in the output list in order of first appearance.
  4. It puts all the values (including duplicates) in the order they appear for a key.
Davis Herring
  • 36,443
  • 4
  • 48
  • 76
  • 1
    This code is not good for large number of key,value pairs since you will be iterating over all of them. – Gambit1614 Sep 15 '17 at 22:18
  • I'm not an expert with `pyspark`. Am I supposed to assume a requirement that the algorithm be [online](https://en.wikipedia.org/wiki/Online_algorithm)? (In Python 2, `map` and `reduce` can't do this!) If so, can we assume that all the pairs with a single key are consecutive? – Davis Herring Sep 15 '17 at 22:22
  • 3
    I did not mean to be rude. What I meant to say was that when considering problems related to Spark, the datasets are often large, and when you execute the code you wrote above, it will require calling collect() so that Master Node will need to load everything into it's memory which is not feasible. The OP meant `map` and `reduce` in Pyspark, not the ones used in Python. – Gambit1614 Sep 15 '17 at 22:51