0

How can I invoke a job dynamatically and cancel them at ease? Can I trigger a delayed task that runs at a specific moment, and cancel them if the moment has not passed by, just behaving like the alarm clock?

Jimmy Jiang
  • 1
  • 1
  • 7
  • Currently, I am working on a function which allows clients to set the execution time of tasks at their will. Clients can also cancel these tasks if the time has not arrived. I am puzzled about how to do. – Jimmy Jiang Dec 30 '20 at 08:32
  • Does this answer your question? [How to stop a scheduled task that was started using @Scheduled annotation?](https://stackoverflow.com/questions/44644141/how-to-stop-a-scheduled-task-that-was-started-using-scheduled-annotation) – Mega Mbo Dec 30 '20 at 08:35
  • @MegaMbo That annotation can only create fixed rated or CRON type tasks. In my scene, the time submitted by clients are not regular, it may be a minute later, 15 minutes later, an hour later or even days later. And as far as I know, I cannot cancel the tasks submitted if I use the annotation. – Jimmy Jiang Dec 30 '20 at 08:39
  • I would persist the cron in a table and poll for the value so you can set up that cron or cancel it. – burm87 Dec 30 '20 at 09:57
  • @burm87 I think it is a little bit costly to build cron expression for tasks that just run once. – Jimmy Jiang Dec 30 '20 at 12:03

2 Answers2

1

Quartz is a good scheduling library that has lots of capabilities like run many jobs and simple triggers, cron triggers simultaneously in a single machine or clustered. Also, it can be run on memory or persisting on a database. For more details Scheduling in Spring with Quartz

I have created a basic setup that is focused on scheduling concept. There are there methods to create, list and kill jobs. It is added that Thread.sleep for simulating a long-running job.

Scenario

Create a new job

POST http://localhost:8080/start/foo

Trigger is created. Job name is 'foo-1609322783667'

List triggers by job

GET http://localhost:8080/list/foo

[
    "foo-1609322783667"
]

Kill the running job

DELETE http://localhost:8080/kill/foo

Job is interrupted

Console output:

2020-12-30 13:06:23.671  INFO 920 --- [nio-8080-exec-3] com.example.demo.HomeController          : Job is created. It will be triggered at Wed Dec 30 13:06:28 EET 2020
2020-12-30 13:06:28.681  INFO 920 --- [eduler_Worker-1] com.example.demo.job.FooJob              : Job started DEFAULT.foo-1609322783667
2020-12-30 13:06:51.109  INFO 920 --- [eduler_Worker-1] com.example.demo.job.FooJob              : Job is interrupted DEFAULT.foo-1609322783667
2020-12-30 13:06:51.109  INFO 920 --- [eduler_Worker-1] com.example.demo.job.FooJob              : Job completed DEFAULT.foo-1609322783667

Source Code

pom.xml (if you are using Gradle, you can change definitions on build.gradle)

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

application.properties

spring.quartz.job-store-type=memory

JobConfig

@Configuration
public class JobConfig {
    @Bean
    public JobDetailFactoryBean fooJobDetail() {
        JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
        jobDetailFactory.setJobClass(FooJob.class);
        jobDetailFactory.setDurability(true);
        return jobDetailFactory;
    }

    @Bean
    public JobDetailFactoryBean barJobDetail() {
        JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
        jobDetailFactory.setJobClass(BarJob.class);
        jobDetailFactory.setDurability(true);
        return jobDetailFactory;
    }
}

BarJob

@Slf4j
@Service
public class BarJob implements InterruptableJob {
    private Thread thread;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("Job started {}", jobExecutionContext.getTrigger().getKey());
        thread = Thread.currentThread();
        try {
            Thread.sleep(50_000); // wait 50 seconds
        } catch (InterruptedException ex) {
            log.info("Job is interrupted {}", jobExecutionContext.getTrigger().getKey());
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        }
        log.info("Job completed {}", jobExecutionContext.getTrigger().getKey());
    }

    @Override
    public void interrupt() throws UnableToInterruptJobException {
        thread = Thread.currentThread();
    }
}

FooJob

@Slf4j
@Service
public class FooJob implements InterruptableJob {
    private Thread thread;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("Job started {}", jobExecutionContext.getTrigger().getKey());
        thread = Thread.currentThread();
        try {
            Thread.sleep(100_000); // wait 100 seconds
        } catch (InterruptedException ex) {
            log.info("Job is interrupted {}", jobExecutionContext.getTrigger().getKey());
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        }
        log.info("Job completed {}", jobExecutionContext.getTrigger().getKey());
    }

    @Override
    public void interrupt() throws UnableToInterruptJobException {
        thread.interrupt();
    }
}

HomeController

@RestController
@Slf4j
public class HomeController {
    @Autowired
    private Scheduler scheduler;

    @Autowired
    @Qualifier("fooJobDetail")
    private JobDetail fooJobDetail;

    @Autowired
    @Qualifier("barJobDetail")
    private JobDetail barJobDetail;

