I have a flow of streams whose goal is to calculate a simple "checksum" of the contents in a set of .zip files.
To do it, I have set an observable that:
- takes all files in a given folder
- reads the contents of each file (reading as a
ZipArchive
) - for each entry in each file, performs the calculation of the checksum
To illustrate it, I have created this example:
NOTICE the usage of AsyncContext.Run
(https://stackoverflow.com/a/9212343/1025407) to make the Main
method await GetChecksum
since it's a Console Application
namespace DisposePoC
{
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Linq;
using System.Threading.Tasks;
class Program
{
private static void Main()
{
AsyncContext.Run(GetChecksums);
}
private static async Task<IList<byte>> GetChecksums()
{
var bytes = Directory.EnumerateFiles("FolderWithZips")
.ToObservable()
.SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
.SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));
return await bytes.ToList();
}
private static ZipArchive CreateZipArchive(string path)
{
return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
}
private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
{
var bytes = await GetBytesFromStream(stream, entryLength);
return bytes.Aggregate((b1, b2) => (byte) (b1 ^ b2));
}
private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
{
byte[] bytes = new byte[entryLength];
await stream.ReadAsync(bytes, 0, (int)entryLength);
return bytes;
}
}
}
Running the application, I get all kinds of errors:
'System.IO.InvalidDataException': A local file header is corrupt. 'System.NotSupportedException': Stream does not support reading. 'System.ObjectDisposedException' : Cannot access a disposed object. 'System.IO.InvalidDataException' : Block length does not match with its complement.
What am I doing wrong?
Is there a problem with the observable itself or is it because ZipArchive
isn't thread-safe? If it isn't, how do I make the code work?