I hope this information will be useful for you (this is a copy of my answer in this thread)
Below is an example of a multi-instance Spring Boot application that launches a cron job.
The Job must be running on only one of the instances.
The configuration of each instance must be the same.
If a job crashes, it should try to restart 3 times with a delay of 5 minutes * number of restart attempts.
If the job still crashes after 3 restarts, the default cron for our job trigger should be set.
We will use Quartz in cluster mode:
Deps:
implementation("org.springframework.boot:spring-boot-starter-quartz")
At first, it is a bad idea to use Thread.sleep(600000) as said in this answer
Out job:
@Component
@Profile("quartz")
class SomeJob(
private val someService: SomeService
) : QuartzJobBean() {
private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
override fun executeInternal(jobExecutionContext: JobExecutionContext) {
try {
log.info("Doing awesome work...")
someService.work()
if ((1..10).random() >= 5) throw RuntimeException("Something went wrong...")
} catch (e: Exception) {
throw JobExecutionException(e)
}
}
}
Here is the Quartz configuration (more information here):
@Configuration
@Profile("quartz")
class JobConfig {
//JobDetail for our job
@Bean
fun someJobDetail(): JobDetail {
return JobBuilder
.newJob(SomeJob::class.java).withIdentity("SomeJob")
.withDescription("Some job")
//If we want the job to be launched after the application instance crashes at the
//next launch
.requestRecovery(true)
.storeDurably().build()
}
//Trigger
@Bean
fun someJobTrigger(someJobDetail: JobDetail): Trigger {
return TriggerBuilder.newTrigger().forJob(someJobDetail)
.withIdentity("SomeJobTrigger")
.withSchedule(CronScheduleBuilder.cronSchedule("0/30 * * ? * * *"))
.build()
}
//Otherwise, changing cron for an existing trigger will not work. (the old cron value will be stored in the database)
@Bean
fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
factory.setWaitForJobsToCompleteOnShutdown(true)
val scheduler = factory.scheduler
factory.setOverwriteExistingJobs(true)
//https://stackoverflow.com/questions/39673572/spring-quartz-scheduler-race-condition
factory.setTransactionManager(JdbcTransactionManager())
rescheduleTriggers(triggers, scheduler)
scheduler.start()
return scheduler
}
private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
triggers.forEach {
if (!scheduler.checkExists(it.key)) {
scheduler.scheduleJob(it)
} else {
scheduler.rescheduleJob(it.key, it)
}
}
}
}
Add a listener to the scheduler:
@Component
@Profile("quartz")
class JobListenerConfig(
private val schedulerFactory: SchedulerFactoryBean,
private val jobListener: JobListener
) {
@PostConstruct
fun addListener() {
schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
}
}
And now the most important - the logic of processing the execution of our job with listener:
@Profile("quartz")
class JobListener(
//can be obtained from the execution context, but it can also be injected
private val scheduler: Scheduler,
private val triggers: List<Trigger>
): JobListenerSupport() {
private lateinit var triggerCronMap: Map<String, String>
@PostConstruct
fun post(){
//there will be no recovery triggers , only our self-written ones
triggerCronMap = triggers.associate {
it.key.name to (it as CronTrigger).cronExpression
}
}
override fun getName(): String {
return "myJobListener"
}
override fun jobToBeExecuted(context: JobExecutionContext) {
log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
}
override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
//you can use context.mergedJobDataMap
val dataMap = context.trigger.jobDataMap
val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
dataMap.putAsString("count", 1)
1
}
//in the if block, you can add the condition && !context.trigger.key.name.startsWith("recover_") - in this case, the scheduler will not restart recover triggers if they fall during execution
if (jobException != null ){
if (count < 3) {
log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
val oldTrigger = context.trigger
var newTriggerName = context.trigger.key.name + "_retry"
//in case such a trigger already exists
context.scheduler.getTriggersOfJob(context.jobDetail.key)
.map { it.key.name }
.takeIf { it.contains(newTriggerName) }
?.apply { newTriggerName += "_retry" }
val newTrigger = TriggerBuilder.newTrigger()
.forJob(context.jobDetail)
.withIdentity(newTriggerName, context.trigger.key.group)
//create a simple trigger that should be fired in 5 minutes * restart attempts
.startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.MINUTES)))
.usingJobData("count", count + 1 )
.build()
val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
log.warn("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
} else {
log.warn("The maximum number of restarts has been reached. Restart attempts: $count")
recheduleWithDefaultTrigger(context)
}
} else if (count > 1) {
recheduleWithDefaultTrigger(context)
}
else {
log.info("Job: ${context.jobDetail.key.name} completed successfully")
}
context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
}
}
private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
val clone = context.jobDetail.clone() as JobDetail
val defaultTriggerName = context.trigger.key.name.split("_")[0]
//Recovery triggers should not be rescheduled
if (!triggerCronMap.contains(defaultTriggerName)) {
log.warn("This trigger: ${context.trigger.key.name} for job: ${context.trigger.jobKey.name} is not self-written trigger. It can be recovery trigger or whatever. This trigger must not be recheduled.")
return
}
log.warn("Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: $defaultTriggerName")
scheduler.deleteJob(clone.key)
scheduler.addJob(clone, true)
scheduler.scheduleJob(
TriggerBuilder.newTrigger()
.forJob(clone)
.withIdentity(defaultTriggerName)
.withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
.usingJobData("count", 1)
.startAt(Date.from(Instant.now().plusSeconds(5)))
.build()
)
}
}
Last but not least: application.yaml
spring:
quartz:
job-store-type: jdbc #Database Mode
jdbc:
initialize-schema: never #Do not initialize table structure
properties:
org:
quartz:
scheduler:
instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
#instanceName: clusteredScheduler #quartzScheduler
jobStore:
# a few problems with the two properties below: https://github.com/spring-projects/spring-boot/issues/28758#issuecomment-974628989 & https://github.com/quartz-scheduler/quartz/issues/284
# class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
# useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
tablePrefix: scam_quartz.QRTZ_ #Database Table Prefix
misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
isClustered: true #Turn on Clustering
threadPool: #Connection Pool
class: org.quartz.simpl.SimpleThreadPool
threadCount: 3
threadPriority: 1
threadsInheritContextClassLoaderOfInitializingThread: true
Here official scripts for database (use liquibase or flyway)
More information:
About quartz
spring boot using quartz in cluster mode
One more article
Cluster effectively quartz