5

We have an ETL pipeline that runs for each CSV uploaded into an storage account (Azure). It runs some transformations on the CSV and writes the outputs to another location, also as CSV, and calls a stored procedure on the database (SQL Azure) which ingests (BULK INSERT) this resulting CSV into a staging table.

This pipeline can have concurrent executions as multiple resources can be uploading files to the storage. Hence, the staging table is getting data inserted pretty often.

Then, we have an scheduled SQL job (Elastic Job) that triggers an SP that moves the data from the staging table into the final table. At this point, we would want to truncate/empty the staging table so that we do not re-insert them in the next execution of the job.

Problem is, we cannot be sure that between the load from the staging table to the final table and the truncate command, there has not been any new data written into the staging table that could be truncated without first being inserted in to the final table.

Is there a way to lock the staging table while we're copying the data into the final table so that the SP (called from the ETL pipeline) trying to write to it will just wait until the lock is release? Is this achievable by using transactions or maybe some manual lock commands?

If not, what's the best approach to handle this?

empz
  • 11,509
  • 16
  • 65
  • 106
  • "Truncate" is a bit different than "Delete". Truncate requires special rights as it does not store the deleted information in the Transaction Log. How much data are you working with? Temp tables could be an option. You could also script the dropping and recreation of the tables. – Jason Geiger Feb 06 '20 at 14:14
  • the answers thus far have introduced some manner of "waiting" for one file to finish before another can run. The sp_getapplck solution only loads one file at a time. The "load" and "stage" table solution only expands your "bandwidth" of file load to 2 files at a time... but still only 1 file is being inserted at a time into your final table. Is that acceptable? Reading your question, it did not seem that loading a single file at a time was acceptable. If it is, then I dont see why a system that ID's each incoming file wouldn't work. all records are associated with the file they came from. – jamie Feb 08 '20 at 00:00

5 Answers5

1

I like the sp_getapplock and use this method myself in few places for its flexibility and that you have full control over the locking logic and wait times.

The only problem that I see is that in your case concurrent processes are not all equal.

You have SP1 that moves data from the staging table into the main table. Your system never tries to run several instances of this SP.

Another SP2 that inserts data into the staging table can be run several times simultaneously and it is fine to do it.

It is easy to implement the locking that would prevent any concurrent run of any combination of SP1 or SP2. In other words, it is easy if the locking logic is the same for SP1 and SP2 and they are treated equal. But, then you can't have several instances of SP2 running simultaneously.

It is not obvious how to implement the locking that would prevent concurrent run of SP1 and SP2, while allowing several instances of SP2 to run simultaneously.


There is another approach that doesn't attempt to prevent concurrent run of SPs, but embraces and expects that simultaneous runs are possible.

One way to do it is to add an IDENTITY column to the staging table. Or an automatically populated datetime if you can guarantee that it is unique and never decreases, which can be tricky. Or rowversion column.

The logic inside SP2 that inserts data into the staging table doesn't change.

The logic inside SP1 that moves data from the staging table into the main table needs to use these identity values.

At first read the current maximum value of identity from the staging table and remember it in a variable, say, @MaxID. All subsequent SELECTs, UPDATEs and DELETEs from the staging table in that SP1 should include a filter WHERE ID <= @MaxID.

This would ensure that if there happen to be a new row added to the staging table while SP1 is running, that row would not be processed and would remain in the staging table until the next run of SP1.

The drawback of this approach is that you can't use TRUNCATE, you need to use DELETE with WHERE ID <= @MaxID.


If you are OK with several instances of SP2 waiting for each other (and SP1), then you can use sp_getapplock similar to the following. I have this code in my stored procedure. You should put this logic into both SP1 and SP2.

I'm not calling sp_releaseapplock explicitly here, because the lock owner is set to Transaction and engine will release the lock automatically when transaction ends.

You don't have to put retry logic in the stored procedure, it can be within external code that runs these stored procedures. In any case, your code should be ready to retry.

