3

I have created a hiveContext in main() function in Scala and I need to pass through parameters this hiveContext to other functions, this is the structure:

object Project {
    def main(name: String): Int = {
      val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
      ... 
    } 
    def read (streamId: Int, hc:hiveContext): Array[Byte] = {
    ... 
    } 
    def close (): Unit = {
    ...
    }
 }

but it doesn't work. Function read() is called inside main().

any idea?

Ali AzG
  • 1,861
  • 2
  • 18
  • 28

2 Answers2

2

I'm declaring hiveContext as implicit, this is working for me

implicit val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)

Defined in MyJob:

override def run(config: Config)(implicit sqlContext: SQLContext): Unit = ...

But if you don't want it implicit, this should be the same

val sqlContext: HiveContext = new HiveContext(sc)
MyJob.run(conf)(sqlContext)

override def run(config: Config)(sqlContext: SQLContext): Unit = ...

Also, your function read should receive HiveContext as the type for the parameter hc, and not hiveContext

def read (streamId: Int, hc:HiveContext): Array[Byte] = 
SCouto
  • 7,808
  • 5
  • 32
  • 49
  • I don`t understand your answer, I don't see how I can pass hiveContext as argument of a function, in my case of function read() – Loreto Gómez Arias Jun 01 '16 at 10:54
  • Hi @LoretoGómezArias you should define your function as follows (HiveContext with capital H) def read (streamId: Int, hc:HiveContext): Array[Byte] = { Also, please provide information about your error to make us easier to help u – SCouto Jun 01 '16 at 11:05
  • the compiler sais: – Loreto Gómez Arias Jun 01 '16 at 11:12
  • :185: not found: type HiveContext. – Loreto Gómez Arias Jun 01 '16 at 11:12
  • Have you got this import? import org.apache.spark.sql.hive.HiveContext Otherwise you need to define your function like this: def read (streamId: Int, hc:org.apache.spark.sql.hive.HiveContext): Array[Byte] = { – SCouto Jun 01 '16 at 11:15
1

I tried several options, this is what worked eventually for me..

object SomeName extends App {

val conf = new SparkConf()...
val sc = new SparkContext(conf)

implicit val sqlC = SQLContext.getOrCreate(sc)
getDF1(sqlC)

def getDF1(sqlCo: SQLContext): Unit = {
    val query1 =  SomeQuery here  
    val df1 = sqlCo.read.format("jdbc").options(Map("url" -> dbUrl,"dbtable" -> query1)).load.cache()

 //iterate through df1 and retrieve the 2nd DataFrame based on some values in the Row of the first DataFrame

  df1.foreach(x => {
    getDF2(x.getString(0), x.getDecimal(1).toString, x.getDecimal(3).doubleValue) (sqlCo)
  })     
}

def getDF2(a: String, b: String, c: Double)(implicit sqlCont: SQLContext) :  Unit = {
  val query2 = Somequery

  val sqlcc = SQLContext.getOrCreate(sc)
  //val sqlcc = sqlCont //Did not work for me. Also, omitting (implicit sqlCont: SQLContext) altogether did not work
  val df2 = sqlcc.read.format("jdbc").options(Map("url" -> dbURL, "dbtable" -> query2)).load().cache()
   .
   .
   .
 }
}

Note: In the above code, if I omitted (implicit sqlCont: SQLContext) parameter from getDF2 method signature, it would not work. I tried several other options of passing the sqlContext from one method to the other, it always gave me NullPointerException or Task not serializable Excpetion. Good thins is it eventually worked this way, and I could retrieve parameters from a row of the DataFrame1 and use those values in loading the DataFrame 2.

Yusuf Arif
  • 21
  • 2