5

I'm kinda new to the SQL world, but I was following a tutorial called Optimizing pandas.read_sql for Postgres. The thing is, I'm working with a big dataset, similar to the example in the tutorial and I need a faster way to execute my query and turn it into a DataFrame. There, they use this function:

def read_sql_tmpfile(query, db_engine):
    with tempfile.TemporaryFile() as tmpfile:
        copy_sql = "COPY ({query}) TO STDOUT WITH CSV {head}".format(
           query=query, head="HEADER"
        )
        conn = db_engine.raw_connection()
        cur = conn.cursor()
        cur.copy_expert(copy_sql, tmpfile)  # I want to replicate this
        tmpfile.seek(0)
        df = pandas.read_csv(tmpfile)
        return df

And I tried to replicate it, like this:

def read_sql_tmpfile(query, connection):
    with tempfile.TemporaryFile() as tmpfile:
        copy_sql = "COPY ({query}) TO STDOUT WITH CSV {head}".format(
           query=query, head="HEADER"
        )

        cur = connection.cursor()
        cur.copy_expert(copy_sql, tmpfile)
        tmpfile.seek(0)
        df = pandas.read_csv(tmpfile)
        return df

The thing is, cursor.copy_expert comes from the psycopg2 library for PostgreSQL, and I can't find a way to do the same thing with pymysql. Is there any way to do this? What should I do? Thanks

Juan C
  • 5,846
  • 2
  • 17
  • 51
  • 3
    They already mentioned, it's postgres specific feature, look at [this](https://stackoverflow.com/questions/18107953/how-to-create-a-large-pandas-dataframe-from-an-sql-query-without-running-out-of) or [this](https://stackoverflow.com/questions/31702621/loading-5-million-rows-into-pandas-from-mysql) – waynetech Jul 24 '19 at 07:51
  • 1
    Thank you very much @waynetech, specially the second link was just what I needed to read – Juan C Jul 24 '19 at 15:08
  • 1
    @NickODell As mentioned in the second link from **waynetech**'s comment, the corresponding command in MySQL is `SELECT ... INTO OUTFILE 'file_name'`, but "file_name cannot be an existing file", so there is no way to use a `tempfile.TemporaryFile()`. – aaron Jan 18 '23 at 10:07
  • 1
    @aaron I'm aware. That solution creates a file on the MySQL server. In my environment, the SQL server and the SQL client run on different hosts, so that's quite unhelpful. Theoretically you could create a solution which creates the file, then moves it over SFTP, but that seems very inefficient. (Turning 1 disk read into 2 disk reads and 1 write.) – Nick ODell Jan 18 '23 at 15:46
  • 1
    @NickODell - Is this a summary of what _you_ want? "How can I create a CSV file on the client from a MySQL table?" – Rick James Jan 20 '23 at 00:22
  • @RickJames That's right. Ideally you'd also be able to do it for any query, not just a `SELECT * FROM table` style query. But even something limited to turning a table into a CSV file would be useful. – Nick ODell Jan 20 '23 at 00:24

4 Answers4

3

I'm aware, that the question is basically answered by waynetech's comment. But I was interested and the details and implications are not always obvious, so here is the tested, copy-pastable solution.

Since the output file ends up on the DB server, the solution involves handling the temp directory on the server and transferring the file to the client. For sake of simplicity I used SSH & SFTP for this. This assumes that the SSH keys of both machines have been exchanged beforehand. The remotefile transfer and handling maybe easier by involving a samba share or something like that.

@Nick ODell: Please give this solution a chance, do a benchmark! I'm pretty sure the copy overhead isn't significant for larger amounts of data.