CREATE PROCEDURE SP2  -- or SP1
AS
BEGIN
    SET NOCOUNT ON;
    SET XACT_ABORT ON;

    BEGIN TRANSACTION;
    BEGIN TRY
        -- Maximum number of retries
        DECLARE @VarCount int = 10;

        WHILE (@VarCount > 0)
        BEGIN
            SET @VarCount = @VarCount - 1;

            DECLARE @VarLockResult int;
            EXEC @VarLockResult = sp_getapplock
                @Resource = 'StagingTable_app_lock',
                -- this resource name should be the same in SP1 and SP2
                @LockMode = 'Exclusive',
                @LockOwner = 'Transaction',
                @LockTimeout = 60000,
                -- I'd set this timeout to be about twice the time
                -- you expect SP to run normally
                @DbPrincipal = 'public';

            IF @VarLockResult >= 0
            BEGIN
                -- Acquired the lock

                -- for SP2
                -- INSERT INTO StagingTable ...

                -- for SP1
                -- SELECT FROM StagingTable ...
                -- TRUNCATE StagingTable ...

                -- don't retry any more
                BREAK;
            END ELSE BEGIN
                -- wait for 5 seconds and retry
                WAITFOR DELAY '00:00:05';
            END;
        END;

        COMMIT TRANSACTION;
    END TRY
    BEGIN CATCH
        ROLLBACK TRANSACTION;
        -- log error
    END CATCH;

END

This code guarantees that only one procedure is working with the staging table at any given moment. There is no concurrency. All other instances will wait.

Obviously, if you try to access the staging table not through these SP1 or SP2 (which try to acquire the lock first), then such access will not be blocked.

Vladimir Baranov
  • 31,799
  • 5
  • 53
  • 90
  • This is a nice alternative. Unfortunately,both the staging and the final tables are clustered column store tables with millions and millions of rows. The `DELETE FROM` the staging table takes a long time compared to the instant TRUNCATE. The SP that inserts data into the staging table is actually pretty fast, less than a second. So I guess we're okay if those can't run concurrently. they will just wait for the lock to be released and then run one after another, right? – empz Feb 05 '20 at 16:35
  • @emzero, yes, please see the code example that I added to the answer – Vladimir Baranov Feb 05 '20 at 23:59
1

I would propose solution with two identical staging tables. Lets name them StageLoading and StageProcessing.
Load process would have following steps:
1. At the beginning both tables are empty.
2. We load some data into StageLoading table (I assume each load is a transaction).
3. When Elastic job starts it will do:
- ALTER TABLE SWITCH to move all data from StageLoading to StageProcessing. It will make StageLoading empty and ready for next loads. It is a metadata operation, so takes miliseconds and it is fully blocking, so will be done between loads.
- load the data from StageProcessing to final tables.
- truncate table StageProcessing.
4. Now we are ready for next Elastic job.

If we try to do SWITCH when StageProcessing is not empty, ALTER will fail and it will mean that last load process failed.

Piotr Palka
  • 3,086
  • 1
  • 9
  • 17
  • +1 Umm this is certainly interesting if it works as it supposed to be. I didn't know about ALTER TABLE SWITCH. I'll do some testing and let you know. – empz Feb 06 '20 at 10:22
  • 1
    There is nice explanation how it works: https://stackoverflow.com/questions/41348523/how-does-alter-table-switch-works-on-sql-server. One note, you can use to switch one whole non-partitioned table with another whole non-partitioned table. – Piotr Palka Feb 06 '20 at 15:43
  • Does this really allow multiple files to be loaded at once? in the end, there is still only 1 file being inserted into the final table. This being the case, what is the benefit of having 2 files in "load/staging" set of tables if only 1 file is being applied at a given time. – jamie Feb 08 '20 at 00:07
  • It allow you to have one or multiple processes loading into stage table and at the same time Elastic job consuming data from stage table. So the load processes don't have to wait for the final processing to finish. As for multiple processes inserting into the stage file at the same time it depends how the stage table is build (heap/rowstore or columnstore). Still, you can have only one process consuming from stage table, still you can extend the solution to have multiple consumers too. – Piotr Palka Feb 08 '20 at 15:54
  • 1
    This worked great. It allows multiple processes to load into the staging table and then, a simple ALTER TABLE SWITCH command to move all the data to the other table which will be used to fill the final tables, while still allowing other processes to keep loading into the staging table. Thanks! – empz Feb 11 '20 at 18:23
