I think for now the best way to create Athena
view from CloudFormation
template is to use Custom resource and Lambda. We have to supply methods for View creation and deletion. For example, using crhelper
library Lambda could be defined:
from __future__ import print_function
from crhelper import CfnResource
import logging
import os
import boto3
logger = logging.getLogger(__name__)
helper = CfnResource(json_logging=False, log_level='DEBUG', boto_level='CRITICAL', sleep_on_delete=120)
try:
client = boto3.client('athena')
ATHENA_WORKGROUP = os.environ['athena_workgroup']
DATABASE = os.environ['database']
QUERY_CREATE = os.environ['query_create']
QUERY_DROP = os.environ['query_drop']
except Exception as e:
helper.init_failure(e)
@helper.create
@helper.update
def create(event, context):
logger.info("View creation started")
try:
executionResponse = client.start_query_execution(
QueryString=QUERY_CREATE,
QueryExecutionContext={'Database': DATABASE},
WorkGroup='AudienceAthenaWorkgroup'
)
logger.info(executionResponse)
response = client.get_query_execution(QueryExecutionId=executionResponse['QueryExecutionId'])
logger.info(response)
if response['QueryExecution']['Status']['State'] == 'FAILED':
logger.error("Query failed")
raise ValueError("Query failed")
helper.Data['success'] = True
helper.Data['id'] = executionResponse['QueryExecutionId']
helper.Data['message'] = 'query is running'
except Exception as e:
print(f"An exception occurred: {e}")
if not helper.Data.get("success"):
raise ValueError("Creating custom resource failed.")
return
@helper.delete
def delete(event, context):
logger.info("View deletion started")
try:
executionResponse = client.start_query_execution(
QueryString=QUERY_DROP,
QueryExecutionContext={'Database': DATABASE},
WorkGroup='AudienceAthenaWorkgroup'
)
logger.info(executionResponse)
except Exception as e:
print("An exception occurred")
print(e)
@helper.poll_create
def poll_create(event, context):
logger.info("Pol creation")
response = client.get_query_execution(QueryExecutionId=event['CrHelperData']['id'])
logger.info(f"Poll response: {response}")
# There are 3 types of state of query
# if state is failed - we stop and fail creation
# if state is queued - we continue polling in 2 minutes
# if state is succeeded - we stop and succeed creation
if 'FAILED' == response['QueryExecution']['Status']['State']:
logger.error("Query failed")
raise ValueError("Query failed")
if 'SUCCEEDED' == response['QueryExecution']['Status']['State']:
logger.error("Query SUCCEEDED")
return True
if 'QUEUED' == response['QueryExecution']['Status']['State']:
logger.error("Query QUEUED")
return False
# Return a resource id or True to indicate that creation is complete. if True is returned an id
# will be generated
# Return false to indicate that creation is not complete and we need to poll again
return False
def handler(event, context):
helper(event, context)
The Athena
queries for view creation/updation/deletion are passed as environmental parameters to Lambda.
In CloudFormation
template we have to define the Lambda that invokes mentioned Python
code and creates/updates/deletes Athena
view. For example
AthenaCommonViewLambda:
Type: 'AWS::Lambda::Function'
DependsOn: [CreateAthenaViewLayer, CreateAthenaViewLambdaRole]
Properties:
Environment:
Variables:
athena_workgroup: !Ref AudienceAthenaWorkgroup
database:
Ref: DatabaseName
query_create: !Sub >-
CREATE OR REPLACE VIEW ${TableName}_view AS
SELECT field1, field2, ...
FROM ${DatabaseName}.${TableName}
query_drop: !Sub DROP VIEW IF EXISTS ${TableName}_common_view
Code:
S3Bucket: !Ref SourceS3Bucket
S3Key: createview.zip
FunctionName: !Sub '${AWS::StackName}_create_common_view'
Handler: createview.handler
MemorySize: 128
Role: !GetAtt CreateAthenaViewLambdaRole.Arn
Runtime: python3.8
Timeout: 60
Layers:
- !Ref CreateAthenaViewLayer
AthenaCommonView:
Type: 'Custom::AthenaCommonView'
Properties:
ServiceToken: !GetAtt AthenaCommonViewLambda.Arn