Databricks community cloud is throwing
an org.apache.spark.SparkException: Task not serializable
exception that my local machine is not throwing executing the same code.
The code comes from the Spark in Action book. What the code is doing is reading a json file with github activity data, then reading a file with employees usernames from an invented company, and ultimately ranks the employees by number of pushes.
To avoid extra shuffling, the variable containing the list of employees is broadcasted, however, when its time to return the rank, is when databricks community cloud throws the exception.
import org.apache.spark.sql.SparkSession
import scala.io.Source.fromURL
val spark = SparkSession.builder()
.appName("GitHub push counter")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
val inputPath = "/FileStore/tables/2015_03_01_0-a829c.json"
val pushes = spark.read.json(inputPath).filter("type = 'PushEvent'")
val grouped = pushes.groupBy("actor.login").count.orderBy(grouped("count").desc)
val empPath = "https://raw.githubusercontent.com/spark-in-action/first-edition/master/ch03/ghEmployees.txt"
val employees = Set() ++ (for { line <- fromURL(empPath).getLines} yield line.trim)
val bcEmployees = sc.broadcast(employees)
import spark.implicits._
val isEmp = user => bcEmployees.value.contains(user)
val isEmployee = spark.udf.register("SetContainsUdf", isEmp)
val filtered = ordered.filter(isEmployee($"login"))
filtered.show()