I've recently seen documentation in Pyflink where it's possible to utilize pandas dataframes in flink via the Table API. My goal was thus to:
- Receive datastream from Kafka source
- Convert to table API instance -> where then able to convert to Pandas
- --- Pandas processing logic
- Convert pandas dataframe back to Table instance
- Which then is converted back to a datastream and sinked back to kafka
Based on the flink documentation, I referenced the code to convert Datastream <-> Table Instance and between Table <-> pandas.
import os
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.serialization import Encoder
from pyflink.common.serialization import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, StreamingFileSink
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.datastream.data_stream import DataStream
from pyflink.datastream.functions import MapFunction
from pyflink.table.table import Table
from pyflink.table.table_environment import StreamTableEnvironment
KAFKA_SERVERS = os.getenv('KAFKA_BS_SERVERS',"kafka:9094").split(',')
KAFKA_USERNAME = "user"
KAFKA_PASSWORD = "123"
KAFKA_SOURCE_TOPIC = 'topic_one'
KAFKA_SINK_TOPIC = 'topic_two'
# creating a kafka source for the pipeline
def create_kafka_source(usern: str, password: str, topic: str):
kafka_props = {
'bootstrap.servers': ','.join(KAFKA_SERVERS),
'group.id': 'testgroup12',
'auto.offset.reset': 'earliest',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config' : f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{usern}\" password=\"{password}\";", # correct one
'security.protocol': 'SASL_PLAINTEXT',
"enable.auto.commit": "true",
"auto.commit.enable": "true",
"auto.commit.interval.ms": "1000",
"session.timeout.ms": "30000",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_source = FlinkKafkaConsumer(
topics=[topic],
deserialization_schema=SimpleStringSchema(),
properties= kafka_props
)
kafka_source.set_commit_offsets_on_checkpoints(True)
return kafka_source
# creating a kafka sink for the pipeline
def create_kafka_sink(usern: str, password: str, topic: str):
kafka_producer = FlinkKafkaProducer(
topic= topic,
serialization_schema=SimpleStringSchema(),
producer_config= {
'bootstrap.servers': ','.join(KAFKA_SERVERS),
'group.id': 'testgroup12',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config' : f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{usern}\" password=\"{password}\";", # correct one
'security.protocol': 'SASL_PLAINTEXT'
}
)
return kafka_producer
# the pipeline which will run
def pipeline():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")
env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.12-1.14.0.jar")
env.set_parallelism(1)
env.enable_checkpointing(5000)
t_env = StreamTableEnvironment.create(stream_execution_environment= env)
kafka_source = create_kafka_source(KAFKA_USERNAME, KAFKA_PASSWORD, KAFKA_SOURCE_TOPIC)
ds = env.add_source(kafka_source)
# Stream to Table
table : Table = t_env.from_data_stream(ds)
pd = table.to_pandas()
# custom pandas logic
pd["Testing"] = 'new Vals'
# Table to stream
table = t_env.from_pandas(pd,pd.columns.tolist())
t_env.create_temporary_view("InputTable", table)
res_table = t_env.sql_query("SELECT * FROM InputTable")
res_ds = t_env.to_data_stream(res_table)
# Sink to file and Kafka
res_ds.add_sink(StreamingFileSink
.for_row_format('/opt/flink/outputs_dumps', Encoder.simple_string_encoder())
.build())
kafka_sink = create_kafka_sink(KAFKA_USERNAME, KAFKA_PASSWORD, KAFKA_SINK_TOPIC)
res_ds.add_sink(kafka_sink)
env.execute("proto_1")
if __name__ == '__main__':
pipeline()
When submitting this to flink, the job will be created without any errors or exceptions:
$ /opt/flink/bin/flink run --python script.py
However, it can be seen on the flink UI that the job name didn't register respectively
as well as the pandas logic not being reflected on the output topic. (1) A json packet as received as source packet, (2) where pandas essentially adds a new value to the packet, (3) and then should sink this packet back to the output topic
Received on Source Topic:
{"Cylinders":8.0,"Displacement":360.0,"Horsepower":215.0,"Weight":4615.0,"Acceleration":14.0,"Model Year":70.0,"USA":1.0,"Europe":0.0,"Japan":0.0}
Output on Destination Topic ('testing': 'New vals' not added):
{"Cylinders":8.0,"Displacement":360.0,"Horsepower":215.0,"Weight":4615.0,"Acceleration":14.0,"Model Year":70.0,"USA":1.0,"Europe":0.0,"Japan":0.0}
If my approach is incorrect, can someone please show me the correct way of implementation? This is supposed to work as a unbounded stream operation (not as a batch operation, if I have my terminology correct here...)