I have a PySpark dataframe containing Lat/Lon points for different trajectories identified by a column "trajectories_id".
trajectory_id | latitude | longitude |
---|---|---|
1 | 45 | 5 |
1 | 45 | 6 |
1 | 45 | 7 |
2 | 46 | 5 |
2 | 46 | 6 |
2 | 46 | 7 |
What I want to do is to extract for each trajectory_id a LineString and store it in another dataframe, where each row represents a trajectory with "id" and "geometry" columns. In this example, the output should be:
trajectory_id | geometry |
---|---|
1 | LINESTRING (5 45, 6 45, 7 45) |
2 | LINESTRING (5 46, 6 46, 7 46) |
This is similar to what has been asked in this question, but in my case I need to use PySpark.
I have tried the following:
import pandas as pd
from shapely.geometry import Point,LineString
df = pd.DataFrame([[1, 45,5], [1, 45,6], [1, 45,7],[2, 46,5], [2, 46,6], [2, 46,7]], columns=['trajectory_id', 'latitude','longitude'])
df1 = spark.createDataFrame(df)
idx_ = df1.select("trajectory_id").rdd.flatMap(lambda x: x).distinct().collect()
geo_df = pd.DataFrame(index=range(len(idx_)),columns=['geometry','trajectory_id'])
k=0
for i in idx_:
df2=df1.filter(F.col("trajectory_id").isin(i)).toPandas()
df2['points']=df2[["longitude", "latitude"]].apply(Point, axis=1)
geo_df.geometry.iloc[k]=str(LineString(df2['points']))
geo_df['trajectory_id'].iloc[k]=i
k=k+1
This code works, but as in my task I am working with many more trajectories (> 2milions), this takes forever as I am converting to Pandas in each iteration. Is there a way I can obtain the same output in a more efficient way? As mentioned, I know that using toPandas() (and/or collect() ) is something I should avoid, especially inside a for loop