4

We have a need to greatly reduce the bandwidth our back-end services use when pulling and pushing data to sql. The TDS Stream used by SqlClient is fairly bloated. For years, people have requested a compression option when pulling from sql, but Microsoft has not added it.

I wanted to see if anyone had any thought on the best way to handle this. Here is what I've tried so far:

  1. I modified https://github.com/MindFlavor/TDSBridge to add compression and decompression right at the socket layer. Because the payload is SSL encrypted, it didn't make much difference.

  2. Next I took the IDataReader to Protobuf library found: https://github.com/dotarj/protobuf-net-data and TCP framework found at https://github.com/jchristn/WatsonTcp in an attempt to create a client server proxy of sorts to stream the IDataReader over the wire by converting it to protobuf, then compressing this stream, and doing the opposite on the other end.

I got a proof of concept to work here, and actually got between 84% and 98% reduction in raw bytes over the network vs plain TDS Streams. The downside is the WatsonTcp wants you to pass in the content length when assigning the stream. But there is no way to know that until you create the whole protobuf stream. We sometimes transfer hundreds of gigs in one swoop, so that won't work.

I didn't see how protobuf-net-data could stream over grpc, and even if it could, I fear the granular nature of the records in an IAsyncEnumerable may slow down a large transfer.

No doubt I can sit and write a fully custom raw socket compressed protobuf over TCP streaming implementation with the surface area for the clients being close to the SqlCommand, I just know this is notoriously difficult to get right.

Any time saving ideas? If not maybe I'll make an open source project out of it.

David Thompson
  • 301
  • 1
  • 3
  • 8
  • Have you considered simply standing up a HTTP Web API close to the SQL Server that supports returning results using compressed, efficient formats? Simple binary serialization of a System.Data.DataSet is pretty good. https://learn.microsoft.com/en-us/archive/msdn-magazine/2004/october/cutting-edge-binary-serialization-of-datasets – David Browne - Microsoft Jun 12 '20 at 00:23
  • Thanks @DavidBrowne-Microsoft. The challenge we have is some of our datasets are 10s of gigs. We solve this by streaming from idatareader of one server to bulkcopy of the other one. Data storage is compressed on each side. I'm not sure how I would start to partially fill a dataset, compress and begin transmission, and do the opposite on the other side in a stream format. My understanding of System.Data.Dataset is the need to manage this in memory all at once? – David Thompson Jun 12 '20 at 01:19
  • Just thinking aloud: I wouldn't worry about the overhead of the streaming API - just make each a batch of some suitable size that isn't 1; it could be 10, 100, 10k - but either way: it'll make any overhead here irrelevant - then unroll each batch into simulated rows on the way out? i.e. write a method that takes an `IAsyncEnumerable` and returns either an `IDataReader` (implementing just the async parts) or an `IAsyncEnumerable` (multiple records per `RowBatch`) ? – Marc Gravell Jun 12 '20 at 07:53
  • re "the granular nature of the records in an `IAsyncEnumerable`" - the trick, then, is: don't make `T` a record; make it a *page* of N records - for some number N (could be 10, 100, even 1000) - that way you get the advantages of incremental batches, without paying any significant "per row" overheads – Marc Gravell Nov 24 '20 at 07:34

2 Answers2

0

Here's a pattern you can use to take a large query and transmit it as a series of batches, where each batch is a compressed, binary-serialized DataTable. After transmission and deserialization each DataTable can be directly used by SqlBulk Copy. The same pattern can work with other formats, but would require an additional converter before passing to SqlBulkCopy.

using System.Data.SqlClient;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;

namespace BatchingDataReader
{
    class BatchingDataReader : IDataReader
    {
        private int batchSize;
        private IDataReader rdr;
        private int rowsRead;
        private bool atEnd;
        private int batchesRead;

        public BatchingDataReader(IDataReader rdr, int batchSize)
        {
            this.batchSize = batchSize;
            this.rdr = rdr;
        }

        public object this[int i] => rdr[i];

        public object this[string name] => rdr[name];

        public int Depth => rdr.Depth;

        public bool IsClosed => rdr.IsClosed;

        public int RecordsAffected => rdr.RecordsAffected;

        public int FieldCount => rdr.FieldCount;

        public void Close()
        {
            if (!atEnd)
                return;
            rdr.Close();
        }

        public void Dispose()
        {
            if (!atEnd)
                return;

            rdr.Dispose();
        }

        public bool GetBoolean(int i)
        {
            return rdr.GetBoolean(i);
        }

        public byte GetByte(int i)
        {
            return rdr.GetByte(i);
        }

        public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
        {
            return rdr.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
        }

