0

I've got a spark method where I'm running a flatMap function which is returning me a list of tuples. The key value in the tuple is a Timestamp, and the value is a dict.

[(Timestamp('2000-01-01 00:00:00'),
  {'id': '1', 'val': '200M', 'date':Timestamp('2000-01-01 00:00:00')}),
 (Timestamp('2000-01-01 00:00:00'),
  {'id': '2', 'val': '10M', 'date':Timestamp('2000-01-01 00:00:00')}),
 (Timestamp('2000-01-01 00:00:00'),
  {'id': '3', 'val': '30M', 'date':Timestamp('2000-01-01 00:00:00')}),
 (Timestamp('2000-01-02 00:00:00'),
  {'id': '15', 'val': '120M', 'date':Timestamp('2000-01-02 00:00:00')}),
 (Timestamp('2000-01-02 00:00:00'),
  {'id': '3', 'val': '35M', 'date':Timestamp('2000-01-02 00:00:00')}),
 (Timestamp('2000-01-02 00:00:00'),
  {'id': '4', 'val': '56M', 'date':Timestamp('2000-01-02 00:00:00')}),
 (Timestamp('2000-01-03 00:00:00'),
  {'id': '6', 'val': '5M', 'date':Timestamp('2000-01-03 00:00:00')}),
 (Timestamp('2000-01-03 00:00:00'),
  {'id': '1', 'val': '25M', 'date':Timestamp('2000-01-03 00:00:00')}),
 (Timestamp('2000-01-03 00:00:00'),
  {'id': '2', 'val': '7M', 'date':Timestamp('2000-01-03 00:00:00')}),

I'm trying to run a reduceByKey function next which gives me:

[ (Timestamp('2000-01-01 00:00:00'),
  [{'id': '1', 'val': '200M', 'date':Timestamp('2000-01-01 00:00:00')},
   {'id': '2', 'val': '10M', 'date':Timestamp('2000-01-01 00:00:00')},
   {'id': '3', 'val': '30M', 'date':Timestamp('2000-01-01 00:00:00')}]),
  (Timestamp('2000-01-02 00:00:00'),
  [{'id': '15', 'val': '120M', 'date':Timestamp('2000-01-02 00:00:00')},
   {'id': '3', 'val': '35M', 'date':Timestamp('2000-01-02 00:00:00')},
   {'id': '4', 'val': '56M', 'date':Timestamp('2000-01-02 00:00:00')}]),
  (Timestamp('2000-01-03 00:00:00'),
  [{'id': '6', 'val': '5M', 'date':Timestamp('2000-01-03 00:00:00')},
   {'id': '1', 'val': '25M', 'date':Timestamp('2000-01-03 00:00:00')},
   {'id': '2', 'val': '7M', 'date':Timestamp('2000-01-03 00:00:00')}]) ]

So far I've tried this: output = rdd.flatMap(split_func).reduceByKey(lambda x, y: x+y).collect()

but I get this error: TypeError: unsupported operand type(s) for +: 'dict' and 'dict'

Thanks in advance!

nrs90
  • 168
  • 1
  • 3
  • 19

1 Answers1

0

This is more a python error. If d1 and d2 are dictionaries then d1 + d2 does not work. However, you can do {**d1, **d2}. If d1 and d2 have the same key it will take the value from d2.

So you could do output = rdd.flatMap(split_func).reduceByKey(lambda x, y: {**x, **y}).collect()

However, you result is a list of tuples. So in this case I think groupByKey is better: output = rdd.flatMap(split_func).groupByKey().mapValues(list).collect()

mattyx17
  • 806
  • 6
  • 11
  • that doesn't actually seem to work - it seems to just give me the first `dict` of each key: `[(Timestamp('2000-01-03 00:00:00'), {'date': Timestamp('2000-01-03 00:00:00'), 'id': '2', 'val': '7M'}), (Timestamp('2000-01-02 00:00:00'), {'date': Timestamp('2000-01-02 00:00:00'), 'id': '15', 'val': '120M'}), (Timestamp('2000-01-01 00:00:00'), {'date': Timestamp('2000-01-01 00:00:00'), 'id': '1', 'val': '200M'})]` – nrs90 Apr 16 '20 at 22:39
  • ah ok. This is because they are tuples. In that case your best bet would be groupByKey: https://backtobazics.com/big-data/spark/apache-spark-groupbykey-example/ – mattyx17 Apr 16 '20 at 23:04
  • I wanted to avoid using `groupByKey` since it returns an iterable.. I'd rather have a large list of dicts.. (is it possible?) – nrs90 Apr 17 '20 at 10:20
  • I have updated my answer. You can change the iterable to a list by adding a `mapValues(list)` before the collect. But I really think your only option is groupByKey unless you want to write a massive UDF https://stackoverflow.com/questions/37580303/using-reducebykey-to-group-list-of-values – mattyx17 Apr 17 '20 at 12:57