1

I'm currently creating some new tables from information stored in Amazon S3. First time using AWS, today I learn that Amazon Athena can't create more than 100 partitions from a CTAS query.

I'm doing the transformations using sql, it works perfectly, but need a way to store more than 100 partitions at once to make the process more reliable.

I'm setting the partition on date, so in 4 months my process is going to fail if I need to recreate the table to load a large amount of data via sql (where I have the transformations).

Any idea of how can I achieve this?

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
Alejandro
  • 519
  • 1
  • 6
  • 32

3 Answers3

1

Let's say you want to processes 4 month of data with CTAS queries, but you need to partition it by day. If you do it in a single CTAS query you will end up with roughly 4 x 30 = 120 partitions, thus, query will fail as you mention due to AWS limitations.

Instead, you can process you data for each month at a time so you will be guarantied to have less then 31 partition at a time. However, the result of each CTAS query should have a unique external location on S3, i.e. if you want to store results of multiple CTAS queries under s3://bukcet-name/data-root you would need to extend this path for each query in external_location under WITH clause. The obvious choice for your case would be full date, for example:

s3://bukcet-name/data-root
├──2019-01            <-- external_location='s3://bukcet-name/data-root/2019-01'
|   └── month=01
|       ├── day=01
|       |   ...
|       └── day=31
├──2019-02            <-- external_location='s3://bukcet-name/data-root/2019-02'
|   └── month=02
|       ├── day=01
|       |   ...
|       └── day=28
...

However, now you ended up with 4 different tables. So you either need to query different tables, or you have to do some postprocessing. Essentially, you would have two options

  1. Move all new files into a common place with AWS CLI high-level commands which should be followed by MSCK REPAIR TABLE since output "directory" structure adheres HIVE partitioning naming convention. For example from

    s3://bukcet-name/data-staging-area
    ├──2019-01        <-- external_location='s3://bukcet-name/data-staging-area/2019-01'
    |   └── month=01
    |       ├── day=01
    |       |   ...
    

    your would copy into

    s3://bukcet-name/data-root
    ├── month=01
    |  ├── day=01
    |  |   ...
    |  └── day=31
    ├── month=02
    |   ├── day=01
    |   |   ...
    |   └── day=28
    
  2. Manipulate with AWS Glue Data Catalog. This is a little bit more trickier, but the main idea is that you define a root table with location pointing to s3://bukcet-name/data-root. Then after executing CTAS query, you would need to copy meta-information about partitions from created "staging" table into the root table. This step would be based on AWS Glue API via for example boto3 library for Python. In particular, you would use get_partitions() and batch_create_partition() methods.

Regardless which approach you choose, you would need use some sort of job scheduling software, especially since your data is not just historical. I would suggest to use Apache Airflow for that. It can be seen as an alternative to a combination of Lambda and Step Functions, it is totally free. There are plenty blog posts and documentation that can help you get started. For example:

  • Medium post: Automate executing AWS Athena queries and moving the results around S3 with Airflow.
  • Complete guide to installation of Airflow, link 1 and link 2

You can even setup integration with Slack for sending notification when you queries terminate either in success or fail state.

Things to keep in mind:

In general, you don't have explicit control of how many files will be created as a result of CTAS query, since Athena is a distributed system. On the other hand, you don't want to have a lot of small files. So can try this to use "this workaround" which uses bucketed_by and bucket_count fields within WITH clause

CREATE TABLE new_table
WITH (
    ...
    bucketed_by=ARRAY['some_column_from_select'],
    bucket_count=1
) AS (
    -- Here goes your normal query 
    SELECT 
        *
    FROM 
        old_table;
)

Alternatively, reduce number of partitions, i.e. stop at month level.

Ilya Kisil
  • 2,490
  • 2
  • 17
  • 31
  • Sound interesting, I'm trying to avoid using more external tools because I'm adding more fault points. Now I have NIFI for Extraction and do transformations in Athena, it gets trickier to monitor the pipeline for errorss. Thanks! – Alejandro Oct 25 '19 at 11:50
  • I understand) Airflow provides nice UI for monitoring and reseting failed tasks, which is a huge benefit. It even has a [dedicated operator](https://airflow.apache.org/_api/airflow/contrib/operators/aws_athena_operator/index.html) for sending queries to Athena and so as for operation on S3). I started to use it when my ETL jobs were startgin had to monitor) – Ilya Kisil Oct 25 '19 at 11:55
1

The best option would be to write a Glue ETL (spark) job for this task and use spark sql to perform the required transformations. That way you still get to use your existing sql queries.

Then you can write the processed output back to some S3 path. Spark allows you create as many partitions as you want. Also it allows to append the newly processed data to already processed data, there by allowing you to load and transform only the new data.

Once the ETL is done, create an external table pointing to the above used S3 path and required partitions. This will be one time step (creating external table). You will only need to update the partition information in this external table after every glue job.

In summary, you need to do the following :

  • Create a spark script to be executed on Glue ETL which will read daily source data, apply required transformations and write the processed data on S3 in a new partition. This script can be easily tampletized for accepting date as input and will be one time activity.

  • Create an external table pointing to the processed data on S3. This will also be one time activity.

  • Execute MSCK Repair command on above external table after every Glue ETL job to update the new partition.

References :

AWS Glue ETL documentation

AWS Athena - Create external table

AWS Athena - Update partiotion

Harsh Bafna
  • 2,094
  • 1
  • 11
  • 21
  • Thank you @harsh-bafna and sorry for the delay accepting the answer, I will use Glue, you made a good point there, I'm having a little problem, just one question can I access an array/struct like I do un Athena? client.id or element[1].name I'm having an error right now and it's taking me a hard time to understand the pages and pages of spark messages. Will make another thread if can't find the solution – Alejandro Nov 02 '19 at 23:54
  • @Alejandro : May be this thread can help : https://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes – Harsh Bafna Nov 03 '19 at 02:53
0

Amazon Athena has a separate guide dedicated to this topic.

The main steps:

  1. Use CREATE EXTERNAL TABLE to prepare a table partitioned as expected
  2. Use CTAS with a low enough number of partitions
  3. Iteratively use INSERT INTO to fill up missing partitions.
Vajk Hermecz
  • 5,413
  • 2
  • 34
  • 25