I'm using Flink to compute a series of operations. Each operation produces a table which is both used for the next operation as well as stored in S3. This makes it possible to view the data at each intermediate step in the calculation and see the effect of each operation.
I need to assign a unique identifier to each row in each table, so that when that identifier appears again in the following step (possibly in a different column) I know that two rows are associated with each other.
The first obvious candidate for this seems to be the ROW_NUMBER()
function, but:
It doesn't seem to be anywhere in the table expression API. Do I have to construct SQL strings?
How do I use it? When I try this query:
SELECT *, ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp
I get this error:
org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
Does it always require sorting the table? This seems like an overhead I'd rather avoid.
The next option was just to generate a random UUID for every row. But when I try this, the same UUID is never used twice, so it's completely useless. Here's an example:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Sandbox {
def main(args: Array[String]): Unit = {
val env = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment
)
val inp = env.fromValues(1.as("id"))
val out1 = inp.addColumns(uuid().as("u"))
val out2 = out1.addColumns($"u".as("u2"))
env.executeSql("""
CREATE TABLE out1 ( id INTEGER, u VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.executeSql("""
CREATE TABLE out2 ( id INTEGER, u VARCHAR(36), u2 VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.createStatementSet()
.addInsert("out1", out1)
.addInsert("out2", out2)
.execute()
// Equivalent to the createStatementSet method:
out1.executeInsert("out1")
out2.executeInsert("out2")
}
}
The output I get:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,55da264d-1e15-4c40-94d4-822e1cd5db9c,c9a78f93-580c-456d-9883-08bc998124ed)
I need the UUID from out1
to reappear in out2
in both columns, e.g:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d,4e6008ad-868a-4f95-88b0-38ee7969067d)
I suppose this is due to this note in the docs:
This function is not deterministic which means the value would be recalculated for each record.
How can I calculate a UUID just once and make it 'concrete' so that the same value is sent to both out1
and out2
?
I get a similar result with a user defined function:
class uuidUdf extends ScalarFunction {
def eval(): String = UUID.randomUUID().toString
}
val out1 = inp.addColumns(call(new uuidUdf()).as("u"))