16

I have a Flask application with a RESTful API. One of the API calls is a 'mass upsert' call with a JSON payload. I am struggling with performance.

The first thing I tried was to use merge-result on a Query object, because...

This is an optimized method which will merge all mapped instances, preserving the structure of the result rows and unmapped columns with less method overhead than that of calling Session.merge() explicitly for each value.

This was the initial code:

class AdminApiUpdateTasks(Resource):

    """Bulk task creation / update endpoint"""

    def put(self, slug):
        taskdata = json.loads(request.data)
        existing = db.session.query(Task).filter_by(challenge_slug=slug)
        existing.merge_result(
            [task_from_json(slug, **task) for task in taskdata])
        db.session.commit()
        return {}, 200

A request to that endpoint with ~5000 records, all of them already existing in the database, takes more than 11m to return:

real    11m36.459s
user    0m3.660s
sys 0m0.391s

As this would be a fairly typical use case, I started looking into alternatives to improve performance. Against my better judgement, I tried to merge the session for each individual record:

class AdminApiUpdateTasks(Resource):

    """Bulk task creation / update endpoint"""

    def put(self, slug):
        # Get the posted data
        taskdata = json.loads(request.data)
        for task in taskdata:
           db.session.merge(task_from_json(slug, **task))
        db.session.commit()
        return {}, 200

To my surprise, this turned out to be more than twice as fast:

real    4m33.945s
user    0m3.608s
sys 0m0.258s

I have two questions:

  1. Why is the second strategy using merge faster than the supposedly optimized first one that uses merge_result?
  2. What other strategies should I pursue to optimize this more, if any?
Jeff Widman
  • 22,014
  • 12
  • 72
  • 88
mvexel
  • 1,093
  • 1
  • 10
  • 25

2 Answers2

1

This is an old question, but I hope this answer can still help people.

I used the same idea as this example set by SQLAlchemy, but I added benchmarking for doing UPSERT (insert if exists, otherwise update the existing record) operations. I added the results on a PostgreSQL 11 database below:

Tests to run: test_customer_individual_orm_select, test_customer_batched_orm_select, test_customer_batched_orm_select_add_all, test_customer_batched_orm_merge_result
test_customer_individual_orm_select : UPSERT statements via individual checks on whether objects exist and add new objects individually (10000 iterations); total time 9.359603 sec
test_customer_batched_orm_select : UPSERT statements via batched checks on whether objects exist and add new objects individually (10000 iterations); total time 1.553555 sec
test_customer_batched_orm_select_add_all : UPSERT statements via batched checks on whether objects exist and add new objects in bulk (10000 iterations); total time 1.358680 sec
test_customer_batched_orm_merge_result : UPSERT statements using batched merge_results (10000 iterations); total time 7.191284 sec

As you can see, merge-result is far from the most efficient option. I'd suggest checking in batches whether the results exist and should be updated. Hope this helps!

"""
This series of tests illustrates different ways to UPSERT
or INSERT ON CONFLICT UPDATE a large number of rows in bulk.
"""
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from profiler import Profiler


Base = declarative_base()
engine = None


class Customer(Base):
  __tablename__ = "customer"
  id = Column(Integer, primary_key=True)
  name = Column(String(255))
  description = Column(String(255))


Profiler.init("bulk_upserts", num=100000)


@Profiler.setup
def setup_database(dburl, echo, num):
  global engine
  engine = create_engine(dburl, echo=echo)
  Base.metadata.drop_all(engine)
  Base.metadata.create_all(engine)

  s = Session(engine)
  for chunk in range(0, num, 10000):
    # Insert half of the customers we want to merge
    s.bulk_insert_mappings(
      Customer,
      [
        {
          "id": i,
          "name": "customer name %d" % i,
          "description": "customer description %d" % i,
        }
        for i in range(chunk, chunk + 10000, 2)
      ],
    )
  s.commit()


@Profiler.profile
def test_customer_individual_orm_select(n):
  """
  UPSERT statements via individual checks on whether objects exist
  and add new objects individually
  """
  session = Session(bind=engine)
  for i in range(0, n):
    customer = session.query(Customer).get(i)
    if customer:
      customer.description += "updated"
    else:
      session.add(Customer(
          id=i,
          name=f"customer name {i}",
          description=f"customer description {i} new"
      ))
    session.flush()
  session.commit()

@Profiler.profile
def test_customer_batched_orm_select(n):
  """
  UPSERT statements via batched checks on whether objects exist
  and add new objects individually
  """
  session = Session(bind=engine)
  for chunk in range(0, n, 1000):
    customers = {
        c.id: c for c in
        session.query(Customer)\
            .filter(Customer.id.between(chunk, chunk + 1000))
    }
    for i in range(chunk, chunk + 1000):
      if i in customers:
        customers[i].description += "updated"
      else:
        session.add(Customer(
            id=i,
            name=f"customer name {i}",
            description=f"customer description {i} new"
        ))
    session.flush()
  session.commit()

@Profiler.profile
def test_customer_batched_orm_select_add_all(n):
  """
  UPSERT statements via batched checks on whether objects exist
  and add new objects in bulk
  """
  session = Session(bind=engine)
  for chunk in range(0, n, 1000):
    customers = {
        c.id: c for c in
        session.query(Customer)\
            .filter(Customer.id.between(chunk, chunk + 1000))
    }
    to_add = []
    for i in range(chunk, chunk + 1000):
      if i in customers:
        customers[i].description += "updated"
      else:
        to_add.append({
            "id": i,
            "name": "customer name %d" % i,
            "description": "customer description %d new" % i,
        })
    if to_add:
      session.bulk_insert_mappings(
        Customer,
        to_add
      )
      to_add = []
    session.flush()
  session.commit()

@Profiler.profile
def test_customer_batched_orm_merge_result(n):
  "UPSERT statements using batched merge_results"
  session = Session(bind=engine)
  for chunk in range(0, n, 1000):
    customers = session.query(Customer)\
        .filter(Customer.id.between(chunk, chunk + 1000))
    customers.merge_result(
      Customer(
          id=i,
          name=f"customer name {i}",
          description=f"customer description {i} new"
      ) for i in range(chunk, chunk + 1000)
    )
    session.flush()
  session.commit()
Ruben Helsloot
  • 12,582
  • 6
  • 26
  • 49
0

I think that either this was causing your slowness in the first query:

existing = db.session.query(Task).filter_by(challenge_slug=slug)

Also you should probably change this:

    existing.merge_result(
        [task_from_json(slug, **task) for task in taskdata])

To:

    existing.merge_result(
        (task_from_json(slug, **task) for task in taskdata))

As that should save you some memory and time, as the list won't be generated in memory before sending it to the merge_result method.

Alexandru Chirila
  • 2,274
  • 5
  • 29
  • 40