I want to create a udf in spark to work with a Java data structure which doesn't fit in the scala fp style and only modifies its internal states. To simplify, here is the skeleton java class I'm working with.
public class MagicStats {
List<Long> rawData;
public void processDataPoint(long dataPoint) {
rawData.add(dataPoint);
//some magic processing
}
public void merge(MagicStats anotherMagicStats) {
//merge with another instance to combine states
}
public long eval() {
//do some magic with data
//return result
}
}
A bit more background on what I'm trying to do with this class. I have some data bucketed by day and for each day's partition I will produce some summary stats, including count, avg, etc, and this special MagicStats (which will be obtained by eval()
in the class) and save them in a database. What makes this MagicStats special is:
- I need a daily MagicStats result for the data.
- I need a monthly aggregation of the daily MagicStats results (which cannot be calculated arithmetically from the daily results and can only be handled by the class).
As you can see, the second requirement means I have to take a snapshot of the MagicStats object for each daily partition and store it as raw bytes in a column in the database so that when it comes to monthly aggregation I can reconstruct all 30 MagicStats objects in memory from the byte arrays and call merge(MagicStats) and then eval()
to properly aggregate.
Now comes the hard part. How do I create a udf that doesn't return a result from input stream but instead modifies the internal state of a java object? Here is where I'm stuck at (pseudo code below):
//input_monthly_data
// +----------+------+
// | day | value|
// +----------+------+
// |2020-01-01| 3000|
// |2020-01-02| 4500|
// |2020-01-03| 3500|
// |..........| ....|
// +----------+------+
val df = spark.read.json("input_monthly_data.json")
df.groupby("day").agg(MyUDF(data).as("daily stats").select("daily stats", "avg", "count").saveAsTable()
class MyUDF extends UserDefinedFunction {
def apply(input: Long): Column = {
//create a static MagicStats instance
//update the state of the instance per data point
//serialize this instance to bytes after done for each day's partition
//return the bytes for later persistence
}
}
//output_monthly_data
// +----------+------+-----------------+
// | day | count| MagicStats bytes|
// +----------+------+-----------------+
// |2020-01-01| 10 | some binary. |
// |2020-01-02| 20 | some binary. |
// |2020-01-03| 25 | some binary. |
// |..........| ....| some binary. |
// +----------+------+-----------------|
Any suggestions on how to make this UDF work or another way to achieve my goal will be greatly appreciated!