I have an Azure Stream Analytics job that combines the results of multiple queries and outputs them to the same sink. To do this, I define my queries within a WITH statement, then combine them using UNION and then write them to my sink. However, unfortunately I only get an output to my sink whenever all of my queries actually have an output, and this is where it goes wrong.
I have some queries that continuously (every 5 minutes) give an output, but I also have some queries that rare give an output (maybe a few times per day). This causes the output to not get any results, until the queries all have something to return. Does anyone know why this is and how I can fix it? Shouldn't UNION also give results when set A has results, but set B doesn't? I'm running this locally in VS Code, with a live connection to Event Hub by the way.
Here is a simplified example of 2 queries (one with frequent output, one with infrequent output) that goes wrong:
WITH
HarmonizedMeasurements AS (
SELECT
CAST(EHHARM.TimeStamp AS datetime) AS "TimeStamp",
CAST(EHHARM.ValueNumber AS float) AS "ValueNumber",
EHHARM.ValueBit AS "ValueBit",
EHHARM.MeasurementName,
EHHARM.PartName,
EHHARM.ElementId,
EHHARM.ElementName,
EHHARM.ObjectName,
EHHARM.TranslationTableId
FROM EventHubHarmonizedMeasurements AS EHHARM TIMESTAMP BY "TimeStamp"
PARTITION BY TranslationTableId
),
ToerenAandrijvingCategoriesMeasurements AS (
SELECT
AANDRCAT.TimeStamp AS "TimeStamp",
AANDRCAT.ValueNumber AS "ValueNumber",
AANDRCAT.ValueBit AS "ValueBit",
AANDRCAT.MeasurementName AS "MeasurementName",
AANDRCAT.PartName AS "PartName",
AANDRCAT.ElementId AS "ElementId",
AANDRCAT.ElementName AS "ElementName",
AANDRCAT.ObjectName AS "ObjectName",
AANDRCAT.TranslationTableId AS "TranslationTableId",
CASE
WHEN (-5000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -1000) THEN 'Dalen'
WHEN (-1000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -200) THEN 'Dalen Retarderen'
WHEN (-200 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 0) THEN 'Stilstand'
WHEN (0 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 250) THEN 'Nivelleren'
WHEN (250 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 400) THEN 'Heffen Retarderen'
WHEN (400 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 5000) THEN 'Heffen'
ELSE 'NoCategory'
END AS "Category"
FROM HarmonizedMeasurements AS AANDRCAT
WHERE
AANDRCAT.ObjectName LIKE 'Schutsluis%' AND
AANDRCAT.MeasurementName = 'Motortoerental terugkoppeling' AND
AANDRCAT.ValueNumber <> 0
),
AandrijvingCatStartMeasurements AS (
SELECT
AANDRCAT.TimeStamp AS "StartTime",
AANDRCAT.Category AS "Category",
AANDRCAT.ElementId AS "ElementId",
AANDRCAT.TranslationTableId AS "TranslationTableId"
FROM ToerenAandrijvingCategoriesMeasurements AS AANDRCAT
WHERE
LAG(Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) <> Category
),
AandrijvingCatEndMeasurements AS (
SELECT
AANDRST.StartTime AS "EndTime",
LAG(AANDRST.StartTime, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "StartTime",
LAG(AANDRST.Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "Category",
AANDRST.ElementId AS "ElementId",
AANDRST.TranslationTableId AS "TranslationTableId"
FROM AandrijvingCatStartMeasurements AS AANDRST
),
VermogenAandrijvingMeasurements AS (
SELECT
AANDRVER.TimeStamp AS "TimeStamp",
AANDRVER.ValueNumber AS "ValueNumber",
AANDRVER.ValueBit AS "ValueBit",
CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category) AS "MeasurementName",
AANDRVER.PartName AS "PartName",
AANDRVER.ElementId AS "ElementId",
AANDRVER.ElementName AS "ElementName",
AANDRVER.ObjectName AS "ObjectName",
AANDRVER.TranslationTableId AS "TranslationTableId"
FROM HarmonizedMeasurements AS AANDRVER
LEFT JOIN AandrijvingCatEndMeasurements AS AANDREN
ON DATEDIFF(minute, AANDRVER, AANDREN) BETWEEN 0 AND 30 AND
AANDRVER.TimeStamp >= AANDREN.StartTime AND
AANDRVER.Timestamp < AANDREN.EndTime AND
AANDRVER.ElementId = AANDREN.ElementId AND
AANDRVER.TranslationTableId = AANDREN.TranslationTableId
INNER JOIN SQLMeasurementType AS MEAS
ON MEAS.Name = CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category)
WHERE
AANDRVER.ObjectName LIKE 'Schutsluis%' AND
AANDRVER.MeasurementName = 'Vermogen'
),
LockDoorTop AS (
SELECT
Lock.TimeStamp AS "TimeStamp",
Lock.ValueNumber AS "ValueNumber",
Lock.ValueBit AS "ValueBit",
Lock.MeasurementName,
Lock.PartName,
Lock.ElementId,
Lock.ElementName,
Lock.ObjectName,
Lock.TranslationTableId
FROM HarmonizedMeasurements AS Lock
WHERE
Lock.MeasurementName = 'Sluisdeur open' AND
Lock.ElementName = 'Deur sluiskolk 1' AND
Lock.PartName = 'Bovenhoofd' AND
Lock.ValueBit = '1'
),
WaterLTop AS (
SELECT
WaterTop.TimeStamp AS "TimeStamp",
WaterTop.ValueNumber AS "ValueNumber",
WaterTop.ValueBit AS "ValueBit",
WaterTop.MeasurementName,
WaterTop.PartName,
WaterTop.ElementId,
WaterTop.ElementName,
WaterTop.ObjectName,
WaterTop.TranslationTableId
FROM HarmonizedMeasurements AS WaterTop
WHERE
WaterTop.MeasurementName = 'Waterniveaumeting' AND
WaterTop.ElementName = 'Sluiskolk 1' AND
WaterTop.PartName = 'Opvaartzijde'
),
WaterLLock AS (
SELECT
WaterLock.TimeStamp AS "TimeStamp",
WaterLock.ValueNumber AS "ValueNumber",
WaterLock.ValueBit AS "ValueBit",
WaterLock.MeasurementName,
WaterLock.PartName,
WaterLock.ElementId,
WaterLock.ElementName,
WaterLock.ObjectName,
WaterLock.TranslationTableId
FROM HarmonizedMeasurements AS WaterLock
WHERE
WaterLock.MeasurementName = 'Waterniveaumeting' AND
WaterLock.ElementName = 'Sluiskolk 1' AND
WaterLock.PartName = 'Sluiskamer'
),
WaterLevelTopMeasurements AS (
SELECT
LockDoor.TimeStamp AS "TimeStamp",
CAST(ROUND((WaterLevelLock.ValueNumber - WaterLevelTop.ValueNumber), 2) AS float) AS "ValueNumber",
null AS "ValueBit",
MEAS.Name AS "MeasurementName",
LockDoor.PartName AS "PartName",
LockDoor.ElementId AS "ElementId",
LockDoor.ElementName AS "ElementName",
LockDoor.ObjectName AS "ObjectName",
LockDoor.TranslationTableId AS "TranslationTableId"
FROM LockDoorTop AS LockDoor
JOIN WaterLTop AS WaterLevelTop
ON DATEDIFF(minute, LockDoor, WaterLevelTop) BETWEEN 0 AND 1 AND
LockDoor.ObjectName = WaterLevelTop.ObjectName
JOIN WaterLLock AS WaterLevelLock
ON DATEDIFF(minute, LockDoor, WaterLevelLock) BETWEEN 0 AND 1 AND
LockDoor.ObjectName = WaterLevelLock.ObjectName
INNER JOIN SQLMeasurementType AS MEAS
ON MEAS.Name = 'Waterniveauverschil'
),
-- Combine queries
DatalakeCombinedMeasurements AS (
SELECT * FROM VermogenAandrijvingMeasurements
UNION
SELECT * FROM WaterLevelTopMeasurements
)
-- Output data
SELECT *
INTO DatalakeHarmonizedMeasurements
FROM DatalakeCombinedMeasurements
PARTITION BY TranslationTableId