1

I use quartz in spring project. The idea is to create separate job for new income data, that was not delivered successfully to target service.

  • To deliver data with exponentially increasing strategy I create job for next unsuccessful tries also. (It spams DB. It seems works, but can be better.)
  • different my solution is to create repeatable crone based job, that scans DB data and loads only date should be handled now because time passed (in this case I have to manage a lot of on java part).

Both ways can provide correct flow, but I prefer out the box solution.

I tried to manage JobExecutionContext of triggered job to use same JobDetail registering it in quartzScheduler. The idea was to update existing job with different trigger. But issue is quartz tries to create new job persisting it in DB.

org.quartz.ObjectAlreadyExistsException: Unable to store Job : 'digex-caas-securepay.b333e5bf-583f-4643-9ad7-ef4b913001f7', because one already exists with this identification.
    at org.quartz.impl.jdbcjobstore.JobStoreSupport.storeJob(JobStoreSupport.java:1113) ~[quartz-2.3.0.jar:na]
    at org.quartz.impl.jdbcjobstore.JobStoreSupport$2.executeVoid(JobStoreSupport.java:1067) ~[quartz-2.3.0.jar:na]
    at org.quartz.impl.jdbcjobstore.JobStoreSupport$VoidTransactionCallback.execute(JobStoreSupport.java:3765) ~[quartz-2.3.0.jar:na]
    at org.quartz.impl.jdbcjobstore.JobStoreSupport$VoidTransactionCallback.execute(JobStoreSupport.java:3763) ~[quartz-2.3.0.jar:na]
    at org.quartz.impl.jdbcjobstore.JobStoreCMT.executeInLock(JobStoreCMT.java:245) ~[quartz-2.3.0.jar:na]
    at org.quartz.impl.jdbcjobstore.JobStoreSupport.storeJobAndTrigger(JobStoreSupport.java:1063) ~[quartz-2.3.0.jar:na]
    at org.quartz.core.QuartzScheduler.scheduleJob(QuartzScheduler.java:855) ~[quartz-2.3.0.jar:na]
    at org.quartz.impl.StdScheduler.scheduleJob(StdScheduler.java:249) ~[quartz-2.3.0.jar:na]
    at com.incomm.ecomm.services.quartz.OrderQuartzJobScheduler.registerSecurePayPostServiceJob(OrderQuartzJobScheduler.java:59) ~[classes/:na]

Questions are (please answer any):

  • How to manage via quartz job trigger updates (in case it was not handled successfully - update trigger with different fired time)?
  • How to manage via quartz job updates (in case it was not handled successfully - update job with new trigger)?
  • How to register exponentially increasing delay time strategy for trigger?
Sergii
  • 7,044
  • 14
  • 58
  • 116

3 Answers3

2
  • you can follow this approach (retry mechanism and can have no. of retries allowed) ref: Quartz retry when failure
  • For each failure you can update the counter and send a email. So you will be aware in the max try also is it successfully passed or not.
  • If not later, you will have a dashboard to view the failed (includes no.of times its tried) and can run it manually mechanism.
  • Instead of e.setRefireImmediately you can set a timeline ex. 120 mins later, schedule to run it i.e, based on the JobDataMap mechanism
Senthil
  • 2,156
  • 1
  • 14
  • 19
1

Simple answer is:

scheduler.rescheduleJob(trigger.getKey(), trigger);

Detailed answer is:

  • How to manage via quartz job trigger updates

scheduler.rescheduleJob(trigger.getKey(), trigger);

  • How to manage via quartz job updates

It is not important any more in case trigger updated.

  • How to register exponentially increasing delay time strategy for trigger?

single trigger can be rescheduled with any different time. Time for next execution can be calculated using any implementation of IntervalCalculationStrategy.

Example of reschedule the job:

The job and job details can be taken from JobExecutionContext but not necessary. Trigger can be connected to only one job, so it good enough for quartz specify triggerKey to be updated:

@Autowired
private Scheduler scheduler;
@Autowired
private IntervalCalculationStrategy intervalCalculation;

public <T extends QuartzJobBean> void registerSecurePayPostServiceJob(
    JobExecutionContext firedJobExecutionContext) {
  Optional<SimpleTriggerImpl> mutableTrigger =
      ofNullable(firedJobExecutionContext)
          .map(JobExecutionContext::getTrigger)
          .filter(SimpleTriggerImpl.class::isInstance)
          .map(SimpleTriggerImpl.class::cast);
  try {
    if (mutableTrigger.isPresent()) {
      SimpleTriggerImpl trigger = mutableTrigger.get();
      int nextAttemptNumber = trigger.getTimesTriggered();
      log.trace("trigger: {} fired [{}] times", trigger.getFullName(),
          trigger.getTimesTriggered());
      trigger.setStartTime(intervalCalculation.calculateNextTryDate(nextAttemptNumber));
      this.scheduler.rescheduleJob(trigger.getKey(), trigger);
    }
  } catch (SchedulerException e) {
    log.error("job was not rescheduled <{}>", firedJobExecutionContext.getJobDetail(), e);
  }
}
Sergii
  • 7,044
  • 14
  • 58
  • 116
