0

Trips

id,timestamp
1008,2003-11-03 15:00:31
1008,2003-11-03 15:02:38
1008,2003-11-03 15:03:04
1008,2003-11-03 15:18:00
1009,2003-11-03 22:00:00
1009,2003-11-03 22:02:53
1009,2003-11-03 22:03:44 
1009,2003-11-14 10:00:00
1009,2003-11-14 10:02:02
1009,2003-11-14 10:03:10

prompts

id,timestamp ,mode
1008,2003-11-03 15:18:49,car 
1009,2003-11-03 22:04:20,metro
1009,2003-11-14 10:04:20,bike 

Read csv file:

coordinates = pd.read_csv('coordinates.csv')
mode = pd.read_csv('prompts.csv')

I have to assign each mode at the end of the trip

Results:

id, timestamp, mode
1008, 2003-11-03 15:00:31, null
1008, 2003-11-03 15:02:38, null
1008, 2003-11-03 15:03:04, null
1008, 2003-11-03 15:18:00, car
1009, 2003-11-03 22:00:00, null
1009, 2003-11-03 22:02:53, null
1009, 2003-11-03 22:03:44, metro
1009, 2003-11-14 10:00:00, null
1009, 2003-11-14 10:02:02, null
1009, 2003-11-14 10:03:10, bike 

Note

I use a large dataset for trips (4GB) and a small dataset for modes (500MB)

pault
  • 41,343
  • 15
  • 107
  • 149
adil blanco
  • 335
  • 1
  • 4
  • 14
  • Is it required to be at the end of the trip only? Otherwise you can just use `coordinates.merge(mode, on='id')`. That will fill in the mode of transportation for all rows with the specified id. – tobsecret Mar 20 '18 at 21:34
  • @tobsecret I want to assign only the mode at the end of the trajectories and not at all. When I did merge my program take several time (5 hours) – adil blanco Mar 20 '18 at 21:39
  • Why is this tagged pyspark? – pault Mar 20 '18 at 22:23
  • @pault I use Pyspark and Jupyter. notebook – adil blanco Mar 20 '18 at 22:45
  • @adilblanco but all of this code is pandas. Are you asking how to do this in spark? – pault Mar 20 '18 at 22:46
  • @pault Normally I use Pyspark but since I am new in Pyspark, I looked for a solution with Pandas and then I converted it into spark, but if you have a solution with Pyspark it will be perfect. – adil blanco Mar 20 '18 at 23:00
  • Do you want to `update` the trips table, or do you just want a result that looks like that? What if the timestamp in prompts matches the _first_ one in trips rather than the last one? –  Mar 21 '18 at 19:33
  • No, I really want it to be the last one, because I want to detect the end of the trip. Because the user enters the mode of transport just after the end of his trip. I don't have problem if I update Trip table. – adil blanco Mar 21 '18 at 20:32

2 Answers2

2

Based on your updated example, you can denote a trip by finding the first prompt timestamp that is greater than the trip timestamp. All rows with the same prompt timestamp will then correspond to the same trip. Then you want to set the mode for the greatest of the trip timestamps for each group.

One way to do this is by using 2 pyspark.sql.Windows.

Suppose you start with the following two PySpark DataFrames, trips and prompts:

trips.show(truncate=False)
#+----+-------------------+
#|id  |timestamp          |
#+----+-------------------+
#|1008|2003-11-03 15:00:31|
#|1008|2003-11-03 15:02:38|
#|1008|2003-11-03 15:03:04|
#|1008|2003-11-03 15:18:00|
#|1009|2003-11-03 22:00:00|
#|1009|2003-11-03 22:02:53|
#|1009|2003-11-03 22:03:44|
#|1009|2003-11-14 10:00:00|
#|1009|2003-11-14 10:02:02|
#|1009|2003-11-14 10:03:10|
#|1009|2003-11-15 10:00:00|
#+----+-------------------+

prompts.show(truncate=False)
#+----+-------------------+-----+
#|id  |timestamp          |mode |
#+----+-------------------+-----+
#|1008|2003-11-03 15:18:49|car  |
#|1009|2003-11-03 22:04:20|metro|
#|1009|2003-11-14 10:04:20|bike |
#+----+-------------------+-----+

Join these two tables together using the id column with the condition that the prompt timestamp is greater than or equal to the trip timestamp. For some trip timestamps, this will result in multiple prompt timestamps. We can eliminate this by selecting the minimum prompt timestamp for each ('id', 'trip.timestamp') group- I call this temporary column indicator, and I used the Window w1 to compute it.

Next do a window over ('id', 'indicator') and find the maximum trip timestamp for each group. Set this value equal to the mode. All other rows will be set to pyspark.sql.functions.lit(None).

Finally you can compute all of the entries in trips where the trip timestamp was greater than the max prompt timestamp. These would be trips that did not match to a prompt. Union the matched and the unmatched together.

import pyspark.sql.functions as f
from pyspark.sql import Window

w1 = Window.partitionBy('id', 'trips.timestamp')
w2 = Window.partitionBy('id', 'indicator')

