0

Similar to How do I copy the contents of one stream to another?

But my understanding of sourceStream.CopyTo(destStream) means it'll read the entire sourceStream from start to end (chunks or whatever) in order to copy it, and then the consumer goes back through and reads the stream again (its copy), resulting in O(2n) rather than O(n), right? And if destStream is a temporary copy (i.e. MemoryStream), then I will also end up loading the entire source stream into memory for each copy.

Is there a way to do it so that it's only copied as destStream is consumed/read?

Specifically, in .NET C# I need to make a copy of an input stream and write it to multiple "destinations" (via various helper libraries, some of which dispose of the stream they're given). The input could be very large, and is usually actually a FileStream, so I'd rather not load the entire file into memory when I can rewind it and buffer it from the disk.

Example Scenario:

void WriteToMany(Stream sourceStream, IEnumerable<ICanPutStream> destinations) {
    foreach(var endpoint in destinations) {
        // <-- I need to make a copy of `stream` here because...
        endpoint.PutStream(sourceStream); // ...some endpoints automatically dispose the stream
    }
}

If I make a copy before PutStream is called, it's going to read through the source stream. I can live with that, but if I copy it to a MemoryStream it also loads it into memory for each endpoint (with the added weirdness of trying to dispose of something that may/not be disposed already). Ideally it would only be during the internal workings of PutStream that the original stream gets copied/read.

Community
  • 1
  • 1
