3

I have around 300 GBs of data on S3. Lets say the data look like:

## S3://Bucket/Country/Month/Day/1.csv 

S3://Countries/Germany/06/01/1.csv 
S3://Countries/Germany/06/01/2.csv 
S3://Countries/Germany/06/01/3.csv 

S3://Countries/Germany/06/02/1.csv 
S3://Countries/Germany/06/02/2.csv 

We are doing some complex aggregation on the data, and because some countries data is big and some countries data is small, the AWS EMR doesn't makes sense to use, as once the small countries are finished, the resources are being wasted, and the big countries keep running for long time. Therefore, we decided to use AWS Batch (Docker container) with Athena. One job works on one day of data per country.

Now there are roughly 1000 jobs which starts together and when they query Athena to read the data, containers failed because they reached Athena query limits.

Therefore, I would like to know what are the other possible ways to tackle this problem? Should I use Redshift cluster, load all the data there and all the containers query to Redshift cluster as they don't have query limitations. But it is expensive, and takes a lot of time to wramp up.

The other option would be to read data on EMR and use Hive or Presto on top of it to query the data, but again it will reach the query limitation.

It would be great if someone can give better options to tackle this problem.

Waqar Ahmed
  • 5,005
  • 2
  • 23
  • 45
  • 1
    I'm sure you already have thought of this, but if for some reason you haven't, make sure you partition the table on country and date. It will speed up the queries and decrease costs by a huge factor for this use case. – Theo Aug 01 '19 at 11:19
  • @Theo Actually yes, and I have already asked question about this also. Here it is: https://stackoverflow.com/questions/57287621/convert-folders-structure-to-partitions-on-s3-using-spark But didn't get any response. The problem is we are getting this data from other source, and we can't control how they are writing the data. So I need to find other way to read this data and make it as partition table. – Waqar Ahmed Aug 01 '19 at 11:22
  • @Theo It would be great, if you can give some insights on that as well. – Waqar Ahmed Aug 01 '19 at 11:23
  • I will try to answer that question, thanks for the link. I didn't see it because it wasn't tagged `amazon-athena`, I'll add that tag too. – Theo Aug 01 '19 at 11:52
  • @Thank you. Much Appreciated!. – Waqar Ahmed Aug 01 '19 at 12:19
  • what is taking time when u say for large countries? is it no of files to be scanned? How are files being dumped to s3? is it possible for u to dump them in ORC format? Because 1GB csv file will become 100MB when it comes to ORC. Added with Athena or EMR: u can take advantage of projection pushdown which will further speed the queries. – chendu Jun 17 '21 at 08:22

4 Answers4

3

As I understand, you simply send query to AWS Athena service and after all aggregation steps finish you simply retrieve resulting csv file from S3 bucket where Athena saves results, so you end up with 1000 files (one for each job). But the problem is number of concurrent Athena queries and not the total execution time.

Have you considered using Apache Airflow for orchestrating and scheduling your queries. I see airflow as an alternative to a combination of Lambda and Step Functions, but it is totally free. It is easy to setup on both local and remote machines, has reach CLI and GUI for task monitoring, abstracts away all scheduling and retrying logic. Airflow even has hooks to interact with AWS services. Hell, it even has a dedicated operator for sending queries to Athena, so sending queries is as easy as:

from airflow.models import DAG
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from datetime import datetime

with DAG(dag_id='simple_athena_query',
         schedule_interval=None,
         start_date=datetime(2019, 5, 21)) as dag:

    run_query = AWSAthenaOperator(
        task_id='run_query',
        query='SELECT * FROM  UNNEST(SEQUENCE(0, 100))',
        output_location='s3://my-bucket/my-path/',
        database='my_database'
    )

I use it for similar type of daily/weekly tasks (processing data with CTAS statements) which exceed limitation on a number of concurrent queries.

There are plenty blog posts and documentation that can help you get started. For example:

  • Medium post: Automate executing AWS Athena queries and moving the results around S3 with Airflow.
  • Complete guide to installation of Airflow, link 1 and link 2

You can even setup integration with Slack for sending notification when you queries terminate either in success or fail state.

However, the main drawback I am facing is that only 4-5 queries are getting actually executed at the same time, whereas all others just idling.

Ilya Kisil
  • 2,490
  • 2
  • 17
  • 31
2

One solution would be to not launch all jobs at the same time, but pace them to stay within the concurrency limits. I don't know if this is easy or hard with the tools you're using, but it's never going to work out well if you throw all the queries at Athena at the same time. Edit: it looks like you should be able to throttle jobs in Batch, see AWS batch - how to limit number of concurrent jobs (by default Athena allows 25 concurrent queries, so try 20 concurrent jobs to have a safety margin – but also add retry logic to the code that launches the job).

Another option would be to not do it as separate queries, but try to bake everything together into fewer, or even a single query – either by grouping on country and date, or by generating all queries and gluing them together with UNION ALL. If this is possible or not is hard to say without knowing more about the data and the query, though. You'll likely have to post-process the result anyway, and if you just sort by something meaningful it wouldn't be very hard to split the result into the necessary pieces after the query has run.

