5

So I'm evaluating Kafka. In our use case would have to create new topics containing "time elapsed" from one event to the next, essentially since a sensor will report as "on" or "off" into Kafka. So having the timestamp, sensorname and state, create new topics with duration of the "on" and "off" state.

  1. Is that doable in KSQL, and how?
  2. Or should one really leave this to consumers or stream processors to figure out?

My data is something like this:

{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on} 

to get result

{ 2019:02:15 00:30:00, sensor1, off, 30sec }. 

Essentially have to combine states of multiple sensors to determine combined state of a machine. Hundreds if not eventually thousands of sensors in a factory

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Can you give an example of the stream of data that you're envisaging and an expected output? It's hard to determine from what you describe if KSQL can currently do this. It's built on Kafka Streams so if you're using stream processors chances are you can do it in KSQL. – Robin Moffatt Feb 15 '19 at 18:00
  • something like this: { 2019:02:15 00:00:00, sensor1, off} then next record { 2019:02:15 00:00:30, sensor1, on} to get result { 2019:02:15 00:30:00, sensor1, off, 30sec }. Essentially have to combine states of multiple sensors to determine combined state of a machine. Hundreds if not eventually thousands of sensors in a factory – Thinus Marloth Feb 15 '19 at 18:28
  • Sounds like you want a sessionized window on a combination of the sensor name + state? Open and close a session based on off/on? – OneCricketeer Feb 17 '19 at 20:29
  • Firstly thanks for the edit. Yes, essentially. Would a sessionized window be time based or record based? A stream of incoming data has a mix of sensors, but a sensor state change could me either seconds or hours in frequncy. – Thinus Marloth Feb 18 '19 at 07:11
  • The question could be rephrased as: How to implement a LAG functionality with kafka/ksql: `SELECT sensor, difference = value - lag(value) FROM bla GROUP BY sensor`? – Holger Brandl Nov 13 '19 at 14:16
  • @RobinMoffatt If ksql would support also absolute event counts as window sizes for hopping windows, this potentially could be potentially solved with a custom aggregation function. Still, have a proper `LAG` would be more intuitive. – Holger Brandl Nov 13 '19 at 14:21
  • @HolgerBrandl Maybe you could comment on https://github.com/confluentinc/ksql/issues/2562 – OneCricketeer Nov 14 '19 at 01:02

2 Answers2

2

This is pretty easy in Kafka Streams, so I would opt for 2.

First you have to to model your input data properly. Your example uses local time, which makes it impossible to calculate durations between two timestamps. Use something like epoch time.

Start with a source data model like

interface SensorState {
  String getId();
  Instant getTime();
  State getState();
  enum State {
    OFF,
    ON
  }
}

and a target of

interface SensorStateWithDurationX {
  SensorState getEvent();
  Duration getDuration();
}

Now that you have defined you input and output stream (but see “Data Types and Serialization”) you just need to transform the values (“Applying processors and transformers”) by simply defining a ValueTransformer.

It has to do 2 things:

  1. Check the state store for historical data for the sensor and update it with new data, if necessary

  2. When historical data is available, calculate the difference between the timestamps and emit the data together with the calculated duration

class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
  KeyValueStore<String, SensorState> store;

  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
    this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
  }

  public SensorStateWithDuration transform(SensorState sensorState) {
    // Nothing to do
    if (sensorState == null) {
      return null;
    }

    // Check for the previous state, update if necessary
    var oldState = checkAndUpdateSensorState(sensorState);

    // When we have historical data, return duration so far. Otherwise return null
    return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
  }

  public void close() {}

  /**
   * Checks the state store for historical state based on sensor ID and updates it, if necessary.
   *
   * @param sensorState The new sensor state
   * @return The old sensor state
   */
  Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
    // The Sensor ID is our index
    var index = sensorState.getId();

    // Get the historical state (might be null)
    var oldState = store.get(index);
    if (neetToUpdate(oldState, sensorState)) {
      // Update the state store to the new state
      store.put(index, sensorState);
    }
    return Optional.ofNullable(oldState);
  }

  /**
   * Check if we need to update the state in the state store.
   *
   * <p>Either we have no historical data, or the state has changed.
   *
   * @param oldState The old sensor state
   * @param sensorState The new sensor state
   * @return Flag whether we need to update
   */
  boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
    return oldState == null || oldState.getState() != sensorState.getState();
  }

  /**
   * Wrap the old state with a duration how log it lasted.
   *
   * @param oldState The state of the sensor so far
   * @param sensorState The new state of the sensor
   * @return Wrapped old state with duration
   */
  SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
    var duration = Duration.between(oldState.getTime(), sensorState.getTime());
    return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
  }
}

