0

I have used APScheduler to scheduled three jobs which run in the interval of 2 minutes, 3 minutes, and 10 minutes. All three jobs access the SQLite database.

During the job execution, there is a possibility that when job 1 has locked SQLite database, job 2 and job 3 also tries to access it, which throws the error sqlite3.OperationalError: database is locked. Because of this some of the data to be inserted into the database does not complete and leaves the database with missing data.

In this scenario, I want to make sure that the Job 2 and Job 3 threads wait for the lock to be released by Job 1, so they can access the data.

Following are snippets of configurations:

jobs.search_events:
  - trigger: interval
  - every: 10
  - unit: minutes

jobs.check_search_status_events:
  - trigger: interval
  - every: 2
  - unit: minutes

jobs.delete_searches:
  - trigger: interval
  - every: 3
  - unit: minutes
  1. search_events: makes a call to some external API and writes the response of the search request with the status
  2. check_search_status_events: check the status of the search requests, and if it is complete, it will send another request to fetch the data and write data in the database.
  3. delete_searches: If the search status is marked as complete, this job will delete all such search records.

These configurations are read by then sent to APScheduler's BackgroundScheduler.

Following is the Schedule class I created using APScheduler.

class Schedule(object):
    """Schedule the process to be run in the background

    :param object: Inherits default python object
    :type object: object
    """
    scheduler: BackgroundScheduler

    def __init__(self) -> None:
        """Initialize the scheduler demon in the background
        """
        self.scheduler = BackgroundScheduler(daemon=True)

    def add_job_storage(self, name: str, job_store: str) -> None:
        """Add persistent job storage to the scheduler

        :param name: Name of the job store
        :type name: str
        :param job_store: URL / location of the job store
        :type job_store: str
        """
        self.scheduler.add_jobstore(name, url=job_store)

    def add_configurations(self, jobstores: KeyValue, executors: KeyValue, job_defaults: KeyValue) -> None:
        """Add configurations to include job stores, executors and job_defaults to the scheduled jobs

        :param jobstores: Job store to be used
        :type jobstores: dict
        :param executors: Executors to be used
        :type executors: dict
        :param job_defaults: Job default configurations to be used
        :type job_defaults: dict
        """
        self.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)

    def add_jobs(self, jobs: KeyValue) -> None:
        """Add jobs to the scheduler from configurations

        : param jobs: Jobs to be added to the scheduler
        : type jobs: dict
        """
        for func_name, config in jobs.items():
            package_path, function = func_name.rsplit(JOB_SPLIT_CHAR, 1)
            module = importlib.import_module(package_path)
            func = getattr(module, function)

            kwargs = {}

            if EVERY in config:
                kwargs[config[UNIT]] = config[EVERY]
                del config[UNIT]
                del config[EVERY]

            if config[TRIGGER] == CRON:
                del config[UNIT]

            for key, value in config.items():
                kwargs[key] = value

            self.scheduler.add_job(func, **kwargs)
    
    def add_job(self, function: Callable[[KeyValue], Any], schedule: KeyValue) -> None:
        """Add single job to the scheduler

        :param function: Function as object, to be called to schedule a single job
        :type function: object
        :param schedule: Schedule to be executed for the function
        :type schedule: dict
        """
        self.scheduler.add_job(function, **schedule)
    
    def start(self) -> None:
        """Start the configured executors and job stores and begin processing scheduled jobs.
        """
        self.scheduler.start()
    
    def shutdown(self) -> None:
        """Shuts down the scheduler, along with its executors and job processes.
        """
        self.scheduler.shutdown()
    
    def pause(self) -> None:
        """Pause job processing in the scheduler.

        This will prevent the scheduler from waking up to do job processing until resume() is called. 
        It will not however stop any already running job processing.
        """
        self.scheduler.pause()
    
    def resume(self) -> None:
        """Resume job processing in the scheduler.
        """
        self.scheduler.resume()

    def is_running(self) -> bool:
        """Check if scheduler is running or not

        :return: True if running, else False
        :rtype: bool
        """
        return bool(self.scheduler.running)  # OK
    
    def is_paused(self) -> bool:
        """Check if scheduler is paused

        :return: True if paused, else False
        :rtype: bool
        """
        return STATE_PAUSED == int(self.scheduler.state) # OK

    def is_stopped(self) -> bool:
        """Check if the scheduler is stopped

        :return: True if stopped, else False
        :rtype: bool
        """
        return STATE_STOPPED == int(self.scheduler.state) # OK
    
    def is_job_exists(self, job_id: str) -> bool:
        """Look for job if it exists in the jobstore.

        :param job_id: Joib id to be checked to exists in the job store
        :type job_id: str
        :return: True if job exists, else False
        :rtype: bool
        """
        job = self.scheduler.get_job(job_id)
        return job is not None
    
    def get_job(self, job_id: str) -> Any:
        """Get the job specified by job_id. 

        If a job exists, it will return job else it will return None

        :param job_id: Job identifier
        :type job_id: str
        :return: Job object / None
        :rtype: object/None
        """
        return self.scheduler.get_job(job_id)
    
    def reschedule(self, job_id: str, schedule: KeyValue) -> None:
        """Reschdule the given job with the new scheduled values provided

        :param job_id: Job to be rescheduled
        :type job_id: str
        :param schedule: New schedule for the job
        :type schedule: dict
        """
        self.scheduler.reschedule_job(job_id, **schedule)

    def remove_job(self, job_id: str) -> None:
        """Removes a job if it exists in the scheduler

        :param job_id: Job to be removed
        :type job_id: str
        """
        if self.is_job_exists(job_id):
            self.scheduler.remove_job(job_id)

