1

My application hangs (force-killed by Polly's Circuit breaker or timeout) when trying to concurrently receive and deserialize very large JSON files, containing over 12,000,000 chars each.

using System.Text.Json;

Parallel.For(0, 1000000, async (i, state) =>
{
    var strategy = GetPollyResilienceStrategy(); // see the method implementation in the following.
    await strategy.ExecuteAsync(async () =>
    {
        var stream = await httpClient.GetStreamAsync(
            GetEndPoint(i), cancellationToken);
        var foo = await JsonSerializer.DeserializeAsync<Foo>(
            stream, cancellationToken: cancellationToken);

        // Process may require API calls to the client, 
        // but no other API calls and JSON deserialization 
        // is required after processedFoo is obtained.
        var processedFoo = Process(foo); // Long running CPU and IO bound task, that may involve additional API calls and JSON deserialization.
        queue.Add(processedFoo); // A background task, implementation in the following.
    });
});

I use different resilience strategies, one from GetPollyResilienceStrategy to process the item i, and one for the HttpClient. The former is more relevant here, so I'm sharing that but I'm also happy to share the strategy used for the HttpClient if needed.

AsyncPolicyWrap GetPollyResilienceStrategy()
{
    var retry = Policy
        .Handle<Exception>()
        .WaitAndRetryAsync(Backoff.DecorrelatedJitterBackoffV2(
            retryCount: 3,
            medianFirstRetryDelay: TimeSpan.FromMinutes(1)));

    var timeout = Policy.TimeoutAsync(timeout: TimeSpan.FromMinutes(5));

    var circuitBreaker = Policy
        .Handle<Exception>()
        .AdvancedCircuitBreakerAsync(
            failureThreshold: 0.5,
            samplingDuration: TimeSpan.FromMinutes(10),
            minimumThroughput: 2,
            durationOfBreak: TimeSpan.FromMinutes(1));

    var strategy = Policy.WrapAsync(retry, timeout, circuitBreaker);

    return strategy;
}

And the background task is implemented as the following.

var queue = new BlockingCollection<Foo>();

var listner = Task.Factory.StartNew(() =>
{
    while (true)
    {
        Foo foo;
        try
        {
            foo = queue.Take(cancellationToken);
        }
        catch (OperationCanceledException)
        {
            break;
        }

        LongRunningCpuBoundTask(foo);
        SerializeToCsv(foo);
    }
},
creationOptions: TaskCreationOptions.LongRunning);

Context

My .NET console application receives a very large JSON file over HTTP and tries to deserialize it by reading some fields necessary for the application and ignoring others. A successful process is expected to run for a few days. Though the program "hangs" after a few hours. After extensive debugging, it turns out an increasing number of threads are stuck trying to deserialize the JSON, and the program hangs (i.e., the Parallel.For does not start another one) when ~5 threads are stuck. Every time it gets stuck at a different i, and since JSON objects are very large, it is not feasible to log every received JSON for debugging.

  1. Why does it get stuck? Is there any built-in max capacity in JsonSerializer that is reached? e.g., buffer size?

  2. Is it possible that GetStreamAsync is reading corrupt data, hence JsonSerializer is stuck in some corner case trying to deserialize a corrupt JSON?

I found this thread relevant, though not sure if there was a resolution other than "fixed in newer version" https://github.com/dotnet/runtime/issues/41604

The program eventually exists but as a result of either the circuit breaker or timeout. I have given very long intervals in the resilience strategy, e.g., giving the process 20min to try deserializing JSON before retrying.

Dr. Strangelove
  • 2,725
  • 3
  • 34
  • 61
  • I am not familiar with Polly, but does adding `.ConfigureAwait(false);` to `GetStreamAsync()` and `DeserializeAsync()` help? – dbc Sep 28 '22 at 05:18
  • 1
    You can use server gc to reduce GC overhead. You are mostly bound by GC when deserializing large amounts of data concurrently. – Alois Kraus Sep 28 '22 at 05:36
  • 3
    `using var stream`? Is this related to http client limiting the number of connections per server? – Jeremy Lakeman Sep 28 '22 at 06:09
  • What if you split up receiving the whole json string and deserializing into two methods? If you then set up a mock that contains 3 of such jsons and it generates an endless enumerable of distinct streams of these jsons. What happens if you parallelize to deserialize these? Does it hang or not? And the other way around. What happens if you don't deserialize the streams and only return strings? Does it hang? Just a first step to find out whom of both is really blocking. – Oliver Sep 28 '22 at 06:21
  • @JeremyLakeman - `await using var stream` might be better. – dbc Sep 28 '22 at 06:26
  • 1
    Could you please share with us the definition of your resiliency strategy's definition? – Peter Csala Sep 28 '22 at 06:35
  • Its not surprising that concurrently processing ~12TB of data is problematic. Two thoughts: Do you need to deserialize the whole of each request, perhaps you need only part of the stream? Reduce the concurrency in one of the ways @JonasH suggests. Personally, in the past, I have found TPL DataFlow very useful. Either way, you need to break the work down. – Jodrell Sep 28 '22 at 09:51
  • @JeremyLakeman, @ Theodor Zoulias, and @ dbc these are all very good suggestions, and they helped! Thank you! I think I had multiple issues (as you mentioned) so it is partially resolved now. – Dr. Strangelove Sep 29 '22 at 02:50
  • @Jodrell it is 12M chars, which in my case it translates to 10s to 100s of MB top, which is well below your 12TB guess :) – Dr. Strangelove Sep 29 '22 at 02:51
  • 12M chars will be somewhere between 11.45MB and 45.78 MB, depending on encoding. However, if you process 1M different requests of that size, concurrently ... – Jodrell Sep 29 '22 at 06:38
  • @Jodrell I see your point. I guess `Parallel.For` and `Parallel.Foreach` would not parallelize all the entries at the same time; in my case, I usually have between `5` to `10` items processed in parallel. – Dr. Strangelove Sep 29 '22 at 12:58
  • Related: [Nesting await in Parallel.ForEach](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach). – Theodor Zoulias Sep 29 '22 at 13:00
  • @TheodorZoulias would not `await Parallel.ForEachAsync` (as suggested by JonasH) addresses concerns similar to those in the question you linked? – Dr. Strangelove Sep 29 '22 at 13:09
  • 1
    Sure, the `Parallel.ForEachAsync` API is the solution of choice for these kinds of problems, provided that you are targeting the .NET 6 or later. – Theodor Zoulias Sep 29 '22 at 13:22
  • Thanks, yes, I switched to using that and it seems it solved some of the issues I had. – Dr. Strangelove Sep 29 '22 at 13:32
  • Yes, I missed that! Please see the updated question. – Dr. Strangelove Sep 29 '22 at 13:58
  • Just wanted to say "I wish I was able to choose both of the excellent answers as the correct answers, but I cannot." :) I can remove that if it is against the norms. – Dr. Strangelove Sep 29 '22 at 17:15

