1

I am working on a threaded application where one thread will feed a Queue with objects to be modified and a number of other threads will then read from the queue, do the modifications and save the changes.

The application won't need a lot of concurrency, so I would like to stick to an SQLite database. Here is a small example illustrating the application:

import queue
import threading
import peewee as pw

db = pw.SqliteDatabase('test.db', threadlocals=True)

class Container(pw.Model):
    contents = pw.CharField(default="spam")

    class Meta:
        database = db


class FeederThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        containers = Container.select()

        for container in containers:
            self.q.put(container)


class ReaderThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        while True:
            item = self.q.get()

            with db.execution_context() as ctx:
                # Get a new connection to the container object:
                container = Container.get(id=item.id)
                container.contents = "eggs"
                container.save()

            self.q.task_done()


if __name__ == "__main__":

    db.connect()
    try:
        db.create_tables([Container,])
    except pw.OperationalError:
        pass
    else:
        [Container.create() for c in range(42)]
    db.close()

    q = queue.Queue(maxsize=10)


    feeder = FeederThread(q)
    feeder.setDaemon(True)
    feeder.start()

    for i in range(10):
        reader = ReaderThread(q)
        reader.setDaemon(True)
        reader.start()

    q.join()

Based on the peewee docs multi-threading should be supported for SQLite. However, I keep getting the infamous peewee.OperationalError: database is locked error with the error output pointing to the container.save() line.

How do I get around this?

digitaldingo
  • 519
  • 6
  • 11

2 Answers2

5

I was kind of surprised to see this failing as well, so I copied your code and played around with some different ideas. What I think the problem is, is that ExecutionContext() by default will cause the wrapped block to run in a transaction. To avoid this, I passed in False in the reader threads.

I also edited the feeder to consume the SELECT statement before putting stuff into the queue (list(Container.select())).

The following works for me locally:

class FeederThread(threading.Thread):

    def __init__(self, input_queue):
        super(FeederThread, self).__init__()

        self.q = input_queue

    def run(self):
        containers = list(Container.select())

        for container in containers:
            self.q.put(container.id)  # I don't like passing model instances around like this, personal preference though

class ReaderThread(threading.Thread):

    def __init__(self, input_queue):
        super(ReaderThread, self).__init__()

        self.q = input_queue

    def run(self):
        while True:
            item = self.q.get()

            with db.execution_context(False):
                # Get a new connection to the container object:
                container = Container.get(id=item)
                container.contents = "nuggets"
                with db.atomic():
                    container.save()

            self.q.task_done()

if __name__ == "__main__":

    with db.execution_context():
        try:
            db.create_tables([Container,])
        except OperationalError:
            pass
        else:
            [Container.create() for c in range(42)]

    # ... same ...

I'm not wholly satisfied with this, but hopefully it gives you some ideas.

Here's a blog post I wrote a while back that has some tips for getting higher concurrency with SQLite: http://charlesleifer.com/blog/sqlite-small-fast-reliable-choose-any-three-/

coleifer
  • 24,887
  • 6
  • 60
  • 75
  • Thanks for your answer! Threading is still a bit voodoo to me, so I am glad it wasn't an obvious mistake on my part. Interestingly, consuming the SELECT statement seems to be the key here -- I don't see any difference using `db.execution_context(False)` or `with db.atomic()`. In fact, by consuming the SELECT statement I don't even seem to need the `ExecutionContext()`. So I suppose the SELECT statement was actually locking the database? – digitaldingo Jul 23 '15 at 10:18
  • http://docs.peewee-orm.com/en/latest/peewee/database.html?highlight=thread#thread-safety-and-multiple-databases – Sion C Sep 02 '21 at 08:23
0

Have you tried WAL mode?

Improve INSERT-per-second performance of SQLite?

You have to be quite careful if you have concurrent access to SQLite, as the whole database is locked when writes are done, and although multiple readers are possible, writes will be locked out. This has been improved somewhat with the addition of a WAL in newer SQLite versions.

and

If you are using multiple threads, you can try using the shared page cache, which will allow loaded pages to be shared between threads, which can avoid expensive I/O calls.

Community
  • 1
  • 1
Paras
  • 642
  • 6
  • 16