0

I have a Windows service that reads data from the database and processes this data using multiple REST API calls.

Originally, this service ran on a timer where it would read unprocessed data from the database and process it using multiple threads limited using SemaphoreSlim. This worked well except that the database read had to wait for all processing to finish before reading again.

ServicePointManager.DefaultConnectionLimit = 10;

Original that works:

// Runs every 5 seconds on a timer
private void ProcessTimer_Elapsed(object sender, ElapsedEventArgs e)
{
    var hasLock = false;
    try
    {
        Monitor.TryEnter(timerLock, ref hasLock);
        if (hasLock)
        {
            ProcessNewData();
        }
        else
        {
            log.Info("Failed to acquire lock for timer."); // This happens all of the time
        }
    }
    finally
    {
        if (hasLock)
        {
            Monitor.Exit(timerLock);
        }
    }
}

public void ProcessNewData()
{
    var unproceesedItems = GetDatabaseItems();

    if (unproceesedItems.Count > 0)
    {
        var downloadTasks = new Task[unproceesedItems.Count];
        var maxThreads = new SemaphoreSlim(semaphoreSlimMinMax, semaphoreSlimMinMax); // semaphoreSlimMinMax = 10 is max threads

        for (var i = 0; i < unproceesedItems .Count; i++)
        {
            maxThreads.Wait();
            var iClosure = i;
            downloadTasks[i] =
            Task.Run(async () =>
                {
                    try
                    {
                        await ProcessItemsAsync(unproceesedItems[iClosure]);
                    }
                    catch (Exception ex)
                    {
                        // handle exception
                    }
                    finally
                    {
                        maxThreads.Release();
                    }
                });
        }

        Task.WaitAll(downloadTasks);
    }
}

To improve efficiency, I rewrite the service to run GetDatabaseItems in a separate thread from the rest so that there is a ConcurrentDictionary of unprocessed items between them that GetDatabaseItems fills and ProcessNewData empties.

The problem is that while 10 unprocessed items are send to ProcessItemsAsync, they are processed two at a time instead of all 10.

The code inside of ProcessItemsAsync calls var response = await client.SendAsync(request); where the delay occurs. All 10 threads make it to this code but come out of it two at a time. None of this code changed between the old version and the new.

Here is the code in the new version that did change:

public void Start()
{
    ServicePointManager.DefaultConnectionLimit = maxSimultaneousThreads;  // 10

    // Start getting unprocessed data
    getUnprocessedDataTimer.Interval = getUnprocessedDataInterval; // 5 seconds
    getUnprocessedDataTimer.Elapsed += GetUnprocessedData; // writes data into a ConcurrentDictionary
    getUnprocessedDataTimer.Start();

    cancellationTokenSource = new CancellationTokenSource();

    // Create a new thread to process data
    Task.Factory.StartNew(() =>
       {
           try
           {
               ProcessNewData(cancellationTokenSource.Token);
           }
           catch (Exception ex)
           {
               // error handling
           }
       }, TaskCreationOptions.LongRunning
    );

}

private void ProcessNewData(CancellationToken token)
{
    // Check if task has been canceled.
    while (!token.IsCancellationRequested)
    {
        if (unprocessedDictionary.Count > 0)
        {
            try
            {
                var throttler = new SemaphoreSlim(maxSimultaneousThreads, maxSimultaneousThreads); // maxSimultaneousThreads = 10
                var tasks = unprocessedDictionary.Select(async item =>
                {
                    await throttler.WaitAsync(token);
                    try
                    {
                        if (unprocessedDictionary.TryRemove(item.Key, out var item))
                        {
                            await ProcessItemsAsync(item);
                        }
                    }
                    catch (Exception ex)
                    {
                        // handle error
                    }
                    finally
                    {
                        throttler.Release();
                    }
                });
                Task.WhenAll(tasks);
            }
            catch (OperationCanceledException)
            {
                break;
            }
        }

        Thread.Sleep(1000);
    }
}

Environment

  • .NET Framework 4.7.1
  • Windows Server 2016
  • Visual Studio 2019

Attempts to fix:

I tried the following with the same bad result (two await client.SendAsync(request) completing at a time):

  • Set Max threads and ServicePointManager.DefaultConnectionLimit to 30
  • Manually create threads using Thread.Start()
  • Replace async/await pattern with sync HttpClient calls
  • Call data processing using Task.Run(async () => and Task.WaitAll(downloadTasks);
  • Replace the new long-running thread for ProcessNewData with a timer

What I want is to run GetUnprocessedData and ProcessNewData concurrently with an HttpClient connection limit of 10 (set in config) so that 10 requests are processed at the same time.

Note: the issue is similar to HttpClient.GetAsync executes only 2 requests at a time? but the DefaultConnectionLimit is increased and the service runs on a Windows Server. It also creates more than 2 connections when original code runs.

Update

I went back to the original project to make sure it still worked, it did. I added a new timer to perform some unrelated operations and the httpClient issue came back. I removed the timer, everything worked. I added a new thread to do parallel processing, the problem came back.

eglease
  • 2,445
  • 11
  • 18
  • 28
  • 1
    Doubt that this will fix anything but I would recommend to change the enumeration to `unprocessedDictionary.Keys.ToList().Select(...)` – Guru Stron Dec 20 '22 at 17:39
  • 2
    Also how do you check this - _"they are processed two at a time instead of all 10."_? – Guru Stron Dec 20 '22 at 17:40
  • 1
    Are you open to radically new approaches of implementing this service (like using the [TPL Dataflow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) library), or you want to stick with the `Timer` and the `ConcurrentDictionary`? – Theodor Zoulias Dec 20 '22 at 18:09
  • @GuruStron I log when new data is sent to `ProcessItemsAsync` and when it gets back from `await client.SendAsync(request)`. The test REST server delays all replies by 10 seconds so I can see how many items are processed every 10 seconds. I can include the logs but they are full on debug data. The question is already very long. – eglease Dec 20 '22 at 18:19
  • 1
    @TheodorZoulias I am hoping to fix my current implementation. If not, I can look at other approaches. – eglease Dec 20 '22 at 18:28
  • @GuruStron No change in performance with the `Keys.ToList().Select` but the code looks cleaner. Thank you for the suggestions. – eglease Dec 20 '22 at 18:32
  • Please note that Task.Run was designed to run CPU bound operation not I/O bound – Peter Csala Dec 21 '22 at 09:57
  • 1
    @PeterCsala the `Task.Run` supports both sync and async delegates ([unlike](https://devblogs.microsoft.com/pfxteam/task-run-vs-task-factory-startnew/) the `Task.Factory.StartNew`). Async delegates are typically I/O-bound. – Theodor Zoulias Dec 21 '22 at 12:22
  • 1
    Some ideas: Does the problem persist if you target a different server with the `client.SendAsync`? Does the problem persist if you switch from .NET Framework to .NET 7? Does the problem persist if you switch from Windows Server to Windows 10 or 11? Is it possible to create a minimal example that reproduces the issue, using public URLs that don't require authentication, so that we can approach the problem experimentally? – Theodor Zoulias Dec 24 '22 at 02:07
  • 1
    Thank you very much for the suggestions. So far, I only tried with a test server we always use and .NET Framework. I can try .NET 7 but the application will have to be released on the .NET Framework 4.7.1. Windows 10 has a limitation on the number of simultaneous connections that Windows Server does not. I will do more testing this week after the holidays. – eglease Dec 25 '22 at 17:11
  • Looks like I found the root of the problem. `HttpClient` sends all items (10) at the same time if the whole object that does the processing (starts all threads, reads the database, calls `HttpClient`, etc.) is created for a second time when the service starts. I do not know why this is but it works. I still need to do some testing and will write an answer once I have everything working. – eglease Dec 28 '22 at 16:21
  • Also, testing on Windows 10 limits me to two connections at a time. I can reproduce the problem with a public REST endpoint. It is just harder to see without the built-in delay. Same issue happens with .NET Framework 4.8. I cannot test on .NET 7 because it is not installed on the server I am testing on. – eglease Dec 28 '22 at 16:23
  • Why the downvote? What is wrong with this question? – eglease Jul 11 '23 at 20:37

2 Answers2

2

The issue turned out to be the place where ServicePointManager.DefaultConnectionLimit is set.

In the version where HttpClient was only doing two requests at a time, ServicePointManager.DefaultConnectionLimit was being set before the threads were being created but after the HttpClient was initialized.

Once I moved it into the constructor before the HttpClient is initialized, everything started working.

Thank you very much to @Theodor Zoulias for the help.

TLDR; Set ServicePointManager.DefaultConnectionLimit before initializing the HttpClient.

eglease
  • 2,445
  • 11
  • 18
  • 28
1

This is not a direct answer to your question, but a suggestion for simplifying your service that could make the debugging of any problem easier. My suggestion is to implement the producer-consumer pattern using an iterator for producing the unprocessed items, and a parallel loop for consuming them. Ideally the parallel loop would have async delegates, but since you are targeting the .NET Framework you don't have access to the .NET 6 method Parallel.ForEachAsync. So I will suggest the slightly wasteful approach of using a synchronous parallel loop that blocks threads. You could use either the Parallel.ForEach method, or the PLINQ like in the example below:

private IEnumerable<Item> Iterator(CancellationToken token)
{
    while (true)
    {
        Task delayTask = Task.Delay(5000, token);
        foreach (Item item in GetDatabaseItems()) yield return item;
        delayTask.GetAwaiter().GetResult();
    }
}

public void Start()
{
    //...

    ThreadPool.SetMinThreads(degreeOfParallelism, Environment.ProcessorCount);

    new Thread(() =>
    {
        try
        {
            Partitioner
                .Create(Iterator(token), EnumerablePartitionerOptions.NoBuffering)
                .AsParallel()
                .WithDegreeOfParallelism(degreeOfParallelism)
                .WithCancellation(token)
                .ForAll(item => ProcessItemAsync(item).GetAwaiter().GetResult());
        }
        catch (OperationCanceledException) { } // Ignore
    }).Start();
}

Online demo.

The Iterator fetches unprocessed items from the database in batches, and yields them one by one. The database won't be hit more frequently than once every 5 seconds.

The PLINQ query is going to fetch a new item from the Iterator each time it has a worker available, according to the WithDegreeOfParallelism policy. The setting EnumerablePartitionerOptions.NoBuffering ensures that it won't try to fetch more items in advance.

The ThreadPool.SetMinThreads is used in order to boost the availability of ThreadPool threads, since the PLINQ is going to use lots of them. Without it the ThreadPool will not be able to satisfy the demand immediately, although it will gradually inject more threads and eventually will catch up. But since you already know how many threads you'll need, you can configure the ThreadPool from the start.

In case you dislike the idea of blocking threads, you can find a simple substitute of the Parallel.ForEachAsync here, based on the TPL Dataflow library. It requires installing a NuGet package.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thank you for the suggestion. When I try to run it, the Iterator never executes. The the new thread is created but it does not seem to do anything. Also, `ProcessItemsAsync` takes a single item and returns `void`. I created another function to call it in a `foreach` loop but I am not sure it is doing anything or is the right way of doing it since the Iterator does not execute. – eglease Dec 21 '22 at 15:48
  • 1
    @eglease I forgot to start the thread. :-) – Theodor Zoulias Dec 21 '22 at 16:10
  • 1
    Thank you very much for your help. I can't believe I missed it ;). It works but the original issue is still there. The items are processed two at a time. – eglease Dec 21 '22 at 16:27
  • 1
    @eglease I changed the return value of the `Iterator` from `IEnumerable` to `IEnumerable`, so that it emits individual items, that can be processed by the thread pool workers more efficiently. I've also included a link to a runnable toy example. Since this code reproduces the original problem, you can be quite sure that the bottleneck is not caused by the parallelization mechanism. Most likely it is related to the configuration of the `HttpClient`. – Theodor Zoulias Dec 21 '22 at 16:33
  • 1
    That is same conclusion I came up with. I am going to go back to the original code and make a one change at a time to see where things break. What does not make sense is that `ProcessItemsAsync` and code that calls the `HttpClient` has not been changed at all between the working and non-working versions. Thank you again for your help. – eglease Dec 21 '22 at 16:44
  • 1
    @eglease I noticed that when I switched from `ProcessItemsAsync(items)` (plural) to `ProcessItemAsync(item)` (singular), the parallelization characteristics changed. With the plural version it was possible for one worker to process one item at a time, while the other workers have no work to do. The singular version balances nicely the work to all workers. So it's not impossible that the bottleneck is related to the mechanism after all, and not to the `HttpClient`. – Theodor Zoulias Dec 21 '22 at 17:30
  • 1
    I am getting two items at a time when I make any thread-related change. I added a new timer to an unrelated operation and it started happening. I added your code and the behavior changed to process tow items at a time. The is something weird going on with `HttpClient`. – eglease Dec 21 '22 at 21:36