30

I'm using AWS Athena to query raw data from S3. Since Athena writes the query output into S3 output bucket I used to do:

df = pd.read_csv(OutputLocation)

But this seems like an expensive way. Recently I noticed the get_query_results method of boto3 which returns a complex dictionary of the results.

client = boto3.client('athena')
response = client.get_query_results(
        QueryExecutionId=res['QueryExecutionId']
        )

I'm facing two main issues:

  1. How can I format the results of get_query_results into pandas data frame?
  2. get_query_results only returns 1000 rows. How can I use it to get two million rows?
Sheldore
  • 37,862
  • 7
  • 57
  • 71
Niv Cohen
  • 1,078
  • 2
  • 11
  • 21
  • I think it may help the people who will answer your question if you give a sample of the 'complex dictionary' returned. Any sensitive data can be redacted, as it's mainly the structure of the data that would be important. Also, pandas offers `DataFrame.from_dict()`, `DataFrame.from_records()`, `pandas.read_json()`. There are others too, but again it is difficult to say with certainty which to use without knowing the structure of the data. Also, it may benefit you to review the documentation for `get_query_results()`. Maybe it takes parameter(s), meaning default of 1000 rows can be increased. – chillin Aug 26 '18 at 14:56
  • Thanks @chillin I see your point. it's really long text to put here. lets use the basic structure from [link](https://boto3.readthedocs.io/en/latest/reference/services/athena.html#Athena.Client.get_query_results) as reference and keep but one of the fields in the Data list is a varchar looks like another dict. e.g. `{temperature=41.1}` – Niv Cohen Aug 26 '18 at 15:38
  • Try `response = client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)` and see if you get 2000 rows this time. Also, it might be reasonable to presume that there is an upper limit to the number of rows that can be returned via a single request (although I can't find any mention of it in the documentation). If there is an upper limit, all you would need to do it is parse the JSON in response for `'NextToken'` key, and include it the next time you call `client.get_query_results()` and you would effectively be getting the next 1000 (or whatever the limit is) rows. – chillin Aug 26 '18 at 15:57
  • Documentation states `get_query_results()` returns a Python dictionary, so try d = response['ResultSet']['Rows'], then df = pd.DataFrame.from_dict(d). However, you might not get expected DataFrame if `d` contains metadata (stuff that you don't want in the final DataFrame). If this is the case, you may need to extract from/mutate `d` (with a for loop or some other logic) so it contains what you want. This link may help: https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.from_dict.html – chillin Aug 26 '18 at 16:42
  • Thanks @chillin. As for max limit you can see in this [link](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) the limitation of 1000. But I guess you are right I'll have to find a way to use the Next Token. – Niv Cohen Aug 26 '18 at 19:47

8 Answers8

35

get_query_results only returns 1000 rows. How can I use it to get two million rows into a Pandas dataframe?

If you try to add:

client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)

You will obtain the next error:

An error occurred (InvalidRequestException) when calling the GetQueryResults operation: MaxResults is more than maximum allowed length 1000.

You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe):

def obtain_data_from_s3(self):
    self.resource = boto3.resource('s3', 
                          region_name = self.region_name, 
                          aws_access_key_id = self.aws_access_key_id,
                          aws_secret_access_key= self.aws_secret_access_key)

    response = self.resource \
    .Bucket(self.bucket) \
    .Object(key= self.folder + self.filename + '.csv') \
    .get()

    return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   

The self.filename can be:

self.filename = response['QueryExecutionId'] + ".csv"

Because Athena names the files as the QueryExecutionId. I will write you all my code that takes a query and return a dataframe with all the rows and columns.

import time
import boto3
import pandas as pd
import io

