0

I'm trying to split apart a large CSV file that is in Google Cloud Storage into smaller CSV files for ease of transferring and uploading into a BQ table. However, I'm running into issues in trying to accomplish this.

I'm using Cloud Composer / Airflow in order to call an API and retrieve data, from here I want to split the gzip csv file into smaller file sizes. I've tried the following Python code on my local machine and seems to work:

def split_csv_file(source_file, dest_file):
  chunk_size = 2000000

  def write_chunk(part, lines):
    with open(dest_file + str(part) + '_' + str(uuid.uuid4()) + '.csv', 'w') as f_out:
      f_out.write(header)
      f_out.writelines(lines)

  with open(source_file, "r") as f:
    count = 0
    header = f.readline()
    lines = []
    for line in f:
      count += 1
      lines.append(line)
      if count % chunk_size == 0:
        write_chunk(count // chunk_size, lines)
        lines = []
    # write remainder
    if len(lines) > 0:
      write_chunk((count // chunk_size) + 1, lines)

However, when I input this in to try and do a PythonOperator it only uploads the final file and nothing else. I've also tried to create my own Operator to call the Python script within, but only seems to do the same thing which is upload the last file only instead of actually uploading X amount of files based on how large the file size is. Currently I'm working with roughly 16 - 20GB files (unzipped) ~2GB zipped for reference. I've tried various different ways of doing this, but so far with not luck and quite a bit of refactoring of code.

The end goal I'm trying to accomplish is, get the zipped file, unzip the file, split the file into smaller csv files and re-upload into another folder within GCS unzipped. Has anyone faced this issue or know a solution for this? I've been working on this issue for the past couple days and I'm not able to make any progress on this.

If there is anything else that needs to be clarified or more detail, please let me know and I'll try to respond as quickly as possible. Any help on this would be greatly appreciated as I'm at a loss of what to do/try next.

Maykid
  • 497
  • 3
  • 7
  • 17
  • did you try to add some print/log in the method and test it in `Cloud Composer `? – Hussein Awala Sep 01 '22 at 20:14
  • BigQuery is able to load up to [4GB of gzip csv data](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv#limitations). How long does it take to load your gzip data as is? – Ricco D Sep 01 '22 at 23:56
  • @RiccoD The process takes a little bit as its going through a dataflow process as other teams are uploading into the same data lake, which can slow down the process of other uploads occurring. Trying to see if its possible to split the gzip file into smaller gzips to keep the file size low and keep the dataflow moving without hindering it for other teams. – Maykid Sep 02 '22 at 14:32
  • @HusseinAwala I did a test this morning and it looks like it is iterating through correctly, but for some reason isn't uploading the correct files into the destination portion of it. – Maykid Sep 02 '22 at 16:58
  • in this case, you can add your dag code, in order to help you solving the problem – Hussein Awala Sep 03 '22 at 11:10

1 Answers1

0

Have you considered to store the file in parquet format, so probably can be loaded directly to BigQuery? It's very simple to do so, have a look at

https://stackoverflow.com/a/61427524/1448460

Then you can have another Bash operator to load the data into BigQuery using

bq load \
--source_format=PARQUET \
dataset.table \
"gs://mybucket/00/*.parquet","gs://mybucket/01/*.parquet"

as described in https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet.

mikrohelen
  • 249
  • 1
  • 3