57

I have some rather large pandas DataFrames and I'd like to use the new bulk SQL mappings to upload them to a Microsoft SQL Server via SQL Alchemy. The pandas.to_sql() method, while nice, is slow.

I'm having trouble writing the code...

I'd like to be able to pass this function a pandas DataFrame which I'm calling table, a schema name I'm calling schema, and a table name I'm calling name. Ideally, the function will 1.) delete the table if it already exists. 2.) create a new table 3.) create a mapper and 4.) bulk insert using the mapper and pandas data. I'm stuck on part 3.

Here's my (admittedly rough) code. I'm struggling with how to get the mapper function to work with my primary keys. I don't really need primary keys but the mapper function requires it.

Thanks for the insights.

from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase

def bulk_upload(table, schema, name):
    e = create_engine('mssql+pyodbc://MYDB')
    s = create_session(bind=e)
    m = MetaData(bind=e,reflect=True,schema=schema)
    Base = declarative_base(bind=e,metadata=m)
    t = Table(name,m)
    m.remove(t)
    t.drop(checkfirst=True)
    sqld = SQLDatabase(e, schema=schema,meta=m)
    sqlt = SQLTable(name, sqld, table).table
    sqlt.metadata = m
    m.create_all(bind=e,tables=[sqlt])    
    class MyClass(Base):
        return
    mapper(MyClass, sqlt)    

    s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
    return
Gord Thompson
  • 116,920
  • 32
  • 215
  • 418
none
  • 1,187
  • 2
  • 13
  • 17
  • 2
    It seems that you are recreating the `to_sql` function yourself, and I doubt that this will be faster. The bottleneck writing data to SQL lies mainly in the python drivers (`pyobdc` in your case), and this is something you don't avoid with the above implementation. Furthermore, `to_sql` does not use the ORM, which is considered to be slower than CORE sqlalchemy even when using bulk insert (http://docs.sqlalchemy.org/en/latest/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow) – joris Aug 13 '15 at 23:09
  • Further, if `to_sql` is too slow, and you cannot improve it (by eg tweaking the connection parameters, the used driver (eg pymssql), internet speed, by removing constraints on the table, etc), a faster alternative is writing the data to csv, and loading this into the SQL table. – joris Aug 13 '15 at 23:57
  • @joris Thanks. It seems that the "bulk operations" listed here are a bit of a misnomer then. http://docs.sqlalchemy.org/en/rel_1_0/orm/persistence_techniques.html#bulk-operations What I really need to do is output the pandas datafile to a textfile and write the BULK INSERT operation like this... http://stackoverflow.com/questions/29638136/how-to-speed-up-with-bulk-insert-to-ms-server-from-python-with-pyodbc-from-csv – none Aug 13 '15 at 23:59
  • yes, but that is to improve the speed of sqlalchemy ORM, which has a lot more functionality than only core sqlalchemy. But pandas `to_sql` does not use ORM at all, as I said before, and is in fact already doing a bulk insert. – joris Aug 14 '15 at 00:01
  • @joris Well, the reason why I went down this road was I can run a 'BULK INSERT dbo.MyTable FROM \\fileserver\folder\doc.txt' on the SQL Server and the performance is great. What I'm thinking is that when the BULK INSERT statement uses "VALUES" instead of "FROM", that's where the real performance loss is. In other words, the connection from the sql server to file server is better than the connection from my virtual machine to the SQL Server. Thanks. – none Aug 14 '15 at 00:14

11 Answers11

41

I ran into a similar issue with pd.to_sql taking hours to upload data. The below code bulk inserted the same data in a few seconds.

from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO

address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()

#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)

#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()
     
#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")    
connection.commit()
cur.close()
vvv444
  • 2,764
  • 1
  • 14
  • 25
