0

I have such a csv file with data (a big file>20GB) as follows:

ObjectID,Lon,Lat,Speed,GPSTime
182163,116.367520,40.024680,29.00,2016-07-04 09:01:09.000
116416,116.694693,39.785382,0.00,2016-07-04 09:01:17.000

Using pyspark (rdd, map and reduce), I want to process geographic data and check for each row if Latitude, Longitude is inside a polygon, then write the row to an output file.

This is the original code without using spark.

polygon = Polygon(data['features'][cityIdx]['geometry']['coordinates'][0])
        with open(outFileName, 'w', newline='') as outfile:
            writer = csv.writer(outfile)
            for chunk in pd.read_csv(inFileName,chunksize=chunksize,sep=','):
                ObjectIDs = list(chunk['ObjectID'])    
                Lons = list(chunk['Lon'])
                Lats = list(chunk['Lat'])
                Speeds = list(chunk['Speed'])
                Directs = list(chunk['Direct'])
                Mileages = list(chunk['Mileage'])
                GPSTimes = list(chunk['GPSTime'])
                for ObjectID,Lon,Lat,Speed,Direct,Mileage,GPSTime in zip(ObjectIDs,Lons,Lats,Speeds,Directs,Mileages,GPSTimes):
                    point = Point(Lon, Lat)
                    if(polygon.contains(point)):
                        writer.writerow([ObjectID,Lon,Lat,Speed,Direct,Mileage,GPSTime])

How can I manage to do this using .rdd.map( fun ).reduce(fun ). I thought about lambda expression but I couldn't formulate a spark runnable code.

Ahmad Alhilal
  • 444
  • 4
  • 19
  • generally speaking, you'd read the csv into a spark dataframe, and add a filter that would call a udf that would accept the lon and lat and checks if it is in the polygon lastly write that df (after filter) to disk - if you get stuck try to update this question once you have some code – Arnon Rotem-Gal-Oz Dec 18 '18 at 22:59
  • How to write a UDF which accept lon, lat and do checking, processing, and writing.. I mean I need a UDF accepts two columns. I couldn't find an example close to this case. – Ahmad Alhilal Dec 20 '18 at 12:45
  • https://stackoverflow.com/questions/42540169/pyspark-pass-multiple-columns-in-udf – Arnon Rotem-Gal-Oz Dec 20 '18 at 13:31
  • Can I pass parameters which are not columns in the dataframe, for instance, polyIdx ,polyLen in this code, and how to pass them to the udf? 'def FilterAndSave(Lon, Lat,polyIdx ,polyLen): ....... – Ahmad Alhilal Dec 20 '18 at 14:35
  • don't do the save itself in the UDF - let spark save the result. as for additional parameters you can use lit(param) or if they are fixed just define them outside of the UDF (spark will ship them) – Arnon Rotem-Gal-Oz Dec 20 '18 at 14:55

0 Answers0