This is pretty easy in Kafka Streams, so I would opt for 2.
First you have to to model your input data properly. Your example uses local time, which makes it impossible to calculate durations between two timestamps. Use something like epoch time.
Start with a source data model like
interface SensorState {
String getId();
Instant getTime();
State getState();
enum State {
OFF,
ON
}
}
and a target of
interface SensorStateWithDurationX {
SensorState getEvent();
Duration getDuration();
}
Now that you have defined you input and output stream (but see “Data Types and Serialization”) you just need to transform the values (“Applying processors and transformers”) by simply defining a ValueTransformer
.
It has to do 2 things:
Check the state store for historical data for the sensor and update it with new data, if necessary
When historical data is available, calculate the difference between the timestamps and emit the data together with the calculated duration
class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
KeyValueStore<String, SensorState> store;
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
}
public SensorStateWithDuration transform(SensorState sensorState) {
// Nothing to do
if (sensorState == null) {
return null;
}
// Check for the previous state, update if necessary
var oldState = checkAndUpdateSensorState(sensorState);
// When we have historical data, return duration so far. Otherwise return null
return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
}
public void close() {}
/**
* Checks the state store for historical state based on sensor ID and updates it, if necessary.
*
* @param sensorState The new sensor state
* @return The old sensor state
*/
Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
// The Sensor ID is our index
var index = sensorState.getId();
// Get the historical state (might be null)
var oldState = store.get(index);
if (neetToUpdate(oldState, sensorState)) {
// Update the state store to the new state
store.put(index, sensorState);
}
return Optional.ofNullable(oldState);
}
/**
* Check if we need to update the state in the state store.
*
* <p>Either we have no historical data, or the state has changed.
*
* @param oldState The old sensor state
* @param sensorState The new sensor state
* @return Flag whether we need to update
*/
boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
return oldState == null || oldState.getState() != sensorState.getState();
}
/**
* Wrap the old state with a duration how log it lasted.
*
* @param oldState The state of the sensor so far
* @param sensorState The new state of the sensor
* @return Wrapped old state with duration
*/
SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
var duration = Duration.between(oldState.getTime(), sensorState.getTime());
return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
}
}
Putting everything together (“Connecting Processors and State Stores”) in a simple Topology:
var builder = new StreamsBuilder();
// Our state store
var storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("SensorStates"),
Serdes.String(),
storeSerde);
// Register the store builder
builder.addStateStore(storeBuilder);
builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
.transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
.to("result-topic", Produced.with(Serdes.String(), resultSerde));
var topology = builder.build();
A full application is at github.com/melsicon/kafka-sensors.