1

I have data in the following format

agent_id, client_id, client_long, client_lat
1, 1, ,39.777982,-7.004599
1, 2, ,39.677982,-7.094599
1, 3, ,39.577982,-7.084599
2, 4, ,39.477982,-7.074599
2, 5, ,39.377982,-7.064599

I want to get the average distance between the clients for each agent

so I need to get the distances between clients 1,2,3 (all combinations) for agent 1 and distances between clients 4 and 5 for agent 2 then average these distances for each agent.

How do I go about doing this using pyspark?

Ric S
  • 9,073
  • 3
  • 25
  • 51
amro_ghoneim
  • 495
  • 1
  • 4
  • 14
  • Have you found out how to calculate distances between two co-ordinates? – aksappy Jul 28 '21 at 17:58
  • While the suggestion is truly useful, it does not answer what I am trying to do. My main problem is how can I calculate all distances between all clients' locations, such that I can the average distance for each agent. So its more about calculating the distances between all possible combinations of locations not only two points – amro_ghoneim Jul 28 '21 at 19:43
  • 1
    Okay, I see the `all combinations` now. So is your question now - how to find all combinations? – aksappy Jul 28 '21 at 20:41
  • yeah exactly. I'll edit the question to make it clear. – amro_ghoneim Jul 29 '21 at 06:07
  • following this [post](https://stackoverflow.com/questions/53630342/sparksql-pyspark-crossjoin-over-dimension-for-a-specific-window), It is a simple inner join over the agent_id – amro_ghoneim Jul 29 '21 at 06:44

1 Answers1

1

You can do it by using Pandas UDFs.

from itertools import combinations
import numpy as np
import pandas as pd
from geopy import distance
from pyspark.sql.functions import pandas_udf

@pandas_udf('double')
def avg_distance_across_points(lat: pd.Series, lon: pd.Series) -> float:
    points = list(zip(lat, lon))
    # if we have only one point for an agent
    if len(points) <= 1:
        d = 0
    else:
        d = np.mean([distance.distance(p1, p2).km for p1, p2 in combinations(points, 2)])
    return d

dist_df = df\
  .groupby('agent_id')\
  .agg(avg_distance_across_points('client_lat', 'client_long').alias('avg_distance'))

dist_df.show()

+--------+------------------+
|agent_id|      avg_distance|
+--------+------------------+
|       1|16.591805652402446|
|       2|11.103091027786775|
+--------+------------------+

Here I selected km as distance unit, you can also select meters or other units specified in the geopy documentation.

If you need more information on Pandas UDFs in Spark, I suggest you read these useful resources: link, link, link.

Ric S
  • 9,073
  • 3
  • 25
  • 51