3

I have 1TB of records structured in a pair rdd and I want to group by key all my records and then apply a function only to the values.

My code is the following:

rdd = sc.textFile("path").map(lambdal:l.split(";"))
rdd_pair=rdd.map(lambda a: (a[0], a))
rdd_pair.take(3)
#output: [('id_client', ('id_client','time','city')]
#[('1', [('1', '2013/03/12 23:59:59', 'London')]
#[('1', [('1', '2013/12/03 10:43:12', 'Rome')]
#[('1', [('1', '2013/05/01 00:09:59', 'Madrid')]

I want to group all records by id_client and then apply the function matrix only to the values. For each key the function sort the list of tuples by "time" and then the function extract the transitions from a city to another.

grouped=rdd_pair.groupByKey(200)
grouped.take(1)
#output [("1",<pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210)]

def matrix(input):
    output=[]
    input_bag= sorted(input, key=lambda x: x[1], reverse=False)
    loc0 = input_bag[0]
    for loc in input_bag[1:]:
        output.append((loc0[2],loc[2]))
        loc0 = loc
    return output

transition=grouped.mapValues(lambda k: matrix(k)).filter(lambda l: l[1]!=[])

The output that I want is:

#output transition: [('1', [('London', 'Madrid'),('Madrid', 'Rome')])]

I got a Python error: list index out of range error

Someone could help me? Thanks

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
JulieP
  • 53
  • 7
  • Can you add some sample input, the desired output, and a clearer description of what it is you are trying to do? What is the function `matrix()` supposed to return? Please read this post on [how to make good reproducible apache spark dataframe examples](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples) and try to provide a [mcve]. – pault Mar 22 '18 at 13:45
  • I think data is malformed and some attributes are missing. Only line which can cause this error is `loc0 = input_bag[0]` all else (loc0, loc, x) are tuples. – pauli Mar 23 '18 at 09:15

1 Answers1

1

I resolved in this way:

def matrix(input):
    output=[]
    input2=[i[0] for i in input]
    input_bag= sorted(input2, key=lambda x: x[1], reverse=False)
    loc0 = input_bag[0]
    for loc in input_bag[1:]:
        output.append((loc0[2],loc[2]))
        loc0 = loc
    return output

Before using the Python in-bulit function "sorted" I transform input (an iterable object) in input2 (a list of tuple)

JulieP
  • 53
  • 7