18

I'm deploying an application to consume some .csv data. I want to copy them to a MySQL table. With some help from the stackoverflow users I wrote the code bellow:

import csv
import MySQLdb

db = MySQLdb.connect(   host = "dbname.description.host.com",
                        user = "user",
                        passwd = "key",
                        db = "dbname")
cursor = db.cursor()

query = 'INSERT INTO table_name(column,column_1,column_2,column_3)
VALUES(%s, %s, %s, %s)'                                                         

csv_data = csv.reader(file('file_name'))

for row in csv_data:
     cursor.execute(query,row)
     db.commit()

cursor.close()

The problem is, currently, the process is so slow and I need to speed the things up.

aioobe
  • 413,195
  • 112
  • 811
  • 826
Pedro Quadros
  • 295
  • 1
  • 3
  • 9

7 Answers7

31

you can use executemany to batch the job as follows

import csv
import MySQLdb

db = MySQLdb.connect(   host = "dbname.description.host.com",
                        user = "user",
                        passwd = "key",
                        db = "dbname")
cursor = db.cursor()

query = 'INSERT INTO table_name(column,column_1,column_2,column_3)
VALUES(%s, %s, %s, %s)'                                                         

csv_data = csv.reader(file('file_name'))

my_data = []
for row in csv_data:
     my_data.append(tuple(row))

cursor.executemany(query, my_data)
cursor.close()
Mike Tung
  • 4,735
  • 1
  • 17
  • 24
  • 4
    Is executemany a batch operation or it goes through the entire list one by one and inserts them? Basically Is executemany = loop of execute() ? – shreesh katti Apr 12 '19 at 06:29
  • 2
    @shreeshkatti Most of the time, it's a loop, [except for inserts](https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlcursor-executemany.html) for which it is optimized. – Sergey Kalinichenko Jan 02 '20 at 15:32
14

the code you are using is ultra inefficient for a number of reasons as you are committing each of your data one row at a time (which would be what you want for a transactional DB or process) but not for a one-off dump.

There are a number of ways to speed this up ranging from great to not so great. Here are 4 approaches, including the naive implementation (above)

#!/usr/bin/env python
import pandas as pd
import numpy as np
import odo
import profilehooks
import sqlalchemy
import csv
import os


def create_test_data():
    n = 100000
    df = pd.DataFrame(dict(
        id=np.random.randint(0, 1000000, n),
        col1=np.random.choice(['hello', 'world', 'python', 'large string for testing ' * 10], n),
        col2=np.random.randint(-1000000, 1000000, n),
        col3=np.random.randint(-9000000, 9000000, n),
        col4=(np.random.random(n) - 0.5) * 99999
    ), columns=['id', 'col1', 'col2', 'col3', 'col4'])
    df.to_csv('tmp.csv', index=False)


@profilehooks.timecall
def using_pandas(table_name, uri):
    df = pd.read_csv('tmp.csv')
    df.to_sql(table_name, con=uri, if_exists='append', index=False)


@profilehooks.timecall
def using_odo(table_name, uri):
    odo.odo('tmp.csv', '%s::%s' % (uri, table_name))


@profilehooks.timecall
def using_cursor(table_name, uri):
    engine = sqlalchemy.create_engine(uri)
    query = 'INSERT INTO {} (id, col1, col2, col3, col4) VALUES(%s, %s, %s, %s, %s)'
    query = query.format(table_name)
    con = engine.raw_connection()
    with con.cursor() as cursor:
        with open('tmp.csv') as fh:
            reader = csv.reader(fh)
            next(reader)  # Skip firt line (headers)
            for row in reader:
                cursor.execute(query, row)
                con.commit()
    con.close()


@profilehooks.timecall
def using_cursor_correct(table_name, uri):
    engine = sqlalchemy.create_engine(uri)
    query = 'INSERT INTO {} (id, col1, col2, col3, col4) VALUES(%s, %s, %s, %s, %s)'
    query = query.format(table_name)
    with open('tmp.csv') as fh:
        reader = csv.reader(fh)
        next(reader)  # Skip firt line (headers)
        data = list(reader)
    engine.execute(query, data)


def main():
    uri = 'mysql+pymysql://root:%s@localhost/test' % os.environ['pass']

    engine = sqlalchemy.create_engine(uri)
    for i in (1, 2, 3, 4):
        engine.execute("DROP TABLE IF EXISTS table%s" % i)
        engine.execute("""
            CREATE TABLE table%s(
                id INT,
                col1 VARCHAR(255),
                col2 INT,
                col3 INT,
                col4 DOUBLE
            );
        """ % i)
    create_test_data()

    using_odo('table1', uri)
    using_pandas('table4', uri)
    using_cursor_correct('table3', uri)
    using_cursor('table2', uri)

    for i in (1, 2, 3, 4):
        count = pd.read_sql('SELECT COUNT(*) as c FROM table%s' % i, con=uri)['c'][0]
        print("Count for table%s - %s" % (i, count))


