2

I have a situation where I have a very large dataframe that has airplane location data with 5 rows per timestamp (see table below for an example of 1 timestamp... 3 of the rows just have dots but imagine they have plane_nums and locations)

+---------------------------+-----------+-----------+-----------+-----------+
|         utc_time          | plane_num |    lat    |    lon    |  height   |
+---------------------------+-----------+-----------+-----------+-----------+
| 2021-06-02T05:01:40+00:00 |          1| 51.759014 | -1.256688 | 47.337597 |
| 2021-06-02T05:01:40+00:00 |          2| 41.758014 |  1.346678 | 41.632522 |
| ...                       |       ... |       ... |       ... |       ... |
| ...                       |       ... |       ... |       ... |       ... |
| ...                       |       ... |       ... |       ... |       ... |
+---------------------------+-----------+-----------+-----------+-----------+

For each time stamp, I want to collapse this dataframe down to 1 row which has various calculations of the various distances between a set of 2 planes and have all of their lat/lon in one row.

I know I could easily accomplish this in pyspark by filtering the dataframe down to each plane (there will always be a plane 1, 2, 3, 4, 5 per timestamp). See code below which creates a dataframe for 1 timestamp, but imagine there are millions of timestamps.

    dfAirplaneData.filter = spark.createDataFrame([
(2021-06-02T05:01:40+00:00, 1, 51.759014, -1.256688, 47.337597),
 (2021-06-02T05:01:40+00:00, 2, 41.758014, 1.346678, 41.632522),
 (2021-06-02T05:01:40+00:00, 3, 41.758014, 1.346678, 11.632522),
 (2021-06-02T05:01:40+00:00, 4, 41.758014, 1.346678, 21.632522),
 (2021-06-02T05:01:40+00:00, 5, 11.758014, 1.346678, 41.632522)
], ("utc_time", "plane_num", "lat", "lon", "height"))

dfPlane1 = dfAirplaneData.filter(F.col('plane_num')==1))
dfPlane2 = dfAirplaneData.filter(F.col('plane_num')==2))
dfPlane3 = dfAirplaneData.filter(F.col('plane_num')==3))

And then joining the dataframe back to itself multiple times to get one row, but this feels inefficient. Is there a better way to do this in pyspark (perhaps using .groupBy with the timestamps)?

I am aware instead of joining, I could use: w = Window.partitionBy(utc_time) but I only need 1 row per group, so I would end up doing the calculation for every row/duplicating the work and then filtering down to 1 summary row. Also, I would need to know which plane_num when doing comparisons so was unsure if that was easily doable with Window.partitionBy

I have briefly read about Pandas UDF grouped map and I believe I could use a grouped map to iterate through a pandas dataframe if that is the best approach.

Interested in the most efficient, simple way to solve this. If joining to the same dataframe is the best then I can use that approach.

WIT
  • 1,043
  • 2
  • 15
  • 32

2 Answers2

0

Here's a method to get all your data in one row prior to any calculations. Instead of the lambda function to reshape the data you can have a function that does your calculations.

data='''utc_time, plane_num, lat, lon, height
2021-06-02T05:01:40+00:00, 1, 51.759014, -1.256688, 47.337597
2021-06-02T05:01:40+00:00, 2, 41.758014, 1.346678, 41.632522
2021-06-02T05:01:40+00:00, 3, 41.758014, 1.346678, 11.632522
2021-06-02T05:01:40+00:00, 4, 41.758014, 1.346678, 21.632522
2021-06-02T05:01:40+00:00, 5, 11.758014, 1.346678, 41.632522
2021-07-02T05:01:40+00:00, 1, 51.759014, -1.256688, 47.337597
2021-07-02T05:01:40+00:00, 2, 41.758014, 1.346678, 41.632522
2021-07-02T05:01:40+00:00, 3, 41.758014, 1.346678, 11.632522
2021-07-02T05:01:40+00:00, 4, 41.758014, 1.346678, 21.632522
2021-07-02T05:01:40+00:00, 5, 11.758014, 1.346678, 41.632522
'''
df = pd.read_csv(io.StringIO(data), sep=',', engine='python')
# df.set_index('utc_time', inplace=True)
dft = df.groupby('utc_time').apply(lambda x: pd.DataFrame(x.iloc[:, 1:5].values.reshape(1,-1))).reset_index().drop('level_1', axis=1)
dft.columns = ['utc_time'] + ["plane_num", "lat", "lon", "height"]*5

dft

                    utc_time  plane_num        lat       lon     height  plane_num        lat       lon     height  plane_num  ...       lon     height  plane_num        lat       lon     height  plane_num        lat       lon     height
0  2021-06-02T05:01:40+00:00        1.0  51.759014 -1.256688  47.337597        2.0  41.758014  1.346678  41.632522        3.0  ...  1.346678  11.632522        4.0  41.758014  1.346678  21.632522        5.0  11.758014  1.346678  41.632522
1  2021-07-02T05:01:40+00:00        1.0  51.759014 -1.256688  47.337597        2.0  41.758014  1.346678  41.632522        3.0  ...  1.346678  11.632522        4.0  41.758014  1.346678  21.632522        5.0  11.758014  1.346678  41.632522

[2 rows x 21 columns]
Jonathan Leon
  • 5,440
  • 2
  • 6
  • 14
0

In Spark, you want the groupBy DataFrame API call. In this case, something like dfAirplaneData.groupBy('utc_time').agg(collect_list(struct(col('plane_num'),col('lat'),col('lon'),col('height')))) will give you an array of structs per timestamp, that you could then perform computations on. Do I understand you right?

Peter Dowdy
  • 439
  • 2
  • 16