-3

Whats the best way to a parallel processing in c# with some async methods. Let me explain with some simple code

Example Scenario: We have a person and 1000 text files from them. we want to check that his text files does not contain sensitive keywords, and if one of his text files contains sensitive keywords, we mark him with the untrusted. The method which check this is an async method, and as fast as we found one of the sensitive keywords further processing is not required and checking loop must be broke for that person.

For the best performance and making it so fast, we must use Parallel processing simple psudocode:

boolean sesitivedetected=false;
Parallel.ForEach(textfilecollection,async (textfile,parallelloopstate)=>
{
    if (await hassensitiveasync(textfile))
    {
         sensitivedetected=true;
         parallelloopstate.break()
    }
}
‌if (sensitivedetected)
    markuntrusted(person)

Problem is that Parallel.ForEach don't wait until completion of async tasks so statement ‌if (sensitivedetected) is runned as soon as creating task are finished. I read other Questions like write parallel.for with async and async/await and Parallel.For and lots of other pages. This topics are usefull when you need the results of async methods to be collected and used later, but in my scenario execution of loop should be ended as soon as possible.


Update: Sample code:

        Boolean detected=false;
        Parallel.ForEach(UrlList,  async (url, pls) =>
          {
              using (HttpClient hc = new HttpClient())
              {
                  var result = await hc.GetAsync(url);

                  if ((await result.Content.ReadAsStringAsync()).Contains("sensitive"))
                  {
                      detected = true;
                      pls.Break();
                  }
              }
          });
        if (detected)
            Console.WriteLine("WARNING");
Community
  • 1
  • 1
X X
  • 49
  • 1
  • 8
  • 3
    Short version: don't mix `Parallel.For()` with `async` methods. Your question is really too broad as stated; you haven't included a good [mcve], and there are lots of ways your pseudo-code could be interpreted in terms of what each part of it is doing in real code. But you can just start tasks instead of using `Parallel`, using `CancellationTokenSource` as needed to communicate to those tasks if they need to be interrupted, and of course to the loop that creates them. Please improve the question if you need more specific advice. – Peter Duniho May 09 '17 at 05:39
  • If you really must use `Parallel.For()`, and you really must use an `async` method that determines whether the loop should continue, and that method is the only awaitable expression in your `Parallel.For()` delegate, then you could just use it synchronously, i.e. `if (hassensitiveasync(textfile).Result)`. As implementation detail goes, this is a poor practice, but if you paint yourself into a corner, sometimes you have to leave footprints. – Peter Duniho May 09 '17 at 05:42
  • I think the main problem is not implementation or an exact detailed code. There may be no async version of a library method,so I'm stock in async version of method. I like to use parallel because of making the code as fast as possible! If you want to know why you may want to mix the `Parallel.For` with async then read this: [link](https://github.com/tyrotoxin/AsyncEnumerable). I think that this `AsyncEnumerator' probably solve my problem, but I don't know how to. Indeed If you need more exact code I'll provide it in the updated Question. @PeterDuniho – X X May 09 '17 at 06:33
  • @XX Why are you even using multi-threading and parallel processing. Your QUESTION is wrong. In C#(>4.5) you don't use Thread (which are an expensive construct) for everything, we have a more evolved paradigm of using `Task`s. – Aron May 09 '17 at 07:06
  • Read this please, and you may get in touch why :[link](https://github.com/tyrotoxin/AsyncEnumerable] @Aron – X X May 09 '17 at 07:11
  • @XX Your question is clearly I/O bound rather than CPU bound. The cost of marshelling data between thread, waking/sleeping and handling extra stacks will make threading pointless. But more to the point threading is always dangerous. – Aron May 09 '17 at 07:17
  • 3
    @XX You are using the wrong class to do the wrong thing. Parallel.Foreach is meant for *data* parallelism, ie processing a lot of data. It's *not* meant for asynchronous operations, or responding to events. It's not used to *wait* for responses from *already* asynchronous events either, that's what `Task.WhenAll` is for. You could start 100 `GetAsync` calls, put them in an array and await all of them. – Panagiotis Kanavos May 09 '17 at 07:17
  • 2
    @XX You need to read [Stephen Cleary's There is no thread](https://blog.stephencleary.com/2013/11/there-is-no-thread.html) – Aron May 09 '17 at 07:17
  • @XX data parallelism means that you have lots of data. `Parallel.For/ForEach` will partition the data and use one task to process each partition. When you do that, you *don't* want or need asynchronous operations. – Panagiotis Kanavos May 09 '17 at 07:18
  • @PanagiotisKanavos In both cases it is data parallelism. What you need to draw a distinction to is that Threads are a metaphor for CPUs. Throwing more CPUs at your network card won't make it faster. – Aron May 09 '17 at 07:19
  • @Aron not data parallelism in the way that most people mean it, or how it's used to explain the various scenarios covered by TPL and co. There is no data to process here, just network calls to make. The OP is generating a lot of *IO calls* and wants to wait until one of them returns a certain value. – Panagiotis Kanavos May 09 '17 at 07:20
  • @PanagiotisKanavos You are right man. I can use `Task.WhenAll` but why I should wait for all task to complete. Result of on task with the desired result could cancell al other uncompleted tasks. How to do this? – X X May 09 '17 at 07:27
  • 1
    @XX Which is why you want to use stream processing. There are two libraries for stream processing, Reactive Extensions and TPL Dataflow. I have below an example of how you can achieve this using Reactive Extensions. – Aron May 09 '17 at 07:41
  • @Aron i didn't heard about these two libraries. I must go and see how to work with them and their usefullness :) tnx – X X May 09 '17 at 08:03
  • @XX If you want to limit the number of in flight http requests, you can use `SemaphoreSlim`. – Aron May 10 '17 at 03:04

1 Answers1

0

The simplest way to achieve what you need (and not what you want, because Threading is evil). Is to use ReactiveExtensions.

var firstSensitive = await UrlList
                     .Select(async url => {
                         using(var http = new HttpClient()
                         {
                             var result = await hc.GetAsync(url);
                             return await result.Content.ReadAsStringAsync();
                         }
                     })
                     .SelectMany(downloadTask => downloadTask.ToObservable())
                     .Where(result => result.Contains("sensitive"))
                     .FirstOrDefaultAsync();

if(firstSensitive != null)
    Console.WriteLine("WARNING");

To limit the number of concurrent HTTP queries :

int const concurrentRequestLimit = 4;
var semaphore = new SemaphoreSlim(concurrentRequestLimit);
var firstSensitive = await UrlList
                     .Select(async url => {
                         await semaphore.WaitAsync()
                         try
                         using(var http = new HttpClient()
                         {
                             var result = await hc.GetAsync(url);
                             return await result.Content.ReadAsStringAsync();
                         }
                         finally
                             semaphore.Release();
                     })
                     .SelectMany(downloadTask => downloadTask.ToObservable())
                     .Where(result => result.Contains("sensitive"))
                     .FirstOrDefaultAsync();

if(firstSensitive != null)
    Console.WriteLine("WARNING");
Aron
  • 15,464
  • 3
  • 31
  • 64