4

The Problem

I need to be able to save and read a very big data structure using C#. The structure itself is rather simple; it's a very long array of a simple structs of a constant size.

Just an example for clarity:

struct st {
UInt32 a;
UInt16 b;
//etc.
} completion ports

st[] data = new st[1024*1024*100]

I want to be able to save and load these to files as fast and efficient as possible.

General Direction

My idea so far, is to cut the data into segments, conceptually of course, assign those segments to tasks and just write them into the file asynchronously. FileStream.WriteAsync appears to be perfect for this.

My problem is with the reading, From the FileStream.ReadAsync API it seems completely reasonable that the results can be cut in the middle of each structure, halfway across a primitive in fact. Of course I can work around this, but I'm not sure what would be the best way, and how much will I interfere with the OS's buffering mechanism.

Eventually I plan to create a MemoryStream from each buffer with MemoryStream.MemoryStream(byte[]) and read each into the struct's with a binary reader.

The Question

So what would be the best way to solve this? Is my direction good? Are there any better solutions? Code examples and links would be appreciated...

Conclusions

After doing performance testing I found that reading a file with BinaryReader, or using multiple readers with FileStream.ReadAsync, gives approximately the same performance.

Soo.... the question is pointless.

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
AK_
  • 7,981
  • 7
  • 46
  • 78
  • Well explained question. Maybe from here you can get some ideas: http://stackoverflow.com/questions/2161895/reading-large-text-files-with-streams-in-c-sharp – Javiere Apr 26 '13 at 09:35

3 Answers3

3

Your biggest bottleneck is going to be IO, which has to be performed with exclusive access to the file. The actual byte-crunching for this will be fast - you are going to do just as well writing it directly to the file (noting that the FileStream itself has a buffer, or you can add an extra layer with BufferedStream) than you would by serializing different parts in-memory and then copying each in-memory part to the stream separately.

My advice: just write the data in a single thread. Frankly I'm not sure I'd even bother with async (hint: async code adds overhead), especially if the buffer is keeping up. I also wouldn't use BiaryWriter / BinaryReader - I'd just write it raw. One tricky you could do is to use some unsafe code to copy the data in blocks, to avoid having to even look at individual objects, but that is at the harder end of things... I'll try to do an example.

Here's an example of read/write, noting performance first:

Write: 2012ms
Read: 1089ms
File: 838,860,804 bytes

Code:

[DllImport("msvcrt.dll", EntryPoint = "memcpy", CallingConvention = CallingConvention.Cdecl, SetLastError = false)]
public static extern IntPtr memcpy(IntPtr dest, IntPtr src, UIntPtr count);

