4

I want to limit the total number of queries that I submit to my database server across all Dataflow blocks to 30. In the following scenario, the throttling of 30 concurrent tasks is per block so it always hits 60 concurrent tasks during execution. Obviously I could limit my parallelism to 15 per block to achieve a system wide total of 30 but this wouldn't be optimal.

How do I make this work? Do I limit (and block) my awaits using SemaphoreSlim, etc, or is there an intrinsic Dataflow approach that works better?

public class TPLTest
{
    private long AsyncCount = 0;
    private long MaxAsyncCount = 0;
    private long TaskId = 0;
    private object MetricsLock = new object();

    public async Task Start()
    {
        ExecutionDataflowBlockOptions execOption
            = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 30 };
        DataflowLinkOptions linkOption = new DataflowLinkOptions()
            { PropagateCompletion = true };

        var doFirstIOWorkAsync = new TransformBlock<Data, Data>(
            async data => await DoIOBoundWorkAsync(data), execOption);
        var doCPUWork = new TransformBlock<Data, Data>(
            data => DoCPUBoundWork(data));
        var doSecondIOWorkAsync = new TransformBlock<Data, Data>(
            async data => await DoIOBoundWorkAsync(data), execOption);
        var doProcess = new TransformBlock<Data, string>(
            i => $"Task finished, ID = : {i.TaskId}");
        var doPrint = new ActionBlock<string>(
            s => Debug.WriteLine(s));

        doFirstIOWorkAsync.LinkTo(doCPUWork, linkOption);
        doCPUWork.LinkTo(doSecondIOWorkAsync, linkOption);
        doSecondIOWorkAsync.LinkTo(doProcess, linkOption);
        doProcess.LinkTo(doPrint, linkOption);

        int taskCount = 150;
        for (int i = 0; i < taskCount; i++)
        {
            await doFirstIOWorkAsync.SendAsync(new Data() { Delay = 2500 });
        }
        doFirstIOWorkAsync.Complete();

        await doPrint.Completion;
        Debug.WriteLine("Max concurrent tasks: " + MaxAsyncCount.ToString());
    }

    private async Task<Data> DoIOBoundWorkAsync(Data data)
    {
        lock(MetricsLock)
        {
            AsyncCount++;
            if (AsyncCount > MaxAsyncCount)
                MaxAsyncCount = AsyncCount;
        }

        if (data.TaskId <= 0)
            data.TaskId = Interlocked.Increment(ref TaskId);

        await Task.Delay(data.Delay);

        lock (MetricsLock)
            AsyncCount--;

        return data;
    }

    private Data DoCPUBoundWork(Data data)
    {
        data.Step = 1;
        return data;
    }
}

Data Class:

public class Data
{
    public int Delay { get; set; }
    public long TaskId { get; set; }
    public int Step { get; set; }
}

Starting point:

TPLTest tpl = new TPLTest();
await tpl.Start();
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
NPCampbell
  • 51
  • 3
  • It's probably better to use a semaphore as it allows you to lock around *just* the executing of each query, and not around any of the rest of the work involved in that block. Create an abstraction that you use to run queries (or alter your existing one, if you have one) so that the abstraction takes out the semaphore when querying so you don't have to manually do it each time. – Servy Jan 15 '19 at 20:31
  • I agree with @Servy. I recommend `SemaphoreSlim` to limit the access to the database. In this way you will have a single point of control over how the db is accessed, and not rely on details in DataFlow. – Nick Jan 17 '19 at 08:53
  • Instead of trying to throttle all blocks, why not use *one* block to write to the database? Parallel *database* access is no better than parallel *disk* access - you still write to the same storage so *more* connections result in *worse* performance. The canonical way (ie used in SSIS and all ETL tools) is to batch data in big enough chunks and then send it to the database using whatever bulk import functionality is available, eg `SqlBulkCopy` – Panagiotis Kanavos Jan 17 '19 at 14:55
  • What are your blocks doing and why do they use a DOP of 30? It *matters*. Instead of having 30 tasks trying to insert single rows over 30 connections, it's far better to send records to a batchblock and then send the batch to an ActionBlock with a SqlBulkCopy instance that pumps them to the target table using miminal logging and **no** cross-connection blocking. A single task like that can easily end up being 30 times faster – Panagiotis Kanavos Jan 17 '19 at 14:59
  • If any blocks perform lookups, it's better to preload and cache the lookup values, just like SSIS does. If the blocks perform multiple activites, break them apart to allow each block to do one job at a time without blocking for too long. – Panagiotis Kanavos Jan 17 '19 at 15:02
  • @PanagiotisKanavos My database server has 40 cores using a SAN with many SSD spindles. The limitation of 30 concurrent queries was chosen because my testing identified this was the most before other users were adversely affected. I provided an abbreviated example of my process. I have 8 I/O blocks that are chained sequentially with CPU bound processing in between each. The initial data is pre-loaded and consists of 10,000+ records but generates around 25 million downstream records as it propagates through each chained I/O block. – NPCampbell Jan 17 '19 at 19:35
  • @NPCampbell so does mine. So does Paulo Morgado's who won the Iron Architect contest at Teched a decade back while working at a big bank. 25M records is a *small* number for a data mart or data warehouse. What you describe is a typical ETL scenario like those covered by SQL Server's built-in SSIS system, especially the *dataflow* components. This [monster](http://www.tpc.org/tpce/results/tpce_result_detail.asp?id=117110101) that tops the TPC-E benchmar of cores and it *still* uses connection pooling, throttling and batching to get those high numbers. – Panagiotis Kanavos Jan 18 '19 at 08:23
  • @NPCampbell the reason is that the cores and SAN don't matter all that much. Each operation is a separate network roundtrip, a separate connection, separate buffers, and far worse, separate log records and separate *locks* that block each other. The very fact that you're looking for ways to throttle your writes means you've already hit blocking issues. Those concurrent writes may end up causing separate file reallocations. If you batch the records and write them out in a single SqlBulkCopy though, you get *one* roundtrip, minimal logging, no blocking. – Panagiotis Kanavos Jan 18 '19 at 08:23
  • @NPCampbell right now, you still haven't explaind what you do. `CPU bound` and `IO bound` don't say anything in an ETL scenario - all operations in an ETL dataflow are IO bound, most are chained sequentially and very few are ever CPU bound because typically, the database can crunch data better than code. Storing the data in the database and using queries to produce combinations can be a lot faster than moving the data out to SSIS or another dataflow. – Panagiotis Kanavos Jan 18 '19 at 08:28
  • @NPCambpell to explain my "so does mine". I work for one of the biggest online travel agencies in Europe. That's where the millions of records (tickets) come from. I need to download audit records from various GDSs (the intermediaries used to find, book flights) and parse them (that's a pair of heavy IO and CPU work). Then I need to request each individual ticket from each file, each office (several thousands per office), parse those *big* XML files and finally add the records to the database. That's the *light* step. Then those are matched against invoice records, other systems for reporting – Panagiotis Kanavos Jan 18 '19 at 08:33
  • @NPCampbell the heavy lifting comes afterwards using SSIS and database views. In many cases it's *far faster* to store data in intemediate tables than create big views or load big lookup tables. Checking to see whether 1M tickets exist in another system one by one is *slow*. A 1M*1M join on indexed tables is orders of magnitude faster. – Panagiotis Kanavos Jan 18 '19 at 08:36
  • @PanagiotisKanavos I apologize, being a noob, I didn't see your comments after the "more comments" link. Our basic process is we have to read 25M very wide records from 50 tables, transform the data using business logic, verticalize the data into new narrow tables with around 5B records, verify the old vs new data using an external validation tool and then do lots of DDL to clean up. All code runs on servers that share the same network backbone as the database servers. This approach is around a 200x performance improvement vs our previous unvalidated single threaded SQL only approach. – NPCampbell Jan 25 '19 at 17:16
  • @PanagiotisKanavos To expand on the throttling issue ... when this ran as a single threaded process there was no contention on the server whatsoever and other users could connect to any database on the cluster with no issues. Our process was just slow (8 days). However, when I spam the server with 8-10k queries at a time I can bring the server to its knees. It's been 10+ years since I got my SQL DBA cert so I'm a little out of touch with current methods, Other than client side throttling side I'm not sure what is being suggested to alleviate that. – NPCampbell Jan 25 '19 at 17:33

3 Answers3

3

Why don't you marshal everything to an action block that has the actual limitation?

var count = 0;
var ab1 = new TransformBlock<int, string>(l => $"1:{l}");
var ab2 = new TransformBlock<int, string>(l => $"2:{l}");
var doPrint = new ActionBlock<string>(
    async s =>
    {
        var c = Interlocked.Increment(ref count);
        Console.WriteLine($"{c}:{s}");
        await Task.Delay(5);
        Interlocked.Decrement(ref count);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 15 });

ab1.LinkTo(doPrint);
ab2.LinkTo(doPrint);

for (var i = 100; i > 0; i--)
{
    if (i % 3 == 0) await ab1.SendAsync(i);
    if (i % 5 == 0) await ab2.SendAsync(i);
}

ab1.Complete();
ab2.Complete();

await ab1.Completion;
await ab2.Completion;
Paulo Morgado
  • 14,111
  • 3
  • 31
  • 59
  • 1
    Whoever downvoted this, why? That's actually the *correct* way of handling ETL. 30 blocked connections, even with semaphores, are far worse than *1* block pumping data as fast as it can – Panagiotis Kanavos Jan 17 '19 at 14:52
  • I provided a weak example. My I/O blocks take as an input, the output from the previous I/O block with some minor processing. The blocks aren't actually the same. Some perform lookups, some retrieve datasets and some execute stored procs. I was able to use this approach using a generic block that returned a DataSet but not for all my I/O block types. – NPCampbell Jan 17 '19 at 19:44
  • 2
    @NPCampbell, that shouldn't be a problem. The `doPrint` block might even do nothing other than limiting the number of simultaneous invocations. This is also a weak example that shows the path to the solution. – Paulo Morgado Jan 17 '19 at 21:18
  • @PauloMorgado I'm interested in your doPrint block comment. I have around 10,000 elements that run through my pipeline of 8 I/O blocks. My understanding is that a dummy doPrint block can throttle the number of elements passing that point in the pipeline but it won't throttle database activity upstream or downstream of the throttling block. Is my understanding correct? – NPCampbell Jan 18 '19 at 12:55
  • It will throttle everything downstream because only that number of operations will be executing. I meant "dumb" not as "doing nothing" but as "not doing something cleaver". – Paulo Morgado Jan 19 '19 at 23:26
