I'm trying to use the Maxmind snowplow library to pull out geo data on each IP that I have in a dataframe.
We are using Spark SQL (spark version 2.1.0) and I created an UDF in the following class:
class UdfDefinitions @Inject() extends Serializable with StrictLogging {
sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
val s3Config = configuration.databases.dataWarehouse.s3
val lruCacheConst = 20000
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName) ),
ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)
def lookupIP(ip: String): LookupIPResult = {
val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
loc match {
case None => LookupIPResult("", "", "")
case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""),
x.city.getOrElse(""), x.regionName.getOrElse(""))
}
}
val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)
}
The intention is to create the pointer to the file (ipLookups) outside the UDF and use it inside, so not to open files on each row. This get an error of task no serialized and when we use the addFiles in the UDF, we get a too many files open error (when using a large dataset, on a small dataset it does work).
This thread show how to use to solve the problem using RDD, but we would like to use Spark SQL. using maxmind geoip in spark serialized
Any thoughts? Thanks