3

I am using SQLALchemy version 1.2.0b1

The table I have looks like this

class Company(Base):
    __tablename__ = 'company'
    id = Column(Integer, primary_key=True, autoincrement=True)
    cik = Column(String(10), nullable=False, index=True, unique=True)
    name = Column(String(71), nullable=False)

When I am inserting new values into the table and I DO specify the id

company=Company()
company.id =counter
company.cik = ...
company.name = ...

the program runs very fast. The insert code issued by the Sqlalchemy to the server is bulk insert.

If I skip on id to rely on db to generate unique id

company=Company()
company.cik = ...
company.name = ...

The code becomes as slow as proton decay and the echo shows that SQLalchemy issues insert statement for every single company item. No bulk insert.

Is there way to avoid this type of behavior and rely on the database to generate ids?

Ilja Everilä
  • 50,538
  • 7
  • 126
  • 127
  • A must read in relation to questions like these: http://docs.sqlalchemy.org/en/latest/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow. An ORM has to know the identity of an object, so SQLAlchemy inserts your companies one by one in order to read the generated ids. When given beforehand SQLA can group the inserts to one. – Ilja Everilä Jul 24 '17 at 08:41
  • Also a good read: the docs on [`return_defaults`](http://docs.sqlalchemy.org/en/latest/orm/session_api.html#sqlalchemy.orm.session.Session.bulk_insert_mappings.params.return_defaults) parameter of [`Session.bulk_insert_mappings()`](http://docs.sqlalchemy.org/en/latest/orm/session_api.html#sqlalchemy.orm.session.Session.bulk_insert_mappings). Without a bit more context it is hard to say what you could do to speed up your code. In general if you don't need the Company objects after the bulk insert right away, esp. for modification, you can bulk insert them with auto generated ids. – Ilja Everilä Jul 24 '17 at 08:49
  • Thank you Ilja. I, sort of, solved this problem before you posted your comments. I didn't look in the direction of bulk_insert_mappings. It looks promising. – Andrey Dolgikh Aug 02 '17 at 18:34

1 Answers1

0

What I ended up doing is staged data upload. First, I create a structural copy of the table I am planning to put the data into. I did it by following this recommendation: sqlalchemy construct new declarative class from existing

def mutate_declarative(source):
    columns = []
    omit_columns = ['created_at', 'updated_at']
    for c in source.__table__.c:
        if c.name not in omit_columns:
            columns.append(((c.name,
                             c.type),
                            {'primary_key': c.primary_key,
                             'nullable': c.nullable,
                             'doc': c.doc,
                             'default': c.default,
                             'unique': c.unique,
                             'autoincrement': c.autoincrement}))

  class Stage(get_base()):
        original = source
        __tablename__ = source.__tablename__ + '_staging'
        __table__ = Table(source.__tablename__ + '_staging',
                          get_base().metadata, *[Column(*c[0], **c[1]) for c in columns])

    return Stage

def create_staging_table(source):
    new_class = mutate_declarative(source)
    engine = get_base().metadata.bind
    new_class.__table__.drop(engine, checkfirst=True)
    new_class.__table__.create(engine)
    return new_class


def drop_staging_table(source):
    engine = get_base().metadata.bind
    source.__table__.drop(engine, checkfirst=True)
enter code here

The code above allows me to quickly create empty page and use it as temporary storage to upload my data with the keys generated in the code. As I showed in the original question text this mode is relatively fast. After that the data from staging table needs to be moved into main table. The problem here is that we need to align existing data with staged data. This can be done with "ON DUPLICATE KEY UPDATE" clause supported by MySQL. Unfortunately SQLALchemy doesn't support that. To solve this problem I followed by the recommendation from here SQLAlchemy ON DUPLICATE KEY UPDATE

def move_data_from_staging_to_main(session, staging, attempt_update=False):
# attempt_update controls if new data should overwrite the existing data
# if attempt_update is set to True existing data will be overwritten with new data
# otherwise presence of conflicting existing data will result in error.
main_table = staging.original.__table__
staged_table = staging.__table__
column_list = []
for column in staging.__table__.columns:
    if not column.primary_key:
        column_list.append(column)

staged_data = staged_table.select()  #
staged_data_1 = staged_data.with_only_columns(column_list).alias("subquery1")
value_string = ''
if attempt_update:
    # here we need to introduce our own language to the query because SQLAlchemy
    # doesn't support ON DUPLICATE UPDATE see 
    # stackexchange "ON DUPLICATE KEY UPDATE in the SQL statement" and "SQLAlchemy ON DUPLICATE KEY UPDATE"
    from sqlalchemy.ext.compiler import compiles
    from sqlalchemy.sql.expression import Insert
    # we do that by introducing our own compiler modification which simply adds the string we provide as a parameter
    # to the end of query.
    @compiles(Insert, "mysql")
    def append_string(insert, compiler, **kw):
        s = compiler.visit_insert(insert, **kw)
        # if our special parameter is present AND parameter's value is not None
        # The presence of "mysql_appendstring" in kwargs gets stuck for some reason
        # that is why additional testing for None is necessary
        if ('mysql_appendstring' in insert.kwargs) and insert.kwargs['mysql_appendstring']:
            return s + " " + insert.kwargs['mysql_appendstring']
        return s

    # Below statement is needed to silence some "dialect unknown" warning.
    # Unfortunately I don't know SQLAlchemy well enough yet to explain why it is needed
    Insert.argument_for("mysql", "appendstring", None)

    # we need to form correct ON DUPLICATE KEY UPDATE a=values(a), b=values(b), ... string which will added
    # at the end of the query to make insert query into insert_or_update_if_exists query
    value_string = ' ON DUPLICATE KEY UPDATE '
    value_string += ' '.join(
        [c.name + "=values(" + c.name + "), " for c in staged_data_1.columns])
    value_string = value_string[:-2]

    insert = main_table.insert(mysql_appendstring=value_string).from_select(
        [c.name for c in staged_data_1.columns],
        staged_data_1.select()
    )
else:
    insert = main_table.insert().from_select(
        [c.name for c in staged_data_1.columns],
        staged_data_1.select()
    )
session.execute(insert)