0

This is the solution I ended up going with (unless I can figure out how to use a single generic DataFlow block for marshalling every type of database access):

I defined a SemaphoreSlim at the class level:

private SemaphoreSlim ThrottleDatabaseQuerySemaphore = new SemaphoreSlim(30, 30);

I modified the I/O class to call a throttling class:

    private async Task<Data> DoIOBoundWorkAsync(Data data)
    {
        if (data.TaskId <= 0)
            data.TaskId = Interlocked.Increment(ref TaskId);

        Task t = Task.Delay(data.Delay); ;
        await ThrottleDatabaseQueryAsync(t);

        return data;
    }

The throttling class: (I also have a generic version of the throttling routine because I couldn't figure out how to write one routine to handle both Task and Task<TResult>)

    private async Task ThrottleDatabaseQueryAsync(Task task)
    {
        await ThrottleDatabaseQuerySemaphore.WaitAsync();
        try
        {
            lock (MetricsLock)
            {
                AsyncCount++;
                if (AsyncCount > MaxAsyncCount)
                    MaxAsyncCount = AsyncCount;
            }

            await task;
        }
        finally
        {
            ThrottleDatabaseQuerySemaphore.Release();

            lock (MetricsLock)
                AsyncCount--;
        }
    }
}
NPCampbell
  • 51
  • 3
  • This moves the database concurrency issues you have to the client's side. It doesn't solve them. Using a "generic" data import step is easy - create an ActionBlock that accepts a record array and writes it out to a table with a SqlBulkCopy class. Create one such block per target table to keep things simple. Add a BatchBlock before each one to batch records into eg 5000 or 10000 arrays. – Panagiotis Kanavos Jan 18 '19 at 08:40
  • 1
    A more advanced version would be to use a custom block to batch records together and add them to a DTO that contains the records *and* the target table name. Pass them to an ActionBlock that uses the table name to configure a SqlBulkCopy instance and write the records out. That would give you a single db writer but the "grouping" block would have to handle different batch sizes per target table, to account for fast and slow tables. – Panagiotis Kanavos Jan 18 '19 at 08:41
  • Furthermore, DataFlow, Reactive Extensions and tasks can be composed together. Source and target blocks can be observables and observers. You could use Rx operations like `Group By` and `Buffer` to separate records by target and batch them by count *and* time – Panagiotis Kanavos Jan 18 '19 at 08:45
  • I had to throttle on the client side because my application was behaving like a denial of service attack. I currently batch my records on the input side to prevent running out of memory during processing. 80% of my performance bottlenecks are on the read side. On the write side, I may be able to save some time using SqlBulkCopy but dropping all indices on the target tables makes it pretty efficient. I'll take your advice on looking at Reactive Extensions. I bought Stephen Clearys book but haven't got there yet. – NPCampbell Jan 18 '19 at 13:57
  • `I had to throttle on the client side because my application was behaving like a denial of service attack` that's what I was talking about. This isn't a parallel processing problem. It's a data engineering problem – Panagiotis Kanavos Jan 18 '19 at 13:58
  • The `ThrottleDatabaseQueryAsync` is throttling the *awaiting* of the `task`. That's not what you want. You want to limit the number of tasks that are currently running. You should have a parameter `Func taskFactory`, not a `Task task`. – Theodor Zoulias Feb 05 '23 at 08:47
0

The simplest solution to this problem is to configure all your blocks with a limited-concurrency TaskScheduler:

TaskScheduler scheduler = new ConcurrentExclusiveSchedulerPair(
    TaskScheduler.Default, maxConcurrencyLevel: 30).ConcurrentScheduler;

ExecutionDataflowBlockOptions execOption = new()
{
    TaskScheduler = scheduler,
    MaxDegreeOfParallelism = scheduler.MaximumConcurrencyLevel,
};

TaskSchedulers can only limit the concurrency of work done on threads. They can't throttle asynchronous operations that are not running on threads. So in order to enforce the MaximumConcurrencyLevel policy, unfortunately you must pass synchronous delegates to all the Dataflow blocks. For example:

TransformBlock<Data, Data> doFirstIOWorkAsync = new(data =>
{
    return DoIOBoundWorkAsync(data).GetAwaiter().GetResult();
}, execOption);

This change will increase the demand for ThreadPool threads, so you'd better increase the number of threads that the ThreadPool creates instantly on demand to a higher value than the default Environment.ProcessorCount:

ThreadPool.SetMinThreads(100, 100); // At the start of the program

I am proposing this solution not because it is optimal, but because it is easy to implement. My understanding is that wasting some RAM on ~30 threads that are going to be blocked most of the time, won't have any measurable negative effect on the type of application that you are working with.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104