0

I am writing an application that periodically pull new rows (row.status == 'NEW') from a database table, do some processing on each row as JPA entity, then save the row back to the database with status == 'PROCESSED'.

DB table:

ID | Status
1  | PROCESSED
2  | NEW
3  | NEW

Java code: (Using Spring boot framework)

@Component
public class Processor {

   // an JPA repository for selecting Items
   @Autowired
   ItemRepository itemRepository;

   // a thread executor for submitting 
   ExecutorService executor = Executors.newSingleThreadExecutor();



   @Scheduled(fixed-rate=1000)
   void process() {
        List<Item> newItems = itemRepository.findByStatus('NEW');
        for(Item item : newItems) {
            // process each item asyncronously
            executor.submit(()-> {
                // do some processing on this item and update status.
                // THis is time consuming process, may take 4 or 5 seconds
                item.setStatus("PROCESSED");
                itemRepository.save(item);
            });
        }
   }

}

The problem is, when one item item1 is still being processed in the executor, and have not be updated with status to PROCESSED, in next round of processing, it is still going to be selected by itemRepository.findByStatus('NEW'). And it will be submitted for processing again.

How to avoid such case from happening? (apart from changing fixed-rate to fixed-delay) Is there some kind of locking mechanism such as syncronize (item) { .... } such that once a database row is still in processing, it is not selected again in next round of process() method?

modeller
  • 3,770
  • 3
  • 25
  • 49

4 Answers4

0

I don't think this can be done easily with the Spring scheduler. Also if you can find a single instance solution with some synchronization in the same JVM, this will fail if multiple instances are running in a cluster with different JVMs. You could move over to Quartz which can use a (JDBC) database to allow only one instance of a job to be executed at a time. Implement a org.springframework.scheduling.quartz.QuartzJobBean and add it to the Spring set-up.

Do a search for spring boot 2 Quartz how to set-up this. This would take too much space here, but is is not that difficult. A start might be the Spring documentation.

k_o_
  • 5,143
  • 1
  • 34
  • 43
0

Have you considered having a third state on your status? i.e. PROCESSING - this could be a simple way of making sure you don't have 2 threads attempting to process the same item, with each thread only picking up NEW work.

I have done something similar except the STATUS object used is simply a string field. To reserve work then it becomes UPDATE TOP 1 FROM table set status = status + :randomString WHERE status = 'NEW', and then select that back through to start processing.

Mark Brown
  • 167
  • 2
  • 12
0

You need a bookkeeping data structure to track the tasks that have been submitted to the executor. You could introduce a new state in the Item entity to track this, but considering the scheduling frequency and the number of items, the approach will introduce a lot of database trips which may hamper the performance.

Use a ConcurrentHashMap to track the Items that have been submitted to the executor by putting the Item's id into the map. After saving the Item remove the Item's id from the map. This map will help you to quickly decide whether to submit the Item to the executor or not.

If the items returned by the method findByStatus is huge, you can consider using Redis or Memcached to track the items that have been submitted already.

samzz
  • 117
  • 1
  • 3
0

In my perspective, this problem could be solved by using @Transactional isolation level of read uncomitted. Refer to this question: Spring @Transactional - isolation, propagation

Additional note is to add another status of ON_PROCESS as flag on items currently being processed by other threads which is will be saved before executing your processing. If exceptions are thrown then it is automatically rolled back but success will result to saving it to PROCESSED. The key here is that as long as the status is not NEW then it wont be picked up by your scheduled task as long as you're reading the uncommitted statuses.

c0d3s1x
  • 1
  • 1