0

I want to scrape a website with plenty of pages with interesting data but as the source is very large I want to multithread and limit the overload. I use a Parallel.ForEach to start each chunk of 10 tasks and I wait in the main for loop until the numbers of active threads started drop below a threshold. For that I use a counter of active threads I increment when starting a new thread with a WebClient and decrement when the DownloadStringCompleted event of the WebClient is triggered.

Originally the questions was how to use DownloadStringTaskAsync instead of DownloadString and wait that each of the threads started in the Parallel.ForEach has completed. This has been solved with a workaround: a counter (activeThreads) and a Thread.Sleep in the main foor loop.

Is using await DownloadStringTaskAsync instead of DownloadString supposed to improve at all the speed by freeing a thread while waiting for the DownloadString data to arrive ?

And to get back to the original question, is there a way to do this more elegantly using TPL without the workaround of involving a counter ?

private static volatile int activeThreads = 0;

public static void RecordData()
{
  var nbThreads = 10;
  var source = db.ListOfUrls; // Thousands urls
  var iterations = source.Length / groupSize; 
  for (int i = 0; i < iterations; i++)
  {
    var subList = source.Skip(groupSize* i).Take(groupSize);
    Parallel.ForEach(subList, (item) => RecordUri(item)); 
    //I want to wait here until process further data to avoid overload
    while (activeThreads > 30) Thread.Sleep(100);
  }
}

private static async Task RecordUri(Uri uri)
{
   using (WebClient wc = new WebClient())
   {
      Interlocked.Increment(ref activeThreads);
      wc.DownloadStringCompleted += (sender, e) => Interlocked.Decrement(ref iterationsCount);
      var jsonData = "";
      RootObject root;
      jsonData = await wc.DownloadStringTaskAsync(uri);
      var root = JsonConvert.DeserializeObject<RootObject>(jsonData);
      RecordData(root)
    }
}
sofsntp
  • 1,964
  • 23
  • 34

2 Answers2

3

If you want an elegant solution you should use Microsoft's Reactive Framework. It's dead simple:

var source = db.ListOfUrls; // Thousands urls

var query =
    from uri in source.ToObservable()
    from jsonData in Observable.Using(
        () => new WebClient(),
        wc => Observable.FromAsync(() => wc.DownloadStringTaskAsync(uri)))
    select new { uri, json = JsonConvert.DeserializeObject<RootObject>(jsonData) };

IDisposable subscription =
    query.Subscribe(x =>
    {
        /* Do something with x.uri && x.json */
    });

That's the entire code. It's nicely multi-threaded and it's kept under control.

Just NuGet "System.Reactive" to get the bits.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • I wanted to have something without extra extension but this is interesting. Not sure to understand why but it doesn't work as it is. It gives me an empty json – sofsntp Sep 20 '17 at 13:59
  • @sofsntp - It worked fine for me when I tested it. Did you check that in the subscribe method that you got the URI, but not the JSON? Perhaps try changing the code so that it doesn't deserialize the JSON until inside the subscribe. – Enigmativity Sep 20 '17 at 23:37
  • ok yes it works but what the point of doing this ? What makes it elegant ? – sofsntp Sep 25 '17 at 08:32
  • @sofsntp - It becomes a one-liner way of asynchronously multi-threading, with all of the disposable clean-ups and it uses LINQ. It couldn't be much more elegant if it tried. – Enigmativity Sep 25 '17 at 09:40
-1
Parallel.ForEach

Will create ProcessorCount tasks to execute the function for each item in the source Enumerable. It will take care that there are not to many tasks and will wait for all items and tasks to be executed.

Task.WhenAll

Only awaits the given tasks it does not execute them. Its on your hand to execute them in a proper way and not to many at once.

But there is some fault in your code. The function RecordUri will return a task that has to be awaited otherwise the ForEach will just create more and more as the function will never know when the current task is completed. Also problematic is that you create a task in a task and the first task does nothing else then wait for the first one.

You might also want to take a look at this overload of Parallel.ForEach https://msdn.microsoft.com/en-us/library/dd782934(v=vs.110).aspx

Edit

Is using await DownloadStringTaskAsync instead of DownloadString supposed to improve at all the speed by freeing a thread while waiting for the DownloadString data to arrive ?

No. As when a task is awaiting a external resource it enters a Suspended state (Windows api that is not using some old/dirty iteration waiting). So there is no much difference. What differs is the overhead the compiler will generate when compiling your async code. The DownloadStringTaskAsync will create a task that contains the long operation. If you use await it, you will attach yourself to that task (by ContinueWith). So you just create a Task for awaiting another. This is the overhead i was talking about in the upper text.

My approach would be: Use the synchronous method inside your Parallel.ForEach. The Threadding will be done by PLinq and you are free to go on.

Remember "KISS"

Venson
  • 1,772
  • 17
  • 37
  • thank you but I have read the relevant resources. I have edited my question for more clarity. My issue is that using DownloadStringTaskAsync implies that the method become async. Shall I use .wait() in the parell.foreach() ? but I have read to avoid this – sofsntp Sep 19 '17 at 20:22
  • @sofsntp added Edit – Venson Sep 20 '17 at 14:37
  • Yes the concerns about scalability are absolutly right. But that was not the question. are you going to scale up in future? The problem with his answer will be when using a LOT more urls as you are limited with using the HttpClient: https://stackoverflow.com/questions/21558109/httpclient-c-fails-on-many-asynchronous-requests If you are just creating more and more requests without a limit from your side, this will effect performance on a negative way. – Venson Sep 20 '17 at 15:27
  • I do not plan to scale. I am just interested in knowing the best practices and understanding how it works behind. – sofsntp Sep 20 '17 at 16:18