11

I want to use the Pandas library to read BigQuery data. How do I allow large results?
For non-Pandas BigQuery interactions, this can be achieved like this.

Current code with Pandas:

sProjectID = "project-id"
sQuery = '''
    SELECT 
        column1, column2
    FROM [dataset_name.tablename]
'''
from pandas.io import gbq
df = gbq.read_gbq(sQuery, sProjectID)
Community
  • 1
  • 1
Roman
  • 8,826
  • 10
  • 63
  • 103

3 Answers3

6

EDIT: I've posted the proper way to do this with in my other answer; by dropping off the data in google storage first. This way you'll never have data that is too large.


Ok, I didn't find a direct way to do it with pandas, so I had to write a little extra with the normal API. Here is my fix (also most of the work to do it natively without Pandas):

sProjectID = "project-id"
sQuery = '''
    SELECT 
        column1, column2
    FROM [dataset_name.tablename]
'''

df = create_dataframe(sQuery, sProjectID, bLargeResults=True)


#*******Functions to make above work*********



def create_dataframe(sQuery, sProjectID, bLargeResults=False):
    "takes a BigQuery sql query and returns a Pandas dataframe"

    if bLargeResults:
        oService = create_service()
        dDestinationTable = run_query(sQuery, oService, sProjectID)
        df = pandas_get_table(dDestinationTable)
    else:
        df = pandas_query(sQuery, sProjectID)

    return df



def pandas_query(sQuery, sProjectID):
    "go into bigquery and get the table with sql query and return dataframe"
    from pandas.io import gbq
    df = gbq.read_gbq(sQuery, sProjectID)

    return df 



def pandas_get_table(dTable):
    "fetch a table and return dataframe"
    from pandas.io import gbq

    sProjectID = dTable['projectId']
    sDatasetID = dTable['datasetId']
    sTableID = dTable['tableId']
    sQuery = "SELECT * FROM [{}.{}]".format(sDatasetID, sTableID)

    df = gbq.read_gbq(sQuery, sProjectID)

    return df 




def create_service():
    "create google service"
    from oauth2client.client import GoogleCredentials
    from apiclient.discovery import build
    credentials = GoogleCredentials.get_application_default()
    oService = build('bigquery', 'v2', credentials=credentials)
    return oService



def run_query(sQuery, oService, sProjectID):
    "runs the bigquery query"

    dQuery = {
        'configuration': {
            'query': {
                'writeDisposition': 'OVERWRITE',
                'useQueryCache': False,
                'allowLargeResults': True,
                'query': sQuery,
                'destinationTable': {
                    'projectId': sProjectID,
                    'datasetId': 'sandbox',
                    'tableId': 'api_large_result_dropoff',
                },
            }
        }
    }

    job = oService.jobs().insert(projectId=sProjectID, body=dQuery).execute()


    return job['configuration']['query']['destinationTable']
Roman
  • 8,826
  • 10
  • 63
  • 103
  • This looks promising. However, I get the error Reason: notFound, Message: Not found: Table my_project_id:sandbox.api_large_result_dropoff. Do I have to do a preeliminary step prior to running this code? – marcopah Nov 17 '16 at 10:43
  • 1
    You need a BigQuery project (it seems you called it `my_project_id`) and a dataset in that project (called `sandbox` above). – Roman Nov 17 '16 at 11:42
  • 1
    This doesn't work if the result of the original query that you saved in an intermediate table is still large. – Yonas Kassa Nov 21 '16 at 15:43
  • Strange. Worked for me when I did it back in the day. Though I've changed my methodology to use the API to do all BQ queries, save them within BQ, then use API to download them to local (and taking care of pagination!), then converting to dataframes. – Roman Nov 22 '16 at 08:46
  • @Tensor please go look at my other answer. I've done.. everything you need. No caps on data size – Roman May 07 '17 at 14:28
  • GoogleCredentials.get_application_default() fails and throws a ApplicationDefaultCredentialsError. There is some different handling of this in https://github.com/pydata/pandas-gbq/blob/master/pandas_gbq/gbq.py but I am not quite sure which part I need. The read_gbq function works with my credentials by giving a popup in my browser – Keith Jun 02 '17 at 21:36
  • @Keith make sure that gcloud is installed on your computer, and that it is authenticated to the correct project. `GoogleCredentials.get_application_default()` grabs credentials from your pc. Look at this, I think this should solve your problem https://stackoverflow.com/questions/35159967/setting-google-application-credentials-for-bigquery-python-cli – Roman Jun 04 '17 at 06:30
  • I installed and it asked for my credentials through the browser just like when running read_gbq. However when running the above code I still get the same result. The link you gave seems to only be for Linux but I am on windows 10. I would think that going through the browser like read_gbq would be the best option. I have looked at the code but can't sort out the trick. – Keith Jun 05 '17 at 16:58
  • @Keith Go to google cloud console. Go to IAM, then "Service accounts". Create a service account that has BigQuery permissions. Then save it as a json file to your local machine. Then set an environment variable that points to the path of that json file. Google how to. Look at that link I posted again, and figure out how to adapt that to windows. Its just a couple of changes – Roman Jun 06 '17 at 05:38
  • Issue solved https://stackoverflow.com/questions/44505462/making-a-google-bigquery-from-python-on-windows/ – Keith Jun 13 '17 at 18:26
4

Decided to post the proper way to do this via the python3 google.cloud API. Looking at my previous answer I see that it would fail like yosemite_k said.

Large results really need to follow BigQuery -> Storage -> local -> dataframe pattern.

BigQuery resources:

Storage resources:

Pandas Resources:

Installation:

pip install pandas
pip install google-cloud-storage
pip install google-cloud-bigquery

Full implementation (bigquery_to_dataframe.py):

"""
We require python 3 for the google cloud python API
    mkvirtualenv --python `which python3` env3
And our dependencies:
    pip install pandas
    pip install google-cloud-bigquery
    pip install google-cloud-storage
"""
import os
import time
import uuid

from google.cloud import bigquery
from google.cloud import storage
import pandas as pd


def bq_to_df(project_id, dataset_id, table_id, storage_uri, local_data_path):
    """Pipeline to get data from BigQuery into a local pandas dataframe.

    :param project_id: Google project ID we are working in.
    :type project_id: str
    :param dataset_id: BigQuery dataset id.
    :type dataset_id: str
    :param table_id: BigQuery table id.
    :type table_id: str
    :param storage_uri: Google Storage uri where data gets dropped off.
    :type storage_uri: str
    :param local_data_path: Path where data should end up.
    :type local_data_path: str
    :return: Pandas dataframe from BigQuery table.
    :rtype: pd.DataFrame
    """
    bq_to_storage(project_id, dataset_id, table_id, storage_uri)

    storage_to_local(project_id, storage_uri, local_data_path)

    data_dir = os.path.join(local_data_path, "test_data")
    df = local_to_df(data_dir)

    return df


def bq_to_storage(project_id, dataset_id, table_id, target_uri):
    """Export a BigQuery table to Google Storage.

    :param project_id: Google project ID we are working in.
    :type project_id: str
    :param dataset_id: BigQuery dataset name where source data resides.
    :type dataset_id: str
    :param table_id: BigQuery table name where source data resides.
    :type table_id: str
    :param target_uri: Google Storage location where table gets saved.
    :type target_uri: str
    :return: The random ID generated to identify the job.
    :rtype: str
    """
    client = bigquery.Client(project=project_id)

    dataset = client.dataset(dataset_name=dataset_id)
    table = dataset.table(name=table_id)

    job = client.extract_table_to_storage(
        str(uuid.uuid4()),  # id we assign to be the job name
        table,
        target_uri
    )
    job.destination_format = 'CSV'
    job.write_disposition = 'WRITE_TRUNCATE'

    job.begin()  # async execution

    if job.errors:
        print(job.errors)

    while job.state != 'DONE':
        time.sleep(5)
        print("exporting '{}.{}' to '{}':  {}".format(
            dataset_id, table_id, target_uri, job.state
        ))
        job.reload()

    print(job.state)

    return job.name


def storage_to_local(project_id, source_uri, target_dir):
    """Save a file or folder from google storage to a local directory.

    :param project_id: Google project ID we are working in.
    :type project_id: str
    :param source_uri: Google Storage location where file comes form.
    :type source_uri: str
    :param target_dir: Local file location where files are to be stored.
    :type target_dir: str
    :return: None
    :rtype: None
    """
    client = storage.Client(project=project_id)

    bucket_name = source_uri.split("gs://")[1].split("/")[0]
    file_path = "/".join(source_uri.split("gs://")[1].split("/")[1::])
    bucket = client.lookup_bucket(bucket_name)

    folder_name = "/".join(file_path.split("/")[0:-1]) + "/"
    blobs = [o for o in bucket.list_blobs() if o.name.startswith(folder_name)]

    # get files if we wanted just files
    blob_name = file_path.split("/")[-1]
    if blob_name != "*":
        print("Getting just the file '{}'".format(file_path))
        our_blobs = [o for o in blobs if o.name.endswith(blob_name)]
    else:
        print("Getting all files in '{}'".format(folder_name))
        our_blobs = blobs

    print([o.name for o in our_blobs])

    for blob in our_blobs:
        filename = os.path.join(target_dir, blob.name)

        # create a complex folder structure if necessary
        if not os.path.isdir(os.path.dirname(filename)):
            os.makedirs(os.path.dirname(filename))

        with open(filename, 'wb') as f:
            blob.download_to_file(f)


def local_to_df(data_path):
    """Import local data files into a single pandas dataframe.

    :param data_path: File or folder path where csv data are located.
    :type data_path: str
    :return: Pandas dataframe containing data from data_path.
    :rtype: pd.DataFrame
    """
    # if data_dir is a file, then just load it into pandas
    if os.path.isfile(data_path):
        print("Loading '{}' into a dataframe".format(data_path))
        df = pd.read_csv(data_path, header=1)
    elif os.path.isdir(data_path):
        files = [os.path.join(data_path, fi) for fi in os.listdir(data_path)]
        print("Loading {} into a single dataframe".format(files))
        df = pd.concat((pd.read_csv(s) for s in files))
    else:
        raise ValueError(
            "Please enter a valid path.  {} does not exist.".format(data_path)
        )

    return df


if __name__ == '__main__':
    PROJECT_ID = "my-project"
    DATASET_ID = "bq_dataset"
    TABLE_ID = "bq_table"
    STORAGE_URI = "gs://my-bucket/path/for/dropoff/*"
    LOCAL_DATA_PATH = "/path/to/save/"

    bq_to_df(PROJECT_ID, DATASET_ID, TABLE_ID, STORAGE_URI, LOCAL_DATA_PATH)
Roman
  • 8,826
  • 10
  • 63
  • 103
2

You can do it by changing the default dialect from legacy to standard in pd.read_gbq function.

pd.read_gbq(query, 'my-super-project', dialect='standard')

Indeed, you can read in Big Query documentation for the parameter AllowLargeResults:

AllowLargeResults: For standard SQL queries, this flag is ignored and large results are always allowed.

nlassaux
  • 2,335
  • 2
  • 21
  • 35