matched = trips.alias('trips').join(prompts.alias('prompts'), on='id', how='left')\
    .where('prompts.timestamp >= trips.timestamp' )\
    .select(
        'id',
        'trips.timestamp',
        'mode',
        f.when(
            f.col('prompts.timestamp') == f.min('prompts.timestamp').over(w1),
            f.col('prompts.timestamp'),
        ).otherwise(f.lit(None)).alias('indicator')
    )\
    .where(~f.isnull('indicator'))\
    .select(
        'id',
        f.col('trips.timestamp').alias('timestamp'),
        f.when(
            f.col('trips.timestamp') == f.max(f.col('trips.timestamp')).over(w2),
            f.col('mode')
        ).otherwise(f.lit(None)).alias('mode')
    )

unmatched = trips.alias('t').join(prompts.alias('p'), on='id', how='left')\
    .withColumn('max_prompt_time', f.max('p.timestamp').over(Window.partitionBy('id')))\
    .where('t.timestamp > max_prompt_time')\
    .select('id', 't.timestamp', f.lit(None).alias('mode'))\
    .distinct()

Output:

matched.union(unmatched).sort('id', 'timestamp').show()

+----+-------------------+-----+
|  id|          timestamp| mode|
+----+-------------------+-----+
|1008|2003-11-03 15:00:31| null|
|1008|2003-11-03 15:02:38| null|
|1008|2003-11-03 15:03:04| null|
|1008|2003-11-03 15:18:00|  car|
|1009|2003-11-03 22:00:00| null|
|1009|2003-11-03 22:02:53| null|
|1009|2003-11-03 22:03:44|metro|
|1009|2003-11-14 10:00:00| null|
|1009|2003-11-14 10:02:02| null|
|1009|2003-11-14 10:03:10| bike|
|1009|2003-11-15 10:00:00| null|
+----+-------------------+-----+
pault
  • 41,343
  • 15
  • 107
  • 149
  • Thank you for your answer, it helps me a lot but I noticed that it does not work for users with several prompts. I will update the data for a user with a second prompt. – adil blanco Mar 22 '18 at 16:42
  • I added 3 last lines in Trips and the last line in Prompts – adil blanco Mar 22 '18 at 16:48
  • Works fine, Thank you. – adil blanco Mar 22 '18 at 18:56
  • In the case where the user does not give prompt. for example if we eliminate the last line of prompts. The result will be falsified. I'm going to have 7 lines instead of 10 lines – adil blanco Mar 22 '18 at 19:10
  • I tried with all joins but still 7 lines instead of 10 lines. For the vote it's done with another account, because this account that I use I have no reputation to do it. Thank you again for help. – adil blanco Mar 22 '18 at 19:33
  • @adilblanco not sure if it's the best solution, but one option is to union back the original trips DataFrame for the unmatched. See the update. – pault Mar 22 '18 at 19:52
0

This would be a naive solution which assumes that your coordinates DataFrame already is sorted by timestamp, that ids are unique and that your data set fits into memory. If the latter is not the case, I recommend using dask and partition your DataFrames by id.

Imports:

import pandas as pd
import numpy as np

First we join the two DataFrames. This will fill the whole mode column for each id. We join on the index because that will speed up the operation, see also "Improve Pandas Merge performance".

mode = mode.set_index('id')
coordinates = coordinates.set_index('id')
merged = coordinates.join(mode, how='left')

We need the index to be unique values in order for our groupby operation to work.

merged = merged.reset_index()

Then we apply a function that will replace all but the last row in the mode column for each id.

def clean_mode_col(df):
    cleaned_mode_col = df['mode'].copy()
    cleaned_mode_col.iloc[:-1] = np.nan
    df['mode'] = cleaned_mode_col
    return df
merged  = merged.groupby('id').apply(clean_mode_col)

As mentioned above, you can use dask to parallelize the execution of the merge code like this:

import dask.dataframe as dd
dd_coordinates = dd.from_pandas(coordinates).set_index('id')
dd_mode = dd.from_pandas(mode).set_index('id')
merged = dd.merge(dd_coordinates, dd_mode, left_index=True, right_index=True)
merged = merged.compute() #returns pandas DataFrame

The set_index operations are slow but make the merge way faster.

I did not test this code. Please provide copy-pasteable code that includes your DataFrames so that I don't have to copy and paste all those files you have in your description (hint: use pd.DataFrame.to_dict to export your DataFrame as a dictionary and copy and paste that into your code).

tobsecret
  • 2,442
  • 15
  • 26
  • Thanks for your answer, my problem is in merge. In my case the merge takes a lot of time to execute (avg: 5hours) Then I want to optimize it. I tried your code works perfectly with a small dataset. – adil blanco Mar 22 '18 at 01:41
  • Yeah, that's likely because of your big tables. What you can try is to make the id column the index (coordinates = coordinates.set_index('id'), mode = mode.set_index('id)) in both DataFrames and then merge like this: merged = coordinates.merge(mode, right_index=True, left_index=True) Again, I encourage you to use dask for this, as it let's you parallelize this process and it's basically the same thing. – tobsecret Mar 22 '18 at 15:40
  • Thanks but I can't treat users with several prompts right!! – adil blanco Mar 22 '18 at 16:53
  • Please see the updated answer - joining on the index should drastically speed up the process. As for the Dask part - it's only intended for the merge part but as long as your data fits into memory, you probably want to do the groupby apply pandas as before with pandas. – tobsecret Mar 22 '18 at 17:39