It's easy and simple in the toy examples for showing how to program in spark. You just import, create, use and discard, all in one little function.
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
def main(args: String) {
val conf = new SparkConf().setAppName("example")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
import hiveContext.sql
// load data from hdfs
val df1 = sqlContext.textFile("hdfs://.../myfile.csv").map(...)
val df1B = sc.broadcast(df1)
// load data from hive
val df2 = sql("select * from mytable")
// transform df2 with df1B
val cleanCol = udf(cleanMyCol(df1B)).apply("myCol")
val df2_new = df2.withColumn("myCol", cleanCol)
...
sc.stop()
}
In the real world, I find myself writing quite a few functions to modularize the tasks. For example, I would have a few functions just to load the different data tables. And in these load functions I would call other functions to do necessary data cleaning/transformation as I load the data. Then I would pass the contexts like so:
def loadHdfsFileAndBroadcast(sc: SparkContext) = {
// use sc here
val df = sc.textFile("hdfs://.../myfile.csv").map(...)
val dfB = sc.broadcast(df)
dfB
}
def loadHiveTable(hiveContext: HiveContext, df1B: Broadcast[Map[String, String]]) = {
import hiveContext.implicits._
val data = hiveContext.sql("select * from myHiveTable")
// data cleaning
val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
df_cleaned = data.withColumn("myCol", cleanCol)
df_cleaned
}
As you can see, the load function signatures get heavy quite easily.
I've tried to put these context imports outside the main function inside the class. But that causes problems (see this issue), which leaves me no option but to pass them around.
Is this the way to go or is there a better way to do this?