0

I'm working on a data flow to ingest XML files and trying to pull a deeply nested node's value up into another part of the flow. E.g.

<payload>
  <header>
    <customerID>1234</customerID>
    <createdDate>1/2/2023T15:33:22</createdDate>
    ....
  </header>
  <body>
    ...
    <compartments>
      <compartment>
        <compartmentNo>1</compartmentNo>
        <product>2</product>
        <quantity>5000</quantity>
        ...
        <references>
          <reference>
            <referenceType>ShipmentID</referenceType>
            <referenceValue>23434</referenceValue>
          </reference>
          <reference>
            ...
          </reference>
        </reference>
      </compartment>
    </compartments
  </body>
</payload>

Note: This XML is not complete and also not a sensible structure but it's what we've got from the vendor.

The file is ingested into 2 tables: Shipments and ShipmentCompartments however the ShipmentID belongs in the Shipments table.

I'm using a flattening activity to get all the compartments and then flattening references, but I'm unsure of how to get the shipment ID up to the Shipments Sinc activity especially since it is part of an array so I would need to get the correct Reference node (by filtering the referenceType by Shipment ID) and then extracting the value from the adjacent referenceValue node.

Source: XML File from a Blob storage container Target: Azure SQL Server (split into multiple tables)

Table structure where data is currently being landed: enter image description here

Any help would be appreciated.

Jacques
  • 6,936
  • 8
  • 43
  • 102
  • What is the source? What is the target? Azure SQL DB / SQL Server is very capable with XML (including deeply nested XML with CROSS APPLY) so a better pattern might be to land the XML in a table and process it there. – wBob Jun 29 '23 at 08:35
  • How does the data look like after flatten transformation? COuld you share the sample data of that? – Aswin Jun 29 '23 at 08:50
  • @wBob I've added the source/targets and we're not allowed to store XML in the DB, we have to load, clean, transform, etc. – Jacques Jun 29 '23 at 09:48
  • @Aswin I've added a diagram showing where data is currently landed. Currently the ShipmentID is flattened and stored in the ShipmentReferences table. I need this moved to the Shipments table – Jacques Jun 29 '23 at 09:49
  • Hey @Jacques, are CustomerID and createdDate properties inside compartment? From where do you get these values – Saideep Arikontham Jun 29 '23 at 10:04
  • @SaideepArikontham they come from the
    node in the XML document.
    – Jacques Jun 29 '23 at 10:13
  • So how do you know what is the customerID for a particular shipmentID? Does the above XML have information only about one row belonging to Shipments table? Or are they same for all the compartments in this file? – Saideep Arikontham Jun 29 '23 at 10:14
  • @SaideepArikontham I've just updated the XML fragment/example. One shipment is for a single customer, but may have multiple products - on product per compartment. – Jacques Jun 29 '23 at 10:16
  • So, the above XML is for one customer who might have multiple shipments? – Saideep Arikontham Jun 29 '23 at 10:17
  • I’m on holiday at the moment but will work up an answer on Sunday. – wBob Jun 29 '23 at 10:32
  • @SaideepArikontham it's one customer, one shipment, but multiple products. This is related to petroleum products so you might have ULP93 in one compartment and Diesel in another but it's all headed to one customer. If the truck happens to be carrying another shipment I think it will have a different shipment document. That's my understanding at this point. – Jacques Jun 29 '23 at 10:43

2 Answers2

0
  • I have used the following xml data as my source for demonstration:
<payload>
    <header>
    <customerID>1234</customerID>
    <createdDate>1/2/2023T15:33:22</createdDate>
  </header>
  <body>
    <compartments>
      <compartment>
        <compartmentNo>1</compartmentNo>
        <product>2</product>
        <references>
          <reference>
            <referenceType>ShipmentID</referenceType>
            <referenceValue>23434</referenceValue>
          </reference>
        </references>
      </compartment>
      <compartment>
        <compartmentNo>2</compartmentNo>
        <product>22</product>
        <references>
          <reference>
            <referenceType>ShipmentID</referenceType>
            <referenceValue>123434</referenceValue>
          </reference>
        </references>
      </compartment>
    </compartments>
  </body>
</payload>
  • I have used select transformation with rule based mapping to unpack the header property and a fixed mapping to select body.

