0

I am trying to add my task to a custom concurrent queue but it keep starting. How can I just add the task object to the queue without starting it so I can start it later in the code? Basically what it's supposed to do for each piece is get the request stream, stream it to a file, concurrently start the next piece.

My custom concurrent queue:

public sealed class EventfulConcurrentQueue<T> : ConcurrentQueue<T>
{
    public ConcurrentQueue<T> Queue;

    public EventfulConcurrentQueue()
    {
        Queue = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        Queue.Enqueue(item);
        OnItemEnqueued();
    }

    public int Count => Queue.Count;


    public bool TryDequeue(out T result)
    {
        var success = Queue.TryDequeue(out result);

        if (success)
        {
            OnItemDequeued(result);
        }
        return success;
    }

    public event EventHandler ItemEnqueued;
    public event EventHandler<ItemDequeuedEventArgs<T>> ItemDequeued;

    void OnItemEnqueued()
    {
        ItemEnqueued?.Invoke(this, EventArgs.Empty);
    }

    void OnItemDequeued(T item)
    {
        ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs<T> { Item = item });
    }
}

public sealed class ItemDequeuedEventArgs<T> : EventArgs
{
    public T Item { get; set; }
}

The code I'm using to add the task to the Queue:

Parallel.ForEach(pieces, piece =>
{
    //Open a http request with the range
    var request = new HttpRequestMessage { RequestUri = new Uri(url) };
    request.Headers.Range = new RangeHeaderValue(piece.start, piece.end);

    //Send the request
    var downloadTask = client.SendAsync(request).Result;

    //Use interlocked to increment Tasks done by one
    Interlocked.Add(ref OctaneEngine.TasksDone, 1);

    //Add the task to the queue along with the start and end value
    asyncTasks.Enqueue(new Tuple<Task, FileChunk>(
        downloadTask.Content.ReadAsStreamAsync().ContinueWith(
        task =>
        {
            using (var fs = new FileStream(piece._tempfilename,
                FileMode.OpenOrCreate, FileAccess.Write))
            {
                task.Result.CopyTo(fs);
            }
        }), piece));
});

The code I am using to later start the tasks:

Parallel.ForEach(asyncTasks.Queue, async (task, state) =>
{
    if (asyncTasks.Count > 0)
    {
        await Task.Run(() => task);
        asyncTasks.TryDequeue(out task);
        Interlocked.Add(ref TasksDone, 1);
    }
});

I'm not sure what is going on so any help would be greatly appreciated! Thank you!

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0xSingularity
  • 577
  • 6
  • 36
  • 1
    There is a major flaw with your code that makes any attempt to answer the question futile IMHO. The `Parallel.ForEach` [is not async-friendly](https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda). The lambda passed is [async void](https://learn.microsoft.com/en-us/archive/msdn-magazine/2013/march/async-await-best-practices-in-asynchronous-programming#avoid-async-void). Not only all tasks are going to be started instantly, but also the `Parallel.ForEach` will complete instantly as well, before the tasks are completed. – Theodor Zoulias Jul 09 '20 at 06:29
  • 1
    What is the problem you are actually trying to solve? Download data on multiple threads, but write the data to disk on a single thread? Both network and disk are IO system, so there is usually not much to gain by multithreading as long as you use "real" async methods to do the IO. – JonasH Jul 09 '20 at 07:25
  • @JonasH I'm basically trying to concurrently stream different pieces of a file to temporary files. When they are done i am trying to join them into the main file. This is [my project](https://github.com/gregyjames/OctaneDownloader) in case you need to see what I'm trying to do – 0xSingularity Jul 10 '20 at 00:27
  • 1
    @GregJ Have you tested this concept and observed any significant performance increase? This sounds like a de-optimization to me. The slowest part of the system is probably the network, the next slowest is usually the disk. This method would triple the amount of disk access, and that does not usually help performance. – JonasH Jul 10 '20 at 06:39

1 Answers1

1

It seems to me that you're over-complicating things here.

I'm not sure you need any queue at all.

This approach allows the network access work to occur concurrently:

using var semaphore = new SemaphoreSlim(1);
var tasks = pieces.Select(piece =>
{
    var request = new HttpRequestMessage { RequestUri = new Uri(url) };
    request.Headers.Range = new RangeHeaderValue(piece.start, piece.end);

    //Send the request
    var download = await client.SendAsync(request);

    //Use interlocked to increment Tasks done by one
    Interlocked.Increment(ref OctaneEngine.TasksDone);

    var stream = await download.Content.ReadAsStreamAsync();
    
    using (var fs = new FileStream(piece._tempfilename, FileMode.OpenOrCreate,
        FileAccess.Write))
    {
        await semaphore.WaitAsync(); // Only allow one concurrent file write
        await stream.CopyToAsync(fs);
    }

    semaphore.Release();

    Interlocked.Increment(ref TasksDone);
});

await Task.WhenAll(tasks);

Because your work involves I/O, there is little benefit trying to use multiple threads via Parallel.ForEach; awaiting the async methods will release threads to process other requests.

Johnathan Barclay
  • 18,599
  • 1
  • 22
  • 35
  • Is this efficient though? It seems to me that having 10 (or 100) tasks trying to write to the filesystem concurrently will be slower than executing the same tasks sequentially. – Theodor Zoulias Jul 09 '20 at 08:49
  • I am using the using the the enqueue and dequeue events for a kind of callback for progress on the status on the events. [My project](https://github.com/gregyjames/OctaneDownloader) is here in case you need to see how I'm doing it. I don't really know how else to track the progress on the events. – 0xSingularity Jul 10 '20 at 00:24
  • 1
    @TheodorZoulias Yes, depending on the storage it may degrade performance. You could use a semaphore to allow the HTTP requests to occur concurrently, but serialise the file writes. I've updated my answer. – Johnathan Barclay Jul 10 '20 at 07:55