if __name__ == '__main__':
    main()

The odo method is the fastest (uses MySQL LOAD DATA INFILE under the hood) Next is Pandas (critical code paths are optimized) Next is using a raw cursor but inserting rows in bulk Last is the naive method, committing one row at a time

Here are some examples timings running locally against a local MySQL server.

using_odo (./test.py:29): 0.516 seconds

using_pandas (./test.py:23): 3.039 seconds

using_cursor_correct (./test.py:50): 12.847 seconds

using_cursor (./test.py:34): 43.470 seconds

Count for table1 - 100000

Count for table2 - 100000

Count for table3 - 100000

Count for table4 - 100000

As you can see, the naive implementation is ~100 times slower than odo. And ~10 times slower than using pandas

Roger Thomas
  • 822
  • 1
  • 7
  • 17
  • can you please specified the odo and python version are you using for this since im getting error when i import odo: Traceback (most recent call last): File "main.py", line 1, in from odo import odo File "/home/env/lib/python3.7/site-packages/odo/__init__.py", line 39, in from .backends import sql File "/home/env/lib/python3.7/site-packages/odo/backends/sql.py", line 396, in @discover.register(sa.engine.RowProxy) AttributeError: module 'sqlalchemy.engine' has no attribute 'RowProxy' – farhan pirzada Jan 19 '22 at 09:01
  • wouldnt it be better if we use pandas df.to_sql(table_name, con=uri, if_exists='append', index=False, method='multi') so that multiple rows are inserted at a time. something like bulk insert. – Sneha Tunga Oct 11 '22 at 07:18
2

The solution is to use batch insert from MySQL.

So you need to take all the values you want to insert and transform them into a single string used as parameter for the execute() method.

In the end you SQL should look like:

INSERT INTO table_name (`column`, `column_1`, `column_2`, `column_3`) VALUES('1','2','3','4'),('4','5','6','7'),('7','8','9','10');

Here is an example:

#function to transform your list into a string
def stringify(v): 
    return "('%s', '%s', %s, %s)" % (v[0], v[1], v[2], v[3])

#transform all to string
v = map(stringify, row)

#glue them together
batchData = ", ".join(e for e in v)

#complete the SQL
sql = "INSERT INTO `table_name`(`column`, `column_1`, `column_2`, `column_3`) \
VALUES %s" % batchData

#execute it
cursor.execute(sql)
db.commit()
Alex
  • 5,510
  • 8
  • 35
  • 54
2

Here are some stats to support the answer from @Mike Tung. executemany out performs execute. It was hard to reach 315 inserts in 1 second with execute while with executemany I achieved 25,000 inserts.

Base machine configuration -

2.7 GHz Dual-Core Intel Core i5
16 GB 1867 MHz DDR3
Flash Storage

Results:

cursor.execute: 250 Inserts to max 315 Inserts in one second
cursor.executemany: 25,000 Inserts in one second
user 923227
  • 2,528
  • 4
  • 27
  • 46
1

Take the commit out the for:

for row in csv_data:
     cursor.execute(query,row)
db.commit()

It will do less work and will be faster

nacho
  • 5,280
  • 2
  • 25
  • 34
0

I solved this by using a tuple array and put this into the execute statement. When processing 1 mili. rows it only took 8 minutes. Try to avoid iterative con.execute command as much as possible

def process_csv_file4(csv_file, conn):
df = pd.read_csv(csv_file,sep=';',
                 names=['column'])

query = """
        INSERT INTO table
            (column)
        VALUES 
            (%s) 
        ON DUPLICATE KEY UPDATE 
            column= VALUES(column);
        """    

conn.execute(query, tuple(df.values))
wfolkerts
  • 99
  • 1
  • 4
0

I'm using SQL alchemy library to speed up bulk insert from a CSV file to MySql database through a python script. The data in the database will be inserted in text format so connect to database workbench and change the data types and the data is ready to use.

Step 1: Use 'pip install sqlalchemy' & 'pip install mysqlclient' in the command terminal.

import MySQLdb

import sqlalchemy

from sqlalchemy import  create_engine

Step 2:
Then create a connection string of create engine through SQL alchemy.

######Create Engine####

syntax- enginecreate_engine("mysql+mysqldb://username:password@hostadress:3306/username")

egzample-

enginecreate_engine("mysql+mysqldb://abc9:abc$123456@127.10.23.1:2207/abc9")
conn=engine.connect()

print(engine);

###########Define your python code##############

def function_name():

    data = pd.read_csv(filepath/file.csv')   
    data_frame = data.to_sql('database_name', engine, method='multi',index=False, 
    if_exists='replace')

############Close Connection###############

conn = engine.raw_connection()

conn.commit()

conn.close()

Run the code and 2 million rows can be inserted under 4 minutes!!

use this reference link for different database drivers:

https://overiq.com/sqlalchemy-101/installing-sqlalchemy-and-connecting-to-database/