3

Let us Assume, I have a key value pair in Spark, such as the following.

[ (Key1, Value1), (Key1, Value2), (Key1, Vaue3), (Key2, Value4), (Key2, Value5) ]

Now I want to reduce this, to something like this.

[ (Key1, [Value1, Value2, Value3]), (Key2, [Value4, Value5]) ]

That is, from Key-Value to Key-List of Values.

How can I do that using the map and reduce functions in python or scala?

Vishnu Upadhyay
  • 5,043
  • 1
  • 13
  • 24
MetallicPriest
  • 29,191
  • 52
  • 200
  • 356
  • 1
    So you want a Spark solution in either Scala or Python or a plain Scala | Python solution? in other words is your starting collection an RDD ? – maasg Nov 06 '14 at 17:59
  • 1
    Possible duplicate of [Reduce a key-value pair into a key-list pair with Apache Spark](http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark) – Christian Strempfer Feb 08 '16 at 13:11
  • What a mess. Why is it tagged with both `scala` and `python`... – Andrey Tyukin Jul 06 '18 at 14:14

6 Answers6

4

collections.defaultdict can be the solution https://docs.python.org/2/library/collections.html#collections.defaultdict

>>> from collections import defaultdict
>>> d = defaultdict(list)
>>> for key, value in [('Key1', 'Value1'), ('Key1', 'Value2'), ('Key1', 'Vaue3'), ('Key2', 'Value4'), ('Key2', 'Value5') ]:
...     d[key].append(value)

>>> print d.items()
[('Key2', ['Value4', 'Value5']), ('Key1', [ 'Value1','Value2', 'Vaue3'])]
Vishnu Upadhyay
  • 5,043
  • 1
  • 13
  • 24
2
val data = Seq(("Key1", "Value1"), ("Key1", "Value2"), ("Key1", "Vaue3"), ("Key2", "Value4"), ("Key2", "Value5"))

data
  .groupBy(_._1)
  .mapValues(_.map(_._2))

res0: scala.collection.immutable.Map[String,Seq[String]] =
     Map(
        Key2 -> List(Value4, Value5), 
        Key1 -> List(Value1, Value2, Vaue3))
Sergii Lagutin
  • 10,561
  • 1
  • 34
  • 43
1

I'm sure there's a more readable way to do this, but the first thing that comes to mind is using itertools.groupby. Sort the list by the first element of the tuple (the key). Then use a list comprehension to iterate over the groups.

from itertools import groupby

l = [('key1', 1),('key1', 2),('key1', 3),('key2', 4),('key2', 5)]
l.sort(key = lambda i : i[0])

[(key, [i[1] for i in values]) for key, values in groupby(l, lambda i: i[0])]

Output

[('key1', [1, 2, 3]), ('key2', [4, 5])]
Cory Kramer
  • 114,268
  • 16
  • 167
  • 218
0

Something like this

newlist = dict()
for x in l: 
    if x[0] not in newlist: 
        dict[x[0]] = list()
    dict[x[0]].append(x[1])
Darth Kotik
  • 2,261
  • 1
  • 20
  • 29
0

The shortest, using the defaultdict, is the following; no requirements on being sorted.

>>> from collections import defaultdict                                                                                       
>>> collect = lambda tuplist: reduce(lambda acc, (k,v): acc[k].append(v) or acc,\
                                     tuplist, defaultdict(list))
>>> collect( [(1,0), (2,0), (1,2), (2,3)])
defaultdict(<type 'list'>, {1: [0, 2], 2: [0, 3]})
emvee
  • 4,371
  • 23
  • 23
  • While I can appreciate the functional style, using it just to call a function with side-effects looks a bit odd – loopbackbee Nov 06 '14 at 13:50
  • Main reason to write it like this is that the code does not rely on modification of a variable in the outer scope. I'm not sure which side effect you mean? – emvee Nov 06 '14 at 13:51
0

Another scala one, avoiding groupBy/mapValues (although that's the obvious Scala solution this one follows the python one given by Vishni since @MetallicPriest commented that was "much easier")

val data = Seq(("Key1", "Value1"), ("Key1", "Value2"), ("Key1", "Vaue3"),
               ("Key2", "Value4"), ("Key2", "Value5"))

val dict = Map[String, Seq[String]]() withDefaultValue(Nil)

data.foldLeft(dict){ case (d, (k,v)) => d updated (k, d(k) :+ v) }

// Map(Key1 -> List(Value1, Value2, Vaue3), Key2 -> List(Value4, Value5))

(Does an append of the key to give the exact results of the question. Prepend would be more efficient, though)

Mutable version, even closer to the Python one:

import scala.collection.mutable.{Map, Seq}
val dict = Map[String, Seq[String]]() withDefaultValue(Seq())

for ((k,v) <- data) dict(k) :+= v
dict
// Map(Key2 -> ArrayBuffer(Value4, Value5),
//     Key1 -> ArrayBuffer(Value1, Value2, Vaue3))
The Archetypal Paul
  • 41,321
  • 20
  • 104
  • 134