0

I want to create a udf in spark to work with a Java data structure which doesn't fit in the scala fp style and only modifies its internal states. To simplify, here is the skeleton java class I'm working with.

public class MagicStats {
    List<Long> rawData;

    public void processDataPoint(long dataPoint) {
        rawData.add(dataPoint);
        //some magic processing
    }

    public void merge(MagicStats anotherMagicStats) {
        //merge with another instance to combine states
    }

    public long eval() {
        //do some magic with data
        //return result
    }
}

A bit more background on what I'm trying to do with this class. I have some data bucketed by day and for each day's partition I will produce some summary stats, including count, avg, etc, and this special MagicStats (which will be obtained by eval() in the class) and save them in a database. What makes this MagicStats special is:

  1. I need a daily MagicStats result for the data.
  2. I need a monthly aggregation of the daily MagicStats results (which cannot be calculated arithmetically from the daily results and can only be handled by the class).

As you can see, the second requirement means I have to take a snapshot of the MagicStats object for each daily partition and store it as raw bytes in a column in the database so that when it comes to monthly aggregation I can reconstruct all 30 MagicStats objects in memory from the byte arrays and call merge(MagicStats) and then eval() to properly aggregate.

Now comes the hard part. How do I create a udf that doesn't return a result from input stream but instead modifies the internal state of a java object? Here is where I'm stuck at (pseudo code below):

//input_monthly_data
// +----------+------+
// |   day    | value|
// +----------+------+
// |2020-01-01|  3000|
// |2020-01-02|  4500|
// |2020-01-03|  3500|
// |..........|  ....|
// +----------+------+

val df = spark.read.json("input_monthly_data.json")
df.groupby("day").agg(MyUDF(data).as("daily stats").select("daily stats", "avg", "count").saveAsTable()

class MyUDF extends UserDefinedFunction {
    def apply(input: Long): Column = {
        //create a static MagicStats instance
        //update the state of the instance per data point
        //serialize this instance to bytes after done for each day's partition
        //return the bytes for later persistence
    }
}
//output_monthly_data
// +----------+------+-----------------+
// |   day    | count| MagicStats bytes|
// +----------+------+-----------------+
// |2020-01-01|  10  | some binary.    |
// |2020-01-02|  20  | some binary.    |
// |2020-01-03|  25  | some binary.    |
// |..........|  ....| some binary.    |
// +----------+------+-----------------|

Any suggestions on how to make this UDF work or another way to achieve my goal will be greatly appreciated!

diyun
  • 21
  • 5

1 Answers1

1

I think maybe you want to be implementing UserDefinedAggregateFunction rather than UserDefinedFunction.

In order to produce an aggregate result, a UserDefinedAggregateFunction has the concept of updating state for each data-point within a given group, which seems to be what you're after.

See these links for more info:

Trevor Reid
  • 3,310
  • 4
  • 27
  • 46
  • I think **UserDefinedAggregateFunction** makes sense but I realized currently Spark doesn't allow UserDefinedType anymore (see this [answer](https://stackoverflow.com/questions/39096268/how-to-use-user-defined-types-in-spark-2-0)). That means I can't really create a **UserDefinedAggregateFunction** with my custom data type for aggregation (in my case MagicStats). Any ideas on how to work around this? – diyun May 14 '20 at 16:24
  • @diyun I haven't tried it with a UDAF, but I think you can make the result of the aggregation a StructType. Something like `StructType(Seq(StructField("rawData", ArrayType(LongType))))`. Though if you only actually need the array of longs, maybe just `ArrayType(LongType)`, then put it into instances of your class or whatever you need to do _after_ the aggregation. This post might help: https://stackoverflow.com/questions/46676785/scala-udaf-with-return-type-as-array-of-complex-object – Brad LaVigne May 16 '20 at 20:41