unsafe static st[] Read(string path)
{
    using (var file = File.OpenRead(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (byte* bufferPtr = buffer)
        {
            Fill(file, buffer, 0, 4);
            int len = ((int*)bufferPtr)[0];

            st[] result = new st[len];
            fixed (st* dataPtr = result)
            {
                st* rawPtr = dataPtr;
                IntPtr source= new IntPtr(bufferPtr);
                while (len >= BLOCK_SIZE)
                {
                    Fill(file, buffer, 0, buffer.Length);
                    memcpy(new IntPtr(rawPtr), source, bufferLen);
                    len -= BLOCK_SIZE;
                    rawPtr += BLOCK_SIZE;
                }
                if (len > 0)
                {
                    Fill(file, buffer, 0, len * size);
                    memcpy(new IntPtr(rawPtr), source, new UIntPtr((uint)(len * size)));
                }
            }
            return result;
        }
    }


}
static void Fill(Stream source, byte[] buffer, int offset, int count)
{
    int read;
    while (count > 0 && (read = source.Read(buffer, offset, count)) > 0)
    {
        offset += read;
        count -= read;
    }
    if (count > 0) throw new EndOfStreamException();
}

unsafe static void Write(st[] data, string path)
{
    using (var file = File.Create(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        int len = data.Length;
        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (st* dataPtr = data)
        fixed (byte* bufferPtr = buffer)
        {
            // write the number of elements
            ((int*)bufferPtr)[0] = data.Length;
            file.Write(buffer, 0, 4);

            st* rawPtr = dataPtr;
            IntPtr destination = new IntPtr(bufferPtr);
            // write complete blocks of BLOCK_SIZE
            while (len >= BLOCK_SIZE)
            {
                memcpy(destination, new IntPtr(rawPtr), bufferLen);
                len -= BLOCK_SIZE;
                rawPtr += BLOCK_SIZE;
                file.Write(buffer, 0, buffer.Length);
            }
            if (len > 0)
            {   // write an incomplete block, if necessary
                memcpy(destination, new IntPtr(rawPtr), new UIntPtr((uint)(len * size)));
                file.Write(buffer, 0, len * size);
            }
        }
    }
}
Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • I think writing in a single thread would be fine too, I'm more concerned about the reading... why would I need to lock anything? Isn't the FileStream.SomethingAsync use Completion ports? – AK_ Apr 26 '13 at 09:43
  • @AK_ a file can only be at one position; you need to know that you're reading/writing contiguous parts of the file; added full read/write implementation, btw – Marc Gravell Apr 26 '13 at 10:06
  • @AK_ oops, had some bugs... fixed now hopefully – Marc Gravell Apr 26 '13 at 10:21
  • @MarcGravell In fact I dont need to know I'm reading (or writing) contiguous parts of the file. I only need to know the position of the buffer I've read, and the according position in my data... – AK_ Apr 26 '13 at 10:23
  • @AK_ again, your missing my key point: no matter how you do this, your main overhead is IO. In the above, I am *literally* just doing IO and memcpy - the latter being really fast. There is nothing to be gained *whatsoever* from slicing/dicing this into concurrent tasks, because there is **no work to be done**. All you can achieve by adding concurrency is to add: seek times; the need for multiple buffers; the need for exclusive access (yes, you need this) while reading/writing... all overheads. – Marc Gravell Apr 26 '13 at 10:27
  • @AK_ when I say contiguous - I probably used the wrong word; I don't mean that each block needs to be read in sequence: I mean that *while processing each block*, it will need multiple reads - and during **each** of those reads (seprately) it needs to know that nobody is repositioning the file at the same time; so you would need `lock { SetPosition(); Read(); }` and `lock { SetPosition(); Write(); }` - otherwise all bets are off. – Marc Gravell Apr 26 '13 at 10:30
  • @MarcGravell marc, are you familiar with completion ports or overlapped IO? – AK_ Apr 26 '13 at 11:37
  • @AK_ yes, but that doesn't change anything here; even with overlapped IO, you'd still want to wait on the completion callback before doing the next operation. Actually, most of my async IO is on sockets, not files, but the principles are basically the same. In this particular scenario, moving to overlapped is not likely to help you much - you don't have anything useful to do after sending each fragment. All you are doing is adding more context switches. – Marc Gravell Apr 26 '13 at 11:55
  • @AK_ just to "validate myself", my async IO work includes things like a massively-concurrent async pipelined redis multiplexer (which drives all caching, inboxes, and a few other things on this very web-site), and the custom web-socket server that drives live updates on this very web-site (currently processing 135,388 concurrent connections over a very low number of nodes). The key difference is that *in these cases*, there is purpose in going async. In your example - nothing you've said suggests any such reason. – Marc Gravell Apr 26 '13 at 11:57
  • I just wanted to know your experience with this stuff... I meant no disrespect :-) – AK_ Apr 26 '13 at 12:01
  • @MarcGravell I've made a performance test, please notice the edits. – AK_ Apr 26 '13 at 13:06
  • @MarcGravell I've also tested your suggestion. I converted it in the simplest way possible to `bytes` and it takes around a second – AK_ Apr 26 '13 at 13:23
  • @MarcGravell Please notice the edit at the end. I'm accepting your answer because the your insisting on using a single thread approach. – AK_ Apr 26 '13 at 15:19
3

[EDIT] I have updated this post to include a complete compilable sample, and also to address the issues raised by @Daniel in his comments below. As a result, this code no longer uses any "dangerous" methods and has no Code Analysis warnings. [/EDIT]

There is a way you can speed things up a little if your structs contain ONLY blittable types.

You can use marshaling to read the data directly into an array without making additional copies, like so (complete compilable example):

using System;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using Microsoft.Win32.SafeHandles;

namespace ConsoleApplication1
{
    internal class Program
    {
        struct TestStruct // Mutable for brevity; real structs should be immutable.
        {
            public byte   ByteValue;
            public short  ShortValue;
            public int    IntValue;
            public long   LongValue;
            public float  FloatValue;
            public double DoubleValue;
        }

        static void Main()
        {
            var array = new TestStruct[10];

            for (byte i = 0; i < array.Length; ++i)
            {
                array[i].ByteValue   = i;
                array[i].ShortValue  = i;
                array[i].IntValue    = i;
                array[i].LongValue   = i;
                array[i].FloatValue  = i;
                array[i].DoubleValue = i;
            }

            Directory.CreateDirectory("C:\\TEST");

            using (var output = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Create))
                FastWrite(output, array, 0, array.Length);

            using (var input = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Open))
                array = FastRead<TestStruct>(input, array.Length);

            for (byte i = 0; i < array.Length; ++i)
            {
                Trace.Assert(array[i].ByteValue   == i);
                Trace.Assert(array[i].ShortValue  == i);
                Trace.Assert(array[i].IntValue    == i);
                Trace.Assert(array[i].LongValue   == i);
                Trace.Assert(array[i].FloatValue  == i);
                Trace.Assert(array[i].DoubleValue == i);
            }
        }

        /// <summary>
        /// Writes a part of an array to a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream to which to write.</param>
        /// <param name="array">The array containing the data to write.</param>
        /// <param name="offset">The offset of the start of the data in the array to write.</param>
        /// <param name="count">The number of array elements to write.</param>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static void FastWrite<T>(FileStream fs, T[] array, int offset, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));
            GCHandle gcHandle = GCHandle.Alloc(array, GCHandleType.Pinned);

            try
            {
                uint bytesWritten;
                uint bytesToWrite = (uint)(count * sizeOfT);

                if
                (
                    !WriteFile
                    (
                        fs.SafeFileHandle,
                        new IntPtr(gcHandle.AddrOfPinnedObject().ToInt64() + (offset*sizeOfT)),
                        bytesToWrite,
                        out bytesWritten,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to write file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesWritten == bytesToWrite);
            }

            finally
            {
                gcHandle.Free();
            }
        }

        /// <summary>
        /// Reads array data from a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream from which to read.</param>
        /// <param name="count">The number of elements to read.</param>
        /// <returns>
        /// The array of elements that was read. This may be less than the number that was
        /// requested if the end of the file was reached. It may even be empty.
        /// NOTE: There may still be data left in the file, even if not all the requested
        /// elements were returned - this happens if the number of bytes remaining in the
        /// file is less than the size of the array elements.
        /// </returns>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static T[] FastRead<T>(FileStream fs, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));

            long bytesRemaining  = fs.Length - fs.Position;
            long wantedBytes     = count * sizeOfT;
            long bytesAvailable  = Math.Min(bytesRemaining, wantedBytes);
            long availableValues = bytesAvailable / sizeOfT;
            long bytesToRead     = (availableValues * sizeOfT);

            if ((bytesRemaining < wantedBytes) && ((bytesRemaining - bytesToRead) > 0))
            {
                Debug.WriteLine("Requested data exceeds available data and partial data remains in the file.", "Dmr.Common.IO.Arrays.FastRead(fs,count)");
            }

            T[] result = new T[availableValues];

            if (availableValues == 0)
                return result;

            GCHandle gcHandle = GCHandle.Alloc(result, GCHandleType.Pinned);

            try
            {
                uint bytesRead;

                if
                (
                    !ReadFile
                    (
                        fs.SafeFileHandle,
                        gcHandle.AddrOfPinnedObject(),
                        (uint)bytesToRead,
                        out bytesRead,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to read file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesRead == bytesToRead);
            }

            finally
            {
                gcHandle.Free();
            }

            return result;
        }

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool WriteFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToWrite,
            out uint             lpNumberOfBytesWritten,
            IntPtr               lpOverlapped
        );

        /// <summary>See the Windows API documentation for details.</summary>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool ReadFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToRead,
            out uint             lpNumberOfBytesRead,
            IntPtr               lpOverlapped
        );
    }
}

