2

I am trying to have the output of my reduceByKey function, using pyspark, to be the range of the integers passed through with respect to the key.

I try to make a custom function:

def _range(x,y):
    return [max(x,y), min(x,y)]


data2 = data_.map(lambda x: (x[u'driverId'] + ',' + x[u'afh'], int(x['timestamp'])))
        .reduceByKey(lambda x,y: _range(x,y))

of course the output comes out as lists within lists within lists

i know a solution would be

.reduceByKey(max)

followed by

.reduceByKey(min)

^^^^and then combining them, but i do NOT want to perform two operations

but i would like to do this in one pass so the application is not inefficient. I would also like to avoid first populating a list of integers. any ideas? the data is in an RDD. thanks

convolutionBoy
  • 811
  • 5
  • 18

1 Answers1

1

A correct approach here is combineByKey defined as follows:

def seq_op(acc, x):
    return (min(x, acc[0]), max(x, acc[1]))

def comb_op(acc1, acc2):
    return (min(acc1[0], acc2[0]), max(acc1[1], acc2[1]))

(pairs
    .aggregateByKey((sys.float_info.max, sys.float_info.min), seq_op, comb_op)
     .mapValues(lambda minmax: abs(minmax[0] - minmax[1])))

where pairs is a result of:

pairs =  data_.map(
   lambda x: (x[u'driverId'] + ',' + x[u'afh'], int(x['timestamp']
)

Since key is generated dynamically you cannot avoid initial map which because key should be known up-front for any *byKey operation. Values type casting could be performed inside combineByKey but fundamentally it wouldn't affect a number of times data has to be accessed.

zero323
  • 322,348
  • 103
  • 959
  • 935