4

Question: For a Connnection object's execute function, is possible to use a generator that returns dictionaries rather than a list of dictionaries to do an "executemany" insert?

Detail: I'm trying to learn SQLAlchemy by working through the core expressions. As a test, I have a rather large data set, accessed from a file via an iterator, that I'm trying to transfer into a PostgreSQL table, but inserting individual rows is quite slow (see Example 1 below). According to the documentation, the Connnection object's execute() function will do the equivalent of an executemany() if a list of dictionaries is passed in rather than a single dictionary. I did some quick tests, and indeed this method is quite a bit faster for groups of insertions. Unfortunately, with my large data set, I can't create a complete list of dictionaries in memory, thus my question...

Example 1: the following (pseudo)code is very slow for a large amount of data

from sqlalchemy import MetaData, Table, Column

metadata = MetaData()
data = Table('data', metadata, Column...)

engine = sql.create_engine('postgresql://user:pass$@localhost/testdb')
metadata.create_all(engine)

conn = engine.connect()
ins = data.insert()
for datum in large_data_iterator:
    datum_dict = do_some_proc(datum)
    conn.execute(ins, datum_dict)

Because execute can take multiple values, it would be nice to replace the final for loop with the following generator version:

def datagen(iterator):
    for datum in large_data_iterator:
        datum_dict = do_some_proc(datum)
        yield datum_dict

conn = engine.connect()
ins = data.insert()
conn.execute(ins, datagen(large_data_iterator))

However, this raises the following exception: AttributeError: 'list' object has no attribute 'keys'.

Does anyone know if it is possible to get the generator version working? Or a better way to do this would also be great. Thanks!

Note: I tested a modified generator expression that yields chunks as a list of dictionaries (below), and it is faster than the individual executes. However, I don't know how to choose the optimal number of chunks, and I'm worried that the added complexity to my generator code makes it potentially more error prone. (But if it is the only way...)

def datagen(iterator):
    output = []
    N = 0
    for datum in large_data_iterator:
        datum_dict = do_some_proc(datum)
        output.append(datum_dict)
        N += 1
        if N == 100: # or whatever
            yield output
            N = 0
            output = []
    if output != []:
        yield output
Ryan
  • 1,064
  • 1
  • 7
  • 14

1 Answers1

0

There are execution_options for the Connection, which take a stream_results parameter, but unforutunately at the bottom it says that "the flag is currently understood only by the psycopg2 dialect", even though there are other drivers with streaming support (e.g. oursql).

Until it is fully supported in sqlalchemy, you can easily write a helper function to break any iterable into chunks to avoid the error-proneness of modifying your generators.

Jakub Klinkovský
  • 1,248
  • 1
  • 12
  • 33