Questions tagged [flink-sql]

Apache Flink features two relational APIs, SQL and Table API, as unified APIs for stream and batch processing.

Apache Flink features two relational APIs:

  1. SQL (via Apache Calcite)
  2. Table API, a language-integrated query (LINQ) interface

Both APIs are unified APIs for stream and batch processing. This means that the a query returns the same result regardless whether it is applied on a static data set or a data stream. SQL queries are parsed and optimized by Apache Calcite (Table API queries are optimized by Calcite).

Both APIs are tightly integrated with Flink's DataStream and DataSet APIs.

667 questions
8
votes
4 answers

Get nested fields from Kafka message using Apache Flink SQL

I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects. The documentation suggests that it should…
bash721
  • 140
  • 2
  • 10
6
votes
1 answer

Does AWS Glue Scheme Registry support being used as Flink SQL Catalog?

Does AWS Schema Registry support being used as an SQL Catalog within Flink SQLK applications? For instance, the documentation shows an example of using a Hive Catalog: CREATE CATALOG hive WITH…
John
  • 10,837
  • 17
  • 78
  • 141
6
votes
3 answers

Apache Flink: How to enable "upsert mode" for dynamic tables?

I have seen several mentions of an "upsert mode" for dynamic tables based on a unique key in the Flink documentation and on the official Flink blog. However, I do not see any examples / documentation regarding how to enable this mode on a dynamic…
Austin York
  • 808
  • 2
  • 10
  • 24
5
votes
1 answer

How can we define nested json properties (including arrays) using Flink SQL API?

We have the following problem while using Flink SQL: we have configured Kafka Twitter connector to add tweets to Kafka and we want to read the tweets from Kafka in a table using Flink SQL. How can we define nested json properties (including arrays)…
mricat
  • 51
  • 1
5
votes
0 answers

PyFlink extract nested fields from JSON array

I'm trying to extract a few nested fields in PyFlink from JSON data received from Kafka. The JSON record schema is as follows. Basically, each record has a Result object within which there's an array of objects called data. I'm trying to extract the…
sumeetkm
  • 189
  • 1
  • 7
5
votes
2 answers

Apache Flink Resource Planning best practices

I'm looking for recommendations/best practices in determining required optimal resources for deploying a streaming job on Flink Cluster. Resources are No. of tasks slots per TaskManager Optimal Memory allocation for TaskManager Max Parallelism
ardhani
  • 303
  • 1
  • 11
5
votes
1 answer

Is there a way to determine total job parallelism or number of slots required to run a Flink job(before it is run)

Is there a way to determine the total number of task slots that will be required to run the job from either the execution plan or in some other way without having to actually start the job first. According to this doc:…
SherinThomas
  • 1,881
  • 4
  • 16
  • 20
4
votes
0 answers

FLINK SQL: row.getFieldsAs returns a LocalDateTime instead of a Timestamp?

Flink: 1.13.2 I'm having a StreamTableEnvironment tableEnv that read streaming data from a KafkaSource. From this tableEnv, I filter my data and transform it back to a DataStream. DataStream myStreamData = env.fromSource(source,…
4
votes
1 answer

How can I use Flink to implement a streaming join between different data sources?

I have data coming from two different Kafka topics, served by different brokers, with each topic having different numbers of partitions. One stream has events about ads being served, the other has clicks: ad_serves: ad_id, ip, sTime ad_clicks:…
David Anderson
  • 39,434
  • 4
  • 33
  • 60
4
votes
1 answer

How to sort a stream by event time using Flink SQL

I have an out-of-order DataStream that I want to sort so that the events are ordered by their event time timestamps. I've simplified my use case down to where my Event class has just a single field -- the timestamp field: public static void…
David Anderson
  • 39,434
  • 4
  • 33
  • 60
4
votes
2 answers

Apache Flink error java.lang.ClassNotFoundException: org.apache.flink.table.sources.TableSource?

I am writing a streaming service in Apache Flink. I am basically picking data from a CSV file by using org.apache.flink.table.sources.CsvTableSource. Below is the code for same: StreamTableEnvironment streamTableEnvironment = TableEnvironment …
Srivatsa Sinha
  • 193
  • 2
  • 12
3
votes
1 answer

How does parallelism works when using Flink SQL?

I understand that in the Flink Datastream world parallelism means each slot will get a subset of events [1]. A Flink program consists of multiple tasks (transformations/operators, data sources, and sinks). A task is split into several…
John
  • 10,837
  • 17
  • 78
  • 141
3
votes
2 answers

How to reference nested JSON within PyFlink SQL when JSON schema varies?

I have a stream of events I wish to process using PyFlink, where the events are taken from AWS EventBridge. The events in this stream share a number of common fields, but their detail field varies according to the value of the source and/or…
John
  • 10,837
  • 17
  • 78
  • 141
3
votes
1 answer

What is the difference between Lookup and Processing Time Temporal join in Flink?

In my opinion, Processing Time Temporal Join is used for a stream and an external database and always join the latest value in the external database based on the join condition. Also, Processing Time Temporal Join is used when the external table is…
Dilibaba
  • 123
  • 8
3
votes
2 answers

Flink sink filesystem as parquet - error on saving nested data

I am trying to convert a json data to parquet so than I can use Trino or presto to query. Sample JSON is as follows: {"name": "success","message": "test","id": 1, "test1": {"one": 1, "two": 2, "three": "t3"}, "test2": [1,2,3], "test3": [{"a":…
1
2 3
44 45