1

So... I've got an SSIS Package that creates and runs SSIS Packages in memory. The "meta" package creation happens inside of a DataFlow Script Component that is set up as a Destination. The basic gist is to pull data from one or more Oracle sources and place it into a SQL Server Staging Databased (used for BI). I've had great success with the package except for when I try to pull data from an Oracle source using a DB Link (a.k.a., SCHEMA.TABLE_NAME@OTHER).

The Oracle Connection Manager is created no differently than if I wasn't using a DB Link. The Select statement I use (that contains the DB Link syntax) is created well before I even run the SSIS Package.

When I attempt to run a "meta" package that contains one of these select statements, I get errors revolving around using syntax that the OLE DB SQL Server doesn't recognize. Meaning, that instead of using the Oracle-ness of my Source Connection Manager, it's using either my SQL Server Destination Connection manager or forgetting it's an Oracle Connection Manager.

Again, the ONLY difference is the DB Link syntax in the SELECT statement. That is literally it.

I've tried rearranging my code so that the connection managers are created right before they are needed. But that doesn't fix the problem (just introduces more). I've tried using the connection managers from the base, outer package. But that has it's own challenges too. I've even attempted to "reinitialize" the connection managers right before they are needed which doesn't seem to do anything.

I'm using SSIS 2017. The Oracle databases vary in versions but are all 11 and above.

Has anyone ever experienced this, and if so, how did you fix it?

I'm including a chunk of my code below; I've had to trim things out to fit in this post.