        public char GetChar(int i)
        {
            return rdr.GetChar(i);
        }

        public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
        {
            return rdr.GetChars(i, fieldoffset, buffer, bufferoffset, length);
        }

        public IDataReader GetData(int i)
        {
            return rdr.GetData(i);
        }

        public string GetDataTypeName(int i)
        {
            return rdr.GetDataTypeName(i);
        }

        public DateTime GetDateTime(int i)
        {
            return rdr.GetDateTime(i);
        }

        public decimal GetDecimal(int i)
        {
            return rdr.GetDecimal(i);
        }

        public double GetDouble(int i)
        {
            return rdr.GetDouble(i);
        }

        public Type GetFieldType(int i)
        {
            return rdr.GetFieldType(i);
        }

        public float GetFloat(int i)
        {
            return rdr.GetFloat(i);
        }

        public Guid GetGuid(int i)
        {
            return rdr.GetGuid(i);
        }

        public short GetInt16(int i)
        {
            return rdr.GetInt16(i);
        }

        public int GetInt32(int i)
        {
            return rdr.GetInt32(i);
        }

        public long GetInt64(int i)
        {
            return rdr.GetInt64(i);
        }

        public string GetName(int i)
        {
            return rdr.GetName(i);
        }

        public int GetOrdinal(string name)
        {
            return rdr.GetOrdinal(name);
        }

        public DataTable GetSchemaTable()
        {
            return rdr.GetSchemaTable();
        }

        public string GetString(int i)
        {
            return rdr.GetString(i);
        }

        public object GetValue(int i)
        {
            return rdr.GetValue(i);
        }

        public int GetValues(object[] values)
        {
            return rdr.GetValues(values);
        }

        public bool IsDBNull(int i)
        {
            return rdr.IsDBNull(i);
        }

        public bool NextResult()
        {
            if (!atEnd)
            {
                batchesRead += 1;
                rowsRead = 0;
                return true;
            }

            if (IsClosed)
                return false;

            return rdr.NextResult();
        }

        public bool Read()
        {
            if (rowsRead >= batchSize)
                return false;
            rowsRead += 1;

            atEnd = !rdr.Read();
            return !atEnd;

        }

        public static IEnumerable<DataTable> Read(SqlDataReader r, int batchSize)
        {
            var rdr = new BatchingDataReader(r, batchSize);
            do
            {
                var dt = new DataTable();
                dt.TableName = "table";
                dt.Load(rdr);
                yield return dt;
            } while (rdr.NextResult());
        }
    }
    class Program
    {

        static void Main(string[] args)
        {
            var constr = "server=localhost;database=master;integrated security=true";
            var outfile = "c:\\temp\\out.bin";

            if (File.Exists(outfile))
                File.Delete(outfile);

            using (var con = new SqlConnection(constr))
            {
                //322,162,200  TDS raw
                //235,355,311  binary uncompressed out.bin
                // 52,755,181  binary GZ Fastest
                // 43,061,121  binary GZ optimal
                // 65,282,624  XML GZ fastest
                // 41,892,056  binary GZ optimal 100,000 row batches

                con.Open();

                var bin = new BinaryFormatter();

                var cmd = new SqlCommand("select top (1000000) * from sys.messages m, sys.objects o", con);
                using (SqlDataReader rdr = cmd.ExecuteReader())
                using (var destFile = File.OpenWrite(outfile))
                using (var zipStream = new System.IO.Compression.GZipStream(destFile,System.IO.Compression.CompressionLevel.Optimal))
                {
                    foreach (var dt in BatchingDataReader.Read(rdr, 10000))
                    {
                        Console.WriteLine(dt.Rows.Count);

                        dt.RemotingFormat = SerializationFormat.Binary;
                        bin.Serialize(zipStream, dt);
                    }
                }
            }
        }
    }

}
David Browne - Microsoft
  • 80,331
  • 6
  • 39
  • 67
0

You can use this technique to get SQL Server to format the results as gzipped csv (adjust lines-per-result in the group by - 1000 is about where the gzip overhead diminishes):

with csv as (
    select n = row_number() over (order by (select null)),
        line = convert(nvarchar(max), concat(
            message_id, ',', language_id, ',', severity, ',',
            is_event_logged, ',', '"' + replace([text], '"', '""') + '"'))
    from sys.messages)

select compress(string_agg(line, char(13)) within group (order by n))
from csv group by n / 1000

..this should help if you're facing an actual egress bottleneck at your SQL Server. It would be interesting to implement this as a TDSBridge that rewrites the query and then transforms the results back to what the client was expecting.

gordy
  • 9,360
  • 1
  • 31
  • 43