I have a feeling this answer may be overkill, but I think it's all useful stuff when processing a large volume ETL transfer from Oracle to SQL Server.
For this example, I'm going to be pulling invoice details. These are the line item records for invoices (i.e. general sales) for a large company. The table I'm pulling from in Oracle has about 1 billion rows, and I need around 300 million records each time. After I pull them over, I compare them to what I currently have in SQL Server and then make any UPDATEs, INSERTs, or DELETEs that are needed. And I do this multiple times per day, and it now only takes about 20 minutes for the entire process. But in the beginning the whole process took several hours.
To pull the raw data from Oracle, I use a fairly simple package that generates the query to use in Oracle, Truncates my SQL Server import staging table (I have a few more staging tables later), pulls the data from Oracle, and then calls a proc that performs the merge (not an actual MERGE
as we'll see). The Control Flow looks like this:

The first important step is limiting the data as much as possible on the Oracle side. I don't want to pull all 1 billion rows each time this package runs. So I limit the records to only the records where the Invoice Date
is greater than or equal to January 1st two years ago. But because date formats between SQL Server and Oracle aren't very compatible (the best technique I've found to to convert both to the same string format), I have to do some formatting, and because I want the date range to always be correct based on the day I'm running on, I have to do quite a bit of inline date math.
SELECT
CAST("Data Source Code" AS VARCHAR2(3)) AS "DataSourceCode"
,CAST("Order#" AS VARCHAR2(11)) AS "OrderNum"
,CAST("Invoice#" AS VARCHAR2(15)) AS "InvoiceNum"
,CAST("Item#" AS VARCHAR2(10)) AS "ItemNumber"
,CAST("Order Line Type" AS VARCHAR2(10)) AS "OrderLineType"
,CAST("Order Status" AS VARCHAR2(1)) AS "OrderStatus"
,"Order Date Time" AS "OrderDate"
,CAST("Fiscal Invoice Period" AS VARCHAR2(6)) AS "FiscalInvoicePeriod"
,"Invoice Date" AS "InvoiceDate"
,CAST("Ship To Cust#" AS VARCHAR2(8)) AS "ShipToCustNum"
,CAST("Billing Account #" AS VARCHAR2(8)) AS "BillingAccountNumber"
,CAST("Sales Branch Number" AS VARCHAR2(4)) AS "SalesBranchNumber"
,CAST("Price Branch Number" AS VARCHAR2(4)) AS "PriceBranchNumber"
,CAST("Ship Branch Number" AS VARCHAR2(4)) AS "ShippingBranchNumber"
,"Sold Qty" AS "SoldQty"
,"Unit Price" AS "UnitPrice"
,"Sales Amount" AS "SalesAmount"
,"Handling Amount" AS "HandlingAmount"
,"Freight Amount" AS "FreightAmount"
,"Unit Cogs Amount" AS "UnitCogsAmount"
,"Cogs Amount" AS "CogsAmount"
,"Unit Commcost Amount" AS "UnitCommcostAmount"
,"Commcost Amount" AS "CommcostAmount"
,CAST("GL Period" AS VARCHAR2(6)) AS "GLPeriod"
,"Order Qty" AS "OrderQty"
,"Margin %" AS "MarginPct"
,CONVERT("PO Number",'AL32UTF8','WE8MSWIN1252') AS "PONumber"
,CAST("Branch Id" AS VARCHAR2(4)) AS "BranchId"
,CAST("Outside Salesrep SSO" AS VARCHAR2(20)) AS "OutsideSalesrepSSO"
,CAST("Inside Salesrep SSO" AS VARCHAR2(20)) AS "InsideSalesrepSSO"
,CAST("Order Writer SSO" AS VARCHAR2(20)) AS "OrderWriterSSO"
,"Line Number" AS "LineNumber"
,CAST(UPPER(RAWTOHEX(SYS.DBMS_OBFUSCATION_TOOLKIT.MD5(input_string =>
COALESCE(CAST("Data Source Code" AS VARCHAR2(4)),'') || '|' ||
COALESCE(CAST("Order#" AS VARCHAR2(40)),'') || '|' ||
COALESCE(CAST("Invoice#" AS VARCHAR2(15)),'') || '|' ||
COALESCE(CAST("Item#" AS VARCHAR2(10)),'') || '|' ||
COALESCE(CAST("Order Line Type" AS VARCHAR2(6)),'') || '|' ||
COALESCE(CAST("Order Status" AS VARCHAR2(3)),'') || '|' ||
COALESCE(CAST("Order Date" AS VARCHAR2(19)),'') || '|' ||
COALESCE(CAST("Invoice Date" AS VARCHAR2(19)),'') || '|' ||
COALESCE(CAST("Ship To Cust#" AS VARCHAR2(10)),'') || '|' ||
COALESCE(CAST("Billing Account #" AS VARCHAR2(10)),'') || '|' ||
COALESCE(CAST("Sales Branch Number" AS VARCHAR2(4)),'') || '|' ||
COALESCE(CAST("Price Branch Number" AS VARCHAR2(4)),'') || '|' ||
COALESCE(CAST("Ship Branch Number" AS VARCHAR2(4)),'') || '|' ||
COALESCE(CAST("Sold Qty" AS VARCHAR2(20)),'') || '|' ||
COALESCE(CAST(CAST("Unit Price" AS NUMBER(18,2)) AS VARCHAR2(20)),'') || '|' ||
COALESCE(CAST(CAST("Sales Amount" AS NUMBER(18,2)) AS VARCHAR2(20)),'') || '|' ||
COALESCE(CAST(CAST("Handling Amount" AS NUMBER(18,2)) AS VARCHAR2(20)),'') || '|' ||
COALESCE(CAST(CAST("Freight Amount" AS NUMBER(18,2)) AS VARCHAR2(20)),'') || '|' ||
COALESCE(CAST(TO_DATE('01-' || "Fiscal Invoice Period", 'DD-MON-YY') AS VARCHAR2(19)),'') || '|' ||
COALESCE(CAST(TO_DATE('01-' || "GL Period", 'DD-MON-YY') AS VARCHAR2(19)),'') || '|' ||
COALESCE(CAST("Order Qty" AS VARCHAR2(20)),'') || '|' ||
COALESCE(CAST(CAST("Commcost Amount" AS NUMBER(18,2)) AS VARCHAR2(20)),'') || '|' ||
COALESCE(CAST("Order Writer SSO" AS VARCHAR2(36)),'') || '|' ||
COALESCE(CAST("Line Number" AS VARCHAR2(10)),'')
))) AS VARCHAR2(32)) AS "HashVal"
FROM MY_SCHEMA.MY_INVOICE_TABLE
WHERE "Invoice Date" >= TO_DATE( CONVERT(VARCHAR(10), DATEADD(YEAR,-2,DATEADD(MONTH,1-(DATEPART(MONTH,DATEADD(DAY,1-(DATEPART(DAY,GETDATE())),GETDATE()))),DATEADD(DAY,1-(DATEPART(DAY,GETDATE())),GETDATE()))), 111), 'yyyy/mm/dd')
You'll also notice that I calculate an MD5 Hash value as well. This is a technique I picked up from the very awesome Andy Leonard. The Hash function takes a string and applies a one-way cryptographic encoding of the string, based on the algorithm you choose. MD5 is the strongest option to DBMS_OBFUSCATION_TOOLKIT
I have available with my limited permissions on the Oracle server. SQL server also happens to be able to generate the same Hash values in MD5, which is very helpful.
So the general idea is that you can convert all the fields of a record that you care about tracking into strings and concatenate them together, apply the hashing algorithm to get the HashVal, then join a source record to a destination record, and compare the HashVals. If the HashVals don't match, you know the source record has changed and you need to update the record. If the HashVal hasn't changed, then the source record probably hasn't changed. I say "probably" because even with the separators inserted into the concatenated string (so that we can distinguish between '1a2b' + '3c' + '4d'
and '1a' + '2b3c' + '4d'
; without separator: '1a2b3c4b'
vs '1a2b3c4b'
; with separator: '1a2|b3c|4d'
vs. '1a|2b3c|4d'
) it's still possible that the MD5 hash of two different string could be the same.
But we want to dynamically generate this query, so we have to stick it into an Execute SQL Task
and assign it to a variable, and then map that variable to an output parameter. The string-ified query then becomes:
SELECT SourceQuery =
'SELECT
CAST("Data Source Code" AS VARCHAR2(3)) AS "DataSourceCode"
,CAST("Order#" AS VARCHAR2(11)) AS "OrderNum"
,CAST("Invoice#" AS VARCHAR2(15)) AS "InvoiceNum"
,CAST("Item#" AS VARCHAR2(10)) AS "ItemNumber"
,CAST("Order Line Type" AS VARCHAR2(10)) AS "OrderLineType"
,CAST("Order Status" AS VARCHAR2(1)) AS "OrderStatus"
,"Order Date Time" AS "OrderDate"
,CAST("Fiscal Invoice Period" AS VARCHAR2(6)) AS "FiscalInvoicePeriod"
,"Invoice Date" AS "InvoiceDate"
,CAST("Ship To Cust#" AS VARCHAR2(8)) AS "ShipToCustNum"
,CAST("Billing Account #" AS VARCHAR2(8)) AS "BillingAccountNumber"
,CAST("Sales Branch Number" AS VARCHAR2(4)) AS "SalesBranchNumber"
,CAST("Price Branch Number" AS VARCHAR2(4)) AS "PriceBranchNumber"
,CAST("Ship Branch Number" AS VARCHAR2(4)) AS "ShippingBranchNumber"
,"Sold Qty" AS "SoldQty"
,"Unit Price" AS "UnitPrice"
,"Sales Amount" AS "SalesAmount"
,"Handling Amount" AS "HandlingAmount"
,"Freight Amount" AS "FreightAmount"
,"Unit Cogs Amount" AS "UnitCogsAmount"
,"Cogs Amount" AS "CogsAmount"
,"Unit Commcost Amount" AS "UnitCommcostAmount"
,"Commcost Amount" AS "CommcostAmount"
,CAST("GL Period" AS VARCHAR2(6)) AS "GLPeriod"
,"Order Qty" AS "OrderQty"
,"Margin %" AS "MarginPct"
,CONVERT("PO Number",''AL32UTF8'',''WE8MSWIN1252'') AS "PONumber"
,CAST("Branch Id" AS VARCHAR2(4)) AS "BranchId"
,CAST("Outside Salesrep SSO" AS VARCHAR2(20)) AS "OutsideSalesrepSSO"
,CAST("Inside Salesrep SSO" AS VARCHAR2(20)) AS "InsideSalesrepSSO"
,CAST("Order Writer SSO" AS VARCHAR2(20)) AS "OrderWriterSSO"
,"Line Number" AS "LineNumber"
,CAST(UPPER(RAWTOHEX(SYS.DBMS_OBFUSCATION_TOOLKIT.MD5(input_string =>
COALESCE(CAST("Data Source Code" AS VARCHAR2(4)),'''') || ''|'' ||
COALESCE(CAST("Order#" AS VARCHAR2(40)),'''') || ''|'' ||
COALESCE(CAST("Invoice#" AS VARCHAR2(15)),'''') || ''|'' ||
COALESCE(CAST("Item#" AS VARCHAR2(10)),'''') || ''|'' ||
COALESCE(CAST("Order Line Type" AS VARCHAR2(6)),'''') || ''|'' ||
COALESCE(CAST("Order Status" AS VARCHAR2(3)),'''') || ''|'' ||
COALESCE(CAST("Order Date" AS VARCHAR2(19)),'''') || ''|'' ||
COALESCE(CAST("Invoice Date" AS VARCHAR2(19)),'''') || ''|'' ||
COALESCE(CAST("Ship To Cust#" AS VARCHAR2(10)),'''') || ''|'' ||
COALESCE(CAST("Billing Account #" AS VARCHAR2(10)),'''') || ''|'' ||
COALESCE(CAST("Sales Branch Number" AS VARCHAR2(4)),'''') || ''|'' ||
COALESCE(CAST("Price Branch Number" AS VARCHAR2(4)),'''') || ''|'' ||
COALESCE(CAST("Ship Branch Number" AS VARCHAR2(4)),'''') || ''|'' ||
COALESCE(CAST("Sold Qty" AS VARCHAR2(20)),'''') || ''|'' ||
COALESCE(CAST(CAST("Unit Price" AS NUMBER(18,2)) AS VARCHAR2(20)),'''') || ''|'' ||
COALESCE(CAST(CAST("Sales Amount" AS NUMBER(18,2)) AS VARCHAR2(20)),'''') || ''|'' ||
COALESCE(CAST(CAST("Handling Amount" AS NUMBER(18,2)) AS VARCHAR2(20)),'''') || ''|'' ||
COALESCE(CAST(CAST("Freight Amount" AS NUMBER(18,2)) AS VARCHAR2(20)),'''') || ''|'' ||
COALESCE(CAST(TO_DATE(''01-'' || "Fiscal Invoice Period", ''DD-MON-YY'') AS VARCHAR2(19)),'''') || ''|'' ||
COALESCE(CAST(TO_DATE(''01-'' || "GL Period", ''DD-MON-YY'') AS VARCHAR2(19)),'''') || ''|'' ||
COALESCE(CAST("Order Qty" AS VARCHAR2(20)),'''') || ''|'' ||
COALESCE(CAST(CAST("Commcost Amount" AS NUMBER(18,2)) AS VARCHAR2(20)),'''') || ''|'' ||
COALESCE(CAST("Order Writer SSO" AS VARCHAR2(36)),'''') || ''|'' ||
COALESCE(CAST("Line Number" AS VARCHAR2(10)),'''')
))) AS VARCHAR2(32)) AS "HashVal"
FROM MY_SCHEMA.MY_INVOICE_TABLE
WHERE "Invoice Date" >= TO_DATE('''+CONVERT(VARCHAR(10), DATEADD(YEAR,-2,DATEADD(MONTH,1-(DATEPART(MONTH,DATEADD(DAY,1-(DATEPART(DAY,GETDATE())),GETDATE()))),DATEADD(DAY,1-(DATEPART(DAY,GETDATE())),GETDATE()))), 111)+''', ''yyyy/mm/dd'')'
In the Data Flow I have an Oracle Source
and an OLE DB Destination
. I first had to "prime" the Oracle Source
, though, by configuring it to use a SQL Command as the input, and then inputting the query with a hard-coded date comparison. Once I finished building the Data Flow, I went back to the Control Flow, opened the Properties of the Data Flow Task
and created a new Expressions entry. I mapped my SSIS source query variable to the [Oracle Source].[SqlCommand]
.

It's also very important to tune the BatchSize
and buffer size. By default, the Attunity Oracle Source uses a BatchSize
of 100. By contrast, I found that this particular transfer works well with a BatchSize
of 120,000.
The gist of the stored proc that merges the data is this:
--This is an intermediate work table
TRUNCATE TABLE dbo.InvoiceWorkTable
--It gets populated from the SQL Server staging table where the raw Oracle data is stored
INSERT INTO dbo.InvoiceWorkTable
(
...
)
SELECT
...
FROM dbo.InvoiceTable_Staging
--Update records where the unique key matches and the HashVals don't match
UPDATE psd
SET
psd.* = iw.*
FROM dbo.ProductionSalesData psd
INNER JOIN dbo.InvoiceWorkTable iw
ON psd.DataSourceCode = iw.DataSourceCode
AND psd.InvoiceNumber = iw.InvoiceNum
AND psd.ItemNumber = iw.ItemNumber
AND psd.LineNumber = iw.LineNumber
AND psd.IsOpen = iw.IsOpen
WHERE iw.DataSourceCode = 'ABC'
AND iw.HashVal <> iw.HashVal
--Insert new records (i.e. those with unique keys not found in
INSERT INTO dbo.ProductionSalesData
(
...
)
SELECT
iw.*
FROM dbo.InvoiceWorkTable iw
LEFT JOIN dbo.ProductionSalesData psd
ON psd.DataSourceCode = iw.DataSourceCode
AND psd.InvoiceNumber = iw.InvoiceNum
AND psd.ItemNumber = iw.ItemNumber
AND psd.LineNumber = iw.LineNumber
AND psd.IsOpen = iw.IsOpen
WHERE iw.DataSourceCode = 'ECL'
AND psd.InvoiceNumber IS NULL
I don't delete anything here, because that's performed by a month-end cleanup job that runs separately. I just perform an UPDATE
on matched-but-changed records, INSERT
the new ones, and anything unchanged is left alone.
By filtering the Oracle data and using a hashed encoding of the record to compare with instead of comparing each column, I avoid the very memory hungry option of using a Lookup
with hundreds of millions of values. Nor do I have to try to transfer those ids between servers (and platforms!).
I wish I could say that was the end of this particular workflow, but it's not. This is a very resource-hungry process, and we can't afford to have it run on our production OLTP server. So we run all our ETL processes on a completely different server. But we also need that data from Oracle to be integrated into the OLTP server as soon as it's available. But what's the point in having a fast transfer of hundres of millions of records between Oracle and the ETL server, if it still takes hours to move the data to a different SQL Server? Doing a "kill-and-fill" wouldn't work because the insert takes too long, and the application is requesting data from it too often. I can transfer the data between servers, but getting the new data into the production table on the OLTP server wasn't working well.
So after too many hours of trial and error, I found a tutorial on how to use ALTER TABLE
with SWITCH TO
. I'm not saying this is a perfect solution in every case, as there are security/permissions considerations as well as some other potential caveats. If you're interested, you should definitely read more from better sources, but basically, you want to make sure your source and destination tables have the same number of columns with the same names, as well as equivalent indexes. Our ProductionSalesData
table could have different indexes between different runs, so I had to create this proc to drop and recreate indexes dynamically.
CREATE PROCEDURE [dbo].[proc_SwitchOutTables]
AS
DECLARE @CreateStatement NVARCHAR(MAX)
,@DropStatement NVARCHAR(MAX)
--this table will hold the current production data as a precaution
TRUNCATE TABLE dbo.ProductionSalesData_Old
--drop indexes on ProductionSalesData_Old
BEGIN
SELECT
DropStatement = 'DROP INDEX ' + QUOTENAME(i.name) + ' ON ' + QUOTENAME(t.name)
INTO #Drops
FROM sys.tables AS t
INNER JOIN sys.indexes AS i ON t.object_id = i.object_id
LEFT JOIN sys.dm_db_index_usage_stats AS u ON i.object_id = u.object_id AND i.index_id = u.index_id
WHERE t.is_ms_shipped = 0
AND i.type <> 0
AND t.name = 'ProductionSalesData_Old'
ORDER BY QUOTENAME(t.name), is_primary_key DESC
IF EXISTS (SELECT TOP 1 1 FROM #Drops)
BEGIN
DECLARE drop_cursor CURSOR FOR
SELECT DropStatement FROM #Drops
OPEN drop_cursor
FETCH NEXT FROM drop_cursor INTO @DropStatement
WHILE @@FETCH_STATUS = 0
BEGIN
EXEC(@DropStatement)
FETCH NEXT FROM drop_cursor INTO @DropStatement
END
--end loop
--clean up
CLOSE drop_cursor
DEALLOCATE drop_cursor
END
DROP TABLE #Drops
END
--recreate indexes on ProductionSalesData_Old based on indexes on ProductionSalesData
BEGIN
SELECT
CreateStatement =
'CREATE '
+ CASE WHEN i.type_desc = 'CLUSTERED' THEN 'CLUSTERED'
WHEN i.type_desc = 'NONCLUSTERED' AND is_unique=1 THEN 'UNIQUE NONCLUSTERED'
WHEN i.type_desc = 'NONCLUSTERED' AND is_unique=0 THEN 'NONCLUSTERED'
END
+ ' INDEX '
+ QUOTENAME(i.name)
+ ' ON '
+ QUOTENAME(t.name+'_Old')
+ ' ( '
+ STUFF(REPLACE(REPLACE((
SELECT QUOTENAME(c.name) + CASE WHEN ic.is_descending_key = 1 THEN ' DESC' ELSE '' END AS [data()]
FROM sys.index_columns AS ic
INNER JOIN sys.columns AS c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
WHERE ic.object_id = i.object_id AND ic.index_id = i.index_id AND ic.is_included_column = 0
ORDER BY ic.key_ordinal
FOR XML PATH
), '<row>', ', '), '</row>', ''), 1, 2, '') + ' ) ' -- keycols
+ COALESCE(' INCLUDE ( ' +
STUFF(REPLACE(REPLACE((
SELECT QUOTENAME(c.name) AS [data()]
FROM sys.index_columns AS ic
INNER JOIN sys.columns AS c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
WHERE ic.object_id = i.object_id AND ic.index_id = i.index_id AND ic.is_included_column = 1
ORDER BY ic.index_column_id
FOR XML PATH
), '<row>', ', '), '</row>', ''), 1, 2, '') + ' ) ', -- included cols
'')
+ COALESCE(' WHERE ' +
STUFF(REPLACE(REPLACE((
--SELECT QUOTENAME(c.name) AS [data()]
SELECT ic.filter_definition AS [data()]
FROM sys.indexes AS ic
WHERE ic.index_id = i.index_id
AND ic.object_id = i.object_id
AND ic.has_filter = 1
ORDER BY ic.index_id
FOR XML PATH
), '<row>', ', '), '</row>', ''), 1, 2, ''), -- filter
'')
INTO #Creates
FROM sys.tables AS t
INNER JOIN sys.indexes AS i ON t.object_id = i.object_id
LEFT JOIN sys.dm_db_index_usage_stats AS u ON i.object_id = u.object_id AND i.index_id = u.index_id
WHERE t.is_ms_shipped = 0
AND i.type <> 0
AND t.name = 'ProductionSalesData'
ORDER BY QUOTENAME(t.name)
,is_primary_key DESC
IF EXISTS (SELECT TOP 1 1 FROM #Creates)
BEGIN
DECLARE create_cursor CURSOR FOR
SELECT CreateStatement FROM #Creates
OPEN create_cursor
FETCH NEXT FROM create_cursor INTO @CreateStatement
WHILE @@FETCH_STATUS = 0
BEGIN
--PRINT @CreateStatement
EXEC(@CreateStatement)
FETCH NEXT FROM create_cursor INTO @CreateStatement
END
--end loop
--clean up
CLOSE create_cursor
DEALLOCATE create_cursor
END
DROP TABLE #Creates
END
--proc continues below
The last bit of the proc below does the actual SWITCH TO
. We use WITH ( WAIT_AT_LOW_PRIORITY()
to wait until there are no pending requests against our production table. Once the coast is clear, the current production table becomes the _Old
version, and the _Staging
version we pulled over from the ETL server becomes the new production table. The SWITCH TO
happens in a matter of milliseconds. All SQL Server is doing is updating the table metadata, specifically the memory address of the first leaf of the table.
BEGIN TRAN
ALTER TABLE dbo.ProductionSalesData
SWITCH TO dbo.ProductionSalesData_Old
WITH ( WAIT_AT_LOW_PRIORITY ( MAX_DURATION = 1 MINUTES, ABORT_AFTER_WAIT = BLOCKERS ));
--Anyone who tries to query the table after the switch has happened and before
--the transaction commits will be blocked: we've got a schema mod lock on the table
ALTER TABLE dbo.ProductionSalesData_Staging
SWITCH TO dbo.ProductionSalesData;
COMMIT
GO