Using Redshift is probably not the solution, since it sounds like you're doing this only once per day, and you wouldn't use the cluster very much. It would Athena is a much better choice, you just have to handle the limits better.

With my limited understanding of your use case I think using Lambda and Step Functions would be a better way to go than Batch. With Step Functions you'd have one function that starts N number of queries (where N is equal to your concurrency limit, 25 if you haven't asked for it to be raised), and then a poll loop (check the examples for how to do this) that checks queries that have completed, and starts new queries to keep the number of running queries at the max. When all queries are run a final function can trigger whatever workflow you need to run after everything is done (or you can run that after each query).

The benefit of Lambda and Step Functions is that you don't pay for idle resources. With Batch, you will pay for resources that do nothing but wait for Athena to complete. Since Athena, in contrast to Redshift for example, has an asynchronous API you can run a Lambda function for 100ms to start queries, then 100ms every few seconds (or minutes) to check if any have completed, and then another 100ms or so to finish up. It's almost guaranteed to be less than the Lambda free tier.

Theo
  • 131,503
  • 21
  • 160
  • 205
  • Thank you for the answer. It makes sense to limit the concurrent jobs in Batch. We are not doing this processing everyday, but once a week for all the 7 days. Regarding the grouping on country, it will not work much as well as data size can change every week for each country, so it will be manual work to find the small countries and find a strategy to merge those countries. – Waqar Ahmed Aug 01 '19 at 11:14
  • Regarding the Lambda and Step Functions, I haven't looked into that, but I will try them as well. thank you pointing that option. – Waqar Ahmed Aug 01 '19 at 11:14
  • Using Step Functions will be a little more work than Batch, since you have to split up the process into pieces, and describe the workflow, and so on, but if you get it working you'll have a much more powerful solution than Batch, and it will be free to run (except for the Athena queries, but they are the same in either solution). – Theo Aug 01 '19 at 11:18
  • Yeah, and that solution will take time to build. If I use a redshift cluster or EMR cluster with Hive, will it better to overcome that Athena limit? As limiting the concurrent jobs will slow down the process alot. – Waqar Ahmed Aug 01 '19 at 11:20
  • Athena's limit is 20 concurrent queries https://docs.aws.amazon.com/athena/latest/ug/service-limits.html – rey don sancho Dec 20 '20 at 11:52
2

As I know Redshift Spectrum and Athena cost same. You should not compare Redshift to Athena, they have different purpose. But first of all I would think about addressing you data skew issue. Since you mentioned AWS EMR I assume you use Spark. To deal with large and small partitions you need to repartition you dataset by months, or some other equally distributed value.Or you can use month and country for grouping. You got the idea.

gorros
  • 1,411
  • 1
  • 18
  • 29
  • Also consider the cost of running the Redshift cluster when comparing costs between Redshift Spectrum and Athena. They may cost the same per queried byte, but Redshift requires a cluster, and the amount of compute resources the Redshift Spectrum query gets is proportional to the size (and cost) of the cluster. – Theo Aug 01 '19 at 12:16
  • Yes,but usually I keep cluster smaller ,just for the data that is needed for repeated queries. But anyway the main issue is unequal partitioning. – gorros Aug 01 '19 at 12:18
  • I agree that there is probably a more efficient way to solve the OP's problem, running lots of small queries is rarely efficient. Given the information at hand it's hard to make good suggestions, though. It seems like they are doing most of the processing after the Athena queries, which means that just repartitioning to remove country from the partitioning would probably mean having to rewrite everything else (but that might be what would be required to get better performance). – Theo Aug 01 '19 at 12:22
  • I need to do the processing per each country therefore, I can't merge multiple countries data together. – Waqar Ahmed Aug 01 '19 at 12:25
  • It is hard to understand without code sample. Anyway if you group by country this will result in large partitions. Do you have joins? – gorros Aug 01 '19 at 12:50
  • Yes, I do have joins in my Spark App but before starting the Batch processing, I write the data back on S3 partitioned by country. And then docker containers query that data per country per day via Athena. – Waqar Ahmed Aug 01 '19 at 12:59
  • Then you can use `broadcast` for small dataframes so you do not need to shuffle data by country, – gorros Aug 01 '19 at 13:07
0

You can use redshift spectrum for this purpose. Yes, it is a bit costly but it is scalable and very good for performing complex aggregations.

Parul Singh
  • 363
  • 3
  • 11
  • complex aggregations are not related to the queries. We are doing that programatically. We are using Athena only to read the data. And Finding other ways to read data from S3, because of Athena query limitations. – Waqar Ahmed Aug 01 '19 at 10:43
  • If most of the complexity is not in the Athena queries, some of what I suggested in my answer might be incorrect. I assumed that the queries were complex and that the jobs running in Batch were pretty basic. If you are using Athena mostly to read the data it might make more sense to not use Athena at all, but download the CSVs to the code running in Batch? Also look at S3 Select if that could help. – Theo Aug 01 '19 at 12:18