Putting everything together (“Connecting Processors and State Stores”) in a simple Topology:

var builder = new StreamsBuilder();

// Our state store
var storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("SensorStates"),
        Serdes.String(),
        storeSerde);

// Register the store builder
builder.addStateStore(storeBuilder);

builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
    .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
    .to("result-topic", Produced.with(Serdes.String(), resultSerde));

var topology = builder.build();

A full application is at github.com/melsicon/kafka-sensors.

eik
  • 2,104
  • 12
  • 15
0

Following up on the idea from https://github.com/confluentinc/ksql/issues/2562 to use a self join, I came up with the following solution:

  1. Create the data
#kafka-topics --bootstrap-server localhost:9092  --delete --topic temptest
echo '{"temp": 3.0, "counter": 1}' | kafkacat -b localhost -t temptest
echo '{"temp": 4.0, "counter": 2}' | kafkacat -b localhost -t temptest
echo '{"temp": 6.0, "counter": 3}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.0, "counter": 4}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 6}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 5}' | kafkacat -b localhost -t temptest

Here we assume that consecutive events have a counter property already. Such a counter could also be added with ksql by simply aggregating the events counts over time.

  1. Differentiate the function
-- import the topic into ksql
CREATE STREAM temp_json (ingesttime BIGINT, row VARCHAR, temp DOUBLE, counter INTEGER) WITH (kafka_topic='temptest', value_format='JSON', KEY='counter');

--- change format to avro and repartion
CREATE STREAM temp WITH (VALUE_FORMAT='AVRO') AS SELECT temp, counter, CAST(counter AS VARCHAR) as counter_key FROM temp_json PARTITION BY counter_key;

--- create second stream with shifted counter
CREATE STREAM temp_shift AS SELECT temp, counter as counter_orig, counter+ 1 as counter from temp PARTITION BY counter;

-- join the streams by counter
CREATE STREAM temp_diff AS SELECT
  prev.temp-cur.temp as temp_difference, cur.temp as temp,  prev.temp as prev_temp, cur.counter as counter
  FROM temp cur
  LEFT JOIN temp_shift prev WITHIN 2 HOURS
  ON cur.counter = prev.counter;

Test it

ksql> SELECT * FROM temp_diff LIMIT 4;
1574321370281 | 1 | null | 3.0 | null | 1
1574321372307 | 2 | -1.0 | 4.0 | 3.0 | 2
1574321372319 | 3 | -2.0 | 6.0 | 4.0 | 3
1574321372331 | 4 | 3.0 | 3.0 | 6.0 | 4

The sensor itself is left out to keep the solution simple. However, it could be easily added by using a compound key for the partitioning as described in https://www.confluent.io/stream-processing-cookbook/ksql-recipes/creating-composite-key

Holger Brandl
  • 10,634
  • 3
  • 64
  • 63
  • You are not using the input format from the question, but something you made up - a “counter” (which does not exist and you have to provide as part of the solution) and you assume that there is only one sensor, I can't see how it is “easily” adopted. Also, using a double for time calculations open this up for all kinds of rounding errors. – eik Nov 26 '19 at 12:13
  • Thanks for the feedback. Indeed format is different. Both the counter and the lack of grouping by sensor were addressed in the surrounding description. double is not used for time but for sensor reading (temperature) calculations, so I'd expect rounding errors to be negligible. – Holger Brandl Nov 27 '19 at 06:48