As I understand from the comments, you're only looking to convert your logic into asynchronous code. Below is how this might be done without explicit separate threads (besides using Task.Run
for the whole process).
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
public class Worker
{
class Item
{
public string Data { get; set; }
}
const int SAVE_AFTER = 2;
string _fileName;
List<Item> _items;
int _savedItemsCount = 0;
CancellationToken _token;
Task _processTask;
Task _pendingSaveTask = null;
// get next item
async Task<Item> GetNextItemAsync()
{
await Task.Delay(500); // delay for testing
return new Item { Data = "Item from " + DateTime.Now.ToString() };
}
// write
async Task SaveItemsAsync(Item[] items)
{
if (_pendingSaveTask != null)
await _pendingSaveTask; // await the previous save
var text = items.Aggregate(String.Empty, (a, b) => a + b.Data + Environment.NewLine);
using (var writer = new System.IO.StreamWriter(_fileName, append: false))
{
await writer.WriteAsync(text);
}
}
// main process
async Task ProcessAsync()
{
while (true)
{
_token.ThrowIfCancellationRequested();
// start getting the next item
var getNextItemTask = GetNextItemAsync();
// save the snapshot if needed
if (_items.Count >= _savedItemsCount + SAVE_AFTER)
{
var snapshot = _items.ToArray();
_savedItemsCount = snapshot.Length;
_pendingSaveTask = SaveItemsAsync(snapshot);
}
// await the next item
var item = await getNextItemTask;
_items.Add(item);
}
}
// start
public void Start(CancellationToken token)
{
_token = token;
_fileName = System.IO.Path.GetTempFileName();
_items = new List<Item>();
_processTask = Task.Run(new Func<Task>(ProcessAsync), _token);
}
// stop
public void Stop()
{
if (_pendingSaveTask != null)
_pendingSaveTask.Wait();
try
{
_processTask.Wait(); // wait for the task to complete
}
catch (Exception ex)
{
// rethrow if anything but OperationCanceledException
if (!(ex is OperationCanceledException))
{
var aggEx = ex as AggregateException;
if (aggEx == null || !(aggEx.InnerException is OperationCanceledException))
throw;
}
}
}
}
class Program
{
public static void Main()
{
var cts = new CancellationTokenSource();
var worker = new Worker();
Console.WriteLine("Start process");
worker.Start(cts.Token);
Thread.Sleep(10000);
Console.WriteLine("Stop process");
cts.Cancel();
worker.Stop();
Console.WriteLine("Press Enter to exit...");
Console.ReadLine();
}
}
}
Note that if new items (GetNextItemAsync
) arrive faster than SaveItemsAsync
finishes saving the last snapshot, this implementation may end up with a growing chain of pending SaveItemsAsync
calls. If this is a problem, you could deal with it by limiting SaveItemsAsync
task to only one pending instance and using BlockingCollection
to queue new snapshots.
[UPDATE] Here is a slightly improved version which eliminate redundant writes if updates are coming faster than saving. It doesn't use BlockingCollection
but adds some extra cancellation logic to SaveItemsAsync
instead. It's a console app, feel free to try it out to see what's going on. Try calling _saveTask = SaveItemsAsync(snapshot)
a few times in a row.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
public class Worker
{
class Item
{
public string Data { get; set; }
}
const int SAVE_AFTER = 2;
string _fileName;
List<Item> _items;
int _savedItemsCount = 0;
CancellationToken _token;
Task _processTask;
Task _saveTask;
CancellationTokenSource _saveTaskCts;
// get next item
async Task<Item> GetNextItemAsync()
{
Console.WriteLine("Enter GetNextItemAsync, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
await Task.Delay(500); // delay for testing
return new Item { Data = "Item from " + DateTime.Now.ToString() };
}
// save items
async Task SaveItemsAsync(Item[] items)
{
// avoid multiple pending SaveItemsAsync tasks
Console.WriteLine("Enter SaveItemsAsync, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
var oldSaveTaskCts = _saveTaskCts;
var oldSaveTask = _saveTask;
var thisSaveTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_token);
_saveTaskCts = thisSaveTaskCts;
_saveTask = null;
// cancel the previous pending SaveItemsAsync, if any
if (oldSaveTaskCts != null)
{
oldSaveTaskCts.Cancel();
if (oldSaveTask != null)
await oldSaveTask.WaitObservingCancellationAsync();
}
// another SaveItemsAsync call should lead to cancelling this one
thisSaveTaskCts.Token.ThrowIfCancellationRequested();
// execute the save logic on a pool thread,
// Task.Run automatically unwraps the nested Task<Task>
await Task.Run(async () =>
{
// do the CPU-bound work: create textual representation of data
var text = items.Aggregate(String.Empty, (agg, item) => agg + item.Data + Environment.NewLine);
// write asynchronously
Console.WriteLine("Write, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
// StreamWriter doesn't support cancellation, so do it in two stages with MemoryStream
using (var memoryStream = new MemoryStream())
{
// write to a memory stream first
using (var writer = new StreamWriter(
memoryStream,
encoding: System.Text.Encoding.UTF8,
bufferSize: Environment.SystemPageSize,
leaveOpen: true))
{
await writer.WriteAsync(text);
}
thisSaveTaskCts.Token.ThrowIfCancellationRequested();
// copy the memory stream to the file
using (var fileStream = new FileStream(_fileName, FileMode.Create, FileAccess.Write))
{
// copy with possible cancellation
memoryStream.Seek(0, SeekOrigin.Begin);
await memoryStream.CopyToAsync(fileStream, Environment.SystemPageSize, thisSaveTaskCts.Token);
}
}
}, thisSaveTaskCts.Token);
}
// main process
async Task ProcessAsync()
{
while (true)
{
// handle cancellation
if (_token.IsCancellationRequested)
{
// await the pending save if any, before throwing
if (_saveTask != null)
await _saveTask.WaitObservingCancellationAsync();
_token.ThrowIfCancellationRequested();
}
// handle last save errors if any
if (_saveTask != null && _saveTask.IsFaulted)
await _saveTask.WaitObservingCancellationAsync();
// start getting the next item
var getNextItemTask = GetNextItemAsync();
// save the snapshot if needed
if (_items.Count >= _savedItemsCount + SAVE_AFTER)
{
var snapshot = _items.ToArray();
_savedItemsCount = snapshot.Length;
_saveTask = SaveItemsAsync(snapshot);
}
// await the next item
var item = await getNextItemTask;
_items.Add(item);
}
}
// start
public void Start(CancellationToken token)
{
_token = token;
_fileName = System.IO.Path.GetTempFileName();
_items = new List<Item>();
_processTask = Task.Run(new Func<Task>(ProcessAsync), _token);
}
// stop
public void Stop()
{
_processTask.WaitObservingCancellation();
}
}
// Main
class Program
{
public static void Main()
{
var cts = new CancellationTokenSource();
var worker = new Worker();
Console.WriteLine("Start process");
worker.Start(cts.Token);
Thread.Sleep(10000);
Console.WriteLine("Stop process");
cts.Cancel();
worker.Stop();
Console.WriteLine("Press Enter to exit...");
Console.ReadLine();
}
}
// Useful extensions
public static class Extras
{
// check if exception is OperationCanceledException
public static bool IsOperationCanceledException(this Exception ex)
{
if (ex is OperationCanceledException)
return true;
var aggEx = ex as AggregateException;
return aggEx != null && aggEx.InnerException is OperationCanceledException;
}
public static async Task WaitObservingCancellationAsync(this Task task)
{
try
{
await task; // await the task to complete
}
catch (Exception ex)
{
// rethrow if anything but OperationCanceledException
if (!ex.IsOperationCanceledException())
throw;
}
}
// a helper to wait for the task to complete and observe exceptions
public static void WaitObservingCancellation(this Task task)
{
try
{
task.Wait(); // wait for the task to complete
}
catch (Exception ex)
{
// rethrow if anything but OperationCanceledException
if (!ex.IsOperationCanceledException())
throw;
}
}
}
}