-1

Here's the issue i want to solve : given a dataset as input i want to generate a list of datasets. the list of datasets of the input dataset is defined using the Min and Max values of a certain attribute that will make the list of sub-datasets by considering the Max & Min attribute values of a second dataset, here's an example of what i want : if we take as attribute Flight and the two following datasets :

1)

  TicketId | Flight |           time       |
    ---------------------------------------|
      10   |    123 |   2020-11-27 05:48:02|
    ---------------------------------------|
      155  |    125 |   2020-11-27 05:49:02|
    ---------------------------------------|
      12   |    133 |   2020-11-27 05:50:02|
    ---------------------------------------|
      200  |    13  |   2020-11-27 06:49:02|
    ---------------------------------------|
      123  |    22  |   2020-11-27 06:50:02|
    ---------------------------------------|
      15   |    92  |   2020-11-27 05:51:02|
    ---------------------------------------|
      21   |    41  |   2020-11-27 05:49:02|
    ---------------------------------------|
      22   |    27  |   2020-11-27 05:50:02|
    ---------------------------------------|
      422   |    35 |   2020-11-27 05:51:02|
    ---------------------------------------

And the second dataset is like the following :

2)

  TicketId | Flight |           time       |
    ---------------------------------------|
      103  |    156 |   2020-11-27 05:48:02|
    ---------------------------------------|
      154  |    130 |   2020-11-27 05:49:02|
    ---------------------------------------|
      123  |    151 |   2020-11-27 05:50:02|
    ---------------------------------------|
      220  |    119 |   2020-11-27 06:49:02|
    ---------------------------------------|
      143  |    111 |   2020-11-27 06:50:02|
    ---------------------------------------|
      16   |    189 |   2020-11-27 05:51:02|
    ---------------------------------------|
      22   |    152 |   2020-11-27 05:49:02|
    ---------------------------------------|
      22   |    125 |   2020-11-27 05:50:02|
    ---------------------------------------|
      134  |    187 |   2020-11-27 05:51:02|
    ---------------------------------------

Then given the Min value of dataset 2 according to the Flight attribute is 111 then the resuting list of datasets resulting from partitioning dataset 1 would be :

  TicketId | Flight |           time       |
    ---------------------------------------|
      10   |    123 |   2020-11-27 05:48:02|
    ---------------------------------------|
      155  |    125 |   2020-11-27 05:49:02|
    ---------------------------------------|
      12   |    133 |   2020-11-27 05:50:02|
    ---------------------------------------|

AND

      TicketId | Flight |           time       |
        ---------------------------------------|
          200  |    13  |   2020-11-27 06:49:02|
        ---------------------------------------|
          123  |    22  |   2020-11-27 06:50:02|
        ---------------------------------------|
          15   |    92  |   2020-11-27 05:51:02|
        ---------------------------------------|
          21   |    41  |   2020-11-27 05:49:02|
        ---------------------------------------|
          22   |    27  |   2020-11-27 05:50:02|
        ---------------------------------------|
          422   |    35 |   2020-11-27 05:51:02|
        ---------------------------------------

Because the value Min of dataset 2 will split the dataset 1 accordingly into the two resulting datasets. My question is how to achieve that in Spark / Java (or even Scala). NB : the partitioning value (of the attribute Flight) could have been the Max value of the Attribute (of the dataset 2)

Thanks for the help.

  • You say `the Max value of dataset 2 according to the Flight attribute is 89` but there is no `89` in the 2nd dataset's `Flight` column (the same goes for the min value, because you indicate `187` as the min when there's `111`). Additionally, I can't quite figure out what do u mean with your last paragraph. Which value can/should be used to partition the 1st dataset? The min of dataset 2? The max of dataset 2? And where do the min/max of the 1st dataset ever being used? – Coursal May 28 '21 at 22:08
  • I updated the question, for the Max/Min values, the split is done here according to the min of dataset 2 because min of dataset 2 is within the range [Min,Max] of dataset 1. If max of dataset was within the range [Min,Max] of dataset 1 then the split would have been done according to the max of dataset 1. – John Campbell May 29 '21 at 07:54
  • If both Min and Max values of dataset 2 were within the range [Min,Max] of dataset 1 then 2 splits are necessary and the resulting dataset count would be 3 (instead of just 2) hope it is clear. – John Campbell May 29 '21 at 08:42