0

Is there a way to lock the staging table while we're copying the data into the final table so that the SP (called from the ETL pipeline) trying to write to it will just wait until the lock is release? Is this achievable by using transactions or maybe some manual lock commands?

It looks you are searching for a mechanism that is wider than a transaction level. SQL Server/Azure SQL DB has one and it is called application lock:

sp_getapplock

Places a lock on an application resource.

Locks placed on a resource are associated with either the current transaction or the current session. Locks associated with the current transaction are released when the transaction commits or rolls back.Locks associated with the session are released when the session is logged out. When the server shuts down for any reason, all locks are released.

Locks can be explicitly released with sp_releaseapplock. When an application calls sp_getapplock multiple times for the same lock resource, sp_releaseapplock must be called the same number of times to release the lock. When a lock is opened with the Transaction lock owner, that lock is released when the transaction is committed or rolled back.

It basically means that your ETL Tool should open single session to DB, acquire the lock and release when finished. Other sessions before trying to do anything should try to acquire the lock(they cannot because it already taken), wait until when it released and continue to work.

Community
  • 1
  • 1
Lukasz Szozda
  • 162,964
  • 23
  • 234
  • 275
0

Assuming you have a single outbound job

  • Add an OutboundProcessing BIT DEFAULT 0 to the table
  • In the job, SET OutboundProcessing = 1 WHERE OutboundProcessing = 0 (claim the rows)
  • For the ETL, incorporate WHERE OutboundProcessing = 1 in the query that sources the data (transfer the rows)
  • After the ETL, DELETE FROM TABLE WHERE OutboundProcessing = 1 (remove the rows you transferred)
  • If the ETL fails, SET OutboundProcessing = 0 WHERE OutboundProcessing = 1
Martin K
  • 183
  • 9
  • I've already thought about this, but I'd rather use a solution with TRUNCATE instead of DELETE FROM. The latter takes a really long time. – empz Feb 07 '20 at 16:59
  • You might have fallen victim to premature performance optimization. The data is already loaded to the destination when the delete runs. – Martin K Feb 07 '20 at 19:46
  • why does a "delete" take too long? it seems the files are small and I assume the lifespan of the process form file to final table is quick... so why would there be millions of records in the load table? They should be getting deleted, so I'd expect there would be few records in the load table at any point. – jamie Feb 08 '20 at 00:03
0

I always prefer to "ID" each file I receive. If you can do this, you can associate the records from a given file throughout your load process. You haven't called out a need for this, but jus sayin.

However, with each file having an identity (just a int/bigint identity value should do) you can then dynamically create as many load tables as you like from a "template" load table.

  1. When a file arrives, create a new load table named with the ID of the file.
  2. Process your data from load to final table.
  3. drop the load table for the file being processed.

This is somewhat similar to the other solution about using 2 tables (load and stage) but even in that solution you are still limited to having 2 files "loaded" (your still only applying one file to the final table though?)

Last, it is not clear if your "Elastic Job" is detached from the actual "load" pipeline/processing or if it is included. Being a job, I assume it is not included, if a job, you can only run a single instance at time? So its not clear why it's important to load multiple files at once if you can only move one from load to final at a time. Why the rush to get files into load?

jamie
  • 745
  • 4
  • 11
  • I don't think this could work because the job that loads the data from the staging table to the final table is "detached" from the pipeline. It just knows that it needs to move data from the staging table to the final table. It doesn't know which file the records belong to. All the records in the staging table can be from several different source files. – empz Feb 10 '20 at 14:31