Questions on Lambda to delete partitions.
- The existing query which uses parallelization is failing since it exceeds the number of parallel queries. We want to replace it with sequential queries and increased timeout for lambda.
- Can we change the lambda to parallel with limited threads?
Database -> aws athena = Getting the List of clients from Athena. Looping throgh it.
Right now it works fine with sequential calls as well but since the number of clients is small now, it would pose a problem for future. The only issue with limited parallel threads is that we need some code to handle the thread count as well.
Then someone suggested me use this: https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync-part-2/
https://gist.github.com/0xced/94f6c50d620e582e19913742dbd76eb6
public class AthenaClient {
private readonly IAmazonAthena _client;
private readonly string _databaseName;
private readonly string _outputLocation;
private readonly string _tableName;
const int MaxQueryLength = 262144;
readonly int _maxclientsToBeProcessed;
public AthenaClient(string databaseName, string tableName, string outputLocation, int maxclientsToBeProcessed) {
_databaseName = databaseName;
_tableName = tableName;
_outputLocation = outputLocation;
_maxclientsToBeProcessed = maxclientsToBeProcessed == 0 ? 1 : maxclientsToBeProcessed;
_client = new AmazonAthenaClient();
}
public async Task < bool > DeletePartitions() {
var clients = await GetClients();
for (int i = 0; i < clients.Count; i = i + _maxclientsToBeProcessed) {
var clientItems = clients.Skip(i).Take(_maxclientsToBeProcessed);
var queryBuilder = new StringBuilder();
queryBuilder.AppendLine($ "ALTER TABLE { _databaseName }.{_tableName} DROP IF EXISTS");
foreach(var client in clientItems) {
queryBuilder.AppendLine($ " PARTITION (client_id = '{client}'), ");
}
var query = queryBuilder.ToString().Trim().TrimEnd(',') + ";";
LambdaLogger.Log(query);
if (query.Length >= MaxQueryLength) {
throw new Exception("Delete partition query length exceeded.");
}
var queryExecutionId = StartQueryExecution(query).Result;
await CheckQueryExecutionStatus(queryExecutionId);
}
return true;
}
}