4

I have a stored procedure which gives me a result set consisting of a single column which contains millions of unprocessed rows. I need to transfer these data to another server using SqlBulkCopy, but the problem is that I can't simply do the following:

using (var con = new SqlConnection(sqlConnectionStringSource))
{
    using (var cmd = new SqlCommand("usp_GetUnprocessedData", con))
    {
        cmd.CommandType = CommandType.StoredProcedure;
        con.Open();
        using (var reader = cmd.ExecuteReader())
        {
            using (var sqlBulk = new SqlBulkCopy(sqlConnectionStringDestination))
            {
                sqlBulk.DestinationTableName = "BulkCopy";
                sqlBulk.BulkCopyTimeout = 0;
                sqlBulk.BatchSize = 200000;
                sqlBulk.WriteToServer(reader);
            }
        }
    }
}

because data won't be processed at all.

In my case, the nth row of the result set looks like this:

value1_n,value2_n,value3_n

where n is just a subscript I've introduced to distinguish between the various rows.

In the destination table, which I've named BulkCopy, I'd like to have:

╔══════════╦══════════╦══════════╗
║  Field1  ║  Field2  ║  Field3  ║
╠══════════╬══════════╬══════════╣
║ Value1_1 ║ Value2_1 ║ Value3_1 ║
║ Value1_2 ║ Value2_2 ║ Value3_2 ║
║ ...      ║ ...      ║ ...      ║
║ Value1_n ║ Value2_n ║ Value3_n ║
╚══════════╩══════════╩══════════╝

I was being told to use a custom DataReader via an implementation of the IDataReader interface, in order to process data row by row before SqlBulkCopy copies the data from it, using EnableStreamingProperty = true to ensure that only a small amount of data are in memory, but I have no idea where to start. Can you help me, please?

J. Doe
  • 63
  • 1
  • 5
  • You have to start by comparing the columns names(and data types) of the table in the original database with the columns names (and data types) in destination table. BulkCopy defaults to the names and types being the same. So if there is differences you have to create a mapping for the BulkCopy to work correctly. – jdweng Apr 10 '19 at 09:56
  • The problem is that the source table has only got one single column (of type `nvarchar(max)`), within which the comma-separated values are present for each row. – J. Doe Apr 10 '19 at 09:59
  • I've never done it before. See : https://learn.microsoft.com/en-us/dotnet/api/system.data.sqlclient.sqlbulkcopy?view=netframework-4.7.2. You need to use the method : WriteToServer(IDataReader) where the IDataReader is the source database. the IDataReader would use the split method to take the single column and return an array of the type value1_n,value2_n,value3_n – jdweng Apr 10 '19 at 10:17
  • @J.Doe you're asking how to *transform* the data, not how to insert them. How much data is there and do you *really* need to stream the data? You could load them in a DataTable for example, perform any transformations you need and then use `WriteToServer(DataTable)`. Or you could use Dapper or any other ORM to load the data with LINQ, transform the objects and use [FastMember](https://github.com/mgravell/fast-member)'s `ObjectReader` to convert the IEnumerable<> result to an IDataReader. – Panagiotis Kanavos Apr 10 '19 at 10:21
  • @J.Doe a rather complex solution would be to create your own `IDataReader` implementation that delegates to the original reader but overrides `GetValue`, `GetName`, `GetOrdinal`, `GetSchemaTable` etc to return the extra columns and transformed values. Delegating to a wrapped `IDataReader` isn't hard, there's even a VS 2019 refactoring, `Implement interface using inner` that does that. Getting the column names and transformations right though, is a bit more involved – Panagiotis Kanavos Apr 10 '19 at 10:24
  • @PanagiotisKanavos I can't use a DataTable, becuase I would easily saturate the RAM (we are talking about 50 million of records). The problem is I wasn't able to find anywhere an example which showed me how the custom `IDataReader` implementation (which would be in theory the fastest solution) could transform the input data in such a way as to obtain the result I want. – J. Doe Apr 10 '19 at 10:51
  • @J.Doe that's an interesting case. Why pull the data to the *client* at all, instead of using SSIS? Yes, it can be annoying and doesn't play well with version control, but it does the job well. It already handles several of the problems you'd encounter in an ETL job. You can't just use an IDataReader with 50M rows for example, what if the *target* is slow? What if the source is slow? You'd have to buffer data but what if one of the transformations takes too long and its input buffer starts growing? – Panagiotis Kanavos Apr 10 '19 at 11:15
  • @J.Doe lets reverse the problem. Why does *SQL Server* return results as a CSV value? Why not proper columns, or at least something that's easy to parse like JSON? Or is that how data is stored eg in a staging table? I suspect you already know how many fields there are? – Panagiotis Kanavos Apr 10 '19 at 12:17

