Questions tagged [pyflink]

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. PyFlink makes it available to Python.

PyFlink makes all of Apache Flink available to Python and at the same time Flink benefits from Ptyhon's rich scientific computing functions.

What is PyFlink on apache.org

258 questions
8
votes
4 answers

Get nested fields from Kafka message using Apache Flink SQL

I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects. The documentation suggests that it should…
bash721
  • 140
  • 2
  • 10
6
votes
0 answers

RuntimeError: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed

I am using pyflink 1.17.1 and i am getting this error "RuntimeError: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed". Need your help with this. when i try to sink data…
6
votes
0 answers

pyflink(flink) 1.12.0 bug when table cast to datastream via to_append_stream(java api is: toAppendStream)

Thanks a lot for any help!!! code: from pyflink.common.typeinfo import RowTypeInfo, Types, BasicTypeInfo, TupleTypeInfo from pyflink.table import EnvironmentSettings, StreamTableEnvironment # stream 模式的env创建 env_settings_stream =…
SZ_MaNong
  • 61
  • 1
5
votes
1 answer

Installing PyFlink and Numpy Error on MacOS M1

I am trying to install pyflink via pip3 install apache-flink. and i get : Installing build dependencies ... error error: subprocess-exited-with-error × pip subprocess to install build dependencies did not run successfully. │ exit code: 1 ╰─>…
Sudip Adhikari
  • 157
  • 2
  • 12
5
votes
1 answer

How to submit a pyFlink job to remote Kubernetes session cluster?

Currently, I have a running Flink Kubernetes session cluster (Flink version 1.13.2) and I can access the web UI by port-forward also, I can submit the WordCount jar example by this command ./bin/flink run -m localhost:8081…
Amin
  • 975
  • 8
  • 24
5
votes
0 answers

PyFlink extract nested fields from JSON array

I'm trying to extract a few nested fields in PyFlink from JSON data received from Kafka. The JSON record schema is as follows. Basically, each record has a Result object within which there's an array of objects called data. I'm trying to extract the…
sumeetkm
  • 189
  • 1
  • 7
5
votes
1 answer

Flink Python Custom Connector / Source

I'd like to create a custom user-defined connector / source in pyflink. I see documentation for doing so in Java / Scala but none for Python. Is this possible?
Awesome-o
  • 2,002
  • 1
  • 26
  • 38
4
votes
0 answers

The DataStream.map() function in Flink Python API doesn't work

I'm trying Flink's DataStream API in Python.The env is: flink: 1.16.0 python package: apache-flink==1.16.0 My code was: env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) …
Rinze
  • 706
  • 1
  • 5
  • 21
4
votes
2 answers

Using Python user defined function in a Java Flink Job

Is there anyway to use a python user defined function within a Java Flink Job or anyway to communicate for example the result of a transformation done by flink with java with a python user defined function to apply some machine learning things: I…
Alter
  • 903
  • 1
  • 11
  • 27
4
votes
2 answers

Apache Flink: Kafka connector in Python streaming API, "Cannot load user class"

I am trying out Flink's new Python streaming API and attempting to run my script with ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py. The python script is fairly straightforward, I am just trying to consume from an existing topic…
adaris
  • 315
  • 3
  • 11
3
votes
2 answers

How to reference nested JSON within PyFlink SQL when JSON schema varies?

I have a stream of events I wish to process using PyFlink, where the events are taken from AWS EventBridge. The events in this stream share a number of common fields, but their detail field varies according to the value of the source and/or…
John
  • 10,837
  • 17
  • 78
  • 141
3
votes
0 answers

AWS Kinesis Data Analytics: PyFlink with nested JSON data

With Kinesis analytics SQL application, we have the option to configure the input Schema. So that we can map the input data to flat structure. So I'm looking for the similar configuration with Kinesis Data Analytics Apache Flink…
3
votes
0 answers

Pyflink 1.14 Datastream source -> Pandas processing -> Datastream sink

I've recently seen documentation in Pyflink where it's possible to utilize pandas dataframes in flink via the Table API. My goal was thus to: Receive datastream from Kafka source Convert to table API instance -> where then able to convert to…
Paul
  • 756
  • 1
  • 8
  • 22
3
votes
0 answers

Pyflink jdbc sink

I am trying to make use of Pyflink's JdbcSink to connect to Oracle's ADB instance. I can find examples of JdbcSink using java in Flink's official documentation. But there is no content provided for Python API to do the same. I was trying to…
2
votes
0 answers

Convert protobuf to flnk schema

I am using py-flink to read protobuf and write it to table, protobuf like: message test { int id = 1; string val = 2; } Actually, the protobuf desc is too long, so I don't wan't write the schema like Schema()\. .field('id',…
1
2 3
17 18