OVERVIEW
I am working with Java Spark to compute huge amount of data. I am loading into a Dataset
the content of lots of .DAT
files called the feeders
. These .DAT
files contain among others fields, a timestamp (number of seconds spanned from the 1st, Jan of 1970) when such data started to be recorded. In here, every row within a file represents a second in time. Example of how a .DAT
file when data started to be recorded 1546297200 (Monday, December 31, 2018 11:00:00 PM, GMT), looks like:
id & deltaRSTs| timestamp|
| |
SQXCXBAXY4P-02,1546297200,825,2065,391
1,0,-8,0 |1546297200|
1,0,-2,0 |1546297201|
1,0,0,0 |1546297202|
1,0,10,0 |1546297203|
1,0,-6,0 |1546297204|
1,0,-4,0 |1546297205|
1,0,0,0 ...
1,0,6,0 ...
1,0,1,0 ...
1,0,-8,0 ...
On the other side, I have another Dataset
that contains information about some electrical device (in .csv
format). The important part here, is that such device created a set of events (let's coin it with EVT
) with a different timestamps (number of seconds spanned from the 1st, Jan of 1970).
I want to take all rows from the .DAT
s files that fulfill with a certain temporal condition: given an event, take all DAT
rows such that EVT(timestamp)
is in DAT(timestamp)
taking into account an offset to create a window, namely:
maxEpoch = DAT(timestamp) + rows_of_DAT
DAT(timestamp) + offset <= EVT(timestamp) && EVT(timestamp) <= maxEpoch - offset
Don't worry if you do not understand this completely, it is for the sake of giving some background. But it's necessary that you grasp the idea.
PROBLEM
I'm going to present the classes that I considered suitable for solving the aforementioned case:
CLASS ReadCSV
(main):
public class ReadCSV {
private static final SparkSession spark = new SparkSession
.Builder()
.master("local[*]")
.getOrCreate();
public static void main(String[] args) {
spark.sparkContext().setLogLevel("ERROR");
Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs
Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs
Dataset<ProcessEvRow> processEvRowDataset = eventCSVRowDataset
.map(new ProcessEvTransformation(feederFileRowDataset), Encoders.bean(ProcessEvRow.class))
}
}
Note in this class, when an object of type ProcessEvTransformation
is created, I am passing as an argument the Dataset
of all DAT
rows.
CLASS ProcessEvTransformation
:
public class ProcessEvTransformation implements MapFunction<EventCSVRow, ProcessEvRow> {
private Dataset<FeederFile> feederFileDataset;
private int offset = 40;
public ProcessEvTransformation(Dataset<FeederFile> feederFileDataset) {
this.feederFileDataset = feederFileDataset;
// I did here, this.feederFileDataset.show(); and it was successfull
}
public ProcessEvTransformation withOffset(int offset) {
this.offset = offset;
return this;
}
@Override
public ProcessEvRow call(EventCSVRow eventCSVRow) throws Exception {
String stdPattern = ...
String rejectedFlag = ...
Dataset<FeederFile> deltaRSTs = this.feederFileDataset
.filter(feederFileRow -> {
final long epochTime = Long.parseLong(eventCSVRow.getEpochTime());
final long maxEpoch = Long.parseLong(feederFileRow.getEpoch()) + feederFileRow.getLineCount();
return Long.parseLong(feederFileRow.getEpoch()) + offset <= epochTime && epochTime <= maxEpoch - offset;
});
String[] rstDistances = getRstDistancesAndMinimum(deltaRSTs, eventCSVRow.getIncrements()); // whatever algorithmic procedure
...
}
}
The problem here is that I am getting a NullPointerException
because somewhat feederFileDataset
attribute is being null. The curious part is that I am pretty sure it is arriving 100% defined, but when the call
method is invoked it turns into something null or Invalid tree; null:
(is the message displayed when printing it)
QUESTIONS & CONSLUSION
- Does anyone know how to pass successfully a
Dataset
as a parameter to aMapFunction
interface class based? - Why the
Dataset
is turning into something invalid when I pass it correctly? does it have something to do with internal procedures of Java Spark?
I hope I have been clear. Thanks for any help you can provide.
Best wishes, Thomas.