enter image description here

  • I have used flatten transformation unrolling by body.compartments.compartment with the following configurations.

enter image description here

  • I have used another select transformation to select only customerID and createdDate as fixed mapping and a rule-based mapping to unpack the hierarchy references.reference:

enter image description here

  • The data preview after the above operations would be as shown in the below image:

enter image description here

  • If you have other referenceType values other than ShipmentID, you can use the filter transformation with condition referenceType=='ShipmentID'

enter image description here

  • Now, you can select the sink table (shipments) and manually map the values to respective columns. The following is the entire dataflow JSON:
{
    "name": "dataflow2",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "Xml1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "select1"
                },
                {
                    "name": "flatten1"
                },
                {
                    "name": "select2"
                },
                {
                    "name": "filter1"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          payload as (body as (compartments as (compartment as (compartmentNo as short, product as short, references as (reference as (referenceType as string, referenceValue as integer)))[])), header as (createdDate as string, customerID as short))",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     validationMode: 'none',",
                "     namespaces: true) ~> source1",
                "source1 select(mapColumn(",
                "          each(payload.header,match(true())),",
                "          body = payload.body",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "select1 foldDown(unroll(body.compartments.compartment),",
                "     mapColumn(",
                "          customerID,",
                "          createdDate,",
                "          references = body.compartments.compartment.references",
                "     ),",
                "     skipDuplicateMapInputs: false,",
                "     skipDuplicateMapOutputs: false) ~> flatten1",
                "flatten1 select(mapColumn(",
                "          customerID,",
                "          createdDate,",
                "          each(references.reference,match(true()))",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select2",
                "select2 filter(referenceType=='ShipmentID') ~> filter1",
                "filter1 sink(validateSchema: false,",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     store: 'cache',",
                "     format: 'inline',",
                "     output: false,",
                "     saveOrder: 1,",
                "     mapColumn(",
                "          customerID,",
                "          createdDate,",
                "          ShipmentID = referenceValue",
                "     )) ~> sink1"
            ]
        }
    }
}
Saideep Arikontham
  • 5,558
  • 2
  • 3
  • 11
0

Azure SQL DB can read directly from blob store reading OPENROWSET. A simple example:

CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'super$trongPassw0rd!';

CREATE DATABASE SCOPED CREDENTIAL cred_datalake
WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
SECRET = 'sv=2020-02-10the rest of your SAS key-no question mark at the front';

CREATE EXTERNAL DATA SOURCE ds_datalake
WITH (
    TYPE = BLOB_STORAGE,
    LOCATION = 'https://yourStorageAccount.blob.core.windows.net/datalake',
    CREDENTIAL = cred_datalake
);
GO

DECLARE @xml XML;

SELECT @xml = BulkColumn 
FROM OPENROWSET(
    BULK 'raw/xml/payload.xml', 
    DATA_SOURCE = 'ds_datalake', 
    SINGLE_BLOB
) AS x;

SELECT @xml;

--INSERT INTO ...
SELECT
    p.c.value('(header/customerID/text())[1]', 'INT') AS customerID,
    p.c.value('(header/createdDate/text())[1]', 'VARCHAR(20)') AS createdDate,

    c.c.value('(compartmentNo/text())[1]', 'INT') AS compartmentNo,
    c.c.value('(product/text())[1]', 'INT') AS product,
    c.c.value('(quantity/text())[1]', 'INT') AS quantity,

    r.c.value('(referenceType/text())[1]', 'VARCHAR(20)') AS referenceType,
    r.c.value('(referenceValue/text())[1]', 'VARCHAR(20)') AS referenceValue,

    p.c.query('.') payload
FROM @xml.nodes('payload') p(c)
    CROSS APPLY p.c.nodes('body/compartments/compartment') c(c)
        CROSS APPLY c.c.nodes('references/reference') r(c);

Other setup required:

  • Enable the System assigned Managed identity for the SQL server which hosts your
  • Grant that identity the Storage Blob Data Reader RBAC role on the storage account

Adapt this example for your requirement. Use ADF for orchestration and make use of the existing compute you have in the Azure SQL DB.

wBob
  • 13,710
  • 3
  • 20
  • 37