I want to match a CEP Pattern in Flink 1.4.0 Streaming with the following code:
DataStream<Event> input = inputFromSocket.map(new IncomingMessageProcessor()).filter(new FilterEmptyAndInvalidEvents());
DataStream<Event> inputFiltered = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
KeyedStream<Event, String> partitionedInput = inputFiltered.keyBy(new MyKeySelector());
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(new ActionCondition("action1"))
.followedBy("middle").where(new ActionCondition("action2"))
.followedBy("end").where(new ActionCondition("action3"));
pattern = pattern.within(Time.seconds(30));
PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
Event
is just a POJO
public class Event {
private UUID id;
private String action;
private String senderID;
private long occurrenceTimeStamp;
......
}
that gets extracted from my custom source (Google PubSub).
The first filter FilterEmptyAndInvalidEvents()
just filters for events that have incorrect formatting etc. but this does not occur in this case. I can verify this because of the logging output.
So every event runs through the MyKeySelector.getKey()
method.
The BoundedOutOfOrdneressGenerator
extracts just the timestamp from one field:
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
private static Logger LOG = LoggerFactory.getLogger(BoundedOutOfOrdernessGenerator.class);
private final long maxOutOfOrderness = 5500; // 5.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Event element, long previousElementTimestamp) {
long timestamp = element.getOccurrenceTimeStamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
Watermark newWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return newWatermark;
}
}
MyKeySelector
just extracts a string value out of the field:
public class MyKeySelector implements KeySelector<Event, String> {
private static Logger LOG = LoggerFactory.getLogger(MyKeySelector.class);
@Override
public String getKey(Event value) throws Exception {
String senderID = value.getSenderID();
LOG.info("Partioning event {} by key {}", value, senderID);
return senderID;
}
}
ActionCondition
is here just doing a comparison of one field in the events and looks like this:
public class ActionCondition extends SimpleCondition<Event> {
private static Logger LOG = LoggerFactory.getLogger(ActionCondition.class);
private String filterForCommand = "";
public ActionCondition(String filterForCommand) {
this.filterForCommand = filterForCommand;
}
@Override
public boolean filter(Event value) throws Exception {
LOG.info("Filtering event for {} action: {}", filterForCommand, value);
if (value == null) {
return false;
}
if (value.getAction() == null) {
return false;
}
if (value.getAction().equals(filterForCommand)) {
LOG.info("It's a hit for the {} action for event {}", filterForCommand, value);
return true;
} else {
LOG.info("It's a miss for the {} action for event {}", filterForCommand, value);
return false;
}
}
}
Unfortunately, when starting the job and sending in events that should be matched by the pattern, they are received and partitioned correctly but the CEP Pattern is not matched.
As an example, I send in the following events:
- action1
- action2
- action3
In the log output of the Flink job I see that the events are correctly running through the MyKeySelector.getKey()
Method since I added logging output there.
So the events seem to appear correctly in the stream, but unfortunately they are not matched by the pattern.
The logging output looks like this:
FilterEmptyAndInvalidEvents - Letting event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 through
MyKeySelector - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents - Letting event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 through
MyKeySelector - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents - Letting event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 through
MyKeySelector - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
TimeCharacteristic is set to EventTime via
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
and the events contain a correct timestamp.
If I now send in another 3 events with the actions (but with new timestamp etc.)
- action1
- action2
- action3
the pattern is matched for the first set of events. I know that it is matched for the first set of events since I for debugging purposes tagged every event with a guid and I print that for the one's matched.
When sending in a 3rd, 4th, ... set of these 3 events in, always the previous set of events get matched. So there seems to be kind of an "offset" in the pattern detection. It does not seem to be a timing issue though, since the first set of events is also not matched if I wait long after sending it (and seeing the events being partitioned by Flink).
Is there anything wrong with my code or why does flink only always match the previous set of events with the pattern?