6

I want to perform geoip lookups of my data in spark. To do that I'm using MaxMind's geoIP database.

What I want to do is to initialize a geoip database object once on each partition, and later use that to lookup the city related to an IP address.

Does spark have an initialization phase for each node, or should I instead check whether an instance variable is undefined, and if so, initialize it before continuing? E.g. something like (this is python but I want a scala solution):

class IPLookup(object):
    database = None

    def getCity(self, ip):
      if not database:
        self.database = self.initialise(geoipPath)
  ...

Of course, doing this requires spark will serialise the whole object, something which the docs caution against.

maasg
  • 37,100
  • 11
  • 88
  • 115
jbrown
  • 7,518
  • 16
  • 69
  • 117

3 Answers3

6

In Spark, per partition operations can be do using :

def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)

This mapper will execute the function f once per partition over an iterator of elements. The idea is that the cost of setting up resources (like DB connections) will be offset with the usage of such resources over a number of elements in the iterator.

Example:

val logsRDD = ???
logsRDD.mapPartitions{iter =>
   val geoIp = new GeoIPLookupDB(...)
   // this is local map over the iterator - do not confuse with rdd.map
   iter.map(elem => (geoIp.resolve(elem.ip),elem)) 
}
maasg
  • 37,100
  • 11
  • 88
  • 115
  • A good solution, but in this case I want to reuse the object in multiple operations so a broadcast variable looks more useful for me. – jbrown Nov 25 '14 at 10:14
2

This seems like a good usage of a broadcast variable. Have you looked at the documentation for that functionality and if you have does it fail to meet your requirements in someway?

bearrito
  • 2,217
  • 1
  • 25
  • 36
  • 1
    I tried using a broadcast variable. But it did not work. Possibly because com.maxmind.geoip.LookupService is not serializable. I tried using the SparContext.addFile method instead and that worked fine. Adding the files GeoIPCity.dat and GeoIPASNum.dat – Gösta Forsum Mar 10 '15 at 18:46
0

As @bearrito mentioned - you can use load your GeoDB and then broadcast it from your Driver. Another option to consider is to provide an external service that you can use to do a lookup. It could be an in memory cache such as Redis/Memcached/Tacheyon or a regular datastore.

Soumya Simanta
  • 11,523
  • 24
  • 106
  • 161