The job of the SQLAlchemy Mapper
is to:
Define the correlation of class attributes to database table columns.
... and it is fundamental to the SQLAlchemy ORM. With the ORM, Python classes represent tables in a database and there needs to be some mechanism that relates the attributes on a class to columns in a table. If you aren't using the ORM, your tables aren't mapped to Python classes, and therefore there are no mappers in use. This is why you get the error from get_mapper()
.
In your example:
m = MetaData(bind=engine,schema='test')
m.reflect()
MetaData
is:
A collection of Table
objects and their associated schema constructs.
and MetaData.reflect
:
Automatically creates Table
entries in this MetaData
for any table available in the database but not yet present in the MetaData
.
So at this point, you have a collection of Table
objects and you want to perform a bulk insert on one of them. Don't confuse Table
objects with ORM mapped classes, they are not the same thing.
The docs on bulk_insert_mappings
state:
Perform a bulk insert of the given list of mapping dictionaries.
and
The values within the dictionaries as given are typically passed without modification into Core Insert() constructs
You are trying to achieve a bulk insert of your data and we can skip the ORM methods (anything involving the Session
) and interact with the Core explicitly.
The expression pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records")
returns a list of dict
s like: [{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
, so I will use that example output from here on for the sake of simplicity.
You have the table in your metadata object which you already retrieve with m.tables['test.test']
, and that Table
object can be used to generate its own insert statement:
print(m.tables['test.test'].insert())
# INSERT INTO test.test (a) VALUES (%(a)s)
And to execute multiple statements we can pass the list of dictionaries to Connection.execute()
as I illustrate below.
One of the benefits of the ORM Session
is that it allows for explicit transaction management, where you call Session.rollback()
or Session.commit()
wherever necessary. Connection objects can also explicitly operate within transactions similar to the Session
using Engine.begin()
.
For example, using a context manager:
with engine.begin() as conn:
conn.execute(
m.tables['test.test'].insert(),
*[{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]
)
This will automatically commit the query if there are no errors within the context, and rollback if there are errors.
The engine logs show this expression issues the following query:
INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})
The following contrived example shows your original query through using Session.bulk_insert_mappings()
. I've had to create an ORM model to represent the table and add an id
field to the table as the ORM doesn't like working without primary keys.
m = MetaData(bind=engine,schema='test')
Base = declarative_base(metadata=m)
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
a = Column(Text)
Session = sessionmaker(bind=engine)
s = Session()
s.bulk_insert_mappings(get_mapper(m.tables['test.test']), pd.DataFrame({'a':['a','b','c']}).to_dict(orient="records"))
s.commit()
s.close()
And this is the executed query from the engine logs:
INSERT INTO test.test (a) VALUES (%(a)s)
({'a': 'a'}, {'a': 'b'}, {'a': 'c'})
Which you will note is exactly the same query as we were able to achieve through using the Core directly.