def read_sql_tmpfile(query, connection):
    df = None

    # Create unique temp directory on server side
    cmd = "mktemp -d"
    (out_mktemp, err) = subprocess.Popen(f'ssh {username}@{db_server} "{cmd}"', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
    if err or not out_mktemp:
        return

    # remove additional white spaces around the output
    tmp_dir = out_mktemp.decode().strip()

    # The following command should be made superfluous by tweaking the group memberships 
    # to grant `mysql` user full access to the directory created by the user which executes the `mktemp` command
    cmd = f"chmod 777 -R {tmp_dir}"
    res = os.system(f'ssh {username}@{db_server} "{cmd}"')
    if res:
        return

    try:
        remote_tmp_file = f'{tmp_dir}/sql_tmpfile'

        # remember: db-connection's user need `FILE` privilege
        # think about sql injection, pass MySql parameters in query and corresponding parameters list to this function if appropriate
        copy_sql = f"{query} INTO OUTFILE '{remote_tmp_file}'"

        cur = connection.cursor()
        cur.execute(copy_sql)

        local_tmp_file = os.path.basename(remote_tmp_file)
        cmd = f"sftp {username}@{db_server}:{remote_tmp_file} {local_tmp_file}"
        res = os.system(cmd)
        if not res and os.path.isfile(local_tmp_file):
            try:
                df = pandas.read_csv(local_tmp_file)
            finally:
                # cleanup local temp file
                os.remove(local_tmp_file)
    finally:
        # cleanup remote temp dir
        cmd = f"rm -R {tmp_dir}"
        os.system(f'ssh {username}@{db_server} "{cmd}"')

    return df
Mike Feustel
  • 1,277
  • 5
  • 14
  • 1
    Hmm, that is 20% faster than the read_sql() solution. Not what I would have expected, but the proof is in the pudding. PS: I found a optimization by combining two of your SSH calls. I also figured out a way to avoid chmod 777. See code listing and timings here: https://gist.github.com/nickodell/5b99512c000ee5f8ce75404c047b017a – Nick ODell Jan 21 '23 at 23:53
  • Thanks for trying it out including the enhancements and bug fixes ;-). The result is not bad for the first shot. This approach still has optimization potential. I'm thinking, for example, of SFTP/SSH compression or the variant via an SMB share, where Panda could stream the result immediately. I would also be interested in how the implementation with @rick-james suggestion to involve the cmd-client (of course with some Python around it, directly at the `copy_sql` line) would perform. Anyway, this is almost a full-fledged research project, a kind of of "Optimizing pandas.read_sql for MySQL". – Mike Feustel Jan 22 '23 at 11:52
  • I tried to figure out some way to start streaming the result to Pandas before it's done. I tried using `tail -f` to stream the file, but that creates a race condition. The file is not created as soon as the MySQL statement is sent - it could be delayed. – Nick ODell Jan 22 '23 at 17:01
  • It seems that `Duration export` + `Duration read` is the baseline and reading in can only start after export. However, when reading in via the share, `Duration copy` and `Duration read` could merge and decrease overall. – Mike Feustel Jan 23 '23 at 06:13
  • How does local client-csv output strategy perform? Provided here: https://gist.github.com/Mike-F362/1e90bcd59bf541d9fb21505202e8ea97 – Mike Feustel Jan 23 '23 at 08:25
  • This is going way over my head and my current work isn't Python nor SQL oriented. @NickODell would you say this is the accepted answer? – Juan C Jan 23 '23 at 12:52
2

Assuming that Nick's question is

How can I create a CSV file on the client from a MySQL table?

At a commandline prompt do

mysql -u ... -p -h ... dbname -e '...' >localfile.csv

where the executable statement is something like

SELECT  col1, col2, col3, col4
    FROM mytable

Notes:

  • Windows: cmd; *nix: some 'terminal' app.
  • This is run on the client.
  • dbname has the effect of "use dbname;".
  • The user, pwd, and hostname (of the server) are suitable filled in.
  • This assumes "tab" is a suitable delimiter for the CSV output.
  • Be careful about the nesting of quotes (escape if needed).
  • Whatever columns/expressions you desire are listed
  • A WHERE (etc) can be included as needed.
  • No FTP needed.
  • No Python needed.
  • SHOW ... acts very much like SELECT.
  • On *nix, "tab" could be turned into another delimiter.
  • The header line can be skipped with an option to mysql.

Example (without -u -p -h showing):

# mysql  -e "show variables like 'max%size'" | tr '\t' ','
Variable_name,Value
max_binlog_cache_size,18446744073709547520
max_binlog_size,104857600
max_binlog_stmt_cache_size,18446744073709547520
max_heap_table_size,16777216
max_join_size,18446744073709551615
max_relay_log_size,0
Rick James
  • 135,179
  • 13
  • 127
  • 222
0

As mentioned in the comments, and in this answer, you are looking for SELECT ... INTO OUTFILE.

Here is a small (untested) example, based on your question:

def read_sql_tmpfile(query, connection):
    # Create tmp file name without creating the file
    tmp_dir = tempfile.mkdtemp()
    tmp_file_name = os.path.join(tmp_dir, next(tempfile._get_candidate_names()))
    
    # Copy data into temporary file
    copy_sql = "{query} INTO OUTFILE {outfile}".format(
           query=query, outfile=tmp_file_name 
    )
    cur = connection.cursor()
    cur.execute(copy_sql)
    
    # Read data from file
    df = pandas.read_csv(tmp_file_name)
    # Cleanup
    os.remove(tmp_file_name)
    return df
tituszban
  • 4,797
  • 2
  • 19
  • 30
  • 1
    Did you try this? MySQL doc says "file_name cannot be an existing file". – aaron Jan 18 '23 at 12:21
  • 1
    @aaron very good point. `with tempfile` would create the file. Updated my answer to just get a temp file name, but don't allocate it. – tituszban Jan 18 '23 at 14:46
0

