1

We have a table in BigQuery that we need to export into a local newline-delimited JSON file. Using BigQuery's export to GCS functionality is problematic as it converts integer types to strings, see export bigquery data to cloud storage, the integer field changs to string format but float format stays as numeric format and How to preserve integer data type when exporting to JSON? and I've tried it myself and the ints are lost. We've come up with the following solution, which maintains integer types, but is very slow:

Currently working code

bq = bigquery.Client()
our_query = "select * from our_project.our_dataset.our_bq_table"

results_row_iter = bq.query(our_query) # google.bigquery.rowIterator
counter = 0
with open('/tmp/output_file.json', 'w') as f:
    for row in results_row_iter:
        f.write(json.dumps(dict(row), default=str) + '\n') # dumps as ndjson

our_bq_table is 5GB in BigQuery, with 3.4M rows and ~100 fields, and the above for loop takes 90 minutes on our table. our_bq_table is partitioned on the integer column confId, and there are ~100 unique confId's in the table, with values 1 - 100. We would like to leverage the partition key + parallelization to speed up this process... somehow.

pseudo-code of what we're going for

bq = bigquery.Client()
base_query = "select * from our_project.our_dataset.our_bq_table"

all_conf_ids = range(1, 100)

def dump_conf_id(base_query, id):
    iter_query = f"{base_query} where confId = {id}"
    results_row_iter = bq.query(iter_query)
    counter = 0
    with open(f'output_file-{id}.json', 'w') as f:
        for row in results_row_iter:
            f.write(json.dumps(dict(row), default=str) + '\n') # dumps as ndjson

in parallel:
    for id in all_conf_ids:
        dump_conf_id(id)

# last step, perhaps concat the separate files into 1 somehow, assuming there are multiple output files...

This approach leverages the confId field so that our BigQuery queries remain small. I'm not quite sure how to implement this beyond the pseudocode, and am overwhelmed with figuring out multi-threading vs multi-processing vs other ways to parallelize in python. Our final output needs to be a single output file, the pseudo-code dumps into separate files but if we can dump into a single file in parallel that would be great too.

Edit: a key question we're trying to resolve before implementing a solution is Should we use multiprocessing or multithreading for this, given that this is dumping to a local .json in parallel...

Canovice
  • 9,012
  • 22
  • 93
  • 211
  • 1
    Perhaps you could do the export to GCS by using the [TO_JSON_STRING](https://stackoverflow.com/a/50937832/530160) function, which preserves types. – Nick ODell Aug 09 '21 at 23:10
  • @NickODell this looks incredibly promising, I've tested in the BigQuery console and it does seem to work. However, after running `bq.query(our_query)` in python, I am struggling to save this into a JSON file. We are trying to avoid at all costs converting these results into a pandas dataframe. – Canovice Aug 10 '21 at 13:49
  • We've tried `output = json.dumps(results_row_iter)` but receive error `TypeError: Object of type QueryJob is not JSON serializable` – Canovice Aug 10 '21 at 13:51

1 Answers1

1

Two rules of thumb:

  • In Python, you can use multithreading in cases where your program is largely IO-bound, but you must use multiprocessing if it is CPU-bound. This is because of Python's Global Interpreter Lock.
  • You shouldn't mix multithreading and multiprocessing in the same program.

In this case, I would guess that your problem (exporting a JSON per each result) is CPU-bound, so I would recommend multiprocessing. Here's a code example:

from multiprocessing import Pool

n_jobs = 4
with Pool(n_jobs) as p:
    # call dump_conf_id on each member of all_conf_ids
    print(p.map(dump_conf_id, all_conf_ids))

[...] but if we can dump into a single file in parallel that would be great too.

I wouldn't bother within trying to write them to a single file. Reading in the files and concatenating them is likely to be the fastest part of what you're doing. A quick benchmark shows this runs at about 780 MB/s:

import subprocess

with open("output.json", "wb") as f:
    filenames = [f'output_file-{id}.json' for i in all_conf_ids]
    subprocess.check_call(["cat", *filesnames], stdout=f)
Nick ODell
  • 15,465
  • 3
  • 32
  • 66