1

I have 3 streams:

TStream<Double> tempReadings=topology.poll(tempSensor, 10, TimeUnit.SECONDS);
TStream<Double> co2Readings=topology.poll(co2Sensor, 10, TimeUnit.SECONDS);
TStream<Boolean> stationaryReadings=topology.poll(stationarySensor, 10, TimeUnit.SECONDS);

I currently create 3 separate device events from 3 JSON objects:

TStream<JsonObject> tempJson=tempReadings.map(tuple->{
    JsonObject json=new JsonObject();
    json.addProperty("Temperature", tuple);
    return json;
});
TStream<JsonObject> co2Json=co2Readings.map(tuple->{
    JsonObject json=new JsonObject();
    json.addProperty("C02Level", tuple);
    return json;
});
TStream<JsonObject> sensoryJson=stationaryReadings.map(tuple->{
    JsonObject json=new JsonObject();
    json.addProperty("isStationary", tuple);
    return json;
});

I instead would like to create a single event by joining these streams together and creating 1 JSON object with three properties (Temperature, C02Level and isStationary).

approxiblue
  • 6,982
  • 16
  • 51
  • 59
E Shindler
  • 425
  • 7
  • 27
  • You can [union](https://edgent.apache.org/docs/streaming-concepts#union) the streams, but that would just put one item after the other. If you want to read all 3 at once, you could make a new sensor that returns a "readings" object containing all 3 properties. – approxiblue Dec 25 '16 at 04:45
  • @approxiblue That's true-thanks. I was just just wondering whether there was a more 'neat' way to do it, but if not, then not. If you'd like me to add your answer as the accepted one, please write it as an answer rather than a comment. – E Shindler Dec 25 '16 at 07:05

2 Answers2

1

You can union the streams, but that would just put one tuple after another, and you need to use streams of the same type.

If you want to read all 3 properties at once, you could create a sensor that returns a "readings" object:

class Reading {
    Double temperature;
    Double c02Level;
    Boolean isStationary;
}
approxiblue
  • 6,982
  • 16
  • 51
  • 59
1

In this case the "single poll combined Reading tuple" approach is probably best.

More generally, PlumbingStreams.barrier() can be used to merge the corresponding tuples of multiple streams. Something like:

TStream<JsonObject> combinedReadings =
    PlumbingStreams.barrier(Arrays.asList(tempJson,co2Json,sensoryJson))
    .map(list -> combineTuples(list));


static JsonObject combineTuples(JsonObject list...) {
  JsonObject jo = new JsonObject();
  for (JsonObject j : list) {
    for (Entry<String,JsonElement> e : j.entrySet()) {
      jo.addProperty(e.getKey(), e.getValue());
    }
  }
  return jo;
}
dlaboss
  • 21
  • 2