I am learning to use Flink to do the pre-aggregation function of metrics.
I installed an Agent on a cluster, and it will report many metrics (cpu/memory metrics, and user-defined metrics) every minute, but these metrics (them have [app, hostname, ip] tags) are at the granularity of a single host.
I want to use a Flink Job to pre-aggregate some metrics according to the rules in the DB and save new metrics(them only have [app] tag) to the time series database.
How does a Flink operator read external configuration? When is the best time and the best way to do it? (Try not to block Flink's processing flow)
My operator needs to use some external configurations during operation, they may:
- Stored in MySQL/Redis
- Requires access via HTTP
Moreover, the external configuration required for each key (assuming keyBy is executed) may be different.
The following is the pseudocode of my program:
env.fromSource(some kafka topic)
.filter(data->{
// Only keep data with value > threshold.
// Among them, threshold needs to be obtained from the external system according to config_id.
int threshold = fetchThresholdForConfig(data.config_id);
return data.value > threshold;
})
.keyBy(data -> data.config_id)
.window(...)
.aggregate(... and every config has its own aggregation rule , ...)
I found someone asked a similar question, refer to this link Flink: How to handle external app configuration changes in flink. Flink maintaining configuration state
I don’t know if the method based on the broadcast stream mentioned in the article is the best recommended method?