I am trying to add my task to a custom concurrent queue but it keep starting. How can I just add the task object to the queue without starting it so I can start it later in the code? Basically what it's supposed to do for each piece is get the request stream, stream it to a file, concurrently start the next piece.
My custom concurrent queue:
public sealed class EventfulConcurrentQueue<T> : ConcurrentQueue<T>
{
public ConcurrentQueue<T> Queue;
public EventfulConcurrentQueue()
{
Queue = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
Queue.Enqueue(item);
OnItemEnqueued();
}
public int Count => Queue.Count;
public bool TryDequeue(out T result)
{
var success = Queue.TryDequeue(out result);
if (success)
{
OnItemDequeued(result);
}
return success;
}
public event EventHandler ItemEnqueued;
public event EventHandler<ItemDequeuedEventArgs<T>> ItemDequeued;
void OnItemEnqueued()
{
ItemEnqueued?.Invoke(this, EventArgs.Empty);
}
void OnItemDequeued(T item)
{
ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs<T> { Item = item });
}
}
public sealed class ItemDequeuedEventArgs<T> : EventArgs
{
public T Item { get; set; }
}
The code I'm using to add the task to the Queue:
Parallel.ForEach(pieces, piece =>
{
//Open a http request with the range
var request = new HttpRequestMessage { RequestUri = new Uri(url) };
request.Headers.Range = new RangeHeaderValue(piece.start, piece.end);
//Send the request
var downloadTask = client.SendAsync(request).Result;
//Use interlocked to increment Tasks done by one
Interlocked.Add(ref OctaneEngine.TasksDone, 1);
//Add the task to the queue along with the start and end value
asyncTasks.Enqueue(new Tuple<Task, FileChunk>(
downloadTask.Content.ReadAsStreamAsync().ContinueWith(
task =>
{
using (var fs = new FileStream(piece._tempfilename,
FileMode.OpenOrCreate, FileAccess.Write))
{
task.Result.CopyTo(fs);
}
}), piece));
});
The code I am using to later start the tasks:
Parallel.ForEach(asyncTasks.Queue, async (task, state) =>
{
if (asyncTasks.Count > 0)
{
await Task.Run(() => task);
asyncTasks.TryDequeue(out task);
Interlocked.Add(ref TasksDone, 1);
}
});
I'm not sure what is going on so any help would be greatly appreciated! Thank you!