2 Answers2

1

Let's reverse the problem. Instead of finding a generic solution, create one specific for this problem. Having spent days creating an IDataReader wrapper I know it's not that trivial.

We know how many fields there are, we don't care about any other fields in the results. Instead of trying to correctly implement an IDataReader wrapper, we could create an iterator method to split the data and return the records one by one in a streaming fashion. FastMember's ObjectReader can wrap an IDataReader interface over any IEnumerable :

class MyDTO
{
    public string Field1{get;set;}
    public string Field2{get;set;}
    public string Field3{get;set;}
}

public IEnumerable<MyDTO> ReaderToStream(IDataReader reader)
{
    while(reader.Read())
    {
        var line=reader.GetString(0);
        var fields=String.Split(",",line);
        yield return new MyDTO{Field1=fields[0];Field2=fields[1];Field3=fields[2]};
    }
}

The importing method could change to :

using (var con = new SqlConnection(sqlConnectionStringSource))
{
    ...
    using (var reader = cmd.ExecuteReader())
    {
        var recordStream=ReaderToStream(reader);
        using(var rd=ObjectReader(recordStream))
        using (var sqlBulk = new SqlBulkCopy(sqlConnectionStringDestination))
        {
            ...
            sqlBulk.WriteToServer(rd);
        }
    }
}

The iterator calls Read() only when SqlBulkCopy requests a new record, so we don't end up loading everything in memory.

And the IDataReader wrapper

Resharper and Visual Studio 2019 offer to implement an interface by delegating calls to a wrapped class. In Visual Studio 2019 this is called Implement interface through 'field_name'.

Starting with this code :

class ReaderWrapper:IDataReader
{
    private readonly IDataReader _inner ;
    public ReaderWrapper(IDataReader inner)
    {
        _inner = inner;
    }
}

Applying the refactoring gives :

class ReaderWrapper:IDataReader
{
    private readonly IDataReader _inner ;
    public ReaderWrapper(IDataReader inner)
    {
        _inner = inner;
    }

    public object this[int i] => _inner[i];

    public object this[string name] => _inner[name];

    public int Depth => _inner.Depth;

    public bool IsClosed => _inner.IsClosed;

    public int RecordsAffected => _inner.RecordsAffected;

    public int FieldCount => _inner.FieldCount;

    public void Close() => _inner.Close();
    public void Dispose() => _inner.Dispose();
    public bool GetBoolean(int i) => _inner.GetBoolean(i);
    public byte GetByte(int i) => _inner.GetByte(i);
    public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) => _inner.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
    public char GetChar(int i) => _inner.GetChar(i);
    public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) => _inner.GetChars(i, fieldoffset, buffer, bufferoffset, length);
    public IDataReader GetData(int i) => _inner.GetData(i);
    public string GetDataTypeName(int i) => _inner.GetDataTypeName(i);
    public DateTime GetDateTime(int i) => _inner.GetDateTime(i);
    public decimal GetDecimal(int i) => _inner.GetDecimal(i);
    public double GetDouble(int i) => _inner.GetDouble(i);
    public Type GetFieldType(int i) => _inner.GetFieldType(i);
    public float GetFloat(int i) => _inner.GetFloat(i);
    public Guid GetGuid(int i) => _inner.GetGuid(i);
    public short GetInt16(int i) => _inner.GetInt16(i);
    public int GetInt32(int i) => _inner.GetInt32(i);
    public long GetInt64(int i) => _inner.GetInt64(i);
    public string GetName(int i) => _inner.GetName(i);
    public int GetOrdinal(string name) => _inner.GetOrdinal(name);
    public DataTable GetSchemaTable() => _inner.GetSchemaTable();
    public string GetString(int i) => _inner.GetString(i);
    public object GetValue(int i) => _inner.GetValue(i);
    public int GetValues(object[] values) => _inner.GetValues(values);
    public bool IsDBNull(int i) => _inner.IsDBNull(i);
    public bool NextResult() => _inner.NextResult();
    public bool Read() => _inner.Read();
}

