1

I have an Azure Stream Analytics job that combines the results of multiple queries and outputs them to the same sink. To do this, I define my queries within a WITH statement, then combine them using UNION and then write them to my sink. However, unfortunately I only get an output to my sink whenever all of my queries actually have an output, and this is where it goes wrong.

I have some queries that continuously (every 5 minutes) give an output, but I also have some queries that rare give an output (maybe a few times per day). This causes the output to not get any results, until the queries all have something to return. Does anyone know why this is and how I can fix it? Shouldn't UNION also give results when set A has results, but set B doesn't? I'm running this locally in VS Code, with a live connection to Event Hub by the way.

Here is a simplified example of 2 queries (one with frequent output, one with infrequent output) that goes wrong:

WITH
HarmonizedMeasurements AS (
    SELECT
        CAST(EHHARM.TimeStamp AS datetime) AS "TimeStamp",
        CAST(EHHARM.ValueNumber AS float) AS "ValueNumber",
        EHHARM.ValueBit AS "ValueBit",
        EHHARM.MeasurementName,
        EHHARM.PartName,
        EHHARM.ElementId,
        EHHARM.ElementName,
        EHHARM.ObjectName,
        EHHARM.TranslationTableId
    FROM EventHubHarmonizedMeasurements AS EHHARM TIMESTAMP BY "TimeStamp"
    PARTITION BY TranslationTableId
),

ToerenAandrijvingCategoriesMeasurements AS (
    SELECT
        AANDRCAT.TimeStamp AS "TimeStamp",
        AANDRCAT.ValueNumber AS "ValueNumber",
        AANDRCAT.ValueBit AS "ValueBit",
        AANDRCAT.MeasurementName AS "MeasurementName",
        AANDRCAT.PartName AS "PartName",
        AANDRCAT.ElementId AS "ElementId",
        AANDRCAT.ElementName AS "ElementName",
        AANDRCAT.ObjectName AS "ObjectName",
        AANDRCAT.TranslationTableId AS "TranslationTableId",
        CASE 
            WHEN (-5000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -1000) THEN 'Dalen'
            WHEN (-1000 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= -200) THEN 'Dalen Retarderen'
            WHEN (-200 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 0) THEN 'Stilstand'
            WHEN (0 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 250) THEN 'Nivelleren'
            WHEN (250 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 400) THEN 'Heffen Retarderen'
            WHEN (400 < AANDRCAT.ValueNumber AND AANDRCAT.ValueNumber <= 5000) THEN 'Heffen'
            ELSE 'NoCategory'
        END AS "Category"
    FROM HarmonizedMeasurements AS AANDRCAT
    WHERE
        AANDRCAT.ObjectName LIKE 'Schutsluis%' AND
        AANDRCAT.MeasurementName = 'Motortoerental terugkoppeling' AND
        AANDRCAT.ValueNumber <> 0
),
AandrijvingCatStartMeasurements AS (
    SELECT
        AANDRCAT.TimeStamp AS "StartTime",
        AANDRCAT.Category AS "Category",
        AANDRCAT.ElementId AS "ElementId",
        AANDRCAT.TranslationTableId AS "TranslationTableId"
    FROM ToerenAandrijvingCategoriesMeasurements AS AANDRCAT
    WHERE
        LAG(Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) <> Category
),
AandrijvingCatEndMeasurements AS (
    SELECT
        AANDRST.StartTime AS "EndTime",
        LAG(AANDRST.StartTime, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "StartTime",
        LAG(AANDRST.Category, 1) OVER (PARTITION BY ElementId LIMIT DURATION(day, 1)) AS "Category",
        AANDRST.ElementId AS "ElementId",
        AANDRST.TranslationTableId AS "TranslationTableId"
    FROM AandrijvingCatStartMeasurements AS AANDRST
),
VermogenAandrijvingMeasurements AS (
    SELECT
        AANDRVER.TimeStamp AS "TimeStamp",
        AANDRVER.ValueNumber AS "ValueNumber",
        AANDRVER.ValueBit AS "ValueBit",
        CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category) AS "MeasurementName",
        AANDRVER.PartName AS "PartName",
        AANDRVER.ElementId AS "ElementId",
        AANDRVER.ElementName AS "ElementName",
        AANDRVER.ObjectName AS "ObjectName",
        AANDRVER.TranslationTableId AS "TranslationTableId"
    FROM HarmonizedMeasurements AS AANDRVER
    LEFT JOIN AandrijvingCatEndMeasurements AS AANDREN
    ON DATEDIFF(minute, AANDRVER, AANDREN) BETWEEN 0 AND 30 AND
        AANDRVER.TimeStamp >= AANDREN.StartTime AND
        AANDRVER.Timestamp < AANDREN.EndTime AND
        AANDRVER.ElementId = AANDREN.ElementId AND
        AANDRVER.TranslationTableId = AANDREN.TranslationTableId
    INNER JOIN SQLMeasurementType AS MEAS
    ON MEAS.Name = CONCAT(AANDRVER.MeasurementName, ' ', AANDREN.Category)
    WHERE
        AANDRVER.ObjectName LIKE 'Schutsluis%' AND
        AANDRVER.MeasurementName = 'Vermogen'
),
LockDoorTop AS (
    SELECT
        Lock.TimeStamp AS "TimeStamp",
        Lock.ValueNumber AS "ValueNumber",
        Lock.ValueBit AS "ValueBit",
        Lock.MeasurementName,
        Lock.PartName,
        Lock.ElementId,
        Lock.ElementName,
        Lock.ObjectName,
        Lock.TranslationTableId
    FROM HarmonizedMeasurements AS Lock
    WHERE
        Lock.MeasurementName = 'Sluisdeur open' AND
        Lock.ElementName = 'Deur sluiskolk 1' AND
        Lock.PartName = 'Bovenhoofd' AND
        Lock.ValueBit = '1'
),
WaterLTop AS (
    SELECT
        WaterTop.TimeStamp AS "TimeStamp",
        WaterTop.ValueNumber AS "ValueNumber",
        WaterTop.ValueBit AS "ValueBit",
        WaterTop.MeasurementName,
        WaterTop.PartName,
        WaterTop.ElementId,
        WaterTop.ElementName,
        WaterTop.ObjectName,
        WaterTop.TranslationTableId
    FROM HarmonizedMeasurements AS WaterTop
    WHERE
        WaterTop.MeasurementName = 'Waterniveaumeting' AND
        WaterTop.ElementName = 'Sluiskolk 1' AND
        WaterTop.PartName = 'Opvaartzijde'
),
WaterLLock AS (
    SELECT
        WaterLock.TimeStamp AS "TimeStamp",
        WaterLock.ValueNumber AS "ValueNumber",
        WaterLock.ValueBit AS "ValueBit",
        WaterLock.MeasurementName,
        WaterLock.PartName,
        WaterLock.ElementId,
        WaterLock.ElementName,
        WaterLock.ObjectName,
        WaterLock.TranslationTableId
    FROM HarmonizedMeasurements AS WaterLock
    WHERE
        WaterLock.MeasurementName = 'Waterniveaumeting' AND
        WaterLock.ElementName = 'Sluiskolk 1' AND
        WaterLock.PartName = 'Sluiskamer'
),
WaterLevelTopMeasurements AS (
    SELECT
        LockDoor.TimeStamp AS "TimeStamp",
        CAST(ROUND((WaterLevelLock.ValueNumber - WaterLevelTop.ValueNumber), 2) AS float) AS "ValueNumber",
        null AS "ValueBit",
        MEAS.Name AS "MeasurementName",
        LockDoor.PartName AS "PartName",
        LockDoor.ElementId AS "ElementId",
        LockDoor.ElementName AS "ElementName",
        LockDoor.ObjectName AS "ObjectName",
        LockDoor.TranslationTableId AS "TranslationTableId"
    FROM LockDoorTop AS LockDoor
    JOIN WaterLTop AS WaterLevelTop
    ON  DATEDIFF(minute, LockDoor, WaterLevelTop) BETWEEN 0 AND 1 AND
        LockDoor.ObjectName = WaterLevelTop.ObjectName
    JOIN WaterLLock AS WaterLevelLock
    ON  DATEDIFF(minute, LockDoor, WaterLevelLock) BETWEEN 0 AND 1 AND
        LockDoor.ObjectName = WaterLevelLock.ObjectName
    INNER JOIN SQLMeasurementType AS MEAS
    ON MEAS.Name = 'Waterniveauverschil'
),

