Apache Flink is an open source platform for scalable batch and stream data processing. Flink supports batch and streaming analytics, in one system. Analytical programs can be written in concise and elegant APIs in Java and Scala. Apache Flink features Table API (and SQL API) as unified APIs for stream and batch processing.
Questions tagged [flink-table-api]
81 questions
6
votes
1 answer
How to convert a Table to a DataStream containing array types (Flink)?
I have issues concerning the table-api of Flink (1.13+). I have a POJO containing several fields, one of them being:
List my_list;
I create my table using the following declaration for this field:
"CREATE TABLE my_table (
...
my_list…

Fray
- 173
- 6
3
votes
2 answers
Apache Flink 1.14.0 - Unable to use python UDF through SQL DDL in Java
I'm trying to execute the python UDF function in SQL DDL(1.14.0)
Python file here:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a: int):
…

Liam Zee
- 196
- 1
- 5
3
votes
2 answers
Flink sink filesystem as parquet - error on saving nested data
I am trying to convert a json data to parquet so than I can use Trino or presto to query. Sample JSON is as follows:
{"name": "success","message": "test","id": 1, "test1": {"one": 1, "two": 2, "three": "t3"}, "test2": [1,2,3], "test3": [{"a":…

success malla
- 80
- 5
3
votes
0 answers
Remote Flink job execution with query to Hive on Flink cluster
I use Flink 1.11.2, Hive 2.1.1, Java 8.
Attempt to execute remotely query to Hive, packaged it in jar and run it by Flink's RestClient:
private static String jar = "/path/Job.jar";
Configuration config = RemoteConfiguration.getConfiguration(host,…

Ruslan
- 31
- 2
3
votes
1 answer
Flink Table API: GROUP BY in SQL Execution throws org.apache.flink.table.api.TableException
I have this very simplified use case: I want to use Apache Flink (1.11) to read data from a Kafka topic (let's call it source_topic), count an attribute in it (called b) and write the result into another Kafka topic (result_topic).
I have the…

Irina S.
- 133
- 8
2
votes
0 answers
Flink Table API : Flink Dynamic table produces incorrect intermediate CDC values
I am using flink table API to calculate few aggregations. I have stream of data coming from Kafka which is transformed to stream of rows. Using this rows I am creating dynamic table.
Ex: consider below three records, primary key is…

Aishwarya
- 21
- 1
2
votes
0 answers
Flink Sql not converting RAW('org.apache.avro.util.Utf8', '...') to String
I'm reading from a kafka stream, creating a Table environment and calculating an average and writing the data back to kafka [SIMPLECUSTOMER].
This worked in Flink 1.12.5. I'm using Flink 1.13.2 and Flink 1.14.0
customerId is read as…

Mahendran Ponnusamy
- 31
- 4
2
votes
2 answers
Flink nested classes toDataStream conversion error
I am using flink 1.13. I am trying to convert table results to the datastream in a following way but keep getting error.
public class HybridTrial {
public static class Address {
public String street;
public String houseNumber;
public…

voidMainReturn
- 3,339
- 6
- 38
- 66
2
votes
1 answer
Sink flink DataStream using jdbc connector to mysql sink with overwrite
My use case is
Get Data from AWS Kinesis Data stream and filter/map using flink data stream api
Use StreamTable Environment to group and aggregate data
Use SQLTableEnvironment to write to mysql using JDBC Connector
I am able to write my datastream…

Mujahid
- 114
- 1
- 3
- 14
2
votes
1 answer
Flink Table-API and DataStream ProcessFunction
I want to join a big table, impossible to be contained in TM memory and a stream (kakfa). I successfully joined both on my tests, mixing table-api with datastream api. I did the following:
val stream: DataStream[MyEvent] = env.addSource(...)
stream
…

Fray
- 173
- 6
2
votes
0 answers
How to assign a unique ID to each row in a table in the Flink Table API?
I'm using Flink to compute a series of operations. Each operation produces a table which is both used for the next operation as well as stored in S3. This makes it possible to view the data at each intermediate step in the calculation and see the…

Alex Hall
- 34,833
- 5
- 57
- 89
2
votes
1 answer
Apache-Flink 1.11 Unable to use Python UDF in SQL Function DDL
According to this confluence page:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL
python udf are in Flink 1.11 available to be used with in SQL functions.
I went to the flink docs…

Jonathan Figueroa
- 31
- 2
1
vote
0 answers
Create separate Table for each key in Data Stream after doing keyBy operation in Flink
My requirement is to create separate tables for each key in two different data streams and then join them. I have successfully created two separate tables from the data streams in Flink and performed the join using table api. However, I am facing…

Vinay Cheguri
- 55
- 7
1
vote
0 answers
Is it currently possible to generate a changelog steam with the Data Generator Source from FLIP-238
In FLIP-238 and the related merge, new DataGeneratorSource was introduced. It appears that this allows us to easily create new DataGeneration sources. However it's not clear if in it's current form allow users to generate a Changelog DataStream.…

discord
- 59
- 10
1
vote
0 answers
Flink SQL : How to unpack fields in ROW type as multiple columns?
I call a UDF in such a Flink SQL query:
SELECT dvid, rank_name, rank_type, window_start, window_end, RankDif(rank_order,rank_pt) AS rank_cur
FROM TABLE(
HOP(TABLE UniqueRankTable, DESCRIPTOR(rank_pt), INTERVAL '1' DAY, INTERVAL '2'…

Singleton
- 11
- 1