To create a splitting wrapper, we need to replace Read() with our own version :

    private string[] _values;

    public bool Read()
    {
        var ok = _inner.Read();
        if (ok)
        {
            //It *could be null*
            if (_inner.IsDBNull(0))
            {
                //What to do? Store an empty array for now
                _values = new string[0];
            }
            var fieldValue = _inner.GetString(0);                
            _values= fieldValue.Split(',');
        }
        return ok;
    }

This splits the CSV values and stores them in a string. This shows why implementing the wrapper as a bit of a bother - we need to handle quite a few things and decide what to do in unexpected situations like nulls, empty strings etc.

After that, we need to add our own implementations for the methods called by SqlBulkCopy. GetValue() is definitelly called, so is FieldCount. Other members are called based on the column mapping types, by name or by ordinal.

public int FieldCount => _values.Length;

public string GetString(int ordinal) => _values[ordinal];

public object GetValue(int ordinal)=> _values[ordinal];

//What if we have more values than expected?
public int GetValues(object[] values)
{
    if (_values.Length > 0)
    {
        Array.Copy(_values, values,_values.Length);
        return _values.Length;
    }
    return 0;
}

And now the "funny" parts. What about GetName()? Probably :

public string GetName(int ordinal) => $"Field{ordinal}";

GetOrdinal ? It may be called in name mapping. Getting tricky :

public int GetOrdinal(string name) => int.Parse(name.Substring(5));

Let's hope this works.

We also need to override the indexes :

    public object this[string name] => _values[GetOrdinal(name)];

    public object this[int i] => _values[i];

What did I forget? ... Still need to handle arbitrary value numbers. Need to handle nulls. There's no GetSchemaTable which probably means the column mappings will have to be specified explicitly, probably by ordinal.

A quick&dirt IsDbNull implementation could be :

public bool IsDBNull(int i)
{  
    //Covers the "null" case too, when `Length` is 0
    if (i>_values.Length-1)
    {
        return true;
    }
    return _inner.IsDBNull(i);
}

GetSchemaTable is harder because we don't really know how many values are in each record. The table has 20+ columns so I'd rather not write that code until I see that it's needed.

public DataTable GetSchemaTable() => throw new NotImplementedException();

Leave it as an excercise to the reader as they say

PPS: Default interface implementations, because why not

All this is probably a nice if convoluted case where C# 8's default interface methods could be used to create a wrapped reader trait. By default, defer to the wrapped inner reader. This would eliminate all deferred calles in the implementation.

interface IReaderWrapper:IDataReader
{
    //Gives access to the wrapped reader in the concrete classes
    abstract IDataReader Inner();

    override object this[int i] => Inner()[i];

    override object this[string name] => Inner()[name];

    override int Depth => Inner().Depth;

    override bool IsClosed => Inner().IsClosed;
    ...
}

class SplitterWrapper:IReaderWrapper
{

    private readonly IDataReader _inner ;
    public SplitterWrapper(IDataReader inner)
    {
        _inner = inner;
    }

    IDataReader Inner()=> _inner;

    string[] _values;
    public object this[int i] => _values[i];
    ...
}

This feature doesn't work in the C# 8 compiler that came with VS 2019 and somehow crashes Sharplab.io. No idea if it will compile or if the overrides are really needed.

Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
0

I found following codeproject : https://www.codeproject.com/script/Articles/ViewDownloads.aspx?aid=1095790. It looks like you have to take csv data and split into objects. I modified code project with code below. There are a lot of type not implemented and you may need to implement some additional methods. Also not sure what type the results Value should be.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.Data.SqlClient;



namespace ConsoleApplication108
{
    class Program
    {
        static void Main(string[] args)
        {

        }
    }
    public class MyDataReader : IDataReader 
    {
        private SqlConnection conn { get; set; }
        private SqlCommand cmd { get; set; }
        private SqlDataReader reader { get; set; }
        private DataTable schemaTable { get; set; }

        private string data { get; set; }
        private object[] arrayData { get; set; }
        private IEnumerator<object> m_dataEnumerator { get; set; }


