What I ended up doing is staged data upload.
First, I create a structural copy of the table I am planning to put the data into. I did it by following this recommendation: sqlalchemy construct new declarative class from existing
def mutate_declarative(source):
columns = []
omit_columns = ['created_at', 'updated_at']
for c in source.__table__.c:
if c.name not in omit_columns:
columns.append(((c.name,
c.type),
{'primary_key': c.primary_key,
'nullable': c.nullable,
'doc': c.doc,
'default': c.default,
'unique': c.unique,
'autoincrement': c.autoincrement}))
class Stage(get_base()):
original = source
__tablename__ = source.__tablename__ + '_staging'
__table__ = Table(source.__tablename__ + '_staging',
get_base().metadata, *[Column(*c[0], **c[1]) for c in columns])
return Stage
def create_staging_table(source):
new_class = mutate_declarative(source)
engine = get_base().metadata.bind
new_class.__table__.drop(engine, checkfirst=True)
new_class.__table__.create(engine)
return new_class
def drop_staging_table(source):
engine = get_base().metadata.bind
source.__table__.drop(engine, checkfirst=True)
enter code here
The code above allows me to quickly create empty page and use it as temporary storage to upload my data with the keys generated in the code. As I showed in the original question text this mode is relatively fast.
After that the data from staging table needs to be moved into main table. The problem here is that we need to align existing data with staged data. This can be done with "ON DUPLICATE KEY UPDATE" clause supported by MySQL. Unfortunately SQLALchemy doesn't support that. To solve this problem I followed by the recommendation from here SQLAlchemy ON DUPLICATE KEY UPDATE
def move_data_from_staging_to_main(session, staging, attempt_update=False):
# attempt_update controls if new data should overwrite the existing data
# if attempt_update is set to True existing data will be overwritten with new data
# otherwise presence of conflicting existing data will result in error.
main_table = staging.original.__table__
staged_table = staging.__table__
column_list = []
for column in staging.__table__.columns:
if not column.primary_key:
column_list.append(column)
staged_data = staged_table.select() #
staged_data_1 = staged_data.with_only_columns(column_list).alias("subquery1")
value_string = ''
if attempt_update:
# here we need to introduce our own language to the query because SQLAlchemy
# doesn't support ON DUPLICATE UPDATE see
# stackexchange "ON DUPLICATE KEY UPDATE in the SQL statement" and "SQLAlchemy ON DUPLICATE KEY UPDATE"
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
# we do that by introducing our own compiler modification which simply adds the string we provide as a parameter
# to the end of query.
@compiles(Insert, "mysql")
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
# if our special parameter is present AND parameter's value is not None
# The presence of "mysql_appendstring" in kwargs gets stuck for some reason
# that is why additional testing for None is necessary
if ('mysql_appendstring' in insert.kwargs) and insert.kwargs['mysql_appendstring']:
return s + " " + insert.kwargs['mysql_appendstring']
return s
# Below statement is needed to silence some "dialect unknown" warning.
# Unfortunately I don't know SQLAlchemy well enough yet to explain why it is needed
Insert.argument_for("mysql", "appendstring", None)
# we need to form correct ON DUPLICATE KEY UPDATE a=values(a), b=values(b), ... string which will added
# at the end of the query to make insert query into insert_or_update_if_exists query
value_string = ' ON DUPLICATE KEY UPDATE '
value_string += ' '.join(
[c.name + "=values(" + c.name + "), " for c in staged_data_1.columns])
value_string = value_string[:-2]
insert = main_table.insert(mysql_appendstring=value_string).from_select(
[c.name for c in staged_data_1.columns],
staged_data_1.select()
)
else:
insert = main_table.insert().from_select(
[c.name for c in staged_data_1.columns],
staged_data_1.select()
)
session.execute(insert)