6

When I'm trying to write very large amount of data (list with 300 000 rows and more) to memory stream using CsvHelper, it throws the exception "System.IO.IOException: Stream was too long.".

Data class is rather big and has ~30 properties, consequently each record in the file would have ~30 columns.

This is the actual writing code where exception throws (by the way this code is based on that answer of CsvHelper lib's author):

using (var memoryStream = new MemoryStream())
{
    using (var streamWriter = new StreamWriter(memoryStream, encoding ?? Encoding.ASCII))
    {
        var csvWriter = new CsvWriter(streamWriter, GetConfiguration(delimiter, mappingClassType, mappingActions));
        csvWriter.WriteRecords(data); //data is IEnumerable<T> and has more than 300k records

        streamWriter.Flush();
        return memoryStream.ToArray();
    }
}

Then I save the resulted bytes array into the file.

File.WriteAllBytes(filePath, resultedBytesArray); 

Please note, that the same code works well when I write 100 000 records to the file (in that case the file has size about 1GB). By the way, my goal is to write more then 600 000 data records.

This is the relevant part of the stack trace related to this issue.

Stream was too long.|System.IO.IOException: Stream was too long.
at System.IO.MemoryStream.Write(Byte[] buffer, Int32 offset, Int32 count) 
at System.IO.StreamWriter.Flush(Boolean flushStream, Boolean flushEncoder) 
at System.IO.StreamWriter.Write(Char[] buffer, Int32 index, Int32 count) 
at CsvHelper.CsvWriter.NextRecord() in C:\Users\Josh\Projects\CsvHelper\src\CsvHelper\CsvWriter.cs:line 290 
at CsvHelper.CsvWriter.WriteRecords(IEnumerable records) in C:\Users\Josh\Projects\CsvHelper\src\CsvHelper\CsvWriter.cs:line 490 
at FileExport.Csv.CsvDocument.Create[T](IEnumerable`1 data, String delimiter, Encoding encoding, Type mappingClassType, IDictionary`2 mappingActions) in d:\Dev\DrugDevExport\FileExport\Csv\CsvDocument.cs:line 33 

As far as I'm concerned the basic way to achieve my goal and avoid that issue is to split my list of written data up on few parts and concatenate them together then, but may be is there any pretty obvious and easy solution without a significant code refactoring (like increase the default stream/buffer size, etc..)?

Also keep in mind, that I've also applied two possible solutions in order to prevent "Out Of Memory" objects exception.

Thanks in advance.

Community
  • 1
  • 1
Artyom Pranovich
  • 6,814
  • 8
  • 41
  • 60
  • 4
    Why are you writing to a MemoryStream? Did you need to have the stream entirely in memory? You talk about files, but use a MemoryStream... Replace it with a FileStream and see what happens... – spender Oct 03 '16 at 15:04
  • Have you tried reading a limited amount of data and writing it to the stream in a loop? i.e. not all at once. You could perhaps try a similar method of chunking to this post http://stackoverflow.com/questions/2819081/memorystream-and-large-object-heap – Paul Zahra Oct 03 '16 at 15:17
  • @PaulZahra, I mentioned about that in my question, that this way (by splitting up the whole bunch of data) it very likely would work, and it works now with 100k data records, but is there any other solution without splitting? – Artyom Pranovich Oct 03 '16 at 15:21
  • @ArtyomPranovich I think it is more logical / safe / future proof to chunk, else you rely too much on the machine.... you could try and define your own buffer (giving it a size) but you will most likely run into issues where the memory must be contiguous) p.s. as per 'that' post: streamWriter will be automatically flushed when you leave it's using statement, which is fine because you return inside the using (so remove your flush) – Paul Zahra Oct 03 '16 at 15:35
  • You might want to have a read of this... sounds the business... http://www.codeproject.com/Articles/685310/Simple-and-fast-CSV-library-in-Csharp – Paul Zahra Oct 03 '16 at 15:58
  • 1
    I agree with @spender - you seem to be jumping through a whole lot of hoops for no reason. You write your list to one stream, then read the entire stream into an array, then write the array into a second stream. Just write it straight into the second stream to begin with. As it is, you're creating three different representations of the same data in memory (the list, the underlying storage of the MemoryStream, and the byte[] which is not just a reference to MemoryStream's buffer). I think the pretty obvious solution is not to store big data thrice in memory. – PMV Oct 04 '16 at 02:24

2 Answers2

15