        public MyDataReader(string commandText, string connectionString, List<KeyValuePair<string, Type>> columns)
        {
            conn = new SqlConnection(connectionString);
            conn.Open();
            cmd = new SqlCommand(commandText, conn);
            reader = cmd.ExecuteReader();

            schemaTable = new DataTable();
            foreach(KeyValuePair<string,Type> col in columns)
            {
                schemaTable.Columns.Add(col.Key, col.Value);
            }
        }
        public Boolean NextResult()
        {
            return reader.Read();
        }
        public int RecordsAffected
        {
            get { return -1; }
        }
        public int Depth
        {
            get { return -1; }
        }
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        private void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (m_dataEnumerator != null)
                {
                    m_dataEnumerator.Dispose();
                    m_dataEnumerator = null;
                }
            }
        }

        public Boolean IsClosed {
            get { return reader.IsClosed; }
        }
        public Boolean Read()
        {

            if (IsClosed)
            {
                throw new ObjectDisposedException(GetType().Name);
            }
            else
            {
                arrayData = reader.GetString(0).Split(new char[] { ',' }).ToArray();
            }
            return m_dataEnumerator.MoveNext();

        }
        public DataTable GetSchemaTable()
        {
            return schemaTable;
        }
        public void Close()
        {
            Dispose();
        }


        public object this[string name]
        {
            get { throw new NotImplementedException(); }

        }

        public object this[int i]
        {
            get { return arrayData[i]; }
        }
        public int FieldCount
        {
            get { return arrayData.Length; }
        }
        public bool IsDBNull(int i)
        {
              throw new NotImplementedException();
        }
        public bool GetBoolean(int i)
        {
            throw new NotImplementedException();
        }

        public byte GetByte(int i)
        {
            throw new NotImplementedException();
        }

        public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        public char GetChar(int i)
        {
            throw new NotImplementedException();
        }

        public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }

        public IDataReader GetData(int i)
        {
            throw new NotImplementedException();
        }

        public string GetDataTypeName(int i)
        {
            throw new NotImplementedException();
        }

        public DateTime GetDateTime(int i)
        {
            throw new NotImplementedException();
        }

        public decimal GetDecimal(int i)
        {
            throw new NotImplementedException();
        }

        public double GetDouble(int i)
        {
            throw new NotImplementedException();
        }

        public Type GetFieldType(int i)
        {
            throw new NotImplementedException();
        }

        public float GetFloat(int i)
        {
            throw new NotImplementedException();
        }

        public Guid GetGuid(int i)
        {
            throw new NotImplementedException();
        }

        public short GetInt16(int i)
        {
            throw new NotImplementedException();
        }

        public int GetInt32(int i)
        {
            throw new NotImplementedException();
        }

        public long GetInt64(int i)
        {
            throw new NotImplementedException();
        }

        public string GetName(int i)
        {
            throw new NotImplementedException();
        }

        public string GetString(int i)
        {
            throw new NotImplementedException();
        }

        public int GetValues(object[] values)
        {
            values = arrayData;

            return arrayData.Length;
        }
        public int GetOrdinal(string name)
        {
            throw new NotImplementedException();
        }

        public object GetValue(int i)
        {
            return arrayData[i];
        }



    }
}
jdweng
  • 33,250
  • 2
  • 15
  • 20
  • Visual Studio 2019 at least can implement the interface that correctly with a simple code fix. There are many wrong calls, like `NextResult()` calling `Read()`, throwing on perfectly valid methods the attempt to split is done in `Read` instead of `GetValue` or `GetString` and methods that *are* called by SqlBulkCopy simply throw, like `GetSchemaTable` or `GetName` – Panagiotis Kanavos Apr 10 '19 at 12:11
  • I disagree. The Get objects should be reading the arrayData. The split should only be done once in the Read() method. This way you split to a string[] then convert the string[i] to the required type. The bulkcopy is expecting an array of objects. The issue is if the types are not all the same type your method would not work. Suppose each record contained : int, string, decimal – jdweng Apr 10 '19 at 12:19
  • I've been trying to create such a wrapper just last week, which is why I looked for the pain points immediatelly. SqlBulkCopy *does* call `GetSchemaTable()` when mapping by name. There's no reason to create the reader in the constructor either, just pass any IDataReader as a constructor parameter. `Read` itself should call `reader.Read()` instead of *assuming* there aren't multiple results, and check the response instead of checking `IsClosed` – Panagiotis Kanavos Apr 10 '19 at 12:37
  • I agree. The implementation is up to user. In this case the GetSchemaTable() would come from the destination database. – jdweng Apr 10 '19 at 12:41
  • I just posted an implementation and you'll probably spot missing functionality immediatelly. What about nulls, empty fields, too many values, *changing* field values, what are the field names, etc. – Panagiotis Kanavos Apr 10 '19 at 13:10