0

I download a table from BQ into a PySpark RDD as below. How do I upload it again?

dGSConfig = {
    'project_id': "project_id",
    'bucket': "bucket_id"
}
dBQConfig = {
    'gs_config': dGSConfig,
    'project_id': "project_id",
    'dataset_id': "dataset_id",
    'table_id': "table_id"
}

oSc = instantiate_pyspark()
rddData, lsHeadings = get_table_cloud(oSc, dBQConfig)  #rddData has a list-of-lists type format




def instantiate_pyspark():
    """instantiate the pyspark RDD stuff"""
    import pyspark

    oSc = pyspark.SparkContext()
    oHadoopConf = oSc._jsc.hadoopConfiguration()
    oHadoopConf.get("fs.gs.system.bucket")

    return oSc


def get_table_cloud(oSc, dBQConfig):
    """get a table from bigquery via google cloud storage
    Config format:
        dGSConfig = {'project_id': '', 'bucket':  ''}
        dBQConfig = {'project_id: '', 'dataset_id': '', 'table_id': ''}
    """
    dGSConfig = dBQConfig['gs_config']

    dConf = {
        "mapred.bq.project.id": dGSConfig['project_id'],
        "mapred.bq.gcs.bucket": dGSConfig['bucket'],
        "mapred.bq.input.project.id": dBQConfig['project_id'],
        "mapred.bq.input.dataset.id":dBQConfig['dataset_id'],
        "mapred.bq.input.table.id": dBQConfig['table_id']
    }

    rddDatasetRaw = oSc.newAPIHadoopRDD(
        "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
        "org.apache.hadoop.io.LongWritable",
        "com.google.gson.JsonObject",
        conf=dConf
    )

    import json
    lsHeadings = json.loads(rddDatasetRaw.take(1)[0][1]).keys()

    rddDataset = (
        rddDatasetRaw
        .map(lambda t, json=json: json.loads(t[1]).values() )
    )

    return rddDataset, lsHeadings
Roman
  • 8,826
  • 10
  • 63
  • 103

2 Answers2

1

You could export to some intermediate files and then load those files into BigQuery.

This might help: how to export a table dataframe in pyspark to csv?

Community
  • 1
  • 1
Michael Sheldon
  • 2,027
  • 11
  • 7
0

3 methods that I used at some point:

1) create local csv, upload to google storage, separate process to get into BigQuery:

llData = rddData.collect()


with open(sCsvPath, 'w') as f:
    import csv
    oWriter = csv.writer(f)
    for lData in llData:
        oWriter.writerow(lData)

import subprocess
lsCommand = ['gsutil', 'cp', sCsvPath, sGooglePath]
subprocess.check_output(lsCommand)

2) Use Pandas to upload directly into BigQuery:

import pandas as pd
dfData = pd.DataFrame(llData, columns=lsHeadings)

sProjectID = dBQConfig['sProjectID']
sTargetDataset = dBQConfig['sTargetDataset']
sTargetTable = dBQConfig['sTargetTable']

sTablePath = "{}.{}".format(sTargetDataset, sTargetTable)
dfData.to_gbq(sTablePath, sProjectID, if_exists='replace')

3) Save distributed results straight to storage using pyspark:

#remove previous dir if exists
import subprocess
lsCommand = ['gsutil', 'rm', '-r', sGooglePath]
subprocess.check_output(lsCommand)

rddSave.saveAsTextFile(sGooglePath)

Though none of these are what I wanted originally, and that is a PySpark way of uploading the result straight into BQ.

Roman
  • 8,826
  • 10
  • 63
  • 103