1

I am currently solving a problem involving GPS data from buses. The issue I am facing is to reduce computation in my process.

There are about 2 billion GPS-coordinate points (Lat-Long degrees) in one table and about 12,000 bus-stops with their Lat-Long in another table. It is expected that only 5-10% of the 2-billion points are at bus-stops.

Problem: I need to tag and extract only those points (out of the 2-billion) that are at bus-stops (the 12,000 points). Since this is GPS data, I cannot do exact matching of the coordinates, but rather do a tolerance based geofencing.

Issue: The process of tagging bus-stops is taking extremely long time with the current naive approach. Currently, we are picking each of the 12,000 bus-stop points, and querying the 2-billion points with a tolerance of 100m (by converting degree-differences into distance).

Question: Is there an algorithmically efficient process to achieve this tagging of points?

OrangeRind
  • 4,798
  • 13
  • 45
  • 57
  • Using k-d tree would be place to start. –  Dec 07 '16 at 15:58
  • I worked on a similar usecase. We used the properties of `GeoHashes` to define cells and define the process per cell instead. This is still a broad question. Maybe you could show the code of your current approach to drive the discussion? – maasg Dec 07 '16 at 17:47
  • @LostInOverflow - sure, going through it. – OrangeRind Dec 07 '16 at 20:12
  • @maasg - that seems to be a rather good idea - I shall give it a shot! Currently, the code is just a set of hive queries. But the final work needs to be done in Spark - hence the problem. – OrangeRind Dec 07 '16 at 20:14

1 Answers1

1

Yes you can use something like SpatialSpark. It only works with Spark 1.6.1 but you can use BroadcastSpatialJoin to create an RTree which is extremely efficient.

Here's an example of me using SpatialSpark with PySpark to check if different polygons are within each other or are intersecting:

from ast import literal_eval as make_tuple
print "Java Spark context version:", sc._jsc.version()
spatialspark = sc._jvm.spatialspark

rectangleA = Polygon([(0, 0), (0, 10), (10, 10), (10, 0)])
rectangleB = Polygon([(-4, -4), (-4, 4), (4, 4), (4, -4)])
rectangleC = Polygon([(7, 7), (7, 8), (8, 8), (8, 7)])
pointD = Point((-1, -1))

def geomABWithId():
  return sc.parallelize([
    (0L, rectangleA.wkt),
    (1L, rectangleB.wkt)
  ])

def geomCWithId():
  return sc.parallelize([
    (0L, rectangleC.wkt)
  ])

def geomABCWithId():
  return sc.parallelize([
  (0L, rectangleA.wkt),
  (1L, rectangleB.wkt),
  (2L, rectangleC.wkt)])

def geomDWithId():
  return sc.parallelize([
    (0L, pointD.wkt)
  ])

dfAB                 = sqlContext.createDataFrame(geomABWithId(), ['id', 'wkt'])
dfABC                = sqlContext.createDataFrame(geomABCWithId(), ['id', 'wkt'])
dfC                  = sqlContext.createDataFrame(geomCWithId(), ['id', 'wkt'])
dfD                  = sqlContext.createDataFrame(geomDWithId(), ['id', 'wkt'])

# Supported Operators: Within, WithinD, Contains, Intersects, Overlaps, NearestD
SpatialOperator      = spatialspark.operator.SpatialOperator 
BroadcastSpatialJoin = spatialspark.join.BroadcastSpatialJoin

joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0)

joinRDD.count()

results = joinRDD.collect()
map(lambda result: make_tuple(result.toString()), results)

# [(0, 0), (1, 1), (2, 0)] read as:
# ID 0 is within 0
# ID 1 is within 1
# ID 2 is within 0

Note the line

joinRDD = BroadcastSpatialJoin.apply(sc._jsc, dfABC._jdf, dfAB._jdf, SpatialOperator.Within(), 0.0)

the last argument is a buffer value, in your case it would be the tolerance you want to use. It will probably be a very small number if you are using lat/lon since it's a radial system and depending on the meters you want for your tolerance you will need to calculate based on lat/lon for your area of interest.

Community
  • 1
  • 1
Shoaib Burq
  • 384
  • 2
  • 13