1

I have a PySpark dataframe containing Lat/Lon points for different trajectories identified by a column "trajectories_id".

trajectory_id latitude longitude
1 45 5
1 45 6
1 45 7
2 46 5
2 46 6
2 46 7

What I want to do is to extract for each trajectory_id a LineString and store it in another dataframe, where each row represents a trajectory with "id" and "geometry" columns. In this example, the output should be:

trajectory_id geometry
1 LINESTRING (5 45, 6 45, 7 45)
2 LINESTRING (5 46, 6 46, 7 46)

This is similar to what has been asked in this question, but in my case I need to use PySpark.

I have tried the following:

import pandas as pd
from shapely.geometry import Point,LineString
df = pd.DataFrame([[1, 45,5], [1, 45,6], [1, 45,7],[2, 46,5], [2, 46,6], [2, 46,7]], columns=['trajectory_id', 'latitude','longitude'])
df1 = spark.createDataFrame(df)
idx_ = df1.select("trajectory_id").rdd.flatMap(lambda x: x).distinct().collect()
geo_df = pd.DataFrame(index=range(len(idx_)),columns=['geometry','trajectory_id'])
k=0
for i in idx_:
    df2=df1.filter(F.col("trajectory_id").isin(i)).toPandas()
    df2['points']=df2[["longitude", "latitude"]].apply(Point, axis=1)
    geo_df.geometry.iloc[k]=str(LineString(df2['points']))
    geo_df['trajectory_id'].iloc[k]=i
    k=k+1

This code works, but as in my task I am working with many more trajectories (> 2milions), this takes forever as I am converting to Pandas in each iteration. Is there a way I can obtain the same output in a more efficient way? As mentioned, I know that using toPandas() (and/or collect() ) is something I should avoid, especially inside a for loop

anc_ca
  • 13
  • 6

3 Answers3

1

Note: The linestring will be ordered as per trajectories in the table.

Changing a bit of Drashti's snippet as it doesn't completely convert the set of points to linestring. Apache Sedona installation is necessary.

import pyspark.sql.functions as func

long_lat_df = result.withColumn('joined_long_lat', func.concat(func.col("longitude"), func.lit(","), func.col("latitude")));

grouped_df = long_lat_df.groupby('trajectory_id').agg(func.collect_list('joined_long_lat').alias("geometry"))

final_df = grouped_df.withColumn('geometry', func.concat_ws(",", func.col("geometry")))

final_df.createOrReplaceTempView("final_df")

query = """select *, ST_LineStringFromText(final_df.geometry, ',') as linestring from final_df"""

final_df = spark.sql(query)
final_df.show()
SparkyT
  • 3
  • 2
paraxor
  • 23
  • 8
1

Another option is to use st_point, st_makeline and st_aswkt from the Mosaic library (see documentation). This will give you the LineString in Well Known Text (WKT) format (as was asked in the question).

import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
import mosaic

# Create the PySpark DataFrame
pd_df = pd.DataFrame(
    [
        [1,45,5],
        [1,45,6],
        [1,45,7],
        [2,46,5],
        [2,46,6],
        [2,46,7],
    ],
    columns=["trajectory_id", "latitude", "longitude"]
)
spark_df = spark.createDataFrame(pd_df)
# Create LineString column
final_df = (
    spark_df
    .withColumn("longitude", F.col("longitude").cast(T.DoubleType()))
    .withColumn("latitude", F.col("latitude").cast(T.DoubleType()))
    .withColumn("point", mosaic.st_point(F.col("longitude"), F.col("latitude")))
    
    .groupBy("trajectory_id")
    .agg(F.collect_list(F.col("point")).alias("list_of_points"))
    
    .withColumn("linestring_geom", mosaic.st_makeline("list_of_points"))
    .withColumn("linestring", mosaic.st_aswkt("linestring_geom"))
    
    .select("trajectory_id", "linestring")
)

Note that collect_list does not guarantee to keep the same ordering of points (see this answer).

To keep the correct ordering of points, you can use the following:

# Create LineString column
final_df = (
    spark_df
    .withColumn("longitude", F.col("longitude").cast(T.DoubleType()))
    .withColumn("latitude", F.col("latitude").cast(T.DoubleType()))
    .withColumn("point", mosaic.st_point(F.col("longitude"), F.col("latitude")))

    .withColumn("point_id", F.monotonically_increasing_id())
    .withColumn("list_of_points", F.collect_list("point").over(
        Window.partitionBy("trajectory_id").orderBy("point_id")
    ))
    .withColumn("max_point_id", F.max("point_id").over(
        Window.partitionBy("trajectory_id")
    ))
    .where(F.col("point_id") == F.col("max_point_id"))
    
    .withColumn("linestring_geom", mosaic.st_makeline("list_of_points"))
    .withColumn("linestring", mosaic.st_aswkt("linestring_geom"))
    
    .select("trajectory_id", "linestring")
)
kiae
  • 169
  • 1
  • 13
0

You can do this by using pyspark SQL's native functions.

import pyspark.sql.functions as func

long_lat_df = df.withColumn('joined_long_lat', func.concat(func.col("longitude"), func.lit(" "), func.col("latitude")));

grouped_df = long_lat_df .groupby('trajectory_id').agg(func.collect_list('joined_long_lat').alias("geometry"))

final_df = grouped_df.withColumn('geometry', func.concat_ws(", ", func.col("geometry")));
Drashti Dobariya
  • 2,455
  • 2
  • 10
  • 23