class QueryAthena:

    def __init__(self, query, database):
        self.database = database
        self.folder = 'my_folder/'
        self.bucket = 'my_bucket'
        self.s3_input = 's3://' + self.bucket + '/my_folder_input'
        self.s3_output =  's3://' + self.bucket + '/' + self.folder
        self.region_name = 'us-east-1'
        self.aws_access_key_id = "my_aws_access_key_id"
        self.aws_secret_access_key = "my_aws_secret_access_key"
        self.query = query

    def load_conf(self, q):
        try:
            self.client = boto3.client('athena', 
                              region_name = self.region_name, 
                              aws_access_key_id = self.aws_access_key_id,
                              aws_secret_access_key= self.aws_secret_access_key)
            response = self.client.start_query_execution(
                QueryString = q,
                    QueryExecutionContext={
                    'Database': self.database
                    },
                    ResultConfiguration={
                    'OutputLocation': self.s3_output,
                    }
            )
            self.filename = response['QueryExecutionId']
            print('Execution ID: ' + response['QueryExecutionId'])

        except Exception as e:
            print(e)
        return response                

    def run_query(self):
        queries = [self.query]
        for q in queries:
            res = self.load_conf(q)
        try:              
            query_status = None
            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
                print(query_status)
                if query_status == 'FAILED' or query_status == 'CANCELLED':
                    raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
                time.sleep(10)
            print('Query "{}" finished.'.format(self.query))

            df = self.obtain_data()
            return df

        except Exception as e:
            print(e)      

    def obtain_data(self):
        try:
            self.resource = boto3.resource('s3', 
                                  region_name = self.region_name, 
                                  aws_access_key_id = self.aws_access_key_id,
                                  aws_secret_access_key= self.aws_secret_access_key)

            response = self.resource \
            .Bucket(self.bucket) \
            .Object(key= self.folder + self.filename + '.csv') \
            .get()

            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
        except Exception as e:
            print(e)  


if __name__ == "__main__":       
    query = "SELECT * FROM bucket.folder"
    qa = QueryAthena(query=query, database='myAthenaDb')
    dataframe = qa.run_query()
JD D
  • 7,398
  • 2
  • 34
  • 53
Eric Bellet
  • 1,732
  • 5
  • 22
  • 40
  • Hi @EricBellet , "You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe) " - This is exactly what I did eventually. I can additionally say that I cmpared performance between this approach VS pagination and loading results from S3 is much faster when it comes to 2 Million rows as is my case.... – Niv Cohen Oct 25 '18 at 12:25
  • Hi @NivCohen, did you get 2 million rows with pagination? Have you a code example to share? – Eric Bellet Oct 26 '18 at 08:02
  • 1
    Hi @EricBellet, I've updated my answer above with pagination example that worked for me in case of 2 Million rows. I had to restore it and fit it , so it's not so well formatted. I hope this will help... – Niv Cohen Oct 28 '18 at 08:34
  • 1
    this is definitely the superior answer, this worked great for me and handles any amount of data with much simpler code. Thanks for putting this into a class so it's easy to incorporate the solution. – JD D Apr 17 '19 at 15:32
  • @EricBellet: why we need this `self.s3_input = 's3://' + self.bucket + '/my_folder_input'` – noobie-php Apr 01 '20 at 18:32
  • 1
    @EricBellet: btw great answer – noobie-php Apr 01 '20 at 18:41
  • Not to be contrarian, but I find that CEM's awswrangler answer below (https://stackoverflow.com/a/67294231/3897315) is by far the best solution for queries I have been making. However, very large queries, you may need special permissions to apply the default ctas_approach=True flag. (Documentation linked in his solution.) – user3897315 Mar 02 '22 at 18:20
26

You can use AWS SDK for Pandas to create pandas data frame directly querying through Athena.

import awswrangler as wr  
df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")

You can find more information here

