0

I wrote the code that reads bulks of data from Redis and writes those bulks to SQL Server using async await in C#.

I have written the following code in my TeamController class using async/await to make reading from REDIS in parallel with writing to SQL Server:

[Route("api/[controller]")]
[ApiController]
public class TeamController : ControllerBase
{
    private ICacheManager cacheManager;
    private IDBManager dbManager;
    private IDomainDataConverter _domainDataConverter;

    public TeamController(ICacheManager cacheManager,
                          IDBManager dbManager,
                          IDomainDataConverter domainDataConverter)
    {            
        this.cacheManager = cacheManager;
        this.dbManager = dbManager;
        this._domainDataConverter = domainDataConverter;
    } 
        
    [HttpPost, Route("SaveDataParallel")]
    public async Task<IActionResult> SaveDataParallel(int parallelDegree, int totalCount)
    {  
        int chunkeSize = totalCount / parallelDegree;
        int remainder = totalCount - chunkeSize * parallelDegree;

        System.Diagnostics.Stopwatch st = new System.Diagnostics.Stopwatch();
        st.Start();

        try
        {
            var tasks = new List<Task>();

            for (int i = 0; i < parallelDegree; i++)
            {
                tasks.Add(SaveChunkAsync(i, chunkeSize, parallelDegree, remainder));
            }

            await Task.WhenAll(tasks);

            st.Stop();
        }
        catch 
        {
        }

        return Ok(st.ElapsedMilliseconds);
    }

    [HttpPost, Route("SaveDataSimple")]
    public IActionResult SaveDataWithSimple(int parallelDegree,  int totalCount)
    {
        int chunkeSize = totalCount / parallelDegree;
        int remainder = totalCount - chunkeSize * parallelDegree;

        System.Diagnostics.Stopwatch st = new System.Diagnostics.Stopwatch();
        st.Start();

        try
        {
            for (int i = 0; i < parallelDegree; i++)
            {
                SaveChunk(i, chunkeSize, parallelDegree, remainder);                   
            }

            st.Stop();
        }
        catch (Exception ex)
        {
        }

        return Ok(st.ElapsedMilliseconds);
    }

    private async Task SaveChunkAsync(int i, int pageSize, int parallelDegree, int remainder)
    {
        var data = cacheManager.ReadDataAsync<TeamDto>(i * pageSize, (i == parallelDegree - 1 ? remainder : 0) + pageSize);
        var arr = _domainDataConverter.Convert<Team, TeamDto>(data.Result);
        await dbManager.BulkInsertAsync(arr);
    }

    private void SaveChunk(int i, int pageSize, int parallelDegree, int remainder)
    {
        var data = cacheManager.ReadData<TeamDto>(i * pageSize, (i == parallelDegree - 1 ? remainder : 0) + pageSize);
        var arr = _domainDataConverter.Convert<Team, TeamDto>(data);
        dbManager.BulkInsert(arr);
    }
}

I'm comparing the performance of the two methods: SaveDataParallel and SaveDataWithSimple. Unfortunately I don't seem to see a significant difference between the two method calls in terms of performance.

If

n  = total number of read and writes
td = time required to save chunk of data to SQL,
tr = time required to read chunk of data from Redis,
tparallel = total time for SaveDataParallel,
tsimple = total time for SaveDataWithSimple,

I expect tparallel to be as follows:

tparallel = (max(td, tr) * n) / 2 + tr

and tsimple as follows:

tsimple = (max(td, tr)) * n

but the result is different and the two tsimple and tparallel values don't appear to have a significant difference.

Does anybody have any idea why? Am I expecting the right thing? Or there is something wrong with the code?

I would appreciate any ideas or guidance.

  • 1
    What are the timings you get? I would put some logging before and after time-consuming operations with timestamp and `i` in the message to see the timeline. Also why are you calling `cacheManager.ReadDataAsync` without `await` and then get `data.Result`? – Mike Mozhaev Jan 14 '23 at 19:28
  • 1
    Don't use `.Result`. Await that call. Also, you may want to use the `%` operator to calculate remainder. Now to your question: what is your expectation based on? – Fildor Jan 14 '23 at 19:29
  • All of your insert calls go to same table. I'd really expect collisions there, making the concurrency nearly senseless. You would probably be better off inserting chunks sequentially but pipeline data transformation. – Fildor Jan 14 '23 at 19:33
  • You are right @Mike Mozhaev and @Fildor I should change it to ```await cacheManager.ReadDataAsync```. Thanks – Shahrzad Abedi Jan 15 '23 at 09:00
  • Cache an Db managers implementations would help the community also. – Guru Stron Jan 15 '23 at 10:51

1 Answers1

0

