1

I wonder if I can change the operators of a job already submitted to Flink. Suppose I have a word count program and there is a filter on it to count only words larger than 3 characters. I want to change the parameters of this filter at runtime. My first guess is that Flink (and others dataflow engines Spark, Storm, Apache Edgent) cannot do that because the job was already submitted on env.execute(). Does anyone knows any approach to do this?

I guess this question (Deploy stream processing topology on runtime?) is related to what I want but the solution is still not dynamic as I want.

Thanks

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9000)
        .flatMap(new SplitterFlatMap()).keyBy(0)
        .sum(1)
        .filter(word -> word.f1 >= 3);
dataStream.print();
env.execute("WordCountSocketFilterQEP");
Felipe
  • 7,013
  • 8
  • 44
  • 102

3 Answers3

1

With Flink you can connect a broadcast stream to a keyed stream, and broadcast in the parameters or code you want to use. TaxiQuery is an example of that using Janino with Java expressions, but you could probably dynamically load a class instead. I've also seen this being done with Rhino/Javascript, JRuby, etc.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • Thanks @David . I was wondering if I can have the same behavior of CoFlatMapFunction using other operators. Because I am also using Apache Edgent and I want the same behavior on it https://edgent.apache.org/javadoc/latest/index.html?org/apache/edgent/topology/class-use/TStream.html – Felipe Dec 21 '18 at 08:22
  • @FelipeOliveiraGutierrez Please expand on your question -- I don't understand what you are looking for. – David Anderson Dec 21 '18 at 08:40
  • yep. I posted here: https://stackoverflow.com/questions/53881770/is-that-possible-to-have-the-same-behavior-of-coflatmapfunction-using-other-basi thanks – Felipe Dec 21 '18 at 08:58
1

in order for your parameterStream to have its values sent to all operators, you have to use a BroadcastStream. Note that (as of Flink 1.6?) this also lets you maintain broadcast state, where the "rules" or config settings that you're sending around to all instances of your DynamicFilterCoFlatMapper will be automatically saved as state.

kkrugler
  • 8,145
  • 6
  • 24
  • 18
0

I guess in Flink I can use CoFlatMapFunction -> Flink: How to handle external app configuration changes in flink. But in Apache Edgent I am not sure if there is a way to do that.... Here is my implementation>

package org.sense.flink.examples.stream;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import org.sense.flink.mqtt.FlinkMqttConsumer;
import org.sense.flink.mqtt.MqttMessage;

public class SensorsDynamicFilterMqttEdgentQEP {

    public SensorsDynamicFilterMqttEdgentQEP() throws Exception {

        // Start streaming from fake data source sensors
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // obtain execution environment, run this example in "ingestion time"
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<MqttMessage> temperatureStream = env.addSource(new FlinkMqttConsumer("topic-edgent"));
        DataStream<Tuple2<Double, Double>> parameterStream = env.addSource(new FlinkMqttConsumer("topic-parameter"))
                .map(new ParameterMapper());

        DataStream<MqttMessage> filteredStream = temperatureStream.connect(parameterStream.broadcast())
                .flatMap(new DynamicFilterCoFlatMapper());

        filteredStream.print();

        String executionPlan = env.getExecutionPlan();
        System.out.println("ExecutionPlan ........................ ");
        System.out.println(executionPlan);
        System.out.println("........................ ");

        env.execute("SensorsDynamicFilterMqttEdgentQEP");
    }

    public static class DynamicFilterCoFlatMapper
            implements CoFlatMapFunction<MqttMessage, Tuple2<Double, Double>, MqttMessage> {

        private static final long serialVersionUID = -8634404029870404558L;
        private Tuple2<Double, Double> range = new Tuple2<Double, Double>(-1000.0, 1000.0);

        @Override
        public void flatMap1(MqttMessage value, Collector<MqttMessage> out) throws Exception {

            double payload = Double.parseDouble(value.getPayload());

            if (payload >= this.range.f0 && payload <= this.range.f1) {
                out.collect(value);
            }
        }

        @Override
        public void flatMap2(Tuple2<Double, Double> value, Collector<MqttMessage> out) throws Exception {
            this.range = value;
        }
    }

    public static class ParameterMapper implements MapFunction<MqttMessage, Tuple2<Double, Double>> {

        private static final long serialVersionUID = 7322348505833012711L;

        @Override
        public Tuple2<Double, Double> map(MqttMessage value) throws Exception {
            String[] array = value.getPayload().split(",");
            double min = Double.parseDouble(array[0]);
            double max = Double.parseDouble(array[1]);
            return new Tuple2<Double, Double>(min, max);
        }
    }
}
Felipe
  • 7,013
  • 8
  • 44
  • 102