    @PostMapping("/start/{jobName}")
    public ResponseEntity<String> startJob(@PathVariable("jobName") String jobName) throws SchedulerException {
        Optional<JobDetail> jobDetail = parseJob(jobName);
        if (!jobDetail.isPresent()) {
            return ResponseEntity.badRequest().body("Invalid job name");
        }

        Trigger trigger = TriggerBuilder.newTrigger()
                .forJob(jobDetail.get())
                .withIdentity(jobName + "-" + new Date().getTime()) // unique name
                .startAt(Date.from(Instant.now().plusSeconds(5)))   // starts 5 seconds later
                .build();

        Date date = scheduler.scheduleJob(trigger);
        log.info("Job is created. It will be triggered at {}", date);

        return ResponseEntity.ok("Trigger is created. Job name is '" + trigger.getKey().getName() + "'");
    }

    /**
     * Find the job by job name
     */
    private Optional<JobDetail> parseJob(String jobName) {
        if ("foo".equals(jobName)) {
            return Optional.of(fooJobDetail);
        } else if ("bar".equals(jobName)) {
            return Optional.of(barJobDetail);
        }
        return Optional.empty();
    }

    @GetMapping("/list/{jobName}")
    public ResponseEntity<List<String>> listTriggers(@PathVariable("jobName") String jobName) throws SchedulerException {
        Optional<JobDetail> jobDetail = parseJob(jobName);
        if (!jobDetail.isPresent()) {
            return ResponseEntity.badRequest().build();
        }

        List<String> triggers = scheduler.getTriggersOfJob(jobDetail.get().getKey()).stream()
                .map(t -> t.getKey().getName())
                .collect(Collectors.toList());
        return ResponseEntity.ok(triggers);
    }