drzaus
  • 24,171
  • 16
  • 142
  • 201
  • 1
    Just use the linked solution for .net 3.5 but with multiple destinations instead of just one – Julián Urbano Oct 13 '15 at 22:49
  • 1
    Note: `Stream.CopyTo` does not read entire source to memory. – Alexei Levenkov Oct 13 '15 at 22:56
  • 1
    As far as I know the underlying code for CopyTo performs a buffered copy and doesn't load the entire contents in memory unless your destination stream is an in memory stream, in which case, it won't matter how you do it. – JamieSee Oct 13 '15 at 23:01
  • 1
    Note: Your question title contradicts the last paragraph in the question body. (Do you want to copy into a single or into several target streams? That's not the same thing at all!) If you want to copy into a single stream on-demand (as your title suggests), then simple attach the final consumer to `sourceStream` instead of `destStream`. `destStream` would otherwise act as a completely superfluous data tunnel/channel. If you want to copy into many streams you could implement an `IObservable` adapter/bridge for `Stream`, to which you can attach as many consumers as you want. – stakx - no longer contributing Oct 13 '15 at 23:13
  • Clarified the question/scenario to address comments. As +JamieSee mentioned, `CopyTo` does read the whole source into memory when copied to a `MemoryStream`, which is the only 'temp stream' I can think of. Copying to multiple 'temp streams' means I'm still reading the entire source stream to make the copy, and it then each copy gets read again when written to its destination. Trying to avoid as much of both as possible. I'm interested in @stakx's solution, or at the very least [michael-petito's answer](http://stackoverflow.com/a/33114051/1037948). – drzaus Oct 14 '15 at 13:50
  • @stakx I see where the slight difference in title and final sentence may cause confusion (they don't contradict each other, though, since I mainly care about not doing the extra work of reading the whole thing to make a copy when it's going to be read anyway in order to write to destination). In my clarified scenario, how would I use an `IObservable`? – drzaus Oct 14 '15 at 14:29

2 Answers2

2

Unless you can seek back to the beginning of the stream, you must copy the entire stream to memory to have multiple consumers. Otherwise the stream data is only available to first consumer.

If you have a seekable stream (like FileStream) and you want to pass it to multiple consumers without having it disposed, you could implement a Stream proxy which delegates all members to the underlying stream except for Dispose. It would look something like:

class StreamProxy : Stream
{
    private readonly Stream _stream;

    public StreamProxy(Stream stream)
    {
        if (stream == null) throw new ArgumentNullException(nameof(stream));
        _stream = stream;
    }

    protected override void Dispose(bool disposing)
    {
        //don't dispose inner stream
    }

    public override void Flush()
    {
        _stream.Flush();
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        return _stream.Seek(offset, origin);
    }

    public override void SetLength(long value)
    {
        _stream.SetLength(value);
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        return _stream.Read(buffer, offset, count);
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        _stream.Write(buffer, offset, count);
    }

    public override bool CanRead
    {
        get { return _stream.CanRead; }
    }

    public override bool CanSeek
    {
        get { return _stream.CanSeek; }
    }

    public override bool CanWrite
    {
        get { return _stream.CanWrite; }
    }

    public override long Length
    {
        get { return _stream.Length; }
    }

    public override long Position
    {
        get { return _stream.Position; }
        set { _stream.Position = value; }
    }
}

This way, each consumer can dispose their "copy" of the stream (instance of StreamProxy), without disposing the underlying stream. Once the consumer is done, seek the underlying stream back to the beginning and pass the proxy to another consumer.

Regarding your question: Is there a way to do it so that it's only copied as destStream is consumed/read? You could augment the above StreamProxy class so that it keeps track of its own position within the inner stream. StreamProxy will then be responsible, for each read operation, to seek the inner stream to the appropriate position and read the next chunk. This way, each consumer receives their own instance of StreamProxy, and can be reading from the inner stream at independent positions.

I can't see any advantage to this approach over the initially proposed StreamProxy, unless your consumers are running in parallel. If they are, you'll also need a synchronization mechanism in StreamProxy so that reads are not overlapped, since the inner stream can only be at one position at a time. This effectively serializes the consumers (which is an inherent limitation of starting with a single Stream and not copying its contents to memory), and makes the approach overall less efficient (unless there is a huge disparity between the read performance of the inner stream and the write performance of the consumers).

Your new read method might look like:

public override int Read(byte[] buffer, int offset, int count)
{
    lock(_stream)
    {
        //position the inner stream to end of last read (another consumer may have moved it)
        _stream.Seek(Position, SeekOrigin.Begin);

        //read the bytes, up to count
        var count = _stream.Read(buffer, offset, count);

        //update the next read position
        Position += count;

        return count;
    }
}

public override long Position{get;set;}
Michael Petito
  • 12,891
  • 4
  • 40
  • 54
  • The first paragraph of your answer is somewhat inaccurate. Even with a non-seekable stream, you can have several consumers without the need for a compete in-memory copy IF you can live with one additional constraint. Namely that consumers can only forward-consume the stream starting from the position where they "attached" to it; similar to hot/cold observables in Rx. – stakx - no longer contributing Oct 13 '15 at 23:46
  • @stakx The OP states "I need to make a copy of an input stream and write it to multiple destinations", so presumably he means to have each consumer read a copy of the entire stream. – Michael Petito Oct 13 '15 at 23:55
  • @MichaelPetito you are correct, I need A to go to B, C, and D with the least "effort". If B, C, and D can forward-consume all of A at the same time, great. But I like this answer, because I can even make `StreamProxy` autorewind the underlying stream when disposed. – drzaus Oct 14 '15 at 13:53
  • @drzaus Yes, you could autorewind the stream, and also add checks to make sure the inner stream `CanSeek`. This is probably as good as you can get without copying the stream content to memory, but I've updated my answer to include an alternative approach for concurrent readers. – Michael Petito Oct 15 '15 at 16:23
0

Stream.CopyTo has a buffer internally (81920 bytes, if you don't set it on the overload method). It's implementation is very simple, so you could just alter it and use it like so:

void ConsumeStream(Stream source, Stream destination, int bufferSize)
{
    byte[] buffer = new byte[bufferSize];
    int count;
    while ((count = source.Read(buffer, 0, buffer.Length)) != 0)
    {
        destination.Write(buffer, 0, count);
        //Other stuff
    }
}
wingerse
  • 3,670
  • 1
  • 29
  • 61
  • Would be more appropriate for my purposes with `IEnumerable destinations`, but I'm actually trying to avoid reading the whole `source` until `destination` is consumed (i.e. read). – drzaus Oct 14 '15 at 13:59