ansonw
  • 1,559
  • 1
  • 16
  • 22
  • 12
    Looks interesting. Tried it with an Oracle DB, it says cx_Oracle.Cursor object has no attribute 'copy_from'. The copy_from method seems to be a postgres thing. Any ideas on an DB-agnostic method? – Ziggy Eunicien Nov 23 '16 at 20:19
  • 1
    this is a nice in-mem solution. just one point is the `contents` variable didn't get used. might as well drop it since it does a read through the whole string buffer which can get big depending on db size. tested without and it's working fine. – marko Sep 05 '17 at 17:21
  • 7
    cStringIO was deprecated for python3. If using python3, you can use: import io; output = io.StringIO() – ansonw Dec 27 '17 at 19:23
  • 3
    note - I don't think this will work for redshift. better/fastest to push data to s3 and then copy into redshift from there. – ansonw Jun 28 '18 at 21:58
  • 6
    This also doesn't work for SQL Server. In fact, the only platform this solution works for is Postgres (due to `copy_from`), which may help people besides the asker, but the asker very explicitly asked about SQL Server. – bsplosion May 08 '19 at 16:07
  • It saved me. Thanks. – shariful Jun 13 '19 at 21:44
  • 1
    Awesome solution. Took just a couple of mins to load 2 million rows. – infiniteloop Apr 28 '20 at 16:00
  • I know this is old, but does this work for multiple execute statements before commit. Something like connection.execute(sql1) ; connection.execute(sql2)... connection.commit() so if any of my queries fails it will roll back ALL changes from sql1...sqlN? – brian_ds Oct 07 '20 at 19:30
26

This might have been answered by then, but I found the solution by collating different answers on this site and aligning with SQLAlchemy's doc.

  1. The table needs to already exist in db1; with an index set up with auto_increment on.
  2. The Class Current needs to align with the dataframe imported in the CSV and the table in the db1.

Hope this helps whoever comes here and wants to mix Panda and SQLAlchemy in a quick way.

from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd


# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine('mysql://root:%s@localhost/db1' % urlquote('weirdPassword*withsp€cialcharacters'), echo=False)
conn = engine.connect()
Base = declarative_base()

#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy's doc.
class Current(Base):
    __tablename__ = 'tableName'

    id = Column(Integer, primary_key=True)
    Date = Column(String(500))
    Type = Column(String(500))
    Value = Column(Numeric())

    def __repr__(self):
        return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value)

# Set up of the table in db and the file to import
fileToRead = 'file.csv'
tableToWriteTo = 'tableName'

# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient='records')

metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)

# Open the session
Session = sessionmaker(bind=engine)
session = Session()

# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)

# Commit the changes
session.commit()

