I'm trying to execute the python UDF function in SQL DDL(1.14.0)
Python file here:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a: int):
return a + 1
And start flink cluster:
➜ flink-1.14.0 ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host magiclian-ubuntu.
Starting taskexecutor daemon on host magiclian-ubuntu.
Java code here:
public class PyUDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//set cfg
tEnv.getConfig().getConfiguration().setString("python.files",
"/home/magic/workspace/python/flinkTestUdf/udfTest.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.executeSql(
"CREATE TEMPORARY SYSTEM FUNCTION add1 AS 'udfTest.add_one' LANGUAGE PYTHON"
);
TableResult ret1 = tEnv.executeSql("select add1(3)");
ret1.print();
env.execute();
}
}
And then run the job through Flink client:
flink run /home/magic/workspace/flink-jobs/UDF/pythonUDF/target/pythonUDF-1.0.0.jar
Error is :
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. Cannot instantiate user-defined function 'add1'.
But when i use sql-client to execute my py UDF, it runs successfully.
Start sql-client:
PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3 ./sql-client.sh embedded -pyexec /usr/bin/python3 -pyfs home/magic/workspace/python/flinkTestUdf/udfTest.py
Then
create temporary system function add1 as 'udfTest.add_one' language python;
Then
select add1(3);
I got the correct result 4
and is there something wrong with my code?
I see that the py UDF function was supported in version 1.11
https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL
, but now i'm using 1.14.0.
Who can help me out!