1 Answers1

1

First things first, It is not possible to yield multiple RDDs from a single transformation (and since DataFrames and Datasets are derived from RDDs, it applies to them as well). This means that we cannot use a one-liner of a where/filter method on top of the first Dataset to split it. Instead we can cut to the chase and use conditions based on the min/max values of Flight of both Datasets to determine a) the value(s) that we will split based on, and b) the number of the split Datasets (only because you have a condition where we want to have 3 instead of 2 split Datasets).

Since we know that there are 3 valid cases of overlapping min/max values:

  • Split by min(df2):
min(df1)------------------------------------max(df1)
               min(df2)----------------------------------max(df2)
  • Split by max(df2):
min(df1)----------------------------------max(df1)
               min(df2)------------------------------------max(df2)
  • Split by min(df2) and max(df2):
min(df1)-----------------------------------------------max(df1)
               min(df2)----------------max(df2)

All there's left to do is:

  1. Find the max and min Flight values from both Datasets,
  2. Use them in a series of if/else if statements to determine which case of value overlapping we have from the input data, and
  3. create 2 or 3 new Datasets (either inside an if/else if statement's scope, or outside of it, depending on what you want to do) by filtering the first Dataset through a simple where method at a time.

Here is the code for this written in Scala for a broader future reference within Spark. (Of course you can implement it in Java with very minor changes, since most of the commands between Spark's language ports are interchangeable):

// store the min/max values of `Flight` as integers
val df1Max = df1.select(max("Flight")).head().getInt(0)
val df1Min = df1.select(min("Flight")).head().getInt(0)
val df2Max = df2.select(max("Flight")).head().getInt(0)
val df2Min = df2.select(min("Flight")).head().getInt(0)

if(df1Min < df2Min && df2Min < df1Max && df1Max < df2Max) // split by min(df2Min)
{
    val firstDf = df1.where(col("Flight") <= df2Min)
    val secondDf = df1.where(col("Flight") > df2Min)

    firstDf.show()
    secondDf.show()

    // ... (store them in disk, process them, do whatever you want)
}
else if(df1Min > df2Min && df1Min < df2Max && df2Max < df1Max) // split by min(df2Max)
{
    val firstDf = df1.where(col("Flight") <= df2Max)
    val secondDf = df1.where(col("Flight") > df2Max)

    firstDf.show()
    secondDf.show()

    // ... (store them in disk, process them, do whatever you want)
}
else if(df1Min > df2Min && df2Max < df1Max) // split by min(df2Min) and max(df2Max)
{
    val firstDf = df1.where(col("Flight") <= df2Min)
    val secondDf = df1.where(col("Flight") >= df2Min && col("Flight") <= df2Max)
    val thirdDf = df1.where(col("Flight") > df2Max)

    firstDf.show()
    secondDf.show()
    thirdDf.show()

    // ... (store them in disk, process them, do whatever you want)
}
Coursal
  • 1,387
  • 4
  • 17
  • 32
  • One other final quick question : what i could do if the splitted parts of dataset 1 had to undergo another splitting process according to some other dataset 3 ? – John Campbell May 29 '21 at 12:41
  • 1
    In that case you could do the same exact process inside each of the cases inside every `if` code block. It will be lengthy and not so good looking, but it can get the job done. Another solution would be to store the split parts to the disk individually, however I wouldn't advise you to do that because it would dramatically slow things down. Keep the data in-memory as much as you can. – Coursal May 29 '21 at 13:22