The main part of the script (public override void Input0_ProcessInputRow) is broken up into three main sections: one that scrutinizes the size of the table and determines whether or not to just Truncate and Load; a section that (once it's determined not to be a trunc and load) determines whether or not a Deletes comparison is needed; and a section that determines whether or not updates/inserts need to be processed. I've included the Truncate and Load section pretty much in whole which demonstrates the logic pretty much throughout.

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using Microsoft.SqlServer.Dts.Pipeline;
using System.Data.SqlClient;
#endregion

[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    private bool fireAgain = true;
    private bool continueProcessing = true;
    private string processingErrorMessage;
    private string pathName;

    private ConnectionManager srcConMgr;
    private ConnectionManager dstConMgr;

    public override void PreExecute()
    {
        base.PreExecute();
        ComponentMetaData.FireInformation(10, $"Begining Pre Execute of Threaded Path", "Begining Pre Execute of First Threaded Path", "", 0, fireAgain);
    }

    public override void PostExecute()
    {
        base.PostExecute();
        if (continueProcessing)
        {
            ComponentMetaData.FireInformation(10, $"Begining Post Execute of {pathName}", $"Begining Post Execute of {pathName}", "", 0, fireAgain);
        }
        else
        {
            ComponentMetaData.FireInformation(10, $"Begining Post Execute of {pathName}", $"There were Processing Errors along {pathName}:  {processingErrorMessage}.  Table was skipped for processing.", "", 0, fireAgain);
        }
    }

    string BlobColumnToString(BlobColumn blobColumn)
    {
        if (blobColumn.IsNull)
            return string.Empty;

        var blobLength = Convert.ToInt32(blobColumn.Length);
        var blobData = blobColumn.GetBlobData(0, blobLength);
        var stringData = System.Text.Encoding.Unicode.GetString(blobData);

        return stringData;
    }

    ConnectionManager CreateConnectionManager(Microsoft.SqlServer.Dts.Runtime.Package package, string conString, string conName, string conDescription)
    {
        ConnectionManager bldConnectionManager = package.Connections.Add("OLEDB");
        bldConnectionManager.ConnectionString = conString;
        bldConnectionManager.Name = conName;
        bldConnectionManager.Description = conDescription;

        bldConnectionManager.AcquireConnection(null);

        return bldConnectionManager;
    }

    static void ReInitializeConnectionManager(ConnectionManager conMgr, string conString)
    {
        conMgr.ConnectionString = conString;

        conMgr.AcquireConnection(null);
    }

    Executable CreateExecutable(Microsoft.SqlServer.Dts.Runtime.Package package, string exType)
    {
        Executable e = package.Executables.Add(exType);

        return e;
    }

    MainPipe CreateDataFlowTask(Executable executable, string dfName)
    {
        Microsoft.SqlServer.Dts.Runtime.TaskHost thMainPipe = executable as Microsoft.SqlServer.Dts.Runtime.TaskHost;
        thMainPipe.Name = dfName;
        MainPipe dataFlowTask = thMainPipe.InnerObject as MainPipe;
        (dataFlowTask as IDTSPipeline130).AutoAdjustBufferSize = true;

        return dataFlowTask;
    }

    IDTSComponentMetaData100 CreateOLEDBComponent(Microsoft.SqlServer.Dts.Runtime.Application app, MainPipe dataFlowTask, string componentName, bool createSource)
    {
        //Create the DataFlow Task
        IDTSComponentMetaData100 oleComponent = dataFlowTask.ComponentMetaDataCollection.New();
        oleComponent.Name = componentName;

        if (createSource == true)
        {
            oleComponent.ComponentClassID = app.PipelineComponentInfos["OLE DB Source"].CreationName;
        }
        else
        {
            oleComponent.ComponentClassID = app.PipelineComponentInfos["OLE DB Destination"].CreationName;
        }

        return oleComponent;
    }

    CManagedComponentWrapper CreateOLEDBSourceDesignTimeInstance(Microsoft.SqlServer.Dts.Runtime.Package package, IDTSComponentMetaData100 source, ConnectionManager sourceConnectionManager, string sourceSQL)
    {
        //Get the design-time instance of the component.
        CManagedComponentWrapper srcDesignTime = source.Instantiate();

        //Initialize the component
        srcDesignTime.ProvideComponentProperties();

        //Map the component to a connection manager
        source.RuntimeConnectionCollection[0].ConnectionManagerID = sourceConnectionManager.ID;
        source.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections[0]);

        //Set the OLE DB Source properties
        srcDesignTime.SetComponentProperty("AccessMode", 2);
        srcDesignTime.SetComponentProperty("SqlCommand", sourceSQL);

        // Reinitialize the metadata
        srcDesignTime.AcquireConnections(null);
        srcDesignTime.ReinitializeMetaData();
        srcDesignTime.ReleaseConnections();

        return srcDesignTime;
    }

    CManagedComponentWrapper CreateOLEDBDestinationDesignTimeInstance(Microsoft.SqlServer.Dts.Runtime.Package package, IDTSComponentMetaData100 destination, ConnectionManager destinationConnectionManager, string stagingAlias)
    {
        CManagedComponentWrapper destDesignTime = destination.Instantiate();
        destDesignTime.ProvideComponentProperties();

        destination.RuntimeConnectionCollection[0].ConnectionManagerID = destinationConnectionManager.ID;
        destination.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections[1]);

        destDesignTime.SetComponentProperty("AccessMode", 3);
        destDesignTime.SetComponentProperty("FastLoadOptions", "TABLOCK");
        destDesignTime.SetComponentProperty("OpenRowset", stagingAlias);

        return destDesignTime;
    }


    Microsoft.SqlServer.Dts.Runtime.TaskHost CreateExecuteSQLTask(Executable e, ConnectionManager connectionManager, string taskName, string sqlStatement)
    {
        Microsoft.SqlServer.Dts.Runtime.TaskHost thExecuteSQL = e as Microsoft.SqlServer.Dts.Runtime.TaskHost;
        thExecuteSQL.Properties["Connection"].SetValue(thExecuteSQL, connectionManager.ID);
        thExecuteSQL.Properties["Name"].SetValue(thExecuteSQL, taskName);
        thExecuteSQL.Properties["SqlStatementSource"].SetValue(thExecuteSQL, sqlStatement);

        return thExecuteSQL;
    }


    static void UpdateStagingMetaDataPostProcess(string connectionString, int tableId, int processedItems, string fromMaxTimeStampString)
    {
        string sqlCommand = $"DECLARE @procTime datetime = GETDATE(); " +
                            $"EXEC Staging.meta.spUpdateInsertsUpdatesDeletesAfterProcessing @tableID = {tableId.ToString()}, " +
                                                                                            $"@processedItems = {processedItems.ToString()}, " +
                                                                                            $"@processedTimeStamp = @procTime, " +
                                                                                            $"@fromMaxTimeStamp = '{fromMaxTimeStampString}';";

        using (SqlConnection connection = new SqlConnection(
            connectionString))
        {
            SqlCommand command = new SqlCommand(sqlCommand, connection);
            command.Connection.Open();
            command.ExecuteNonQuery();
        }

    }

    static void ExecuteStagingSQLCommand(string connectionString, string sqlCommand)
    {
        using (SqlConnection connection = new SqlConnection(
            connectionString))
        {
            SqlCommand command = new SqlCommand(sqlCommand, connection);
            command.Connection.Open();
            command.ExecuteNonQuery();
        }
    }


    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        //removed a bunch of code to fit into post on stack overflow...

        if (continueProcessing)
        {
            string serverName = this.Variables.ServerName;
            string stagingConnectionString = @"server=" + serverName + imNotGivingYouMyConnectionInformation;
            int processedItems = 0;

            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the deletesExecutable and insertsUpdatesExecutable variables and assign null for {stagingAlias}", string.Empty, 0, ref fireAgain);
            Executable deletesExecutable = null;
            Executable insertsUpdatesExecutable = null;

            //Create the Application and Package
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the Application and Package", string.Empty, 0, ref fireAgain);
            Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
            Microsoft.SqlServer.Dts.Runtime.Package package = new Microsoft.SqlServer.Dts.Runtime.Package();


            Microsoft.SqlServer.Dts.Runtime.Connections pkgConns = package.Connections;

            //Setup the Source Connection Manager
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Setup Source Connection Manager for {stagingAlias}", string.Empty, 0, ref fireAgain);
            srcConMgr = CreateConnectionManager(package, connectionDetails,
                                            connectionName + " OLEDB Connection Manager",
                                            "Connection Manager for " + connectionName);
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Source Connection Manager for {stagingAlias}:  {package.Connections[$"{connectionName} OLEDB Connection Manager"].Description}", string.Empty, 0, ref fireAgain);


            //Setup the Destination Connection Manager
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Setup Destination Connection Manager", string.Empty, 0, ref fireAgain);
            dstConMgr = CreateConnectionManager(package, destinationConnectionDetails,
                                            "Staging OLEDB Connection Manager",
                                            "Connection Manager for Staging.");
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Destination Connection Manager for {stagingAlias}:  {package.Connections["Staging OLEDB Connection Manager"].Description}", string.Empty, 0, ref fireAgain);

            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Connection Manager Count for {stagingAlias}:  {package.Connections.Count}", string.Empty, 0, ref fireAgain);


            //Determine what kind of staging transaction this will be (Trunc and Load or Update/Insert and Delete)
            if (stagingClassification == "S" || this.Variables.ForceTruncateAndLoad == true) //this is a small, trunc and load table OR the user has elected to trunc and load everything that needs trunc'd and loaded yo
            {
                try
                {
                    processedItems = 14; //we're updating inserts, updates, and deletes

                    //we're going to trunc and load here
                    //Create the Load pipeline, a.k.a., DataFlow task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the Trunc and Load pipeline, a.k.a., DataFlow task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    Executable tl_e = CreateExecutable(package, "STOCK:PipelineTask");
                    MainPipe tl_dataFlowTask = CreateDataFlowTask(tl_e, "Trunc And Load");

                    //Set the IDTSComponentEvent handler to capture the details from any COMExceptions raised during package execution
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Set the IDTSComponentEvent handler to capture the details from any COMExceptions raised during package execution", string.Empty, 0, ref fireAgain);
                    ComponentEventHandler tl_events = new ComponentEventHandler();
                    tl_dataFlowTask.Events = DtsConvert.GetExtendedInterface(tl_events as IDTSComponentEvents);

                    ReInitializeConnectionManager(srcConMgr, connectionDetails);
                    //Create the OLEDB Source DataFlow Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the OLEDB Source DataFlow Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    IDTSComponentMetaData100 tl_source = CreateOLEDBComponent(app, tl_dataFlowTask, "OLEDBSource", true);
                    CManagedComponentWrapper tl_srcDesignTime = CreateOLEDBSourceDesignTimeInstance(package, tl_source, srcConMgr, tableSQL);


                    ReInitializeConnectionManager(dstConMgr, destinationConnectionDetails);
                    //Create the OLEDB destination DataFlow Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the OLEDB destination DataFlow Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    IDTSComponentMetaData100 tl_destination = CreateOLEDBComponent(app, tl_dataFlowTask, "OleDBDestination", false);
                    CManagedComponentWrapper tl_destDesignTime = CreateOLEDBDestinationDesignTimeInstance(package, tl_destination, dstConMgr, $"dbo.{stagingAlias}");


                    //Create the path between the two DataFlow Tasks
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the path between the two DataFlow Tasks for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    IDTSPath100 tl_path = tl_dataFlowTask.PathCollection.New();
                    tl_path.AttachPathAndPropagateNotifications(tl_source.OutputCollection[0], tl_destination.InputCollection[0]);


                    //Configure the Destination's Meta Data
                    //############################################################
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //Get the destination's default input and virtual input
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Get the destination's default input and virtual input", string.Empty, 0, ref fireAgain);
                    IDTSInput100 tl_input = tl_destination.InputCollection[0];
                    IDTSVirtualInput100 tl_vInput = tl_input.GetVirtualInput();
                    IDTSVirtualInputColumnCollection100 tl_vInputColumns = tl_vInput.VirtualInputColumnCollection;

                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //Initialize the destination dataflow
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Initialize the destination dataflow", string.Empty, 0, ref fireAgain);
                    tl_destDesignTime.AcquireConnections(null);
                    tl_destDesignTime.ReinitializeMetaData();
                    tl_destDesignTime.ReleaseConnections();

                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //Iterate through the virtual input column collection and map to destination
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Iterate through the virtual input column collection and map to destination for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    foreach (IDTSVirtualInputColumn100 tl_vColumn in tl_vInputColumns)
                    {
                        var inputColumn = tl_destDesignTime.SetUsageType(tl_input.ID, tl_vInput, tl_vColumn.LineageID, DTSUsageType.UT_READONLY);
                        var externalColumn = tl_input.ExternalMetadataColumnCollection[inputColumn.Name];
                        tl_destDesignTime.MapInputColumn(tl_input.ID, inputColumn.ID, externalColumn.ID);
                    }
                    //############################################################


                    //Create the Truncate Execute SQL Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the Truncation Execute SQL Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    Executable trunc_e = CreateExecutable(package, "STOCK:SQLTask");
                    Microsoft.SqlServer.Dts.Runtime.TaskHost thTruncate = CreateExecuteSQLTask(trunc_e, dstConMgr, $"TRUNCATE {stagingAlias}", $"TRUNCATE TABLE dbo.{stagingAlias}");

                    //Create the Precedence Constraint between the Execute SQL Task and the Pipeline Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the precedence constraint between Execute SQL Task and DataFlow for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    Microsoft.SqlServer.Dts.Runtime.PrecedenceConstraint tl_Constraint = package.PrecedenceConstraints.Add(trunc_e, tl_e);
                }
                catch (Exception tl_exc)
                {

                    ComponentMetaData.FireWarning(0, "Trunc And Load Package Creation Failure", $"{pathName}:  Trunc and Load Package Creation Failure for {stagingAlias} Custom Component Event Type:  {CustomComponentEvent.type}, Sub Component:  {CustomComponentEvent.subComponent}, Description:  {CustomComponentEvent.description}", string.Empty, 0);
                    ComponentMetaData.FireWarning(0, "Trunc And Load Package Creation Failure", $"{pathName}:  Trunc and Load Package Creation Failure for {stagingAlias} Error Code:  {tl_exc.HResult}, Error Message:  {tl_exc.Message}, Source Table SQL:  {tableSQL}, Source Connection Details:  {srcConMgr.ConnectionString}", string.Empty, 0);
                    continueProcessing = false;
                }


            }
            else
            {
                //removed a bunch of code for pasting into stack overlfow

Thank you so much for your help!

Brandon Meyer
  • 121
  • 1
  • 6

0 Answers0