Cem
  • 436
  • 4
  • 9
  • 2
    Excelent library, this is indeed the best answer in 2021 – Manuel Montoya Sep 15 '21 at 21:01
  • 1
    What is the MaxResults size for this? – N_C Nov 13 '21 at 14:52
  • If I could upvote this 200× I would do so. A quick test reveals that it will save me many hours, with a trivial substitution of code. – user3897315 Mar 02 '22 at 18:16
  • When I run this on my system I get this result: QueryFailed: Insufficient permissions to execute the query. Error creating table : ... AccessDeniedException: An error occurred (AccessDeniedException) when calling the DeleteTable operation: ... because no identity-based policy allows the glue:DeleteTable action... I feel like something wasn't setup correctly or permissioned correctly. Any idea how to resolve this issue? – ASH Aug 30 '22 at 19:32
  • 1
    @ASH your Glue should have the necessary permissions to do operations on Athena. Check this managed policy: AmazonAthenaFullAccess (https://docs.aws.amazon.com/athena/latest/ug/managed-policies.html). It probably has more permissions that you need. Instead, you can create your own role with specific policies attached - managed, custom or inline. Then give the role to Glue. – Cem Aug 31 '22 at 01:39
20

I have a solution for my first question, using the following function

def results_to_df(results):
 
    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    listed_results = []
    for res in results['ResultSet']['Rows'][1:]:
         values = []
         for field in res['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        listed_results.append(
            dict(zip(columns, values))
        )
 
    return listed_results

and then:

t = results_to_df(response)
pd.DataFrame(t)

As for my 2nd question and to the request of @EricBellet I'm also adding my approach for pagination which I find as inefficient and longer in compare to loading the results from Athena output in S3:

def run_query(query, database, s3_output):
    ''' 
    Function for executing Athena queries and return the query ID 
    '''
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
    print('Execution ID: ' + response['QueryExecutionId'])
    return response



def format_result(results):
    '''
    This function format the results toward append in the needed format.
    '''
    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]
 
    formatted_results = []
 
    for result in results['ResultSet']['Rows'][0:]:
        values = []
        for field in result['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))
 
        formatted_results.append(
            dict(zip(columns, values))
        )
    return formatted_results



res = run_query(query_2, database, s3_ouput) #query Athena



import sys
import boto3

marker = None
formatted_results = []
query_id = res['QueryExecutionId']
i = 0
start_time = time.time()

while True:
    paginator = client.get_paginator('get_query_results')
    response_iterator = paginator.paginate( 
        QueryExecutionId=query_id,
        PaginationConfig={
            'MaxItems': 1000,
            'PageSize': 1000,
            'StartingToken': marker})

    for page in response_iterator:
        i = i + 1
        format_page = format_result(page)
        if i == 1:
            formatted_results = pd.DataFrame(format_page)
        elif i > 1:
            formatted_results = formatted_results.append(pd.DataFrame(format_page))

    try:
        marker = page['NextToken']
    except KeyError:
        break

print ("My program took", time.time() - start_time, "to run")

It's not formatted so good but I think it does the job...

2021 Update

Today I'm using custom wrapping for aws-data-wrangler as the best solution for the original question I asked several years ago.

import awswrangler as wr

def run_athena_query(query, database, s3_output, boto3_session=None, categories=None, chunksize=None, ctas_approach=None, profile=None, workgroup='myTeamName', region_name='us-east-1', keep_files=False, max_cache_seconds=0):
    """
    An end 2 end Athena query method, based on the AWS Wrangler package. 
    The method will execute a query and will return a pandas dataframe as an output.
    you can read more in https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.athena.read_sql_query.html

    Args:
        - query: SQL query.

        - database (str): AWS Glue/Athena database name - It is only the original database from where the query will be launched. You can still using and mixing several databases writing the full table name within the sql (e.g. database.table).

        - ctas_approach (bool): Wraps the query using a CTAS, and read the resulted parquet data on S3. If false, read the regular CSV on S3.

        - categories (List[str], optional): List of columns names that should be returned as pandas.Categorical. Recommended for memory restricted environments.

        - chunksize (Union[int, bool], optional): If passed will split the data in a Iterable of DataFrames (Memory friendly). If True wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize. If an INTEGER is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.

        - s3_output (str, optional): Amazon S3 path.

        - workgroup (str, optional): Athena workgroup. 

        - keep_files (bool): Should Wrangler delete or keep the staging files produced by Athena? default is False

        - profile (str, optional): aws account profile. if boto3_session profile will be ignored.

        - boto3_session (boto3.Session(), optional): Boto3 Session. The default boto3 session will be used if boto3_session receive None. if profilename is provided a session will automatically be created.

        - max_cache_seconds (int): Wrangler can look up in Athena’s history if this query has been run before. If so, and its completion time is less than max_cache_seconds before now, wrangler skips query execution and just returns the same results as last time. If reading cached data fails for any reason, execution falls back to the usual query run path. by default is = 0

    Returns:
        - Pandas DataFrame

    """
    # test for boto3 session and profile.
    if ((boto3_session == None) & (profile != None)):
        boto3_session = boto3.Session(profile_name=profile, region_name=region_name)

    print("Querying AWS Athena...")

    try:
        # Retrieving the data from Amazon Athena
        athena_results_df = wr.athena.read_sql_query(
            query,
            database=database,
            boto3_session=boto3_session,
            categories=categories,
            chunksize=chunksize,
            ctas_approach=ctas_approach,
            s3_output=s3_output,
            workgroup=workgroup,
            keep_files=keep_files,
            max_cache_seconds=max_cache_seconds
        )

        print("Query completed, data retrieved successfully!")
    except Exception as e:
        print(f"Something went wrong... the error is:{e}")
        raise Exception(e)

    return athena_results_df

you can read more here

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
Niv Cohen
  • 1,078
  • 2
  • 11
  • 21
11

A very simple solution is to use a list comprehension with the boto3 Athena paginator. The list comprehension can then be simply passed into the pd.DataFrame() to create a DataFrame as such,

pd.DataFrame([[data.get('VarCharValue') for data in row['Data']] for row in
              results['ResultSet']['Rows']])

Boto3 Athena to Pandas DataFrame

import pandas as pd
import boto3

result = get_query_results( . . . ) # your code here

def cleanQueryResult(result) :
    '''
    This will take the dictionary of the raw Boto3 Athena results and turn it into a 
    2D array for further processing

    Parameters
    ----------
    result dict
        The dictionary from the boto3 Athena client function get_query_results

    Returns
    -------
    list(list())
        2D list which is essentially the table result. The first row is the column name.
    '''
    return [[data.get('VarCharValue') for data in row['Data']]
            for row in result['ResultSet']['Rows']]

# note that row 1 is the header
df = pd.DataFrame(cleanQueryResult(result))

Millions of Results

This requires a the paginator object, https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/athena.html#paginators

As a hint, here's how you can append after each page

df.append(pd.DataFrame(cleanQueryResult(next_page), ignore_index = True))
Hammad Usmani
  • 129
  • 1
  • 3
4

Maybe you can try to use pandas read_sql and pyathena:

from pyathena import connect
import pandas as pd

conn = connect(s3_staging_dir='s3://bucket/folder',region_name='region')
df = pd.read_sql('select * from database.table', conn) #don't change the "database.table"
dasilvadaniel
  • 413
  • 4
  • 8
  • Did this actually work for you? When I tried it, I got this error message: ModuleNotFoundError: No module named 'pyathena' – ASH Aug 26 '22 at 17:52
  • Hi @ASH, can you try 'pip install pyathena'? You probably need to install it. – dasilvadaniel Aug 29 '22 at 00:07
  • I think my environment was not provisioned correctly/fully or the rights are waaaayyy too restrictive. Nothing works, at all. I switched over to SageMaker, and everything works totally fine, just as I would expect it to. – ASH Aug 29 '22 at 02:46
1

I´ve used a while loop approach to solve this, in case NextToken is present, I extend que dataframe:

# Receive Query Results
# Method get_query_results() limits to max 1000, handled with while, and called NextToken.
query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'])
results = query_results['ResultSet']['Rows']
while 'NextToken' in query_results:
    query_results = athena_client.get_query_results(QueryExecutionId=execution_response['QueryExecutionId'], NextToken = query_results['NextToken'])
    results.extend(query_results['ResultSet']['Rows'])
    return results
return query_results['ResultSet']['Rows']
snoob dogg
  • 2,491
  • 3
  • 31
  • 54
  • 2
    Welcome, Julio. There are six existing answers to this question, including an accepted answer with 24 upvotes. Are you sure your approach hasn't already been suggested? If not, why might someone prefer your approach over the existing approaches proposed? Are you taking advantage of new capabilities? Are there scenarios where your approach is better suited? – Jeremy Caney Nov 23 '21 at 00:17
0

Try this approach to convert response['records'] into dataframe using columnMetadata:

def results_to_df(response):
    columns = [
        col['label']
        for col in response['columnMetadata']
    ]

    listed_results = [[list(col.values())[0] if list(col.values())[0] else '' for col in 
    record] for record in response['records']]
    df = pd.DataFrame(listed_results, columns=columns)
    return df
0

While not the intent of the question, a few million row csv could be only several hundred Mb. Download query results as csv from the AWS console and then load into pandas using pandas.read_csv() is probably faster and easier than implementing one of the above solutions. It does not scale as well, but the OP only asked for 2 million rows. I have used successfully on files twice that size.

MikeF
  • 764
  • 9
  • 26