# Close the session
session.close()
AkaGonjo
  • 261
  • 3
  • 3
  • 2
    I found this SQLAlchemy article very useful for improving the speed of inserts: http://docs.sqlalchemy.org/en/latest/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow – Andrei Sura Mar 03 '17 at 03:08
  • 1
    Thanks for this, you made my life way easier. Can you explain the point of the `def __repr__(self)`? – Walt Reed Oct 03 '17 at 20:28
  • 1
    Where exactly is the definition of `class Current` used? Doesn't actually seem to be dereferenced anywhere here. – ijoseph Jul 10 '18 at 23:35
  • 1
    @ijoseph I think the class `Current` is subclassing the `Base` which records all of its ORM definittions in the `MetaData` object which is used to create table schema. [docs](http://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/basic_use.html#accessing-the-metadata) – Alex Zamai Aug 17 '18 at 08:15
  • @WaltReed, That tells the class how to reproduce itself as a string. – RagingRoosevelt Nov 14 '22 at 16:12
20

Based on @ansonw answers:

def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
    # Create Table
    df[:0].to_sql(table, engine, if_exists=if_exists)

    # Prepare data
    output = cStringIO.StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    cursor.copy_from(output, table, sep=sep, null='')
    connection.commit()
    cursor.close()

I insert 200000 lines in 5 seconds instead of 4 minutes

  • I didn't downvote, but this doesn't really look like a solution that utilizes pandas as desired: multiple process + pandas + sqlalchemy. Usually during ingestion, especially with larger data sets, there will be a temporary location to store the data in the database and then massage that data (delete/back-populate) before an insert/update. – Brian Bruggeman Jun 26 '17 at 23:34
  • I think this could be refactored to support chunking of the data (which the Pandas DataFrame.to_sql suggests it does, but it doesn't work). This would mean the entire CSV form would not need to be rendered in one go, but I think this is otherwise the best solution in this thread! – Bklyn Aug 09 '18 at 16:22
  • Unfortunately since then, the method copy_from() is deprecated. – Sedat Kestepe Jan 05 '21 at 16:15
15

Pandas 0.25.1 has a parameter to do multi-inserts, so it's no longer necessary to workaround this issue with SQLAlchemy.

Set method='multi' when calling pandas.DataFrame.to_sql.

In this example, it would be df.to_sql(table, schema=schema, con=e, index=False, if_exists='replace', method='multi')

Answer sourced from docs here

Worth noting that I've only tested this with Redshift. Please let me know how it goes on other databases so I can update this answer.

  • 1
    When I use 'multi' with MSSQL+Pyodbc driver, SQLAlchemy throws a generic DBAPIError, which unfortunately makes this extremely difficult to debug. – Jonny Waffles Nov 14 '19 at 16:40
  • 5
    I realized MSSQL only supports 2100 maximum parameters, and thus large multi inserts don't work. I resolved the issue by using the chunksize argument to break the dataframe values in to a smaller param list. – Jonny Waffles Nov 14 '19 at 16:57
  • 1
    `method="multi"` didn't improve the speed of bulk-load to SQL Server for me, only [create_engine(..., fast_executemany=True)](https://stackoverflow.com/a/63178240/1026) did. – Nickolay Dec 20 '21 at 12:37
8

As this is an I/O heavy workload you can also use the python threading module through multiprocessing.dummy. This sped things up for me:

import math
from multiprocessing.dummy import Pool as ThreadPool

...

def insert_df(df, *args, **kwargs):
    nworkers = 4

    chunksize = math.floor(df.shape[0] / nworkers)
    chunks = [(chunksize * i, (chunksize * i) + chunksize) for i in range(nworkers)]
    chunks.append((chunksize * nworkers, df.shape[0]))
    pool = ThreadPool(nworkers)

    def worker(chunk):
        i, j = chunk
        df.iloc[i:j, :].to_sql(*args, **kwargs)

    pool.map(worker, chunks)
    pool.close()
    pool.join()


....

insert_df(df, "foo_bar", engine, if_exists='append')
dgorissen
  • 6,207
  • 3
  • 43
  • 52
6

Here is a simple method

Download Drivers for SQL database connectivity

For Linux and Mac OS:

https://learn.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-2017

For Windows:

https://www.microsoft.com/en-us/download/details.aspx?id=56567

Creating Connection

from sqlalchemy import create_engine 
import urllib
server = '*****'
database = '********'
username = '**********'
password = '*********'

params = urllib.parse.quote_plus(
'DRIVER={ODBC Driver 17 for SQL Server};'+ 
'SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password) 

engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params) 

#Checking Connection 
connected = pd.io.sql._is_sqlalchemy_connectable(engine)

print(connected)   #Output is True if connection established successfully

Data insertion

df.to_sql('Table_Name', con=engine, if_exists='append', index=False)


"""
if_exists: {'fail', 'replace', 'append'}, default 'fail'
     fail: If table exists, do nothing.
     replace: If table exists, drop it, recreate it, and insert data.
     append: If table exists, insert data. Create if does not exist.
"""

If there are many records

# limit based on sp_prepexec parameter count
tsql_chunksize = 2097 // len(bd_pred_score_100.columns)
# cap at 1000 (limit for number of rows inserted by table-value constructor)
tsql_chunksize = 1000 if tsql_chunksize > 1000 else tsql_chunksize
print(tsql_chunksize)


df.to_sql('table_name', con = engine, if_exists = 'append', index= False, chunksize=tsql_chunksize)

PS: You can change the parameters as per your requirement.

Suhas_Pote
  • 3,620
  • 1
  • 23
  • 38
5

My postgres specific solution below auto-creates the database table using your pandas dataframe, and performs a fast bulk insert using the postgres COPY my_table FROM ...

import io

import pandas as pd
from sqlalchemy import create_engine

def write_to_table(df, db_engine, schema, table_name, if_exists='fail'):
    string_data_io = io.StringIO()
    df.to_csv(string_data_io, sep='|', index=False)
    pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine, schema=schema)
    table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                               index=False, if_exists=if_exists, schema=schema)
    table.create()
    string_data_io.seek(0)
    string_data_io.readline()  # remove header
    with db_engine.connect() as connection:
        with connection.connection.cursor() as cursor:
            copy_cmd = "COPY %s.%s FROM STDIN HEADER DELIMITER '|' CSV" % (schema, table_name)
            cursor.copy_expert(copy_cmd, string_data_io)
        connection.connection.commit()
