0

Me new to spark , in our project we are using spark-structured streaming to write kafka consumer. We have a use case where I need to modular the code so that multiple people can work on different pieces of spark-job simultaneously.

In first step we read different kafka topics now i have two datasets. Lets say ds_input1 and ds_input2.

I need to pass these to next step where other person working on. So i have done as below in java8

DriverClass{
   Dataset<Row> ds_input1 = //populate it from kafka topic

   Dataset<Row> ds_output1 = null;
   SecondPersonClass.process(ds_input1   , ds_output1 );

   //here outside I get ds_output1  as null
   //Why it is not working as List<Objects> in java ?
   //Is there anything wrong I am doing ? what is the correct way to do?

   Dataset<Row> ds_output2 = null;
   ThirdPersonClass.process(ds_output1 , ds_output2);


   //here outside I get ds_output2  as null
   //though ds_output2  populated inside function why it is still null outside?


}


SecondPersonClass{

 static void process(ds_input1  ,  ds_output1 ){
  //here have business logic to work on ds_input1  data.
  //then i will update and assign it back to out put dataSets
  //i.e. ds_output1 

  //for simplicity lets says as below
   ds_output1 = ds_input1  ;
  //here I see data in ds_output1 i.e ds_output1  is not null

}

}


ThirdPersonClass{

 static void process2(ds_input2  ,  ds_output2 ){
  //here have business logic to work on ds_input2  data.
  //  then i will update and assign it back to out put dataSets
  //i.e. ds_output2 

  //for simplicity lets says as below
   ds_output2 = ds_input2  ;
   //here I see data in ds_output2 i.e ds_output2  is not null

}

}

Question : Even though dataset is populated inside the function static method why those are not reflecting outside the function and still null? Why java call by reference to objects not working here ? How to handle this ?

Can we return multiple Datasets from a function if so how to do it ?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
BdEngineer
  • 2,929
  • 4
  • 49
  • 85
  • 4
    Arguments of reference type get copied in Java. It's of no use to assign to that copy. – apophis Jul 11 '19 at 15:30
  • 2
    That is a pure Java 101 question. Has nothing to do with Spark. But yes, of course you can return collections and tuples (the generic type is Pair) with dataset references from a Java function. To give your question a Spark flair I can add that you are writing the driver program, ie everything you do is building a DAG on the driver side that will be executed once you call an action. You shouldn't view the parameters and return values as anything else as a collection of instructions that the driver will partition into instructions that will be executed on the workers. – Molotch Jul 11 '19 at 17:05
  • @apophis Yeah i thought it is java only ...but why outside me getting null , instead of populated data ? – BdEngineer Jul 11 '19 at 17:38
  • @Molotch thank you , i got it but why I am getting null outside? what is the correct way to do it ? – BdEngineer Jul 11 '19 at 17:39
  • 1
    Are you sure you understand the difference between [pass by value / pass by reference](https://stackoverflow.com/questions/40480/is-java-pass-by-reference-or-pass-by-value)? – apophis Jul 11 '19 at 17:57
  • @apophis ...i know this much "Java passes references by value" ...same reference i am using here inside a function and outside the function....so whats traction you observed wrong here? – BdEngineer Jul 12 '19 at 05:27
  • "pass by value" means coyping. Inside the function it is **not** "the same" - it is a **copy**. Your example code *assigns* the results to that *copy*, you are *not* "populating" the original. Please take a break and revisit the basics. – apophis Jul 12 '19 at 09:50
  • @apophis , but here we are copying reference ...not the object contents right ?... if we copy reference i would point to the same object right ? – BdEngineer Jul 15 '19 at 12:53
  • It points to the same object as long as you don't change to what it points by assigning `null` to the copied reference. – apophis Jul 29 '19 at 16:26
  • @apophis thank you for confirmation , so i am copying the reference to the function , so why the changed content is not reflected back out side of the function ? i.e. in dataframe ? – BdEngineer Jul 30 '19 at 13:16
  • Taking your `process` method in `SecondPersonClass`: `ds_input1` is assigned to the *copy* of `ds_output1`. You are **not** "populating" the dataframe pointed to by the original `ds_output1` reference outside of the process() method. I give up ... – apophis Aug 05 '19 at 17:51

0 Answers0