1

I have a requirement to poll database and create parquet file for each table. This tasks are to run in parallel with some configurable limit on the number of threads to be created.
Once the threads are added and the tasks starts to run I need to set a timer so that if any query take more than some specified time can cancel the task and close that thread. If anymore task is there in the collection then the next thread will be called.
I am using CancellationTokenSource but it is not working as expected.

public async static Task CreateExtractionAndUpload(ConfigurationDetails config)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    cts.CancelAfter(10000);
    
    List<Task> getData = new List<Task>();
    foreach (var query in config.Root.Queries)
    {
        getData.Add(Task.Run(() => DoExtractionFromQueries(query, dbManager, cts.Token)));
    }

     await Task.WhenAll(getData);
}

private async static void DoExtractionFromQueries(ExtractionQueries query, DBManager dBManager, CancellationToken cancelToken)
{
    try
    {
        while (!cancelToken.IsCancellationRequested)
        {
            Thread.Sleep(20000);
            var dataTable = dBManager.GetDataTable(query.Query, System.Data.CommandType.Text);
            //ParaquetFileHandler.GenerateParquetFile(dataTable);
        }
    }
    catch (TimeoutException ex)
    {
        Logger.Error("Query taking longer than expected time!", ex);
    }
    catch (Exception ex)
    {
        Logger.Error("Exception in running query!", ex);
    }
}

What I am doing wrong and how to rectify it?
How can I limit the number of threads?
I can limit the threads in Parallel.Foreach but can I cancel a task after timeout?

Peter Csala
  • 17,736
  • 16
  • 35
  • 75
  • The cancellation token is only checked here `cancelToken.IsCancellationRequested` so because `Thread.Sleep(); var dataTable = dBManager.GetDataTable` will block for a number of seconds the token will not be checked again until after these 2 lines have executed. – Ben Dec 17 '20 at 11:51
  • 2
    the code looks odd. You set the task to be canceled after 10s, but the first thing you do is sleep for 20s. So each task should call the database exactly once. And cancelling is cooperative, i.e. `GetDataTable` will always run to completion. – JonasH Dec 17 '20 at 11:51
  • Related: [Why is the task is not cancelled when I call CancellationTokenSource's Cancel method in async method?](https://stackoverflow.com/questions/30975590/why-is-the-task-is-not-cancelled-when-i-call-cancellationtokensources-cancel-me) – Theodor Zoulias Dec 17 '20 at 12:14
  • @Ben Yes thats what is happening. I want to exit that thread. How can I exit it. – Pamela Roy Chowdhury Dec 17 '20 at 14:29
  • @JonasH So in may actual scenario I want to cancel the thread if it takes time to execute the query. To replicate the issue I put this sleep thinking it will come out. But it is not working. What do I do to fix this. – Pamela Roy Chowdhury Dec 17 '20 at 14:31
  • Does your `dbManager` have async version of the `GetDataTable`? – Peter Csala Dec 17 '20 at 15:51

2 Answers2

2

I am using CancellationTokenSource but it is not working as expected.

Cancellation in .NET is cooperative. So the code must check whether it is canceled. If it doesn't check, it's not canceled.

In this case, you'll probably need to modify dBManager.GetDataTable to make it cancel-aware: add a CancellationToken parameter to that method, and pass it to any long-running methods that it calls.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
-2

One way to achieve this if you cannot make GetDataTable async (and to accept a cancellation token) is to configure your db connection's timeout and then you could:

 Parallel.ForEach(
    source: config.Root.Queries,
    parallelOptions: new ParallelOptions { MaxDegreeOfParallelism = NUMBER },
    body: (query) => DoExtractionFromQueries(query, dBManager)

Please note that if you go the recommended way of making GetDataTable async then you move from parallelism to concurrency. There is no trivial way to set a "max level of concurrency" when working with Tasks. You will need to do a search or two to find an alternative to Task.WhenAll which allows setting a limit to number of concurrent tasks.

tymtam
  • 31,798
  • 8
  • 86
  • 126
  • While I tend to agree on Parallel.Foreach instead of tasks, this answer does not address core of the issue, i.e. aborting if it takes longer than some timeout. – JonasH Dec 18 '20 at 10:06
  • @JonasH You're 100% correct, but Stephen-Cleary's answer doesn't tackle the "configurable limit on the number of threads to be created" aspect :) I've rewritten the answer to address the timeout and added a note about parallelism/concurrency. – tymtam Dec 18 '20 at 11:07