-1

I have a LINQ query that should NOT be enumerated more than once, and I want to avoid enumerating it twice by mistake. Is there any extension method I can use to ensure that I am protected from such a mistake? I am thinking about something like this:

var numbers = Enumerable.Range(1, 10).OnlyOnce();
Console.WriteLine(numbers.Count()); // shows 10
Console.WriteLine(numbers.Count()); // throws InvalidOperationException: The query cannot be enumerated more than once.

The reason I want this functionality is because I have an enumerable of tasks, that is intended to instantiate and run the tasks progressivelly, while it is enumerated slowly under control. I already made the mistake to run the tasks twice because I forgot that it's a differed enumerable and not an array.

var tasks = Enumerable.Range(1, 10).Select(n => Task.Run(() => Console.WriteLine(n)));
Task.WaitAll(tasks.ToArray()); // Lets wait for the tasks to finish...
Console.WriteLine(String.Join(", ", tasks.Select(t => t.Id))); // Lets see the completed task IDs...
// Oups! A new set of tasks started running!
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Introduce custom class which will use Queue under the hood. Implement `GetEnumerator` which will remove item from the Queue on every iteration - such way you will be safe to iterate your class as much as you want without executing tasks more then once, because tasks will be removed on the first iteration. Use `ImmutableQueue` for getting thread safe class. – Fabio Apr 08 '19 at 03:41

3 Answers3

3

I want to avoid enumerating it twice by mistake.

You can wrap the collection with a collection that throws if it's enumerated twice.

eg:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;

namespace ConsoleApp8
{
    public static class EnumExtension
    {
        class OnceEnumerable<T> : IEnumerable<T>
        {
            IEnumerable<T> col;
            bool hasBeenEnumerated = false;
            public OnceEnumerable(IEnumerable<T> col)
            {
                this.col = col;
            }

            public IEnumerator<T> GetEnumerator()
            {
                if (hasBeenEnumerated)
                {
                    throw new InvalidOperationException("This collection has already been enumerated.");
                }
                this.hasBeenEnumerated = true;
                return col.GetEnumerator();
            }

            IEnumerator IEnumerable.GetEnumerator()
            {
                return GetEnumerator();
            }
        }

        public static IEnumerable<T> OnlyOnce<T>(this IEnumerable<T> col)
        {
            return new OnceEnumerable<T>(col);
        }
    }
    class Program
    {
        static void Main(string[] args)
        {
             var col = Enumerable.Range(1, 10).OnlyOnce();

             var colCount = col.Count(); //first enumeration
             foreach (var c in col) //second enumeration
             {
                 Console.WriteLine(c);
             }
        }
    }
}
David Browne - Microsoft
  • 80,331
  • 6
  • 39
  • 67
  • Nice answer, upvote, however i feel we are stuck in an xy problem – TheGeneral Apr 08 '19 at 03:36
  • @Fabio I know how to make it thread safe if it need be. But currently my enumerable is used in a single flow of execution, so I'll use David's solution as is. – Theodor Zoulias Apr 08 '19 at 03:54
  • @Michael Randall it is a XY problem, but X is wider than Y. Because my problem with the tasks could be solved by using a more robust implementation, but my initial request for an enumerable that can be enumerated only once could have other applications as well. It is nice to have David Browne's `OnceEnumerable` class in my toolbox. :-) – Theodor Zoulias Apr 08 '19 at 05:10
1

Enumerables enumerate, end of story. You just need to call ToList, or ToArray

// this will enumerate and start the tasks
var tasks = Enumerable.Range(1, 10)
                      .Select(n => Task.Run(() => Console.WriteLine(n)))
                      .ToList();

// wait for them all to finish
Task.WaitAll(tasks.ToArray());
Console.WriteLine(String.Join(", ", tasks.Select(t => t.Id)));

Hrm if you want parallelism

Parallel.For(0, 100, index => Console.WriteLine(index) );

or if you are using async and await pattern

public static async Task DoWorkLoads(IEnumerable <Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result);

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   await SomethingAsync(result);
}

Update, Since you are after a way to control the max degree of conncurrency, you could use this

public static async Task<IEnumerable<Task>> ExecuteInParallel<T>(this IEnumerable<T> collection,Func<T, Task> callback,int degreeOfParallelism)
{
   var queue = new ConcurrentQueue<T>(collection);

   var tasks = Enumerable.Range(0, degreeOfParallelism)
                         .Select(async _ =>
                          {
                             while (queue.TryDequeue(out var item))
                                await callback(item);
                          })
                         .ToArray();

   await Task.WhenAll(tasks);

   return tasks;
}
TheGeneral
  • 79,002
  • 9
  • 103
  • 141
  • I don't want to call `ToArray`, because then all tasks will start running at once. I want to enumerate the tasks slowly (to achieve a max degree of parallelism). – Theodor Zoulias Apr 08 '19 at 03:19
  • `Parallel.For` [cannot be combined with await](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach) – Theodor Zoulias Apr 08 '19 at 03:25
  • @TheodorZoulias then use an ActionBlock or reactive extensions – TheGeneral Apr 08 '19 at 03:26
  • These have a learning curve. I'll certainly use them after I study them. Currently I know pretty well the Task Parallel Library, and I would prefer a solution that I can apply immediately with my current level of knowledge. – Theodor Zoulias Apr 08 '19 at 03:30
  • @TheodorZoulias please explain what this means `I want to enumerate the tasks slowly` – TheGeneral Apr 08 '19 at 03:31
  • I am using the method of @Ohad Schneider's from [here](https://stackoverflow.com/a/25877042/11178549). He is keeping a list of active tasks, then he stops enumerating when there are enough tasks running, and then he waits for some to complete. – Theodor Zoulias Apr 08 '19 at 03:36
  • @TheodorZoulias the correct wording you need to use is "you need to control the max degree of parallelism" – TheGeneral Apr 08 '19 at 03:38
  • Thanks Michael! Your `ExecuteInParallel` method is pretty neat! I modified it a bit because I want the task results to be returned. I think your method is preferable to Ohad Schneider's method, because waiting for all active tasks with `Task.WhenAny` in a loop should have overhead, and your method avoids that. – Theodor Zoulias Apr 08 '19 at 04:49
1

Rx certainly is an option to control parallelism.

var query =
    Observable
        .Range(1, 10)
        .Select(n => Observable.FromAsync(() => Task.Run(() => new { Id = n })));

var tasks = query.Merge(maxConcurrent: 3).ToArray().Wait();

Console.WriteLine(String.Join(", ", tasks.Select(t => t.Id)));
Enigmativity
  • 113,464
  • 11
  • 89
  • 172