Search Events function:

def search_events():
    # Sends a request to external API and stores its response in the SQLite database.
    response = requests.get('https://some_url_with_query_params')
    if requests.codes.okay == response.status_code:
        # process the response and create insert query
        query = function_generating_query_from_response(response)
        try:
            conn = sqlite3.connect("database_location")
            cursor = conn.cursor()
            cursor.execute(query)
            conn.commit()
        except Exception as e:
             logger.exception(e)


def check_search_status_events():
    # Sends a request to external API and stores its response in the SQLite database.
    response = requests.get('https://status_check_url_with_query_params')
    if requests.codes.okay == response.status_code:
        # process the response and create insert query
        query = function_generating_query_from_response(response)
        try:
            conn = sqlite3.connect("database_location")
            cursor = conn.cursor()
            cursor.execute(query)
            conn.commit()
        except Exception as e:
             logger.exception(e)

def delete_searches():
    # process the response and create insert query
    query = function_generating_delete_query()
    try:
        conn = sqlite3.connect("database_location")
        cursor = conn.cursor()
        cursor.execute(query)
        conn.commit()
    except Exception as e:
        logger.exception(e)

How can I make sure that the threads created by APScheduler will wait for other thread to finish the job of reading/writing to the database and then execute its read/write?

Rahul Shelke
  • 2,052
  • 4
  • 28
  • 50
  • 1
    Have you tried using either [`PRAGMA busy_timeout`](https://www.sqlite.org/pragma.html#pragma_busy_timeout) or [`sqlite3_busy_timeout()`](https://www.sqlite.org/c3ref/busy_timeout.html) (with a value of at least 5,000) in each of the jobs? In most cases, this lets SQLite3 handle accessing the same DB from multiple processes. – TripeHound May 05 '21 at 10:09
  • No, I did not. I have added timeout as of now. I will test and if the problem persists, I will update. – Rahul Shelke May 05 '21 at 13:18
  • [This thread](https://stackoverflow.com/a/8618328/10265880) also has helpful advice about increasing the timeout. I'm not sure how else to check if the db is locked.. you could add a while loop that checks for the exception, waits 5 seconds, then tries again, for 5x or something. – sur.la.route Jun 04 '21 at 18:51

0 Answers0