0

Let me expand on Sergii answer a little.

I hope this information will be useful for you (this is a copy of my answer in this thread)

Below is an example of a multi-instance Spring Boot application that launches a cron job.
The Job must be running on only one of the instances.
The configuration of each instance must be the same.
If a job crashes, it should try to restart 3 times with a delay of 5 minutes * number of restart attempts.
If the job still crashes after 3 restarts, the default cron for our job trigger should be set.

We will use Quartz in cluster mode:

Deps:

implementation("org.springframework.boot:spring-boot-starter-quartz")

At first, it is a bad idea to use Thread.sleep(600000) as said in this answer
Out job:

@Component
@Profile("quartz")
class SomeJob(
    private val someService: SomeService
) : QuartzJobBean() {
    private val log: Logger = LoggerFactory.getLogger(SomeJob::class.java)
    
    override fun executeInternal(jobExecutionContext: JobExecutionContext) {
        try {
            log.info("Doing awesome work...")
            someService.work()
            if ((1..10).random() >= 5) throw RuntimeException("Something went wrong...")
        } catch (e: Exception) {
            throw JobExecutionException(e)
        }
    }
}

Here is the Quartz configuration (more information here):

@Configuration
@Profile("quartz")
class JobConfig {
    //JobDetail for our job
    @Bean
    fun someJobDetail(): JobDetail {
        return JobBuilder
            .newJob(SomeJob::class.java).withIdentity("SomeJob")
            .withDescription("Some job")
            //If we want the job to be launched after the application instance crashes at the 
            //next launch
            .requestRecovery(true)
            .storeDurably().build()
    }

    //Trigger
    @Bean
    fun someJobTrigger(someJobDetail: JobDetail): Trigger {
        return TriggerBuilder.newTrigger().forJob(someJobDetail)
            .withIdentity("SomeJobTrigger")
            .withSchedule(CronScheduleBuilder.cronSchedule("0 0 4 L-1 * ? *"))
            .build()

    }

    //Otherwise, changing cron for an existing trigger will not work. (the old cron value will be stored in the database)
    @Bean
    fun scheduler(triggers: List<Trigger>, jobDetails: List<JobDetail>, factory: SchedulerFactoryBean): Scheduler {
        factory.setWaitForJobsToCompleteOnShutdown(true)
        val scheduler = factory.scheduler
        factory.setOverwriteExistingJobs(true)
        //https://stackoverflow.com/questions/39673572/spring-quartz-scheduler-race-condition
        factory.setTransactionManager(JdbcTransactionManager())
        rescheduleTriggers(triggers, scheduler)
        scheduler.start()
        return scheduler
    }

    private fun rescheduleTriggers(triggers: List<Trigger>, scheduler: Scheduler) {
        triggers.forEach {
            if (!scheduler.checkExists(it.key)) {
                scheduler.scheduleJob(it)
            } else {
                scheduler.rescheduleJob(it.key, it)
            }
        }
    }
}
    

Add a listener to the scheduler:

@Component
@Profile("quartz")
class JobListenerConfig(
    private val schedulerFactory: SchedulerFactoryBean,
    private val jobListener: JobListener
) {
    @PostConstruct
    fun addListener() {
        schedulerFactory.scheduler.listenerManager.addJobListener(jobListener, KeyMatcher.keyEquals(jobKey("SomeJob")))
    }
}

And now the most important - the logic of processing the execution of our job with listener:

@Profile("quartz")
class JobListener(
    //can be obtained from the execution context, but it can also be injected
    private val scheduler: Scheduler,
    private val triggers: List<Trigger>
): JobListenerSupport() {

    private lateinit var triggerCronMap: Map<String, String>

    @PostConstruct
    fun post(){
        //there will be no recovery triggers , only our self-written ones
        triggerCronMap = triggers.associate {
            it.key.name to (it as CronTrigger).cronExpression
        }
    }

    override fun getName(): String {
        return "myJobListener"
    }


    override fun jobToBeExecuted(context: JobExecutionContext) {
        log.info("Job: ${context.jobDetail.key.name} ready to start by trigger: ${context.trigger.key.name}")
    }


    override fun jobWasExecuted(context: JobExecutionContext, jobException: JobExecutionException?) {
        //you can use context.mergedJobDataMap
        val dataMap = context.trigger.jobDataMap
        val count = if (dataMap["count"] != null) dataMap.getIntValue("count") else {
            dataMap.putAsString("count", 1)
            1
        }
        //in the if block, you can add the condition && !context.trigger.key.name.startsWith("recover_") - in this case, the scheduler will not restart recover triggers if they fall during execution
        if (jobException != null ){
            if (count < 3) {
                log.warn("Job: ${context.jobDetail.key.name} filed while execution. Restart attempts count: $count ")
                val oldTrigger = context.trigger
                var newTriggerName = context.trigger.key.name + "_retry"
                //in case such a trigger already exists
                context.scheduler.getTriggersOfJob(context.jobDetail.key)
                    .map { it.key.name }
                    .takeIf { it.contains(newTriggerName) }
                    ?.apply { newTriggerName += "_retry" }
                val newTrigger = TriggerBuilder.newTrigger()
                    .forJob(context.jobDetail)
                    .withIdentity(newTriggerName, context.trigger.key.group)
                    //create a simple trigger that should be fired in 5 minutes * restart attempts
                    .startAt(Date.from(Instant.now().plus((5 * count).toLong(), ChronoUnit.MINUTES)))
                    .usingJobData("count", count + 1 )
                    .build()
                val date = scheduler.rescheduleJob(oldTrigger.key, newTrigger)
                log.warn("Rescheduling trigger: ${oldTrigger.key} to trigger: ${newTrigger.key}")
            } else {
                log.warn("The maximum number of restarts has been reached. Restart attempts: $count")
                recheduleWithDefaultTrigger(context)
            }
        } else if (count > 1) {
            recheduleWithDefaultTrigger(context)
        }
        else {
            log.info("Job: ${context.jobDetail.key.name} completed successfully")
        }
        context.scheduler.getTriggersOfJob(context.trigger.jobKey).forEach {
            log.info("Trigger with key: ${it.key} for job: ${context.trigger.jobKey.name} will start at ${it.nextFireTime ?: it.startTime}")
        }
    }

    private fun recheduleWithDefaultTrigger(context: JobExecutionContext) {
        val clone = context.jobDetail.clone() as JobDetail
        val defaultTriggerName = context.trigger.key.name.split("_")[0]
        //Recovery triggers should not be rescheduled
        if (!triggerCronMap.contains(defaultTriggerName)) {
            log.warn("This trigger: ${context.trigger.key.name} for job: ${context.trigger.jobKey.name} is not self-written trigger. It can be recovery trigger or whatever. This trigger must not be recheduled.")
            return
        }
        log.warn("Remove all triggers for job: ${context.trigger.jobKey.name} and schedule default trigger for it: $defaultTriggerName")
        scheduler.deleteJob(clone.key)
        scheduler.addJob(clone, true)
        scheduler.scheduleJob(
            TriggerBuilder.newTrigger()
                .forJob(clone)
                .withIdentity(defaultTriggerName)
                .withSchedule(CronScheduleBuilder.cronSchedule(triggerCronMap[defaultTriggerName]))
                .usingJobData("count", 1)
                .startAt(Date.from(Instant.now().plusSeconds(5)))
                .build()
        )
    }
}

Last but not least: application.yaml

spring:
  quartz:
    job-store-type: jdbc #Database Mode
    jdbc:
      initialize-schema: never #Do not initialize table structure
    properties:
      org:
        quartz:
          scheduler:
            instanceId: AUTO #Default hostname and timestamp generate instance ID, which can be any string, but must be the only corresponding qrtz_scheduler_state INSTANCE_NAME field for all dispatchers
            #instanceName: clusteredScheduler #quartzScheduler
          jobStore:
#            a few problems with the two properties below: https://github.com/spring-projects/spring-boot/issues/28758#issuecomment-974628989 & https://github.com/quartz-scheduler/quartz/issues/284
#            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore #Persistence Configuration
            driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate #We only make database-specific proxies for databases
#            useProperties: true #Indicates that JDBC JobStore stores all values in JobDataMaps as strings, so more complex objects can be stored as name-value pairs rather than serialized in BLOB columns.In the long run, this is safer because you avoid serializing non-String classes to BLOB class versions.
            tablePrefix: scam_quartz.QRTZ_  #Database Table Prefix
            misfireThreshold: 60000 #The number of milliseconds the dispatcher will "tolerate" a Trigger to pass its next startup time before being considered a "fire".The default value (if you do not enter this property in the configuration) is 60000 (60 seconds).
            clusterCheckinInterval: 5000 #Set the frequency (in milliseconds) of this instance'checkin'* with other instances of the cluster.Affects the speed of detecting failed instances.
            isClustered: true #Turn on Clustering
          threadPool: #Connection Pool
            class: org.quartz.simpl.SimpleThreadPool
            threadCount: 3
            threadPriority: 1
            threadsInheritContextClassLoaderOfInitializingThread: true

Here official scripts for database (use liquibase or flyway)
More information:
About quartz
spring boot using quartz in cluster mode
One more article
Cluster effectively quartz

gearbase
  • 21
  • 3