2 Answers2

3

Parallel.For does not play well with async. I would not even expect your example to compile, since the lamda for the Parallel.For lacks an async keyword. So I would expect it to start just about all the tasks at the same time. Eventually this will likely lead to bad things, like threadpool exhaustion.

I would suggest using another pattern for your work

  1. If using .Net 6, use Parallel.ForEachAsync

  2. Keep using Parallel.For but make your worker method synchronous

  3. Use something like LimitedConcurrencyTaskScheduler (see example) to limit the number of concurrent tasks.

  4. Use Dataflow, But I'm not very familiar with this, so i cannot advice exactly how it should be used.

  5. Manually split your work into chunks, and run chunks in parallel.

JonasH
  • 28,608
  • 2
  • 10
  • 23
  • You could also mention .NET 6's `Parallel.ForeachAsync` – Peter Csala Sep 28 '22 at 08:38
  • This was very useful! Thank you! I think my approach had multiple issues, and with your suggestion and those in the comments, they are partially resolved, though I still have issues with the resiliency strategy. – Dr. Strangelove Sep 29 '22 at 02:53
  • TPL Dataflow was mentioned a few times on this question, so a quick comment: the whole pipeline was originally implemented in TPL, but in our use case that was very performant for large loads (starting from larger JSON files), but slow for the small ones. But we could have had missed some implementation details. So we decided to roll a simpler solution (as discussed here) that performs equally well for large and small loads. But we can revisit that decision if TPL is the ultimate solution. – Dr. Strangelove Sep 29 '22 at 15:00
  • @Dr.Strangelove in your original TPL Dataflow implementation, had you configured all the dataflow blocks with the [`BoundedCapacity`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.dataflowblockoptions.boundedcapacity) option? – Theodor Zoulias Sep 29 '22 at 17:38
  • I have a vague recollection we did. – Dr. Strangelove Sep 29 '22 at 22:33