The first part of async methods is executed synchronously. And await sometimes can complete synchronously if the called method returned a completed task (see “Hot path” optimization section https://devblogs.microsoft.com/premier-developer/dissecting-the-async-methods-in-c/).

So e.g. it might be that cacheManager.ReadDataAsync did its job quickly and returned a completed task instead of going really asynchronously. Then _domainDataConverter.Convert will also run synchronously. And the for (int i = 0; i < parallelDegree; i++) loop will take all the time of ReadDataAsync + Convert. Yes, BulkInsertAsync will run in parallel, but you could do better if you made the Convert also running in parallel.

Something like:

private async Task SaveChunkAsync(int i, int pageSize,
    int parallelDegree, int remainder)
{
    var data = await cacheManager.ReadDataAsync<TeamDto>(i * pageSize,
        (i == parallelDegree - 1 ? remainder : 0) + pageSize);
    var arr = await _domainDataConverter.ConvertAsync<Team, TeamDto>(data);
    await dbManager.BulkInsertAsync(arr);
}

Also you are dividing all the data at once into parts according to the number of parallel threads. I suggest dividing it into smaller parts of fixed (maybe configurable) length and then process them either with Parallel.ForEach, in which case I would use SaveCunk or maybe Parallel.ForEachAsync with SaveChunkAsync, although I haven't tried this. See https://www.hanselman.com/blog/parallelforeachasync-in-net-6 for more details.

What to choose depends on whether you have some work for CPU and want to fully use it or if you are mostly waiting for something. In the former case multithreading approach might give you the required control to load all the CPU cores. In the latter case asynchronous approach might be better.

Then you can play with different values of batch size and number of parallel workers to find the optimal combination. When I was doing parallel bulk insert I used Parallel.ForEach and went with 4 threads. Increasing this limit didn't give any noticeable benefits.

Testing different implementations

You can run the following program to see the possible timelines of SaveChunk method depending on the implementation details. I prepared simulations of ReadData, Convert and BulkInsert so that they take 40ms, 200ms and 40ms correspondingly, to simulate tr=24, td=4 situation. And the parallelDegree=10. The first variant is synchronous and takes about 2800ms, the second one is partially async and takes about 2440ms, and the third one is async and takes about 560ms.

using System.Diagnostics;

namespace SaveChunks
{
    internal class Program
    {
        static async Task Main(string[] args)
        {
            int parallelDegree = 10;

            Console.WriteLine("Simple:");
            var target = new DataSaver();
            target.SaveDataSimple(parallelDegree);

            Console.WriteLine("Partially Async:");
            await target.SaveDataPartiallyAsync(parallelDegree);

            Console.WriteLine("Async:");
            await target.SaveDataAsync(parallelDegree);
        }
    }

    internal class DataSaver
    {
        private Stopwatch _timer = new Stopwatch();

        public void SaveDataSimple(int parallelDegree)
        {
            _timer.Restart();

            for (int i = 0; i < parallelDegree; i++)
                SaveChunk(i);

            _timer.Stop();
        }

        public async Task SaveDataPartiallyAsync(int parallelDegree)
        {
            _timer.Restart();

            var tasks = new List<Task>();

            for (int i = 0; i < parallelDegree; i++)
                tasks.Add(SaveChunkPartiallyAsync(i));

            await Task.WhenAll(tasks);

            _timer.Stop();
        }

        public async Task SaveDataAsync(int parallelDegree)
        {
            _timer.Restart();

            var tasks = new List<Task>();

            for (int i = 0; i < parallelDegree; i++)
                tasks.Add(SaveChunkAsync(i));

            await Task.WhenAll(tasks);

            _timer.Stop();
        }

        private void SaveChunk(int i)
        {
            var data = ReadData(i);
            var converted = Convert(i, data);
            BulkInsert(i, converted);
        }

        private async Task SaveChunkPartiallyAsync(int i)
        {
            var data = await ReadDataPartiallyAsync(i);
            var converted = Convert(i, data);
            await BulkInsertAsync(i, converted);
        }

        private async Task SaveChunkAsync(int i)
        {
            var data = await ReadDataAsync(i);
            var converted = await ConvertAsync(i, data);
            await BulkInsertAsync(i, converted);
        }

        // Synchronous implementation

        private int ReadData(int i)
        {
            Log(i, "ReadData start");
            Pause(40);
            Log(i, "ReadData end");
            return 0;
        }

        private int Convert(int i, int data)
        {
            Log(i, "Convert start");
            Pause(200);
            Log(i, "Convert end");
            return 0;
        }

        private void BulkInsert(int i, int data)
        {
            Log(i, "BulkInsert start");
            Pause(40);
            Log(i, "BulkInsert end");
        }

        // Partially-asynchronous implementation

        private Task<int> ReadDataPartiallyAsync(int i)
        {
            return Task.FromResult(ReadData(i));
        }

        private Task<int> ConvertAsync(int i, int data)
        {
            return Task.Run(() => Convert(i, data));
        }

        private Task BulkInsertAsync(int i, int data)
        {
            return Task.Run(() => BulkInsert(i, data));
        }

        // Asyncronous implementation

        private Task<int> ReadDataAsync(int i)
        {
            return Task.Run(() => ReadData(i));
        }

        // Helper methods

        private void Pause(int ms)
        {
            var start = _timer.ElapsedMilliseconds;
            var sum = 0;
            while (_timer.ElapsedMilliseconds - start < ms)
                sum += sum * sum;
        }

        private void Log(int i, string message)
        {
            char c = message[0];
            Console.WriteLine($"{_timer.ElapsedMilliseconds,5} {new string(' ', i * 2)}{c}");
        }

        private void Log2(int i, string message)
        {
            Console.WriteLine($"{_timer.ElapsedMilliseconds}\t{i}\t{message}");
        }
    }
}

The log shows the timeline of all 3 variants. As you can see in the second variant we are not running more than 2 tasks in parallel. But in the third one we are much more parallel.

Simple:
    1 R
   43 R
   43 C
  243 C
  243 B
  283 B
  283   R
  323   R
  323   C
  523   C
  523   B
  563   B
  563     R
  603     R
  603     C
  803     C
  803     B
  843     B
  843       R
  883       R
  883       C
 1083       C
 1083       B
 1123       B
 1123         R
 1163         R
 1163         C
 1363         C
 1363         B
 1403         B
 1403           R
 1443           R
 1443           C
 1643           C
 1643           B
 1683           B
 1683             R
 1723             R
 1723             C
 1923             C
 1923             B
 1963             B
 1963               R
 2003               R
 2003               C
 2203               C
 2203               B
 2243               B
 2243                 R
 2283                 R
 2283                 C
 2483                 C
 2483                 B
 2523                 B
 2523                   R
 2563                   R
 2563                   C
 2763                   C
 2763                   B
 2803                   B
Partially Async:
    0 R
   40 R
   40 C
  240 C
  244   R
  244 B
  284 B
  284   R
  284   C
  484   C
  484     R
  484   B
  524   B
  524     R
  524     C
  724     C
  724       R
  724     B
  764       R
  764       C
  764     B
  964       C
  964         R
  964       B
 1004         R
 1004         C
 1004       B
 1204         C
 1204           R
 1204         B
 1244           R
 1244           C
 1244         B
 1444           C
 1444             R
 1444           B
 1484             R
 1484             C
 1484           B
 1684             C
 1684             B
 1684               R
 1724               R
 1724               C
 1724             B
 1924               C
 1924                 R
 1924               B
 1964                 R
 1964                 C
 1964               B
 2164                 C
 2164                   R
 2164                 B
 2204                   R
 2204                   C
 2204                 B
 2404                   C
 2404                   B
 2444                   B
Async:
    1 R
    2   R
    2       R
    2         R
    2           R
    3     R
   27             R
   34                   R
   41 R
   41 C
   42       R
   42         R
   42       C
   42         C
   43     R
   43     C
   43           R
   43           C
   44   R
   45   C
   72             R
   72             C
   79                   R
   82                   C
  241 C
  241 B
  242         C
  242       C
  242         B
  242       B
  243     C
  243     B
  245   C
  245   B
  251           C
  251           B
  272             C
  272             B
  281 B
  281               R
  282                   C
  282       B
  282                   B
  282                 R
  283     B
  283         B
  285   B
  294           B
  312             B
  321               R
  321               C
  322                   B
  322                 R
  322                 C
  521               C
  521               B
  522                 C
  522                 B
  561               B
  562                 B
Mike Mozhaev
  • 2,367
  • 14
  • 13
  • I'd really expect the db to be the bottleneck here. But that would need deeper inspection and monitoring to confirm. – Fildor Jan 14 '23 at 19:34
  • It might be as well. But here are obvious things to check in the user code. – Mike Mozhaev Jan 14 '23 at 19:36
  • Like the call of `.Result`. Yes, definitely. – Fildor Jan 14 '23 at 19:37
  • I assume that when the first iteration of the loop happens ```BulkInsertAsync``` waits for the results of ```ReadDataAsync ``` but after ```BulkInsertAsync``` starts executing , the next interation of the loop begins so that the first ```BulkInsertAsync``` will happen in parallel with the second ```ReadDataAsync ``` and so on and this will become a pipeline. That's why I wrote ```tparallel = (max(td, tr) * n) / 2 + tr``` @Mike Mozhaev – Shahrzad Abedi Jan 15 '23 at 08:57
  • Regarding parallel.ForEach I have tried it before and the problem is it does not happen to insert anything to SQL Server DB without producing any exceptions which sounds strange to me. @MikeMozhaev – Shahrzad Abedi Jan 15 '23 at 11:00
  • Please add `Log.Debug($"{DateTime.Now}: {i} begin")`, `Log.Debug($"{DateTime.Now}: {i} after ReadData")`, etc. to `SaveChunk` and `SaveChunkAsync` and collect the logs. Then you'll be able to understand what goes on more clearly. – Mike Mozhaev Jan 15 '23 at 11:24
  • Finally I using logging and found out the reason there isn't a significant difference between the two method calls. The reason is ```tr``` and ```td``` were significantly different ```td=4``` and ```tr=24``` but I had assumed that ```tr``` and ```td``` must have values close to each other like ```td=22``` and ```tr=24```. Therefore using ```async/await``` here has value but not very significant. Thanks for your help @MikeMozhaev – Shahrzad Abedi Jan 15 '23 at 13:50
  • And by the way I changed```_domainDataConverter.Convert``` with async/await and it didn't make any difference since this operation is synchronous by nature and is not time consuming. @MikeMozhaev – Shahrzad Abedi Jan 15 '23 at 13:51