As I commented above, the fix is to call Flush
on the CsvWriter
prior to copying the MemoryStream
. The issue is that there is a pending data still sitting in the CsvWriter
internal buffers that doesn't get copied to the MemoryStream
until you Flush
it. That should get things working for you.
However, I have deeper feedback for your scenario. It appears that you are reading the entire 700K file into a List<CsvRow>
before you process the batches. A better way, would be to stream the CSV data from Azure and as you are a reading it send the smaller batches back to Azure.
In this example I'm going to use my own library (Sylvan.Data.Csv), but I'm sure CsvHelper provides similar capabilities.
using Sylvan.Data.Csv;
...
string name = "MyMassiveCsv";
TextReader reader = File.OpenText(name + ".csv");
// replace above with however you access your Azure blob streams.
CsvDataReader csv = await CsvDataReader.CreateAsync(reader);
RangeDataReader r;
int i = 0;
do
{
r = new RangeDataReader(csv, 30000);
i++;
using var writer = File.CreateText(name + i + ".csv");
// using var writer = new StreamWriter(CreateAzureBlob("batch" + i));
using var w = CsvDataWriter.Create(writer);
await w.WriteAsync(r);
} while (!r.AtEndOfData);
This way you will only need to hold a small amount of the CSV file in memory at one time, and you will start sending back batches immediately instead of having to download the entire CSV first.
RangeDataReader
is a DbDataReader
implementation that wraps a DbDataReader
and limits the number of rows that it reads from the underlying reader. The implementation is the following:
using System;
using System.Collections;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
class RangeDataReader : DbDataReader
{
readonly DbDataReader reader;
int row = -1;
int count;
public RangeDataReader(DbDataReader dataReader, int count)
{
this.reader = dataReader;
this.count = count;
}
public bool AtEndOfData { get; private set; }
public override async Task<bool> ReadAsync(CancellationToken cancellationToken)
{
if (row < count)
{
row++;
var r = await reader.ReadAsync(cancellationToken);
if (!r)
{
AtEndOfData = r;
}
return r;
}
return false;
}
public override bool Read()
{
if (row < count)
{
row++;
var r = reader.Read();
if (!r)
{
AtEndOfData = r;
}
return r;
}
return false;
}
public override object this[int ordinal] => this.GetValue(ordinal);
public override object this[string name] => this.GetValue(GetOrdinal(name));
public override int Depth => 0;
public override int FieldCount => reader.FieldCount;
public override bool HasRows => reader.HasRows;
public override bool IsClosed => reader.IsClosed;
public override int RecordsAffected => reader.RecordsAffected;
public override bool GetBoolean(int ordinal)
{
return reader.GetBoolean(ordinal);
}
public override byte GetByte(int ordinal)
{
return reader.GetByte(ordinal);
}
public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
{
return reader.GetBytes(ordinal, dataOffset, buffer, bufferOffset, length);
}
public override char GetChar(int ordinal)
{
return reader.GetChar(ordinal);
}
public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
{
return reader.GetChars(ordinal, dataOffset, buffer, bufferOffset, length);
}
public override string GetDataTypeName(int ordinal)
{
return reader.GetDataTypeName(ordinal);
}
public override DateTime GetDateTime(int ordinal)
{
return reader.GetDateTime(ordinal);
}
public override decimal GetDecimal(int ordinal)
{
return reader.GetDecimal(ordinal);
}
public override double GetDouble(int ordinal)
{
return reader.GetDouble(ordinal);
}
public override IEnumerator GetEnumerator()
{
return new DbEnumerator(this);
}
public override Type GetFieldType(int ordinal)
{
return reader.GetFieldType(ordinal);
}
public override float GetFloat(int ordinal)
{
return reader.GetFloat(ordinal);
}
public override Guid GetGuid(int ordinal)
{
return reader.GetGuid(ordinal);
}
public override short GetInt16(int ordinal)
{
return reader.GetInt16(ordinal);
}
public override int GetInt32(int ordinal)
{
return reader.GetInt32(ordinal);
}
public override long GetInt64(int ordinal)
{
return reader.GetInt64(ordinal);
}
public override string GetName(int ordinal)
{
return reader.GetName(ordinal);
}
public override int GetOrdinal(string name)
{
return reader.GetOrdinal(name);
}
public override string GetString(int ordinal)
{
return reader.GetString(ordinal);
}
public override object GetValue(int ordinal)
{
return reader.GetValue(ordinal);
}
public override int GetValues(object[] values)
{
return reader.GetValues(values);
}
public override bool IsDBNull(int ordinal)
{
return reader.IsDBNull(ordinal);
}
public override bool NextResult()
{
throw new NotSupportedException();
}
}
Pretty much everything just delegates to the interna data reader. The only interesting bit is Read/ReadAsync
, which is where it limits the number of rows it will read. I haven't tested this code thoroughly, and looking at it now I might be off-by-one in the number of rows it will read.
Finally, now that I've illustrated how you can stream-process the CSV data, perhaps the need for splitting evaporates and you can simply stream process the file rather than needing to split it? Hard do know without knowing more about why you feel you need to split it.