- Figure out what is the correct way to scale up the remote function.
- Figure out scaling relations between replicas of the remote function, Flink
parallelism.default
configuration, ingress topic partition counts together with message partition keys. What is the design intentions behind this topic.
As the docs suggest, one of the benefits of flink statefun remote functions is that the remote function can scale differently with the flink workers and task parallelism. To understand more about how these messages are sent to the remote function processes. I have tried following scenarios.
Preparation
- Use https://github.com/apache/flink-statefun-playground/blob/main/deployments/k8s this for my experiment.
- Modify the https://github.com/apache/flink-statefun-playground/blob/main/deployments/k8s/03-functions/functions.py to the following to check the logs how things are parallelized in practice
...
functions = StatefulFunctions()
@functions.bind(typename="example/hello")
async def hello(context: Context, message: Message):
arg = message.raw_value().decode('utf-8')
hostname = os.getenv('HOSTNAME')
for _ in range(10):
print(f"{datetime.utcnow()} {hostname}: Hello from {context.address.id}: you wrote {arg}!", flush=True)
time.sleep(1)
...
- Play around the
parallelism.default
in the flink.conf, replicas count in the functions deployment configuration as well different partitioning configurations in the ingress topic:names
Observations
- When sending messages with the same partition key, everything seems to be running sequentially. Meaning if I send 5 messages like "key1:message1", "key1:message2", "key1:message3", "key1:message4", ""key1:message5". I can see that only one of the pod is getting requests even I have more replicas (Configured 5 replicas) of the remote function in the deployment. Regardless how I configure the parallelism or increasing the ingress topic partition count, it always stays the same behavior.
- When sending messages with 10 partition keys (The topic is configured with 5 partitions, and parallelism is configured to 5 and the replicas of the remote function is configured to 5). How the replicas remote function receiving the requests seems to be random. Sometime, 5 of them receiving requests at the same time so that 5 of them can run some task together. But some time only 2 of them are utilized and other 3 are just waiting there.
- Seems parallelism determines the number of consumers in the same consumer group that subscribing to the ingress topic. I suspect if I have if configured more parallelism than the number of partitions in the ingress topic. the extra parallelism will just stay idle.
My Expectations
- What I really expect how this SHOULD work is that 5 of the replica remote functions should always be fully utilized if there is still backlogs in the ingress topic.
- When the ingress topic is configured with multiple partitions, each partitions should be batched separately and multiplex with other parallelism (or consumers) batches to utilize all of the processes in the remote functions.
Can some Flink expert help me understand the above behavior and design intentions more?