My requirement is to save mysql tables to csv files. For running it in parallel, I want to use a thread pool to execute multiple to_csv
this function at the same time in order to dump many tables in parallel. Here I reproduced the issue and implemented the code like below:
import concurrent.futures
from sqlalchemy import create_engine
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
engine = create_engine(f'mysql+pymysql://user:passwd@host:3306/')
conn = engine.connect()
db = 'db_enterprise_0'
table1 = 't_enterprise_0'
table2 = 't_enterprise_1'
table3 = 't_enterprise_2'
filename1 = '/data/test_threading1.csv'
filename2 = '/data/test_threading2.csv'
filename3 = '/data/test_threading3.csv'
def to_csv(db, table, filename, limit=None, delimiter=','):
sql = f'select * from {db}.{table}'
proxy = conn.execution_options(stream_results=True) \
.execution_options(net_write_timeout=3600) \
.execution_options(max_allowed_packet=67108864).execute(sql)
outcsv.writerow(proxy.keys())
while 'batch not empty':
batch = proxy.fetchmany(10000) # 100,000 rows at a time
if not batch:
break
for row in batch:
outcsv.writerow(row)
executor.submit(to_csv, db, table1, filename1)
executor.submit(to_csv, db, table2, filename2)
executor.submit(to_csv, db, table3, filename3)
The problem:
What the odd thing was it was not able to write data into csv successully, it wrote an empty content instead! Although there were csv files created, the content was empty or only the **header inside.
-rw-rw-r-- 1 user user 20 Sep 18 10:05 test_threading1.csv
-rw-rw-r-- 1 user user 0 Sep 18 10:05 test_threading2.csv
-rw-rw-r-- 1 user user 0 Sep 18 10:05 test_threading3.csv
What's the problem with my code? I guess there might be some problem at conn.execute(sql)
. So what's the exact reason for that? Or is there any other ways for dumping MySQL tables to csv with sqlAlchemy
in parallel?
Thanks in advance. Appreciate if anyone can give me some advice. Thanks.