    @DeleteMapping("/kill/{jobName}")
    public ResponseEntity<String> killTrigger(@PathVariable("jobName") String jobName) throws SchedulerException {
        Optional<JobDetail> jobDetail = parseJob(jobName);
        if (!jobDetail.isPresent()) {
            return ResponseEntity.badRequest().build();
        }
        scheduler.interrupt(jobDetail.get().getKey());
        return ResponseEntity.ok("Job is interrupted");
    }
}
Ismail Durmaz
  • 2,521
  • 1
  • 6
  • 19
  • Thank you very much for the kind help. I have sovled my problem using another scheduling framework called [PowerJob](https://github.com/PowerJob/PowerJob), the answer can be seen below. The framework is more friendly with the web page and supports a lot of fancy tasks. – Jimmy Jiang Jan 18 '21 at 01:15
0

I have found another enterprise job-scheduling framework for this task, which is called PowerJob. With the OpenAPI it provides, delayed tasks could be easily created and canceled. Source codes are available here.
Firstly, init the project with its guidance.

Then, create our own project. We will need both PowerJob-worker and PowerJob-client.
So dependencies in Pom file is like:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.github.kfcfans</groupId>
            <artifactId>powerjob-client</artifactId>
            <version>3.4.3</version>
        </dependency>

        <dependency>
            <groupId>com.github.kfcfans</groupId>
            <artifactId>powerjob-worker-spring-boot-starter</artifactId>
            <version>3.4.3</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <dependency>
            <groupId>org.hibernate.validator</groupId>
            <artifactId>hibernate-validator</artifactId>
            <version>7.0.0.Final</version>
        </dependency>

Entities used:

Response class:

import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Response {
    private Integer code;
    private String message;
    private JSONObject info;
    
    public static Response success(JSONObject data) {
        return Response.builder().code(200).message("success").info(data).build();
    }
    
    public static Response error(JSONObject data) {
        return Response.builder().code(500).message("fail").info(data).build();
    }
}

The Alarm class:

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.time.LocalDateTime;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class AlarmClock {
    private Long id;
    
    @NotBlank(message = "username should not be blank.")
    private String username;
    
    private String clockName;
    
    @NotNull(message = "Delay should not be null.")
    private Long delayMillis;
    
    private Long instanceId;
    
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private LocalDateTime createTime;
    
    @Override
    public String toString() {
        return JSON.toJSONStringWithDateFormat(this, JSON.DEFFAULT_DATE_FORMAT);
    }
}

The task is like:

import com.alibaba.fastjson.JSON;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.github.powerjobdemo.entity.AlarmClock;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
public class AlarmClockTask implements BasicProcessor {
    
    public static final DateTimeFormatter STANDARD_DATE_TIME = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
    @Override
    public ProcessResult process(TaskContext taskContext) throws Exception {
        OmsLogger omsLogger = taskContext.getOmsLogger();
        String instanceParams = taskContext.getInstanceParams();
        omsLogger.info("instance params:{}", instanceParams);
        AlarmClock alarmClock = JSON.parseObject(instanceParams, AlarmClock.class);
        assert alarmClock != null;
        String username = alarmClock.getUsername();
        omsLogger.info("Current time is:{}", STANDARD_DATE_TIME.format(LocalDateTime.now()));
        omsLogger.info("Clock info: id:{}, name:{}, creator:{}", alarmClock.getId(), alarmClock.getClockName(), username);
        return new ProcessResult(true, String.format("User: %s running an alarm clock.", username));
    }
}

Service interface is like:

import com.alibaba.fastjson.JSONObject;
import com.github.powerjobdemo.entity.AlarmClock;

public interface ClockService {
    /**
     * Add alarm clock.
     *
     * @param alarmClock alarm clock
     * @return json
     */
    JSONObject addAlarmClock(AlarmClock alarmClock);
    
    /**
     * Cancel alarm clock.
     *
     * @param alarmClock alarm clock
     */
    JSONObject cancelAlarmClock(AlarmClock alarmClock);
    
    /**
     * Query all alarm clocks.
     *
     * @param username username
     * @return list
     */
    JSONObject queryAll(String username);
}

Service impl class is like:

import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.powerjobdemo.entity.AlarmClock;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

@Service
public class ClockServiceImpl implements ClockService {
    
    public static final DateTimeFormatter CLOCK_NAME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss");
    
    private static final List<AlarmClock> alarmClockList = new CopyOnWriteArrayList<>();
    
    private final AtomicLong clockCount = new AtomicLong();
    
    @Resource
    private OhMyClient ohMyClient;
    
    @Value("${powerjob.task.id}")
    private Long taskId;
    
    @Override
    public JSONObject addAlarmClock(AlarmClock alarmClock) {
        String formattedName = CLOCK_NAME_FORMATTER.format(LocalDateTime.now());
        alarmClock.setClockName("Clock-" + formattedName);
        long id = clockCount.addAndGet(1L);
        alarmClock.setCreateTime(LocalDateTime.now());
        Long delayMillis = alarmClock.getDelayMillis();
        ResultDTO<Long> longResultDTO = ohMyClient.runJob(taskId, alarmClock.toString(), delayMillis);
        alarmClock.setInstanceId(longResultDTO.getData());
        alarmClockList.add(alarmClock);
        JSONObject data = new JSONObject();
        data.put("id", id);
        return data;
    }
    
    @Override
    public JSONObject cancelAlarmClock(AlarmClock alarmClock) {
        Long instanceId = alarmClock.getInstanceId();
        assert instanceId != null;
        ohMyClient.cancelInstance(instanceId);
        alarmClockList.removeIf(clock -> Objects.equals(instanceId, clock.getInstanceId()));
        JSONObject data = new JSONObject();
        data.put("instanceId", instanceId);
        return data;
    }
    
    @Override
    public JSONObject queryAll(String username) {
        List<AlarmClock> clockList = alarmClockList.stream()
                .filter(alarmClock -> StringUtils.equals(alarmClock.getUsername(), username))
                .collect(Collectors.toList());
        JSONObject data = new JSONObject();
        data.put("data", clockList);
        data.put("count", clockList.size());
        return data;
    }
}

And finally, controller is like:

import com.github.powerjobdemo.entity.AlarmClock;
import com.github.powerjobdemo.entity.Response;
import com.github.powerjobdemo.service.ClockService;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

@RestController
@CrossOrigin
public class AlarmClockController {
    
    @Resource
    private ClockService clockService;
    
    @PostMapping(value = "/api/v1/alarm/clock/add")
    public Response addAlarmClock(@RequestBody AlarmClock alarmClock) {
        return Response.success(clockService.addAlarmClock(alarmClock));
    }
    
    @PostMapping(value = "/api/v1/alarm/clock/cancel")
    public Response cancelAlarmClock(@RequestBody AlarmClock alarmClock) {
        return Response.success(clockService.cancelAlarmClock(alarmClock));
    }
    
    @GetMapping(value = "/api/v1/alarm/clock/query")
    public Response queryAlarmClock(@RequestParam String username) {
        return Response.success(clockService.queryAll(username));
    }
}

So we could post the api /api/v1/alarm/clock/add for adding new alarm clock. For example, create an alarm clock that runs 600 seconds later.

curl --location --request POST 'http://localhost:8080/api/v1/alarm/clock/add' \
--header 'Content-Type: application/json' \
--data-raw '{
  "username": "Jimmy",
  "delayMillis": 600000
}'

Response is like:

{
    "code": 200,
    "message": "success",
    "info": {
        "id": 1
    }
}

And then query.

curl --location --request GET 'http://localhost:8080/api/v1/alarm/clock/query?username=Jimmy'

Response is like:

{
    "code": 200,
    "message": "success",
    "info": {
        "data": [
            {
                "id": 1,
                "username": "Jimmy",
                "clockName": "Clock-20210118-002804",
                "delayMillis": 600000,
                "instanceId": 231210525113975104,
                "createTime": "2021-01-18 00:28:04"
            }
        ],
        "count": 1
    }
}

To cancel the clock:

Post the cancel API:

curl --location --request POST 'http://localhost:8080/api/v1/alarm/clock/cancel' \
--header 'Content-Type: application/json' \
--data-raw '{"instanceId": "231210525113975104"}'

The response:

{
    "code": 200,
    "message": "success",
    "info": {
        "instanceId": 231210525113975104
    }
}

We could see all the instances on the web page, which is helpful. Task instance list

Jimmy Jiang
  • 1
  • 1
  • 7