1

I have the following stream analytics input:

{ "ID":"DEV-001-Test",
  "TMSMUTC":"2021-10-14T14:00:00.000",
  "MSGTYP":"TELEMETRY",
  "THING":[
           {
            "TMSDUTC":"2021-10-14T13:00:00.000",
            "DATA":[
                {
                  "TAGID":"TAGB",
                  "VALUE":30
                },
                {
                  "TAGID":"TAGX",
                  "VALUE":[30.34,245.65,30.34,245.65,245.65,30.34]
                }
               ]
           }
          ]
}

in which the array of values for the "TAGX" is representing a value recorded from a sensor every 10 mins for one hour from the timestamp "TMSDUTC":"2021-10-14T13:00:00.000". I was wondering how could make a query that would give me a similar output:

output

my main doubts are how to create the sequence of 10 mins from the timestamp and cross apply the values to it.

DerStoffel
  • 2,553
  • 2
  • 15
  • 25

1 Answers1

0

That's a good one! Note that I highly recommend you use VSCode and the ASA extension when working on these queries. The developer experience is much nicer than in the portal thanks to local testing, and you can also unit test your query via the npm package.

I took the following assumptions:

  • THING is an array of a single record. Let me know if that's not the case
  • [edited] TMSDUTC needs to be incremented by 10 minutes according to the position of each item in the array when applicable (TAGX)

With that, here is the query. It's split in multiple code blocks to explain the flow, but I also pasted it whole in the last code block.

First we bring all the required fields to the first level. It makes things easier to read, but not only. GetArrayElements needs an array to CROSS APPLY, but GetArrayElement (singular) doesn't return the type at compile time. Using an intermediary query step solves that.

WITH things AS (
    SELECT
        ID,
        GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
        MSGTYP AS MessageType,
        GetArrayElement(THING,0).DATA AS DATA
    FROM [input]
),

Then we expand DATA:

dataAll AS (
    SELECT
        T.ID,
        T.TMSDUTC,
        T.MessageType,
        D.ArrayValue.TAGID AS Tag,
        D.ArrayValue.Value AS [Value]
    FROM things T
    CROSS APPLY GetArrayElements(T.DATA) AS D
),

Then we create a subset for records that have a VALUE of type array (TAGX in your example). Here I avoid hard-coding per tag by detecting the type at runtime. These records will need another round of array processing in the following step.

dataArrays AS (
    SELECT
        A.ID,
        A.TMSDUTC,
        A.MessageType,
        A.Tag,
        A.[Value]
    FROM dataAll A
    WHERE GetType(A.[Value]) = 'array'
),

Now we can focus on expanding VALUE for those records. Note that we could not do that in a single pass (filter on arrays above and CROSS APPLY below), as GetArrayElements checks types before filtering is done.

[edited] To increment TMSDUTC, we use DATEADD on the index of each item in its array (ArrayIndex/ArrayValue are both returned from the array expansion, see doc below).

dataArraysExpanded AS (
    SELECT
        A.ID,
        DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
        A.MessageType,
        A.Tag,
        V.ArrayValue AS [Value]
    FROM dataArrays A
    CROSS APPLY GetArrayElements(A.[Value]) AS V
),

We union back everything together:

newSchema AS (
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
        UNION
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)

And finally insert everything into the destination:

SELECT
    *
INTO myOutput
FROM newSchema

[edited] Please note that the only order guaranteed on a result set is the one defined by the timestamp. If multiple records occur on the same timestamp, no order is guaranteed by default. Here, at the end of the query, all of the newly created events are still timestamped on the timestamp of the original event. If you now need to apply time logic on the newly generated TMSDUTC, you will need to output these records to Event Hub, and load them in another job using TIMESTAMP BY TMSDUTC. Currently the timestamp can only be changed directly at the very first step of a query.

What is used here :

  • GetArrayElement (singular) : doc
  • WITH aka Common Table Expression (CTE) : doc
  • CROSS APPLY + GetArrayElements : doc and doc, plus very good ref
  • GetType : doc

The entire thing for easier copy/pasting:

WITH things AS (
    SELECT
        ID,
        GetArrayElement(THING,0).TMSDUTC AS TMSDUTC,
        MSGTYP AS MessageType,
        GetArrayElement(THING,0).DATA AS DATA
    FROM [input]
),
dataAll AS (
    SELECT
        T.ID,
        T.TMSDUTC,
        T.MessageType,
        D.ArrayValue.TAGID AS Tag,
        D.ArrayValue.Value AS [Value]
    FROM things T
    CROSS APPLY GetArrayElements(T.DATA) AS D
),
dataArrays AS (
    SELECT
        A.ID,
        A.TMSDUTC,
        A.MessageType,
        A.Tag,
        A.[Value]
    FROM dataAll A
    WHERE GetType(A.[Value]) = 'array'
),
dataArraysExpanded AS (
    SELECT
        A.ID,
        DATEADD(minute,10*V.ArrayIndex,A.TMSDUTC) AS TMSDUTC,
        A.MessageType,
        A.Tag,
        V.ArrayValue AS [Value]
    FROM dataArrays A
    CROSS APPLY GetArrayElements(A.[Value]) AS V
),
newSchema AS (
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataAll WHERE GetType([Value]) != 'array'
        UNION
    SELECT ID, TMSDUTC, MessageType, Tag, [Value] FROM dataArraysExpanded
)
SELECT
    *
INTO myOutput
FROM newSchema
Florian Eiden
  • 832
  • 5
  • 9
  • 1
    the explanation and the implementation are super clear , your assumptions are correct. The only part missing is when we arrive at the dataArraysExpanded level. Is there a way to use the timestamp for this array value which is TMSDUTC and add increment 10 minutes to each new value to represent their real timestamps as in the table screenshot I posted for TAGX ? . Thank you very much for the easy explanation, appreciated :) – Ahmed Essam Oct 15 '21 at 08:23
  • I would say something like ROW_NUMBER() function in SQL then a sort of DATEADD, but there is no ROW_NUMBER() in stream analytics ! – Ahmed Essam Oct 15 '21 at 10:05
  • Sorry I missed that! Yes you can access each item position via ArrayIndex in return of the CROSS APPLY. Then you can add that times 10 minutes to TMSDUTC via DATEADD. I’ll edit the answer later today to illustrate. – Florian Eiden Oct 15 '21 at 14:40
  • The answer is edited! – Florian Eiden Oct 15 '21 at 16:59
  • Thank you, I missed about the ArrayIndex, appreciated :) – Ahmed Essam Oct 17 '21 at 11:53