-- Combine queries
DatalakeCombinedMeasurements AS (
    SELECT * FROM VermogenAandrijvingMeasurements
    UNION
    SELECT * FROM WaterLevelTopMeasurements
)

-- Output data
SELECT *
INTO DatalakeHarmonizedMeasurements
FROM DatalakeCombinedMeasurements
PARTITION BY TranslationTableId
Leon Cullens
  • 12,276
  • 10
  • 51
  • 85
  • I'll try to look at it today. If that's possible, it makes things easier if you share samples of your inputs. You can do that in VSCode fairly easily. – Florian Eiden Nov 03 '21 at 18:09
  • I had a quick look, and I'm wondering if the job is exhausting readers on the consumer group of that Event Hub (see [here](https://learn.microsoft.com/en-us/azure/stream-analytics/stream-analytics-troubleshoot-input#readers-per-partition-exceeds-event-hubs-limit)). Did you see anything along those lines in the logs? Could you try creating 2 inputs in ASA, each with a different consumer group, for each path you have here? – Florian Eiden Nov 03 '21 at 18:35
  • 1
    Hi @FlorianEiden thanks for looking into it, I will get back with a data sample tomorrow. Regarding exhausting consumers: the first query after the WITH statement should prevent this from happening right? In all my queries I only refer to "HarmonizedMeasurements", I don't retrieve anything from "EventHubHarmonizedMeasurements" anymore after that point. – Leon Cullens Nov 03 '21 at 22:22
  • That approach works if you have multiple outputs on that same input. You force a single reader across those by using WITH. It's not applicable here, as the exhaustion would come from UNION and a lot of self joins. Note that I'm still not 100% sure that's the reason here though, even if it looks really really like that's the case, because you should get warnings if not errors for consumer exhaustion. – Florian Eiden Nov 03 '21 at 23:17
  • I checked your query topology and you're right, it doesn't look like consumer exhaustion should happen. I'm looking into timeline shenanigans now. Will keep you updated. – Florian Eiden Nov 04 '21 at 19:31
  • Hi @LeonCullens, sorry this is taking more time than expected, I should get back to you by next week. – Florian Eiden Nov 10 '21 at 16:51
  • @FlorianEiden thanks a bunch for looking into it! – Leon Cullens Nov 10 '21 at 20:44

0 Answers0