2

I'm stuck trying to use sqlalchemy's bulk_insert_mappings. I got the the point where I can create a session and connect to the db. I have initialized my engine but I can't seem to get the mapper I need from the table.

from sqlalchemy import create_engine
from sqlalchemy.orm.session import sessionmaker,Session
from sqlalchemy_utils import get_mapper

engine = create_engine('mysql+pymysql://{}:{}@IP:PORT/'.format(USER,PW)) # removed my config here
connection = engine.connect()
m = MetaData(bind=engine,schema='test')
m.reflect()

Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()

I found a bunch of related questions on SO most recently this one

SQLAlchemy get Mapper object from Table object (from Metadata or Session or otherwise)

but sqlalchemy_utils.get_mapper raises:

"ValueError: Could not get mapper for table 'test'."

sqlalchemy.orm.mapperlib._mapper_registry appears to be empty. Maybe because I didn't bind it to my engine. but not sure how to do that.

PS: test is a very simple one column table of type TEXT

Here is the output of m.tables['test.test']

Table('test', MetaData(bind=Engine(mysql+pymysql://USER:***@IP:PORT/)), Column('a', TEXT(), table=<test>), schema='test')
SuperShoot
  • 9,880
  • 2
  • 38
  • 55
rhedak
  • 399
  • 4
  • 13

2 Answers2

6

The job of the SQLAlchemy Mapper is to:

Define the correlation of class attributes to database table columns.

... and it is fundamental to the SQLAlchemy ORM. With the ORM, Python classes represent tables in a database and there needs to be some mechanism that relates the attributes on a class to columns in a table. If you aren't using the ORM, your tables aren't mapped to Python classes, and therefore there are no mappers in use. This is why you get the error from get_mapper().

In your example:

m = MetaData(bind=engine,schema='test')
m.reflect()

MetaData is:

A collection of Table objects and their associated schema constructs.

and MetaData.reflect:

Automatically creates Table entries in this MetaData for any table available in the database but not yet present in the MetaData.

So at this point, you have a collection of Table objects and you want to perform a bulk insert on one of them. Don't confuse Table objects with ORM mapped classes, they are not the same thing.

The docs on bulk_insert_mappings state:

Perform a bulk insert of the given list of mapping dictionaries.

and

The values within the dictionaries as given are typically passed without modification into Core Insert() constructs

You are trying to achieve a bulk insert of your data and we can skip the ORM methods (anything involving the Session) and interact with the Core explicitly.

The expression pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records") returns a list of dicts like: [{'a': 'a'}, {'a': 'b'}, {'a': 'c'}], so I will use that example output from here on for the sake of simplicity.

You have the table in your metadata object which you already retrieve with m.tables['test.test'], and that Table object can be used to generate its own insert statement:

print(m.tables['test.test'].insert())
# INSERT INTO test.test (a) VALUES (%(a)s)

And to execute multiple statements we can pass the list of dictionaries to Connection.execute() as I illustrate below.

One of the benefits of the ORM Session is that it allows for explicit transaction management, where you call Session.rollback() or Session.commit() wherever necessary. Connection objects can also explicitly operate within transactions similar to the Session using Engine.begin().

For example, using a context manager:

with engine.begin() as conn:
    conn.execute(
        m.tables['test.test'].insert(),
        *[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
    )

This will automatically commit the query if there are no errors within the context, and rollback if there are errors.

The engine logs show this expression issues the following query:

INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})

The following contrived example shows your original query through using Session.bulk_insert_mappings(). I've had to create an ORM model to represent the table and add an id field to the table as the ORM doesn't like working without primary keys.

m = MetaData(bind=engine,schema='test')
Base = declarative_base(metadata=m)

class Test(Base):
    __tablename__ = 'test'
    id = Column(Integer, primary_key=True)
    a = Column(Text)


Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()

And this is the executed query from the engine logs:

INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})

Which you will note is exactly the same query as we were able to achieve through using the Core directly.

SuperShoot
  • 9,880
  • 2
  • 38
  • 55
0

I have been googling on the exact same question. However, I have found a workaround to this issue.

class Helper():
   pass
new_mapper = sqlalchemy.orm.mapper(Helper, local_table = m.tables['test.test'])
session.bulk_insert_mappings(new_mapper, 
df.to_dict(orient="records"), return_defaults = False)
session.commit()
session.close()

According to the following link, I thought that df.to_sql is performing really poor in inserting massive dataframes into sql tables. However, it turned out that bulk_insert_mappings is much slower. I hope it helps.