-4

Questions on Lambda to delete partitions.

  1. The existing query which uses parallelization is failing since it exceeds the number of parallel queries. We want to replace it with sequential queries and increased timeout for lambda.
  2. Can we change the lambda to parallel with limited threads?

Database -> aws athena = Getting the List of clients from Athena. Looping throgh it.

Right now it works fine with sequential calls as well but since the number of clients is small now, it would pose a problem for future. The only issue with limited parallel threads is that we need some code to handle the thread count as well.

Then someone suggested me use this: https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync-part-2/

https://gist.github.com/0xced/94f6c50d620e582e19913742dbd76eb6

public class AthenaClient {
    private readonly IAmazonAthena _client;
    private readonly string _databaseName;
    private readonly string _outputLocation;
    private readonly string _tableName;
    const int MaxQueryLength = 262144;
    readonly int _maxclientsToBeProcessed;
    
    public AthenaClient(string databaseName, string tableName, string outputLocation, int maxclientsToBeProcessed) {
        _databaseName = databaseName;
        _tableName = tableName;
        _outputLocation = outputLocation;
        _maxclientsToBeProcessed = maxclientsToBeProcessed == 0 ? 1 : maxclientsToBeProcessed;
        _client = new AmazonAthenaClient();
    }
    
    public async Task < bool > DeletePartitions() {
        var clients = await GetClients();
        for (int i = 0; i < clients.Count; i = i + _maxclientsToBeProcessed) {
            var clientItems = clients.Skip(i).Take(_maxclientsToBeProcessed);
            var queryBuilder = new StringBuilder();
            queryBuilder.AppendLine($ "ALTER TABLE { _databaseName }.{_tableName} DROP IF EXISTS");
            foreach(var client in clientItems) {
                queryBuilder.AppendLine($ " PARTITION (client_id = '{client}'), ");
            }
            var query = queryBuilder.ToString().Trim().TrimEnd(',') + ";";
            LambdaLogger.Log(query);
            if (query.Length >= MaxQueryLength) {
                throw new Exception("Delete partition query length exceeded.");
            }
            var queryExecutionId = StartQueryExecution(query).Result;
            await CheckQueryExecutionStatus(queryExecutionId);
        }
        return true;
    }
}
General Grievance
  • 4,555
  • 31
  • 31
  • 45
