I call a UDF in such a Flink SQL query:
SELECT dvid, rank_name, rank_type, window_start, window_end, RankDif(rank_order,rank_pt) AS rank_cur
FROM TABLE(
HOP(TABLE UniqueRankTable, DESCRIPTOR(rank_pt), INTERVAL '1' DAY, INTERVAL '2' DAY)
)
GROUP BY dvid, rank_name, rank_type, window_start, window_end;
in which the RankDif function returns a ROW type.
Here is the defination of RankDif:
@FunctionHint(output = @DataTypeHint("ROW<order INT, diff INT, pt TIMESTAMP_LTZ(0)>"))
public class RankDif extends AggregateFunction<Row, RankDif.RankRecordAccumulator> {
// ....
}
I want to unpack the output of RankDif as multiple columns in the query above, like this:
SELECT dvid, rank_name, rank_type, window_start, window_end, RankDif(rank_order,rank_pt) AS (rank_order, rank_diff, rank_pt)
FROM TABLE(
HOP(TABLE UniqueRankTable, DESCRIPTOR(rank_pt), INTERVAL '1' DAY, INTERVAL '2' DAY)
)
GROUP BY dvid, rank_name, rank_type, window_start, window_end;
but it's failed in Flink SQL grammar validation.
What I only know is how to unpack ROW type fields with Flink Table API:
table.select(call("RankDif",$("rank_order"),$("rank_pt")).as("rank_order, rank_diff, rank_pt"))
it works, so what's in SQL style?