To figure out which of these answers was fastest, I benchmarked each of them on a synthetic dataset. This dataset consisted of 100MB of time-series data, and 500MB of text data. (Note: this is measured using Pandas, which heavily penalizes small objects versus data which can be represented in NumPy.)
I benchmarked 5 methods:
- naive: The baseline of
read_sql()
.
- sftp: LOAD INTO OUTFILE, followed by an sftp call and read_csv.
- tofile: Invoke the mysql command with -B to generate a CSV, and write that into a file.
- pipe: Invoke the mysql command with -B to generate a CSV, and read from that pipe using read_csv. Also use fnctl() to raise the pipe size.
- pipe_no_fcntl: Same as before, but without fcntl.
Timings
All methods were tried seven times, in random order. In the following tables, a lower score is better.
Time series benchmark:
Method |
Time (s) |
Standard Error (s) |
pipe |
6.719870 |
0.064610 |
pipe_no_fcntl |
7.243937 |
0.104802 |
tofile |
7.636196 |
0.125963 |
sftp |
9.926580 |
0.171262 |
naive |
11.125657 |
0.470146 |
Text benchmark:
Method |
Time (s) |
Standard Error (s) |
pipe |
8.452694 |
0.217661 |
tofile |
9.502743 |
0.265003 |
pipe_no_fcntl |
9.620349 |
0.420255 |
sftp |
12.189046 |
0.294148 |
naive |
13.769322 |
0.695961 |
Winning solution
This is the pipe method, which was fastest.
import os
import pandas as pd
import subprocess
import tempfile
import time
import fcntl
db_server = '...'
F_SETPIPE_SZ = 1031
def read_sql_pipe(query, database):
args = ['mysql', f'--login-path={db_server}', database, '-B', '-e', query]
try:
# Run mysql and capture output
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
except FileNotFoundError:
# MySQL is not installed. Raise a better error message.
raise Exception("The mysql command is not installed. Use brew or apt to install it.") from None
# Raise amount of CSV data buffered up to 1MB.
# This is a Linux-only syscall.
fcntl.fcntl(proc.stdout.fileno(), F_SETPIPE_SZ, 1 << 20)
df = pd.read_csv(proc.stdout, delimiter='\t')
retcode = proc.wait()
if retcode != 0:
raise subprocess.CalledProcessError(
retcode, proc.args, output=proc.stdout, stderr=proc.stderr
)
return df
The basic idea is to use the subprocess module to invoke mysql, with the stdout of MySQL being fed to a pipe. A pipe is a file-like object, which can be directly passed to pd.read_csv()
. The MySQL process creates the CSV concurrently with Pandas reading the CSV, so this leads to an advantage over the method which writes the entire file before Pandas starts reading it.
A note about fcntl: fcntl is useful here because the amount of data which can be buffered in the pipe is limited to 64kB by default. I found that raising this to 1MB lead to a ~10% speedup. If this is unavailable, a solution which writes the CSV to a file may outperform the pipe method.
Dataset
The dataset was generated with the following script.
import pandas as pd
import numpy as np
from english_words import get_english_words_set
np.random.seed(42)
import util
def gen_benchmark_df(data_function, limit):
i = 0
df = data_function(i)
i += 1
while df.memory_usage(deep=True).sum() < limit:
df = pd.concat([df, data_function(i)], ignore_index=True)
i += 1
# Trim excess rows
row_count = len(df.index)
data_size_bytes = df.memory_usage(deep=True).sum()
row_count_needed = int(row_count * (limit / data_size_bytes))
df = df.head(row_count_needed)
return df
def gen_ts_chunk(i):
rows = 100_000
return pd.DataFrame({
'run_id': np.random.randint(1, 1_000_000),
'feature_id': np.random.randint(1, 1_000_000),
'timestep': np.arange(0, rows),
'val': np.cumsum(np.random.uniform(-1, 1, rows))
})
def gen_text_chunk(i):
rows = 10_000
words = list(get_english_words_set(['web2'], lower=True))
text_strings = np.apply_along_axis(lambda x: ' '.join(x), axis=1, arr=np.random.choice(words, size=(rows, 3)))
return pd.DataFrame({
'id': np.arange(i * rows, (i + 1) * rows),
'data': text_strings
})
dataset_size = 1e8
con = util.open_engine()
timeseries_df = gen_benchmark_df(gen_ts_chunk, dataset_size)
timeseries_df.to_sql('timeseries', con=con, if_exists='replace', index=False, chunksize=10_000)
dataset_size = 5e8
text_df = gen_benchmark_df(gen_text_chunk, dataset_size)
text_df.to_sql('text', con=con, if_exists='replace', index=False, chunksize=10_000)