2

OVERVIEW

I am working with Java Spark to compute huge amount of data. I am loading into a Dataset the content of lots of .DAT files called the feeders. These .DAT files contain among others fields, a timestamp (number of seconds spanned from the 1st, Jan of 1970) when such data started to be recorded. In here, every row within a file represents a second in time. Example of how a .DAT file when data started to be recorded 1546297200 (Monday, December 31, 2018 11:00:00 PM, GMT), looks like:


id & deltaRSTs| timestamp| 
              |          |
SQXCXBAXY4P-02,1546297200,825,2065,391
1,0,-8,0      |1546297200|
1,0,-2,0      |1546297201|
1,0,0,0       |1546297202|
1,0,10,0      |1546297203|
1,0,-6,0      |1546297204|
1,0,-4,0      |1546297205|
1,0,0,0       ... 
1,0,6,0       ...
1,0,1,0       ...
1,0,-8,0      ...

On the other side, I have another Dataset that contains information about some electrical device (in .csv format). The important part here, is that such device created a set of events (let's coin it with EVT) with a different timestamps (number of seconds spanned from the 1st, Jan of 1970).

I want to take all rows from the .DATs files that fulfill with a certain temporal condition: given an event, take all DAT rows such that EVT(timestamp) is in DAT(timestamp) taking into account an offset to create a window, namely:

maxEpoch = DAT(timestamp) + rows_of_DAT
DAT(timestamp) + offset <= EVT(timestamp) && EVT(timestamp) <= maxEpoch - offset

Don't worry if you do not understand this completely, it is for the sake of giving some background. But it's necessary that you grasp the idea.

PROBLEM

I'm going to present the classes that I considered suitable for solving the aforementioned case:

CLASS ReadCSV (main):

public class ReadCSV {

    private static final SparkSession spark = new SparkSession
            .Builder()
            .master("local[*]")
            .getOrCreate();

    public static void main(String[] args) {

        spark.sparkContext().setLogLevel("ERROR");

        Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs

        Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs

        Dataset<ProcessEvRow> processEvRowDataset = eventCSVRowDataset
            .map(new ProcessEvTransformation(feederFileRowDataset), Encoders.bean(ProcessEvRow.class))

    }
}

Note in this class, when an object of type ProcessEvTransformation is created, I am passing as an argument the Dataset of all DAT rows.

CLASS ProcessEvTransformation:

public class ProcessEvTransformation implements MapFunction<EventCSVRow, ProcessEvRow> {

    private Dataset<FeederFile> feederFileDataset;
    private int offset = 40;

    public ProcessEvTransformation(Dataset<FeederFile> feederFileDataset) {
        this.feederFileDataset = feederFileDataset;
        // I did here, this.feederFileDataset.show(); and it was successfull
    }

    public ProcessEvTransformation withOffset(int offset) {
        this.offset = offset;
        return this;
    }

    @Override
    public ProcessEvRow call(EventCSVRow eventCSVRow) throws Exception {
        String stdPattern = ...
        String rejectedFlag = ...
        Dataset<FeederFile> deltaRSTs = this.feederFileDataset
                .filter(feederFileRow -> {
                    final long epochTime = Long.parseLong(eventCSVRow.getEpochTime());
                    final long maxEpoch = Long.parseLong(feederFileRow.getEpoch()) + feederFileRow.getLineCount();
                    return Long.parseLong(feederFileRow.getEpoch()) + offset <= epochTime && epochTime <= maxEpoch - offset;
                });

        String[] rstDistances = getRstDistancesAndMinimum(deltaRSTs, eventCSVRow.getIncrements()); // whatever algorithmic procedure
        ...
    }
}

The problem here is that I am getting a NullPointerException because somewhat feederFileDataset attribute is being null. The curious part is that I am pretty sure it is arriving 100% defined, but when the call method is invoked it turns into something null or Invalid tree; null: (is the message displayed when printing it)

QUESTIONS & CONSLUSION

  • Does anyone know how to pass successfully a Dataset as a parameter to a MapFunction interface class based?
  • Why the Dataset is turning into something invalid when I pass it correctly? does it have something to do with internal procedures of Java Spark?

I hope I have been clear. Thanks for any help you can provide.

Best wishes, Thomas.

  • Take a look at [Why does this Spark code make NullPointerException?](https://stackoverflow.com/questions/47111607/why-does-this-spark-code-make-nullpointerexception), [Serializing RDD](https://stackoverflow.com/q/29567247/10465355), and many similar questions. Nested operations like this one, are in general not supported. – 10465355 Dec 05 '19 at 11:02
  • Did not help. Anyway based on my problem I decided that my best option was to use the ``crossJoin`` method of a ``Dataset``. In there I would count with all the info needed. – Tomás Denis Reyes Sánchez Dec 05 '19 at 17:00

1 Answers1

0

Since you have to keep all the data, I would recommend using the crossJoin method. Please be aware that this method is tremendously expensive.

public class ReadCSV {

    private static final SparkSession spark = new SparkSession
            .Builder()
            .master("local[*]")
            .getOrCreate();

    public static void main(String[] args) {

        spark.sparkContext().setLogLevel("ERROR");

        Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs

        Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs

        Dataset<Andamyo> joined = eventCSVRowDataset
            .crossJoin(feederFileRowDataset).as(Encoders.bean(Andamyo.class))
            .filter(andamyo -> {
                final long eventEpoch = Long.parseLong(andamyo.getEventEpoch());
                final long maxEpoch = Long.parseLong(andamyo.getFeederEpoch()) + andamyo.getLineCount();
                return Long.parseLong(andamyo.getFeederEpoch()) <= eventEpoch && eventEpoch <= maxEpoch;
            });

    }
}

On the other side, class Andamyo represent a row with all the joined information of an event and a feederFileRow.

Example of the output:

+----------+--------------------+----------+-----+------------+-----------+--------+---------+-------------+
|eventEpoch|          increments|internalId|phase|    deltaRST|feederEpoch|feederId|lineCount|      mrtCode|
+----------+--------------------+----------+-----+------------+-----------+--------+---------+-------------+
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       46|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       47|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       48|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       49|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       50|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       51|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       52|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       53|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       54|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       55|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       56|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       57|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       58|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       59|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       60|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       61|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       62|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       63|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       64|MRT0000020611|
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       65|MRT0000020611|
+----------+--------------------+----------+-----+------------+-----------+--------+---------+-------------+
only showing top 20 rows

root
 |-- eventEpoch: string (nullable = true)
 |-- increments: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- internalId: string (nullable = true)
 |-- phase: string (nullable = true)
 |-- deltaRST: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- feederEpoch: string (nullable = true)
 |-- feederId: string (nullable = true)
 |-- lineCount: long (nullable = true)
 |-- mrtCode: string (nullable = true)

CHECK

The offset applied was 0. Check whether eventEpoch was in the range of feederEpoch plus its line count (normally 3600 seconds), as you may know, every row represents a second in time. See the example of how to check.

for first row:
|1564995646|[2, 2, 2, 75, 33,...|        11|    R|[1, 0, 0, 0]| 1564995600|      02|       46|MRT0000020611|

1564995646 >= 1564995600 and 1564995646 <= 1564995600 + 3600 ===> TRUE