1

Given some code like so

public class CustomCollectionClass : Collection<CustomData> {}
public class CustomData
{
    string name;
    bool finished;
    string result;
}

public async Task DoWorkInParallel(CustomCollectionClass collection)
{
    // collection can be retrieved from a DB, may not exist.
    if (collection == null)
    {
        collection = new CustomCollectionClass();
        foreach (var data in myData) 
        { 
            collection.Add(new CustomData()
            {
                name = data.Name;
            });
        }
    }

    // This part doesn't feel safe. Not sure what to do here.
    var processTasks = myData.Select(o => 
        this.DoWorkOnItemInCollection(collection.Single(d => d.name = o.Name))).ToArray();

    await Task.WhenAll(processTasks);

    await SaveModifedCollection(collection);
}

public async Task DoWorkOnItemInCollection(CustomData data)
{
    await DoABunchOfWorkElsewhere();
    // This doesn't feel safe either. Lock here?
    data.finished = true;
    data.result = "Parallel";
}

As I noted in a couple comments inline, it doesn't feel safe for me to do the above, but I'm not sure. I do have a collection of elements that I'd like to assign a unique element to each parallel task and have those tasks be able to modify that single element of the collection based on what work is done. End result being, I wanted to save the collection after individual, different elements have been modified in parallel. If this isn't a safe way to do it, how best would I go about this?

Tom
  • 133
  • 1
  • 3
  • 14

3 Answers3

1

Your code is the right way to do this, assuming starting DoABunchOfWorkElsewhere() multiple times is itself safe.

You don't need to worry about your LINQ query, because it doesn't actually run in parallel. All it does is to invoke DoWorkOnItemInCollection() multiple times. Those invocations may work in parallel (or not, depending on your synchronization context and the implementation of DoABunchOfWorkElsewhere()), but the code you showed is safe.

svick
  • 236,525
  • 50
  • 385
  • 514
0

Your above code should work without issue. You are passing off one item to each worker thread. I'm not so sure about the async attribute. You might just return a Task, and then in your method do:

public Task DoWorkOnItemInCollection(CustomData data)
{
    return Task.Run(() => {
        DoABunchOfWorkElsewhere().Wait();
        data.finished = true;
        data.result = "Parallel";
    });
}

You might want to be careful, with large amount of items, you could overflow your max thread count with background threads. In this case, c# just deletes your threads, which can be difficult to debug later.

I have done this before, It might be easier if instead of handing the whole collection to some magic linq, rather do a classic consumer problem:

class ParallelWorker<T>
{
    private Action<T> Action;
    private Queue<T> Queue = new Queue<T>();
    private object QueueLock = new object();
    private void DoWork() 
    {
        while(true)
        {
            T item;
            lock(this.QueueLock)
            {
                if(this.Queue.Count == 0) return; //exit thread
                item = this.Queue.DeQueue();
            }

            try { this.Action(item); }
            catch { /*...*/ }
        }
    }

    public void DoParallelWork(IEnumerable<T> items, int maxDegreesOfParallelism, Action<T> action)
    {
        this.Action = action;

        this.Queue.Clear();
        this.Queue.AddRange(items);

        List<Thread> threads = new List<Thread>();
        for(int i = 0; i < items; i++)
        {
            ParameterizedThreadStart threadStart = new ParameterizedThreadStart(DoWork);
            Thread thread = new Thread(threadStart);
            thread.Start();
            threads.Add(thread);
        }

        foreach(Thread thread in threads)
        {
            thread.Join();
        }
    }
}

This was done IDE free, so there may be typos.

ohmusama
  • 4,159
  • 5
  • 24
  • 44
  • There is no advantage to using `Wait()` when you can `await`. – svick Apr 29 '15 at 17:04
  • you can't use await, if your method isn't marked async, I removed async because I don't think linq supports that. – ohmusama Apr 29 '15 at 17:48
  • I'm talking about the code inside `Task.Run()` (using that is itself a questionable choice here). That doesn't have anything to do with LINQ and can certainly be made `async`. – svick Apr 29 '15 at 18:40
  • And I didn't even notice your second suggestion of explicitly using `Thread`s. That's even worse. – svick Apr 29 '15 at 18:41
  • I guess what was concerning me is that I was calling SIngle() on my collection, within a Select, to find the custom data associated to my data, and then modifying the custom data, which is a bit more parallelization than I'm used to since none of these collections are thread safe ([as discussed here](http://stackoverflow.com/questions/11103779/are-ienumerable-linq-methods-thread-safe)). But maybe I'm over-thinking this? Either way, thanks for the confirmation. – Tom Apr 29 '15 at 18:42
  • As long as you are not updating the collection itself, just fields of a member of a collection, you should have no issue. – ohmusama Apr 29 '15 at 18:45
  • @svick, you might need that depending on your solution. You can't abort Tasks for example. – ohmusama Apr 29 '15 at 18:54
  • @ohmusama You really, really shouldn't abort threads, it's incredibly unsafe. On the other hand, Tasks do support safe cancellation. – svick Apr 29 '15 at 19:22
  • @svick, I agree, in general principle, but how would you do `File.Exists()` with a timeout using just a `Task`? – ohmusama Apr 29 '15 at 19:26
0

I'm going to make the suggestion that you use Microsoft's Reactive Framework (NuGet "Rx-Main") to do this task.

Here's the code:

public void DoWorkInParallel(CustomCollectionClass collection)
{
    var query =
        from x in collection.ToObservable()
        from r in Observable.FromAsync(() => DoWorkOnItemInCollection(x))
        select x;

    query.Subscribe(x => { }, ex => { }, async () =>
    {
        await SaveModifedCollection(collection);
    });
}

Done. That's it. Nothing more.

I have to say though, that when I tried to get your code to run it was full of bugs and issues. I suspect that the code you posted isn't your production code, but an example you wrote specifically for this question. I suggest that you try to make a running compilable example before posting.

Nevertheless, my suggestion should work for you with a little tweaking.

It is multi-threaded and thread-safe. And it does do cleanly save the modified collection when done.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • So, you're suggesting to use the "fire and forget" approach? That doesn't sound like a good choice to me here. – svick Apr 29 '15 at 17:07
  • @svick - Why is this "fire and forget"? What do you mean by that? – Enigmativity Apr 30 '15 at 00:54
  • You set up the observable, then you set up what happens when it completes, and then you immediately return. So `DoWorkInParallel()` will very likely return before `DoWorkOnItemInCollection()` or `SaveModifedCollection()` complete and any exceptions from those methods won't propagate to the caller. – svick Apr 30 '15 at 00:57
  • @svick - That's a fair call. It wouldn't take much to fix this, but I didn't want to over complicate the solution. I still think that Rx is a way better way to do this kind of thing. – Enigmativity Apr 30 '15 at 00:59