I have 1TB of records structured in a pair rdd and I want to group by key all my records and then apply a function only to the values.
My code is the following:
rdd = sc.textFile("path").map(lambdal:l.split(";"))
rdd_pair=rdd.map(lambda a: (a[0], a))
rdd_pair.take(3)
#output: [('id_client', ('id_client','time','city')]
#[('1', [('1', '2013/03/12 23:59:59', 'London')]
#[('1', [('1', '2013/12/03 10:43:12', 'Rome')]
#[('1', [('1', '2013/05/01 00:09:59', 'Madrid')]
I want to group all records by id_client and then apply the function matrix only to the values. For each key the function sort the list of tuples by "time" and then the function extract the transitions from a city to another.
grouped=rdd_pair.groupByKey(200)
grouped.take(1)
#output [("1",<pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210)]
def matrix(input):
output=[]
input_bag= sorted(input, key=lambda x: x[1], reverse=False)
loc0 = input_bag[0]
for loc in input_bag[1:]:
output.append((loc0[2],loc[2]))
loc0 = loc
return output
transition=grouped.mapValues(lambda k: matrix(k)).filter(lambda l: l[1]!=[])
The output that I want is:
#output transition: [('1', [('London', 'Madrid'),('Madrid', 'Rome')])]
I got a Python error: list index out of range error
Someone could help me? Thanks