mgoldwasser
  • 14,558
  • 15
  • 79
  • 103
5

for people like me who are trying to implement the aforementioned solutions:

Pandas 0.24.0 has now to_sql with chunksize and method='multi' option that inserts in bulk...

freddy888
  • 956
  • 3
  • 18
  • 39
2

This worked for me to connect to Oracle Database using cx_Oracle and SQLALchemy

import sqlalchemy
import cx_Oracle
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String
from sqlalchemy.orm import sessionmaker
import pandas as pd

# credentials
username = "username"
password = "password"
connectStr = "connection:/string"
tableName = "tablename"

t0 = time.time()

# connection
dsn = cx_Oracle.makedsn('host','port',service_name='servicename')

Base = declarative_base()

class LANDMANMINERAL(Base):
    __tablename__ = 'tablename'

    DOCUMENTNUM = Column(String(500), primary_key=True)
    DOCUMENTTYPE = Column(String(500))
    FILENUM = Column(String(500))
    LEASEPAYOR = Column(String(500))
    LEASESTATUS = Column(String(500))
    PROSPECT = Column(String(500))
    SPLIT = Column(String(500))
    SPLITSTATUS = Column(String(500))

engine = create_engine('oracle+cx_oracle://%s:%s@%s' % (username, password, dsn))
conn = engine.connect()  

Base.metadata.bind = engine

# Creating the session

DBSession = sessionmaker(bind=engine)

session = DBSession()

# Bulk insertion
data = pd.read_csv('data.csv')
lists = data.to_dict(orient='records')


table = sqlalchemy.Table('landmanmineral', Base.metadata, autoreload=True)
conn.execute(table.insert(), lists)

session.commit()

session.close() 

print("time taken %8.8f seconds" % (time.time() - t0) )
bootstrap
  • 61
  • 1
  • 1
  • 5
0

Below code might help you, i was facing the same issue while loading 695,000K records

Method Truncate the table before load

with engine.begin() as conn:
     conn.execute(sa.text("TRUNCATE TABLE <schama.table>")

Note:- engine= my connection to destination server, sa for (import sqlalchemy as "sa"

table_name = "<destination_table>"
df.to_sql(table_name, engine, schema = 'schema', if_exists = 'replace', index=False)

depends on requirement do append/replace

Deepak
  • 430
  • 1
  • 7
  • 14
-4

For anyone facing this problem and having the destination DB as Redshift, note that Redshift does not implement the full set of Postgres commands, and so some of the answers using either Postgres' COPY FROM or copy_from() will not work. psycopg2.ProgrammingError: syntax error at or near "stdin" error when trying to copy_from redshift

Solution for speeding up the INSERTs to Redshift is to use a file ingest or Odo.

Reference:
About Odo http://odo.pydata.org/en/latest/perf.html
Odo with Redshift
https://github.com/blaze/odo/blob/master/docs/source/aws.rst
Redshift COPY (from S3 file)
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html

cryanbhu
  • 4,780
  • 6
  • 29
  • 47
  • Welcome to Stack Overflow! While links are great way of sharing knowledge, they won't really answer the question if they get broken in the future. Add to your answer the essential content of the link which answers the question. In case the content is too complex or too big to fit here, describe the general idea of the proposed solution. Remember to always keep a link reference to the original solution's website. See: [How do I write a good answer?](https://stackoverflow.com/help/how-to-answer) – sɐunıɔןɐqɐp Jul 05 '18 at 07:08