0

My requirement is to save mysql tables to csv files. For running it in parallel, I want to use a thread pool to execute multiple to_csv this function at the same time in order to dump many tables in parallel. Here I reproduced the issue and implemented the code like below:

import concurrent.futures
from sqlalchemy import create_engine

executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)

engine = create_engine(f'mysql+pymysql://user:passwd@host:3306/')
conn = engine.connect()

db = 'db_enterprise_0'
table1 = 't_enterprise_0'
table2 = 't_enterprise_1'
table3 = 't_enterprise_2'
filename1 = '/data/test_threading1.csv'
filename2 = '/data/test_threading2.csv'
filename3 = '/data/test_threading3.csv'


def to_csv(db, table, filename, limit=None, delimiter=','):
    sql = f'select * from {db}.{table}'
    proxy = conn.execution_options(stream_results=True) \
      .execution_options(net_write_timeout=3600) \
      .execution_options(max_allowed_packet=67108864).execute(sql)
    outcsv.writerow(proxy.keys())
    while 'batch not empty':
        batch = proxy.fetchmany(10000)  # 100,000 rows at a time
        if not batch:
            break

        for row in batch:
            outcsv.writerow(row)

executor.submit(to_csv, db, table1, filename1)
executor.submit(to_csv, db, table2, filename2)
executor.submit(to_csv, db, table3, filename3)

The problem:

What the odd thing was it was not able to write data into csv successully, it wrote an empty content instead! Although there were csv files created, the content was empty or only the **header inside.

-rw-rw-r-- 1 user user 20 Sep 18 10:05 test_threading1.csv
-rw-rw-r-- 1 user user  0 Sep 18 10:05 test_threading2.csv
-rw-rw-r-- 1 user user  0 Sep 18 10:05 test_threading3.csv

What's the problem with my code? I guess there might be some problem at conn.execute(sql). So what's the exact reason for that? Or is there any other ways for dumping MySQL tables to csv with sqlAlchemy in parallel?

Thanks in advance. Appreciate if anyone can give me some advice. Thanks.

DennisLi
  • 3,915
  • 6
  • 30
  • 66
  • Connections are not thread safe. – Ilja Everilä Sep 18 '19 at 03:44
  • @IljaEverilä Hi IIja, I just read tables, not any other actions, it should be thread-safe? – DennisLi Sep 18 '19 at 03:56
  • Some driver implementations might have thread safe connections that support concurrent execution, but the norm is that they are not. Also the SQLAlchemy `Connection` object is not thread safe as is, but requires that you guard access to it: https://docs.sqlalchemy.org/en/13/core/connections.html#sqlalchemy.engine.Connection – Ilja Everilä Sep 18 '19 at 04:09
  • after reading your answer in https://stackoverflow.com/questions/51769299/is-connection-pool-in-sqlalchemy-thread-safe, I think if I can use connection pool with ThreadPoolExecutor to achieve it? – DennisLi Sep 18 '19 at 05:49
  • If using threads, you could and should just checkout a new connection in each thread – in your case in each call to `to_csv()`. The engine and its connection pool is thread-safe and will handle the situation correctly, i.e. serve each thread its own connection. – Ilja Everilä Sep 18 '19 at 10:10
  • got it. thanks a lot for your advice. BTW, is there any snippet for that, I couldn't find it on official document. thanks :) – DennisLi Sep 19 '19 at 01:26

1 Answers1

1

Based on the guide from @Ilja Everilä , I solved it with multiprocessing, but not the multithreading.

Here is the code:

def run_in_process(db, table, filename, delimiter=','):

    file_ = open(filename, 'w')
    outcsv = csv.writer(file_, delimiter=delimiter, quotechar='"', quoting=csv.QUOTE_MINIMAL)
    engine.dispose()

    with engine.connect() as conn:
        sql = f'select * from {db}.{table}'
        proxy = conn.execute(sql)

        outcsv.writerow(proxy.keys())
        while 'batch not empty':
            print('batch')
            batch = proxy.fetchmany(10000)  # 100,000 rows at a time
            if not batch:
                break

            for row in batch:
                outcsv.writerow(row)
    file_.close()


p1 = Process(target=run_in_process, args=(db, table1, filename1))
p2 = Process(target=run_in_process, args=(db, table2, filename2))
p1.start()
p2.start()
p.join()

Reference:

https://docs.sqlalchemy.org/en/13/core/pooling.html#using-connection-pools-with-multiprocessing

Is connection pool in sqlalchemy thread-safe?

DennisLi
  • 3,915
  • 6
  • 30
  • 66