Sherlock
  • 499
  • 1
  • 7
  • 18
  • 1
    What is the *actual* question? Lambdas or `ForEachAsync` don't seem to have anything to do with the code. Table partitioning in SQL Server doesn't affect how many queries can be performed concurrently. Query parallelization, ie using parallel processing while running the query, is available in Enterprise editions and something the query optimizer based on statistics and indexes, not the existence of partitions. – Panagiotis Kanavos Jun 10 '19 at 08:06
  • 2
    On the other hand, DDL, storage manipulation and administrative tasks in general need to be performed in a controlled manner. If you want to create some tables, create a SQL script with all the commands you need and send that to the server in a *single*, preferably asynchronous call. The script itself should take care of error handling – Panagiotis Kanavos Jun 10 '19 at 08:11
  • Getting clients from athena. GetClients() method does it. Then i am looping those Clients in for loop. But my process taking bit long time to execute the query for delete partitions. So If i use foreachAsync then multi thread execution would be handled – Sherlock Jun 10 '19 at 08:12
  • `Athena` is a *very* common project name, even inside Microsoft. The first project I remember that used that name was OData. What are you referring to? In any case, what you as is asynchronous execution *not* multi-threading. You don't need `ForEachAsync`, you can use an `await` inside a `foreach` loop. – Panagiotis Kanavos Jun 10 '19 at 08:16
  • You can implement Parallel.ForEach as well like https://stackoverflow.com/questions/12251874/how-can-i-convert-this-foreach-code-to-parallel-foreach – Mansinh Jun 10 '19 at 08:16
  • Correct! But it's taking too long time for execution of queries. – Sherlock Jun 10 '19 at 08:17
  • If your actual question is `How can I perform X jobs using N workers concurrently?` the answer would be to use [TPL Dataflow's ActionBlock](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/how-to-perform-action-when-a-dataflow-block-receives-data) to create a block with an input queue and 1 or more worker tasks to process them – Panagiotis Kanavos Jun 10 '19 at 08:18
  • @Mansinh the question is about asynchronous execution, not parallelism. The linked example is *not* a good example for `Parallel.ForEach` anyway - PLINQ/Parallel are meant for data parallelism and there's no data to process in that example. – Panagiotis Kanavos Jun 10 '19 at 08:18
  • @Mansinh `Parallel.ForEach` is not suited to the async and await pattern anyway – TheGeneral Jun 10 '19 at 08:19
  • Ok , so they can achieve something like Task.Factory.StartNew( () => ForEach(items, item => DoSomething(item))); – Mansinh Jun 10 '19 at 08:23
  • @Sherlyn you should explain which database/service you target in the question and tags. Without it, people can only give a generic answer. What you asked is a bad case of an [XY Problem](https://meta.stackexchange.com/questions/66377/what-is-the-xy-problem) . You have a scaling issue with X (AWS Athena perhaps?) think Y is the solution (manual partition modifications) and ask about **Z**, how to execute commands concurrently. An ActionBlock is a good solution for Z, but probably not the answer to X – Panagiotis Kanavos Jun 10 '19 at 08:25
  • @Mansinh data parallelism isn't concurrency, which in turn isn't asynchronous processing. What do you the last you posted would do? Execute everything in a single background thread. There's a reason .NET has PLINQ *and* Tasks *and* Dataflow *and* Rx. – Panagiotis Kanavos Jun 10 '19 at 08:27
  • @PanagiotisKanavos Please read my question once again. – Sherlock Jun 10 '19 at 08:27
  • @Sherlyn I've already guessed it was AWS Athena but the question is still unclear - you don't need `ForEachAsync` at all. Perhaps there's another way to improve scalability in AWS Athena (one already described by AWS docs), or you're looking for a way to perform several actions concurrently. *Async* looping won't take any *less* time than sync looping. Concurrently processing each client *will* run faster – Panagiotis Kanavos Jun 10 '19 at 08:32
  • @PanagiotisKanavos: Thanks for the correction and valuable information – Mansinh Jun 10 '19 at 08:32
  • @PanagiotisKanavos So You mean to say, I don't have to change anything my code for faster execution of queries? The reason i choose the *foreachasync* is for faster execution – Sherlock Jun 10 '19 at 08:36

1 Answers1

4

It seems that the actual question should be :

How can I change the database partitions for lots of clients in AWS Athena without executing them sequentially?

The answer isn't ForEachAsync or the upcoming await foreach in C# 8. An asynchronous loop would still send calls to the service one at a time, it "just" wouldn't block while waiting for an answer.

Concurrent workers

This is a concurrent worker problem that can be handled using eg the TPL Dataflow library's ActionBlock class or the new System.Threading.Channel classes.

The Dataflow library is meant to create event/message processing pipelines similar to a shell script pipeline, by moving data between independent blocks. Each block runs on its own task/thread which means you can get concurrent execution simply by breaking up processing into blocks.

It's also possible to increase the number of processing tasks per block, by specifying the MaxDegreeOfParallelism option when creating the block. This allows us to quickly create "workers" that can work on lots of messages concurrently.

Example

In this case, the "message" is the Client whatever that is. A single ActionBlock could create the DDL statement and execute it. Each block has an input queue which means we can just post messages to a block and await for it to execute everything using the DOP we specified.

We can also specify a limit to the queue so it won't get flooded if the worker tasks can't run fast enough :

var options=new ExecutionDataflowBlockOptions
     {
        MaxDegreeOfParallelism = _maxclientsToBeProcessed,
        BoundedCapacity = _maxclientsToBeProcessed*3, //Just a guess
     });
var block=new ActionBlock<Client>(client=>CreateAndRunDDL(client));

//Post the client requests
foreach(var client in clients)
{
    await block.SendAsync(client);
}

//Tell the block we're done
block.Complete();
//Await for all queued messages to finish processing
await block.Completion;

The CreateAndRunDDL(Client) method should do what the code inside the question's loop does. A good idea would be to refactor it though, and create separate functions to create and execute the query , eg :

async Task CreateAndRunDDL(Client client)
{
    var query = QueryForClient(...);
    LambdaLogger.Log(query);
    if (query.Length >= MaxQueryLength) {
        throw new Exception("Delete partition query length exceeded.");
    }
    var queryExecutionId = await StartQueryExecution(query);
    await CheckQueryExecutionStatus(queryExecutionId);
}

Blocks can be linked too. If we wanted to batch multiple clients together for processing, we can use a BatchBlock and feed its results to our action block, eg :

var batchClients = new BatchBlock<Client>(20);
var linkOptions = new DataflowLinkOptions
                  { 
                      PropagateCompletion = true
                  };

var block=new ActionBlock<Client>(clients=>CreateAndRunDDL(clients));
batchClients.LinkTo(block,linkOptions);

This time the CreateAndRunDDL method accepts a Client[] array with the number of clients/messages we specified in the batch size.

async Task CreateAndRunDDL(Client[] clients)
{
    var query = QueryForClients(clients);
    ...
}

Messages should be posted to the batchClients block now. Once that completes, we need to wait for the last block in the pipeline to complete :

foreach(var client in clients)
{
    await batchClients.SendAsync(client);
}

//Tell the *batch block* we're done
batchClient.Complete();
//Await for all queued messages to finish processing
await block.Completion;
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236