3

I have two dataframe df1 and ip2Country. df1 contains the IP addresses and I am trying to map the ip addresses into geolocation information like longitude and latitude which are columns in ip2Country.

I am running it as a Spark-submit job, but the operations took a very long time even though df1 only has less than 2500 lines.

My code:

val agg =df1.join(ip2Country, ip2Country("network_start_int")=df1("sint")
, "inner")
.select($"src_ip"
,$"country_name".alias("scountry")
,$"iso_3".alias("scode")
,$"longitude".alias("slong")
,$"latitude".alias("slat")
,$"dst_ip",$"dint",$"count")
.filter($"slong".isNotNull)

val agg1 =agg.join(ip2Country, ip2Country("network_start_int")=agg("dint")
, "inner")
.select($"src_ip",$"scountry"
,$"scode",$"slong"
,$"slat",$"dst_ip"
,$"country_name".alias("dcountry")
,$"iso_3".alias("dcode")
,$"longitude".alias("dlong")
,$"latitude".alias("dlat"),$"count")
.filter($"dlong".isNotNull)

Is there any other way to join the two table? Or am I doing it the wrong way?

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
ELI
  • 359
  • 1
  • 4
  • 20

1 Answers1

11

If you have a big dataframe which needs to be joined with a small one - Broadcast joins are very effective. Read here: Broadcast Joins (aka Map-Side Joins)

bigdf.join(broadcast(smalldf))
akki
  • 2,021
  • 1
  • 24
  • 35
Sam Upra
  • 737
  • 5
  • 12