1

I have a dataframe as follow:

from pyspark.sql import SparkSession

sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
data = [(1,2,0.1,0.3),(1,2,0.1,0.3),(1,3,0.1,0.3),(1,3,0.1,0.3),
        (11, 12, 0.1, 0.3),(11,12,0.1,0.3),(11,13,0.1,0.3),(11,13,0.1,0.3)]

trajectory_df = sqlContext.createDataFrame(data, schema=['grid_id','rider_id','lng','lat'])
trajectory_df.show()

+-------+--------+---+---+
|grid_id|rider_id|lng|lat|
+-------+--------+---+---+
|      1|       2|0.1|0.3|
|      1|       2|0.1|0.3|
|      1|       3|0.1|0.3|
|      1|       3|0.1|0.3|
|     11|      12|0.1|0.3|
|     11|      12|0.1|0.3|
|     11|      13|0.1|0.3|
|     11|      13|0.1|0.3|
+-------+--------+---+---+

I want to merge data from the same grid into dict. Where rider_id is the key of dict and latitude and longitude is the value of dict.

The results I expect are as follows:

[(1, {3:[[0.1, 0.3], [0.1, 0.3]],2:[[0.1, 0.3], [0.1, 0.3]]}),
 (11,{13:[[0.1, 0.3], [0.1, 0.3]],12:[[0.1, 0.3], [0.1, 0.3]]})]

I can use groupByKey() to group grid_id.

def trans_point(row):
    return ((row.grid_id, row.rider_id), [row.lng, row.lat])
trajectory_df = trajectory_df.rdd.map(trans_point).groupByKey().mapValues(list)
print(trajectory_df.take(10))

[((1, 3), [[0.1, 0.3], [0.1, 0.3]]), ((11, 13), [[0.1, 0.3], [0.1, 0.3]]), ((1, 2), [[0.1, 0.3], [0.1, 0.3]]), ((11, 12), [[0.1, 0.3], [0.1, 0.3]])]

But I can't get the result when I combine multiple dict:

trajectory_df = trajectory_df.map(lambda x:(x[0][0],{x[0][1]:x[1]})).reduceByKey(lambda x,y:x.update(y))
print(trajectory_df.take(10))
[(1, None), (11, None)]

I hope it's done under the RDD type for some reasons. How can I achieve it? Thanks in advance.

giser_yugang
  • 6,058
  • 4
  • 21
  • 44
  • 1
    `dict.update` works in place and returns `None`. [How to combine dictionaries in a single statement](https://stackoverflow.com/questions/38987/how-to-merge-two-dictionaries-in-a-single-expression) – pault Jun 19 '19 at 10:50
  • @pault Thank you very much. Can you post as answer and I will accept it? – giser_yugang Jun 19 '19 at 12:10

1 Answers1

2

dict.update works in place and returns None. From the docs:

Update the dictionary with the key/value pairs from other, overwriting existing keys. Return None.

You need to write your own reduce function to combine the dictionaries. We can borrow from @Aaron Hall's answer on How to merge two dictionaries in a single expression?

def merge_two_dicts(x, y):
    """From https://stackoverflow.com/a/26853961/5858851"""
    z = x.copy()   # start with x's keys and values
    z.update(y)    # modifies z with y's keys and values & returns None
    return z

trajectory_df = trajectory_df.map(lambda x:(x[0][0],{x[0][1]:x[1]}))\
    .reduceByKey(merge_two_dicts)

print(trajectory_df.collect())
#[(1, {2: [[0.1, 0.3], [0.1, 0.3]], 3: [[0.1, 0.3], [0.1, 0.3]]}),
# (11, {12: [[0.1, 0.3], [0.1, 0.3]], 13: [[0.1, 0.3], [0.1, 0.3]]})]
pault
  • 41,343
  • 15
  • 107
  • 149