To figure out which of these answers was fastest, I benchmarked each of them on a synthetic dataset. This dataset consisted of 100MB of time-series data, and 500MB of text data. (Note: this is measured using Pandas, which heavily penalizes small objects versus data which can be represented in NumPy.)

I benchmarked 5 methods:

  • naive: The baseline of read_sql().
  • sftp: LOAD INTO OUTFILE, followed by an sftp call and read_csv.
  • tofile: Invoke the mysql command with -B to generate a CSV, and write that into a file.
  • pipe: Invoke the mysql command with -B to generate a CSV, and read from that pipe using read_csv. Also use fnctl() to raise the pipe size.
  • pipe_no_fcntl: Same as before, but without fcntl.

Timings

All methods were tried seven times, in random order. In the following tables, a lower score is better.

Time series benchmark:

Method Time (s) Standard Error (s)
pipe 6.719870 0.064610
pipe_no_fcntl 7.243937 0.104802
tofile 7.636196 0.125963
sftp 9.926580 0.171262
naive 11.125657 0.470146

Text benchmark:

Method Time (s) Standard Error (s)
pipe 8.452694 0.217661
tofile 9.502743 0.265003
pipe_no_fcntl 9.620349 0.420255
sftp 12.189046 0.294148
naive 13.769322 0.695961

Winning solution

This is the pipe method, which was fastest.

import os
import pandas as pd
import subprocess
import tempfile
import time
import fcntl


db_server = '...'
F_SETPIPE_SZ = 1031


def read_sql_pipe(query, database):
    args = ['mysql', f'--login-path={db_server}', database, '-B', '-e', query]
    try:
        # Run mysql and capture output
        proc = subprocess.Popen(args, stdout=subprocess.PIPE)
    except FileNotFoundError:
        # MySQL is not installed. Raise a better error message.
        raise Exception("The mysql command is not installed. Use brew or apt to install it.") from None

    # Raise amount of CSV data buffered up to 1MB.
    # This is a Linux-only syscall.
    fcntl.fcntl(proc.stdout.fileno(), F_SETPIPE_SZ, 1 << 20)

    df = pd.read_csv(proc.stdout, delimiter='\t')

    retcode = proc.wait()
    if retcode != 0:
        raise subprocess.CalledProcessError(
            retcode, proc.args, output=proc.stdout, stderr=proc.stderr
        )

    return df

The basic idea is to use the subprocess module to invoke mysql, with the stdout of MySQL being fed to a pipe. A pipe is a file-like object, which can be directly passed to pd.read_csv(). The MySQL process creates the CSV concurrently with Pandas reading the CSV, so this leads to an advantage over the method which writes the entire file before Pandas starts reading it.

A note about fcntl: fcntl is useful here because the amount of data which can be buffered in the pipe is limited to 64kB by default. I found that raising this to 1MB lead to a ~10% speedup. If this is unavailable, a solution which writes the CSV to a file may outperform the pipe method.

Dataset

The dataset was generated with the following script.

import pandas as pd
import numpy as np
from english_words import get_english_words_set
np.random.seed(42)

import util


def gen_benchmark_df(data_function, limit):
    i = 0
    df = data_function(i)
    i += 1
    while df.memory_usage(deep=True).sum() < limit:
        df = pd.concat([df, data_function(i)], ignore_index=True)
        i += 1
    # Trim excess rows
    row_count = len(df.index)
    data_size_bytes = df.memory_usage(deep=True).sum()
    row_count_needed = int(row_count * (limit / data_size_bytes))
    df = df.head(row_count_needed)
    return df


def gen_ts_chunk(i):
    rows = 100_000
    return pd.DataFrame({
        'run_id': np.random.randint(1, 1_000_000),
        'feature_id': np.random.randint(1, 1_000_000),
        'timestep': np.arange(0, rows),
        'val': np.cumsum(np.random.uniform(-1, 1, rows))
    })


def gen_text_chunk(i):
    rows = 10_000
    words = list(get_english_words_set(['web2'], lower=True))
    text_strings = np.apply_along_axis(lambda x: ' '.join(x), axis=1, arr=np.random.choice(words, size=(rows, 3)))
    return pd.DataFrame({
        'id': np.arange(i * rows, (i + 1) * rows),
        'data': text_strings
    })



dataset_size = 1e8


con = util.open_engine()
timeseries_df = gen_benchmark_df(gen_ts_chunk, dataset_size)
timeseries_df.to_sql('timeseries', con=con, if_exists='replace', index=False, chunksize=10_000)


dataset_size = 5e8

text_df = gen_benchmark_df(gen_text_chunk, dataset_size)
text_df.to_sql('text', con=con, if_exists='replace', index=False, chunksize=10_000)
halfer
  • 19,824
  • 17
  • 99
  • 186
Nick ODell
  • 15,465
  • 3
  • 32
  • 66