0

I have a flow of streams whose goal is to calculate a simple "checksum" of the contents in a set of .zip files.

To do it, I have set an observable that:

  1. takes all files in a given folder
  2. reads the contents of each file (reading as a ZipArchive)
  3. for each entry in each file, performs the calculation of the checksum

To illustrate it, I have created this example:

NOTICE the usage of AsyncContext.Run(https://stackoverflow.com/a/9212343/1025407) to make the Main method await GetChecksum since it's a Console Application

namespace DisposePoC
{
    using System.Collections.Generic;
    using System.IO;
    using System.IO.Compression;
    using System.Reactive.Linq;
    using Nito.AsyncEx;
    using System.Linq;
    using System.Threading.Tasks;


    class Program
    {
        private static void Main()
        {
            AsyncContext.Run(GetChecksums);
        }

        private static async Task<IList<byte>> GetChecksums()
        {
            var bytes = Directory.EnumerateFiles("FolderWithZips")
                .ToObservable()
                .SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
                .SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));

            return await bytes.ToList();
        }

        private static ZipArchive CreateZipArchive(string path)
        {
            return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
        }

        private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
        {
            var bytes = await GetBytesFromStream(stream, entryLength);
            return bytes.Aggregate((b1, b2) => (byte) (b1 ^ b2));
        }

        private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
        {
            byte[] bytes = new byte[entryLength];
            await stream.ReadAsync(bytes, 0, (int)entryLength);
            return bytes;            
        }
    }
}

Running the application, I get all kinds of errors:

'System.IO.InvalidDataException': A local file header is corrupt. 'System.NotSupportedException': Stream does not support reading. 'System.ObjectDisposedException' : Cannot access a disposed object. 'System.IO.InvalidDataException' : Block length does not match with its complement.

What am I doing wrong?

Is there a problem with the observable itself or is it because ZipArchive isn't thread-safe? If it isn't, how do I make the code work?

Community
  • 1
  • 1
SuperJMN
  • 13,110
  • 16
  • 86
  • 185
  • 1
    I'll make this a comment since I can't verify in-code at the moment, but I would suspect the problem is that the ZipArchives being created in the first SelectMany are being disposed of by the Using statement before you can read the entry streams in the next line - essentially, the disposable scoping is wrong. I would move the logic from the second SelectMany into the first. I would also verify that your test data is not corrupted, as indicated by the first exception. – Andrew Nov 26 '16 at 02:39
  • I think I see your point. But, if the scope is wrong, then how do I have to modify the code to avoid the disposal of each ZipArchive until all the Entries are processed? Is it even possible? – SuperJMN Nov 26 '16 at 11:05

2 Answers2

2

Rx is probably not the best fit for this. Honestly, you can even do it without async.

Directory.EnumerateFiles("FolderWithZips")
         .AsParallel()
         .Select(folder => CalculateChecksum(folder))
         .ToList()
Asti
  • 12,447
  • 29
  • 38
  • Well, the CalculateChecksum is just an example to simplify the question. In my real life problem it's an async method that I cannot modify (3rd party). How does it modify your approach? (being async) – SuperJMN Nov 26 '16 at 11:01
1

There appears to be nothing "Rx" about your problem.

If you mod the whole thing to an imperative set of loops it works fine

private static async Task<IList<byte>> GetChecksums()
{
    var bytes = new List<byte>();
    foreach (var path in Directory.EnumerateFiles("FolderWithZips"))
    {
        using (var archive = CreateZipArchive(path))
        {
            foreach (var entry in archive.Entries)
            {
                using (var stream = entry.Open())
                {
                    var checksum = await CalculateChecksum(stream, entry.Length);
                    bytes.Add(checksum);
                }
            }
        }
    }

    return bytes;
}

So I would imagine you have a set of race conditions (concurrency) and/or out of order disposal issues.

Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • I thought that Observable.Using would handle the disposal of the streams in the correct order, so I won't get ObjectDisposedExceptions. Am I using it wrong or it's a problem intrinsically related to the nature of the problem? (reading from ZipArchive concurrently) – SuperJMN Nov 26 '16 at 10:58
  • 1
    Observable.Using will dispose of the resource created by the provided factory when the sequence terminates (dispose/error/complete). But it is all academic, because you are forcing Rx into a problem that in not Reactive. That is the main issue i see peripheral issues are the unnecessary introduction of threading by not providing an IScheduler to the 2 (unnecessary) ToObservable() calls – Lee Campbell Nov 26 '16 at 11:54