4

I have a scheduler application which runs using Spring Boot & Quartz. I have created the job with multiple triggers (cron) which is running perfectly fine. The details of the job, cron expressions are storing at the database.

Here is the requirement, whenever restarting the application the stored triggers are not firing. I know a workaround to make it happen. Storing the details of job, trigger in a user defined table. Should be loaded upfront whenever the application starts. But I am not sure the approach is right or wrong.

Also I have referred lot of articles and github projects to find out how to restart/resume/re-initiate jobs.

There could be a solution where I couldn't find, maybe lack of understanding of quartz or something. I am sure someone would have experienced the scenario and found a way to overcome it. It will be grateful if someone provide an answer.

Alexpandiyan Chokkan
  • 1,025
  • 1
  • 10
  • 30

2 Answers2

4

You can get the scheduled-jobs from the database and on application startup using ApplicationRunner or using @EventListener(ApplicationReadyEvent.class) and call to startAllSchedulers().

public class SchedulerStartUpHandler implements ApplicationRunner {

    @Autowired
    private SchedulerService schedulerService;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("Schedule all new scheduler jobs at app startup - starting");
        try {
            schedulerService.startAllSchedulers();
            log.info("Schedule all new scheduler jobs at app startup - complete");
        } catch (Exception ex) {
            log.error("Schedule all new scheduler jobs at app startup - error", ex);
        }
    }
}

OR

@EventListener(ApplicationReadyEvent.class)
public void startTheScheduledJobFromDatabase() {
    startAllSchedulers();
}

startAllSchedulers()

public void startAllSchedulers() {
   List<SchedulerJobInfo> jobInfoList = schedulerRepository.findAll();
    if (jobInfoList != null) {
        Scheduler scheduler = schedulerFactoryBean.getScheduler();
        jobInfoList.forEach(jobInfo -> {
            try {
                JobDetail jobDetail = JobBuilder.newJob(SampleCronJob.class)
                        .withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup()).build();
                if (!scheduler.checkExists(jobDetail.getKey())) {
                    Trigger trigger;
                    jobDetail = scheduleCreator.createJob(SampleCronJob.class,
                            false, context, jobInfo.getJobName(), jobInfo.getJobGroup());

                    if (jobInfo.getCronJob() && CronExpression.isValidExpression(jobInfo.getCronExpression())) {
                        trigger = scheduleCreator.createCronTrigger(jobInfo.getJobName(), new Date(),
                                jobInfo.getCronExpression(), SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW);
                    } else {
                        trigger = scheduleCreator.createSimpleTrigger(jobInfo.getJobName(), new Date(),
                                jobInfo.getRepeatTime(), SimpleTrigger.MISFIRE_INSTRUCTION_FIRE_NOW);
                    }

                    scheduler.scheduleJob(jobDetail, trigger);

                }
            } catch (SchedulerException e) {
                log.error(e.getMessage(), e);
            }
        });
    }
}
Romil Patel
  • 12,879
  • 7
  • 47
  • 76
0

I hope this information will be useful for you (this is a copy of my answer in this thread)

Below is an example of a multi-instance Spring Boot application that launches a cron job.
The Job must be running on only one of the instances.
The configuration of each instance must be the same.
If a job crashes, it should try to restart 3 times with a delay of 5 minutes * number of restart attempts.
If the job still crashes after 3 restarts, the default cron for our job trigger should be set.

We will use Quartz in cluster mode:

Deps:

implementation("org.springframework.boot:spring-boot-starter-quartz")

At first, it is a bad idea to use Thread.sleep(600000) as said in this answer
Out job:

@Component
@Profile("quartz")
class SomeJob(
    private val someService: SomeService
) : QuartzJobBean() {
    private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
    
    override fun executeInternal(jobExecutionContext: JobExecutionContext) {
        try {
            log.info("Doing awesome work...")
            someService.work()
            if ((1..10).random() >= 5) throw RuntimeException("Something went wrong...")
        } catch (e: Exception) {
            throw JobExecutionException(e)
        }
    }
}

Here is the Quartz configuration (more information here):

@Configuration
@Profile("quartz")
class JobConfig {
    //JobDetail for our job
    @Bean
    fun someJobDetail(): JobDetail {
        return JobBuilder
            .newJob(SomeJob::class.java).withIdentity("SomeJob")
            .withDescription("Some job")
            //If we want the job to be launched after the application instance crashes at the 
            //next launch
            .requestRecovery(true)
            .storeDurably().build()
    }

    //Trigger
    @Bean
    fun someJobTrigger(someJobDetail: JobDetail): Trigger {
        return TriggerBuilder.newTrigger().forJob(someJobDetail)
            .withIdentity("SomeJobTrigger")
            .withSchedule(CronScheduleBuilder.cronSchedule("0/30 * * ? * * *"))
            .build()

    }