Then you could create a BlockingCollection to store the incoming data and use one thread to populate it and a separate thread to consume it.

The thread that read data into the queue could look like this:

public void ReadIntoQueue<T>(FileStream fs, BlockingCollection<T[]> queue, int blockSize) where T: struct
{
    while (true)
    {
        var data = FastRead<T>(fs, blockSize);

        if (data.Length == 0)
        {
            queue.CompleteAdding();
            break;
        }

        queue.Add(data);
    }
}

And the consuming thread would remove stuff from the queue like so:

public void ProcessDataFromQueue<T>(BlockingCollection<T[]> queue) where T : struct
{
    foreach (var array in queue.GetConsumingEnumerable())
    {
        // Do something with 'array'
    }
}
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
  • wouldn't it be faster and simpler just to use unsafe code to directly populate or read the data? – AK_ Apr 26 '13 at 09:48
  • @AK_ If you are allowed to use unsafe code, then probably it would be, but this approach avoids unsafe code spreading around projects. As soon as one assembly uses it, you'd have to enable unsafe code on all the projects that directly or indirectly used it - which is unacceptable for some sites (such as ours). It's also probably not as easy to do as you might think! – Matthew Watson Apr 26 '13 at 09:59
  • 1
    @MatthewWatson agreed on the last point; it took me 3 goes to get my example (below) right, and I'm pretty familiar with both serialization at working with raw memory... – Marc Gravell Apr 26 '13 at 10:18
  • I would like to avoid unsafe code, and what you're suggesting might be safe according to .Net, but it's still kinda dangerous :-) – AK_ Apr 26 '13 at 10:21
  • 1
    @AK_ It's no more dangerous than accidentally specifying an array index that's out-of-bounds, or trying to read beyond the end of a file stream. It's impossible to corrupt memory or anything using the code I posted. The marshaller would throw an exception. – Matthew Watson Apr 26 '13 at 10:23
  • 2
    @AK_ as always, it is a trade-off. You question states "as fast and efficient as possible" - and I would respond that *for that* the `unsafe` / `memcpy` approach is probably (you'd need to measure) going to be the best you can do. However, if you *really* hate the idea of `unsafe`, then you can't do that, in which case using the marshaller is a good compromise. In either event, frankly I see no benefit whatsoever in using concurrent tasks for this, and here I'm speaking as someone who does a **lot** of IO, a **lot** of serialization, and a **lot** of concurrency. – Marc Gravell Apr 26 '13 at 10:35
  • Don't just suppress `CA2001:AvoidCallingProblematicMethods` - `DangerousGetHandle` is called dangerous for a reason; your code will crash if the GC finalizes the file stream during a `ReadFile` call. You need a `GC.KeepAlive` call, or better, use the `SafeHandle` properly without dangerous calls. – Daniel Apr 29 '13 at 20:14
  • @Daniel: That code was originally written for .Net 1.x and then patially updated - you're right; it needs fully updating. I'll fix it later. Our usage pattern is that we always hold on to a reference to the FileStream outside the method, so it will never get GCed while the method is executing, but if you did `new FileStream()` inside the method call I guess it could - not sure who would close or dispose it then though (antipattern). In proper use it could never get GCed. Nevertheless, I'll fix it anyway ;) – Matthew Watson Apr 30 '13 at 06:21
  • @AK_ My example no longer uses "DangerousGetHandle" so you might consider it somewhat safer now. – Matthew Watson Apr 30 '13 at 08:42
  • @MatthewWatson the name doesnt scare me :-) – AK_ Apr 30 '13 at 11:23
  • @MatthewWatson I gave you the bounty because I ended up using your example as a basis to implement this. – AK_ Apr 30 '13 at 11:25
  • @AK_ You might have noticed I've been doing some major investigation of all this; in particular, my most recent question seems to show that it is much faster than I thought... unless I'm made a cockup somewhere (entirely possibly ;) – Matthew Watson Apr 30 '13 at 11:34
  • @AK_ Final update! I fixed a problem I had inadvertently introduced this morning while fixing a Code Analysis warning. This will now work fine for multiple calls for the same FileStream. – Matthew Watson Apr 30 '13 at 14:29
1

As far as I know of, the fastest way to read or write a file is a single forward-only process. Otherwise the disk would have to move back and forth through the file in addition to the mandatory reading/writing.

Of course this does not mean you cannot handle the data in multiple concurrent threads.

If the segments are large enough, the overhead of the disk moving would probably not be noticable.

C.Evenhuis
  • 25,996
  • 2
  • 58
  • 72
  • "done right", the processing of each segment will just be a raw memcpy anyway ... not much to do, so no need to do it concurrently – Marc Gravell Apr 26 '13 at 10:17