You can address this limitation of 2GB by writing your own MemoryStream :

    class HugeMemoryStream : Stream
    {
        #region Fields

        private const int PAGE_SIZE = 1024000;
        private const int ALLOC_STEP = 1024;

        private byte[][] _streamBuffers;

        private int _pageCount = 0;
        private long _allocatedBytes = 0;

        private long _position = 0;
        private long _length = 0;

        #endregion Fields

        #region Internals

        private int GetPageCount(long length)
        {
            int pageCount = (int)(length / PAGE_SIZE) + 1;

            if ((length % PAGE_SIZE) == 0)
                pageCount--;

            return pageCount;
        }

        private void ExtendPages()
        {
            if (_streamBuffers == null)
            {
                _streamBuffers = new byte[ALLOC_STEP][];
            }
            else
            {
                byte[][] streamBuffers = new byte[_streamBuffers.Length + ALLOC_STEP][];

                Array.Copy(_streamBuffers, streamBuffers, _streamBuffers.Length);

                _streamBuffers = streamBuffers;
            }

            _pageCount = _streamBuffers.Length;
        }

        private void AllocSpaceIfNeeded(long value)
        {
            if (value < 0)
                throw new InvalidOperationException("AllocSpaceIfNeeded < 0");

            if (value == 0)
                return;

            int currentPageCount = GetPageCount(_allocatedBytes);
            int neededPageCount = GetPageCount(value);

            while (currentPageCount < neededPageCount)
            {
                if (currentPageCount == _pageCount)
                    ExtendPages();

                _streamBuffers[currentPageCount++] = new byte[PAGE_SIZE];
            }

            _allocatedBytes = (long)currentPageCount * PAGE_SIZE;

            value = Math.Max(value, _length);

            if (_position > (_length = value))
                _position = _length;
        }

        #endregion Internals

        #region Stream

        public override bool CanRead => true;

        public override bool CanSeek => true;

        public override bool CanWrite => true;

        public override long Length => _length;

        public override long Position
        {
            get { return _position; }
            set
            {
                if (value > _length)
                    throw new InvalidOperationException("Position > Length");
                else if (value < 0)
                    throw new InvalidOperationException("Position < 0");
                else
                    _position = value;
            }
        }

        public override void Flush() { }

        public override int Read(byte[] buffer, int offset, int count)
        {
            int currentPage = (int)(_position / PAGE_SIZE);
            int currentOffset = (int)(_position % PAGE_SIZE);
            int currentLength = PAGE_SIZE - currentOffset;

            long startPosition = _position;

            if (startPosition + count > _length)
                count = (int)(_length - startPosition);

            while (count != 0 && _position < _length)
            {
                if (currentLength > count)
                    currentLength = count;

                Array.Copy(_streamBuffers[currentPage++], currentOffset, buffer, offset, currentLength);

                offset += currentLength;
                _position += currentLength;
                count -= currentLength;

                currentOffset = 0;
                currentLength = PAGE_SIZE;
            }

            return (int)(_position - startPosition);
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            switch (origin)
            {
                case SeekOrigin.Begin:
                    break;

                case SeekOrigin.Current:
                    offset += _position;
                    break;

                case SeekOrigin.End:
                    offset = _length - offset;
                    break;

                default:
                    throw new ArgumentOutOfRangeException("origin");
            }

            return Position = offset;
        }

        public override void SetLength(long value)
        {
            if (value < 0)
                throw new InvalidOperationException("SetLength < 0");

            if (value == 0)
            {
                _streamBuffers = null;
                _allocatedBytes = _position = _length = 0;
                _pageCount = 0;
                return;
            }

            int currentPageCount = GetPageCount(_allocatedBytes);
            int neededPageCount = GetPageCount(value);

            // Removes unused buffers if decreasing stream length
            while (currentPageCount > neededPageCount)
                _streamBuffers[--currentPageCount] = null;

            AllocSpaceIfNeeded(value);

            if (_position > (_length = value))
                _position = _length;
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            int currentPage = (int)(_position / PAGE_SIZE);
            int currentOffset = (int)(_position % PAGE_SIZE);
            int currentLength = PAGE_SIZE - currentOffset;

            long startPosition = _position;

            AllocSpaceIfNeeded(_position + count);

            while (count != 0)
            {
                if (currentLength > count)
                    currentLength = count;

                Array.Copy(buffer, offset, _streamBuffers[currentPage++], currentOffset, currentLength);

                offset += currentLength;
                _position += currentLength;
                count -= currentLength;

                currentOffset = 0;
                currentLength = PAGE_SIZE;
            }
        }

        #endregion Stream
    }
using ICSharpCode.SharpZipLib.GZip;
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

            // HugeMemoryStrem Test

            string filename = @"gzip-filename.gz";

            HugeMemoryStream ms = new HugeMemoryStream();

            using (StreamWriter sw = new StreamWriter(ms, Encoding.UTF8, 16384, true))
            using (FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read))
            using (GZipInputStream gzipStream = new GZipInputStream(fs))
            using (StreamReader sr = new StreamReader(gzipStream, Encoding.UTF8, false, 16384, true))
            {
                for (string line = sr.ReadLine(); line != null; line = sr.ReadLine())
                    sw.WriteLine(line);
            }

            ms.Seek(0, SeekOrigin.Begin);

            using (StreamReader srm = new StreamReader(ms, Encoding.UTF8, false, 16384, true))
            using (FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read))
            using (GZipInputStream gzipStream = new GZipInputStream(fs))
            using (StreamReader sr = new StreamReader(gzipStream, Encoding.UTF8, false, 16384, true))
            {
                for (string line1 = sr.ReadLine(), line2 = srm.ReadLine(); line1 != null; line1 = sr.ReadLine(), line2 = srm.ReadLine())
                {
                    if (line1 != line2)
                        throw new InvalidDataException();
                }
            }
Luc Vaillant
  • 165
  • 1
  • 3
  • there is a bug in the `Seek` method in the `SeekOrigin.End` case: the offset should be added to the _length (not substracted) as it's expected to be negative when passed from the End. So that Position does not become greater than Length! – sritmak Jan 09 '23 at 17:13
13

Many thanks Spender, like he mentioned in the comment below the question, it has been fixed by replacing MemoryStream with FileStream and writing data direct into the file.

It was absolutely useless in my case to write data to MemoryStream and then copy it again into the file without any reason. Thanks him again for opening my eyes on that fact.

My fixed code below.

using (var fileStream = File.Create(path))
{
    using (var streamWriter = new StreamWriter(fileStream, encoding ?? Encoding.ASCII))
    {
        var csvWriter = new CsvWriter(streamWriter, GetConfiguration(delimiter, mappingClassType, mappingActions));
        csvWriter.WriteRecords(data);
    }
}

Now it works with any amount of input data.

Community
  • 1
  • 1
Artyom Pranovich
  • 6,814
  • 8
  • 41
  • 60