3

Even without knowing how did you set up your resiliency strategy it seems like you want to kill two birds with one stone:

  • Add resilient behaviour for the http based communication
  • Add resilient behaviour for the stream parsing

I would recommend to separate these two.

GetStreamAsync

The GetStreamAsync call returns a Task<Stream> which does not allow you to access the underlying HttpResponseMessage.

But if you issue your request for the stream like this:

var response = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead);
using var stream = await response.Content.ReadAsStreamAsync();

then you would be able to decorate the GetAsync call with a http based Polly policy definition.

DeserializeAsync

Looking to this problem only from resilience perspective it would make sense to use a combination of CancellationTokenSources to enforce timeout like this:

CancellationTokenSource userCancellation = ...;
var timeoutSignal = new CancellationTokenSource(TimeSpan.FromMinutes(20));
var combinedCancellation = CancellationTokenSource.CreateLinkedTokenSource(userCancellation.Token, timeoutSignal.Token);
... 

var foo = await JsonSerializer.DeserializeAsync<Foo>(stream, combinedCancellation.Token);

But you could achieve the same with optimistic timeout policy.

var foo = await timeoutPolicy.ExecuteAsync(
  async token => await JsonSerializer.DeserializeAsync<Foo>(stream, token), ct);

UPDATE #1

My understanding is the strategy used for every item is independent from the other, correct? So if the circuit breaks, why the parallel.ForEachAsync does not continue with the others?

The timeout and retry policies are stateless. Whereas the circuit breaker maintains a state which is shared between the executions. Here I have detailed some internals if you are interested how does it work under the hood.

Also, if the loop is broken, why no exception?

If the threshold of the successive/subsequent requests is reached then the CB transitions from Closed to Open and the CB throws the original exception (if it was triggered for some exception). In Open state if you try to perform a new request then it will short-cut the execution with a BrokenCircuitException.

So, back to your question. Yes, there should be an exception but because you have used Parallel.Foreach which does not support async that's why the exception is shallowed. If you would have used await Parallel.ForeachAsync then it should throw the exception.


UPDATE #2 After assessing your GetPollyResilienceStrategy code I have two more advices:

  • Please change the return type of the method to IAsyncPolicy from AsyncPolicyWrap
    • AsyncPolicyWrap is an implementation detail and should not be exposed
  • Please change the order of timeout and circuit breaker
    • Policy.WrapAsync(retry, circuitBreaker, timeout);
    • In your setup the CB will not break for timeout
    • In my suggested setup the CB could break for TimeoutRejectedException as well
Peter Csala
  • 17,736
  • 16
  • 35
  • 75
  • You guessed it well! After improving based on other suggestions, the program ran to process much more items than ever before, but stopped again! I realized it many `retry` and `circuit breaks` (related to the resiliency policy) happening, and the program stops after a few circuit breaks. My understanding is the strategy used for every item is independent from the other, correct? So if the circuit breaks, why the parallel.ForEachAsync does not continue with the others? Also, if the loop is broken, why no exception? – Dr. Strangelove Sep 29 '22 at 02:57
  • If it helps, after a few hours the program has stopped working, I sent cancellation token, and this is what I see in the logs: `on timeout`, `on retry`, `on break`, `on retry` (Those messages are printed from the `onRetry` delegates); was it waiting somewhere?! – Dr. Strangelove Sep 29 '22 at 02:59
  • @Dr.Strangelove I have replied your first comment's questions in my adjusted post. Please check them. I've read your second comment several times and I can not fully understand your question there. Could you please rephrase it? – Peter Csala Sep 29 '22 at 10:00
  • @Dr.Strangelove I've also added some suggestions for your `GetPollyResilienceStrategy` method. Please check my amended post. – Peter Csala Sep 29 '22 at 10:04
  • 1
    These are all very good suggestions! The exception was swallowed in a different place! :) https://github.com/dotnet/command-line-api/issues/796 – Dr. Strangelove Sep 29 '22 at 14:51