    //Otherwise, changing cron for an existing trigger will not work. (the old cron value will be stored in the database)
    @Bean
    fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
        factory.setWaitForJobsToCompleteOnShutdown(true)
        val scheduler = factory.scheduler
        factory.setOverwriteExistingJobs(true)
        //https://stackoverflow.com/questions/39673572/spring-quartz-scheduler-race-condition
        factory.setTransactionManager(JdbcTransactionManager())
        rescheduleTriggers(triggers, scheduler)
        scheduler.start()
        return scheduler
    }

    private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
        triggers.forEach {
            if (!scheduler.checkExists(it.key)) {
                scheduler.scheduleJob(it)
            } else {
                scheduler.rescheduleJob(it.key, it)
            }
        }
    }
}
    

Add a listener to the scheduler:

@Component
@Profile("quartz")
class JobListenerConfig(
    private val schedulerFactory: SchedulerFactoryBean,
    private val jobListener: JobListener
) {
    @PostConstruct
    fun addListener() {
        schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
    }
}

And now the most important - the logic of processing the execution of our job with listener:

@Profile("quartz")
class JobListener(
    //can be obtained from the execution context, but it can also be injected
    private val scheduler: Scheduler,
    private val triggers: List<Trigger>
): JobListenerSupport() {

    private lateinit var triggerCronMap: Map<String, String>

    @PostConstruct
    fun post(){
        //there will be no recovery triggers , only our self-written ones
        triggerCronMap = triggers.associate {
            it.key.name to (it as CronTrigger).cronExpression
        }
    }

    override fun getName(): String {
        return "myJobListener"
    }


    override fun jobToBeExecuted(context: JobExecutionContext) {
        log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
    }


    override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
        //you can use context.mergedJobDataMap
        val dataMap = context.trigger.jobDataMap
        val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
            dataMap.putAsString("count", 1)
            1
        }
        //in the if block, you can add the condition && !context.trigger.key.name.startsWith("recover_") - in this case, the scheduler will not restart recover triggers if they fall during execution
        if (jobException != null ){
            if (count < 3) {
                log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
                val oldTrigger = context.trigger
                var newTriggerName = context.trigger.key.name + "_retry"
                //in case such a trigger already exists
                context.scheduler.getTriggersOfJob(context.jobDetail.key)
                    .map { it.key.name }
                    .takeIf { it.contains(newTriggerName) }
                    ?.apply { newTriggerName += "_retry" }
                val newTrigger = TriggerBuilder.newTrigger()
                    .forJob(context.jobDetail)
                    .withIdentity(newTriggerName, context.trigger.key.group)
                    //create a simple trigger that should be fired in 5 minutes * restart attempts
                    .startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.MINUTES)))
                    .usingJobData("count", count + 1 )
                    .build()
                val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
                log.warn("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
            } else {
                log.warn("The maximum number of restarts has been reached. Restart attempts: $count")
                recheduleWithDefaultTrigger(context)
            }
        } else if (count > 1) {
            recheduleWithDefaultTrigger(context)
        }
        else {
            log.info("Job: ${context.jobDetail.key.name} completed successfully")
        }
        context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
            log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
        }
    }

    private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
        val clone = context.jobDetail.clone() as JobDetail
        val defaultTriggerName = context.trigger.key.name.split("_")[0]
        //Recovery triggers should not be rescheduled
        if (!triggerCronMap.contains(defaultTriggerName)) {
            log.warn("This trigger: ${context.trigger.key.name} for job: ${context.trigger.jobKey.name} is not self-written trigger. It can be recovery trigger or whatever. This trigger must not be recheduled.")
            return
        }
        log.warn("Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: $defaultTriggerName")
        scheduler.deleteJob(clone.key)
        scheduler.addJob(clone, true)
        scheduler.scheduleJob(
            TriggerBuilder.newTrigger()
                .forJob(clone)
                .withIdentity(defaultTriggerName)
                .withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
                .usingJobData("count", 1)
                .startAt(Date.from(Instant.now().plusSeconds(5)))
                .build()
        )
    }
}

Last but not least: application.yaml

spring:
  quartz:
    job-store-type: jdbc #Database Mode
    jdbc:
      initialize-schema: never #Do not initialize table structure
    properties:
      org:
        quartz:
          scheduler:
            instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
            #instanceName: clusteredScheduler #quartzScheduler
          jobStore:
#            a few problems with the two properties below: https://github.com/spring-projects/spring-boot/issues/28758#issuecomment-974628989 & https://github.com/quartz-scheduler/quartz/issues/284
#            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
            driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
#            useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
            tablePrefix: scam_quartz.QRTZ_  #Database Table Prefix
            misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
            clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
            isClustered: true #Turn on Clustering
          threadPool: #Connection Pool
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 3
            threadPriority: 1
            threadsInheritContextClassLoaderOfInitializingThread: true

Here official scripts for database (use liquibase or flyway)
More information:
About quartz
spring boot using quartz in cluster mode
One more article
Cluster effectively quartz

gearbase
  • 21
  • 3