I have a table that has 10 million plus records(rows) in it. I am trying to do a one-time load into s3 by select *'ing the table and then writing it to a gzip file in my local file system. Currently, I can run my script to collect 800,000 records into the gzip file but then I receive an error, and the remainder records are obviously not inserted.
Since there is no continuation in sql (for example- if you run 10 limit 800,000 queries, it wont be in order).
So, is there a way to writer a python/airflow function that can load the 10 million+ table in batches? Perhaps theres a way in python where I can do a select * statement and continue the statement after x amount of records into separate gzip files?
Here is my python/airflow script so far that when ran, it only writers 800,000 records to the path variable:
def gzip_postgres_table(table_name, **kwargs):
path = '/usr/local/airflow/{}.gz'.format(table_name)
server_post = create_tunnel_postgres()
server_post.start()
etl_conn = conn_postgres_internal(server_postgres)
record = get_etl_record(kwargs['master_table'],
kwargs['table_name'])
cur = etl_conn.cursor()
unload_sql = '''SELECT *
FROM schema1.database1.{0} '''.format(record['table_name'])
cur.execute(unload_sql)
result = cur.fetchall()
column_names = [i[0] for i in cur.description]
fp = gzip.open(path, 'wt')
myFile = csv.writer(fp, delimiter=',')
myFile.writerow(column_names)
myFile.writerows(result)
fp.close()
etl_conn.close()
server_postgres.stop()