3

I'm trying to use a multi-threaded step within a Spring Batch job but I'm getting a "Scope 'job' is not active for the current thread...". I've tried a few approaches in Spring but at this point I'm using what I thought was OOTB Spring constructs but it still fails.

The error is:

2021-09-19 22:40:03,432 ERROR [https-jsse-nio-8448-exec-4]: org.springframework.batch.core.step.AbstractStep Encountered an error executing step writeToDatabaseStep in job softlayerUploadJob
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.softLayerDataItemQueue': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:365)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:192)
    at com.sun.proxy.$Proxy126.read(Unknown Source)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161)
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113)
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: No context holder available for job scope
    at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159)
    at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:353)
    ... 19 common frames omitted

Basic Job Structure simplified: Job SoftLayerUploadJob Step: softlayerUploadFileStep (Can't be multi-threaded) Reads from Excel file Writes to SoftLayerDataItemQueue) bean that ultimately writes to java.util.Queue Step: writeToDatabaseStep Reads from SoftLayerDataItemQueue bean Writes database using JpaWriter

SoftLayerJobConfiguration.java

public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
    private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);

    private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();

//  private Queue<SoftLayerData> queue = new LinkedList<>();

    // @Value("#{stepExecution.jobExecution.jobInstance.instanceId}")
    @Value("#{jobExecution.jobInstance.instanceId}")
    private int jobInstanceId;

    public Queue<SoftLayerData> getQueue() {
        Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
        logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
        if (result == null) {
            result = new LinkedList<>();
            queueMap.put(jobInstanceId, result);
            
        }
        logger.info("Returning queue with item count=" + result.size());
        return result;
    }

    @Override
    public void write(List<? extends SoftLayerData> items) throws Exception {
        logger.info("@@@ Attempting to add item to queue with bean hashCode=" + this.hashCode() + " job instanceid="
                + jobInstanceId + " ");
        logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
        if (logger.isDebugEnabled()) {
            for (SoftLayerData item : items) {
                logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
            }
        }
        getQueue().addAll(items);
    }

    @Override
    public SoftLayerData read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        logger.info("@@@ Attempting to remove item from queue with bean hashCode=" + this.hashCode()
                + " job instanceid=" + jobInstanceId + " ");
        SoftLayerData result = null;
        if (getQueue() != null && getQueue().size() > 0) {
            result = getQueue().remove();
            logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
        } else {
            logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
        }
        
        return result;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: open()");
        /* Unused method */
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: update()");
        /* Unused method */
    }

    @Override
    public void close() throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: close()");
        /* Unused method */
    }

}

SoftLayerDataItemQueue.java

public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
    private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);

    private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();

    @Value("#{jobExecution.jobInstance.instanceId}")
    private int jobInstanceId;

    public Queue<SoftLayerData> getQueue() {
        Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
        logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
        if (result == null) {
            result = new LinkedList<>();
            queueMap.put(jobInstanceId, result);
            
        }
        logger.info("Returning queue with item count=" + result.size());
        return result;
    }

    @Override
    public void write(List<? extends SoftLayerData> items) throws Exception {
        logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
        if (logger.isDebugEnabled()) {
            for (SoftLayerData item : items) {
                logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
            }
        }
        getQueue().addAll(items);
    }

    @Override
    public SoftLayerData read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                + " job instanceid=" + jobInstanceId + " ");
        SoftLayerData result = null;
        if (getQueue() != null && getQueue().size() > 0) {
            result = getQueue().remove();
            logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
        } else {
            logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
        }
        
        return result;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: open()");
        /* Unused method */
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: update()");
        /* Unused method */
    }

    @Override
    public void close() throws ItemStreamException {
        logger.info("SoftLayerDataItemQueue: close()");
        /* Unused method */
    }

}

Note: I don't like using SoftLayerDataItemQueue, but I can't figure out any other way to write items processed in one step and read them in another, especially for large volumes and parallel processing. I wish Spring had some sort of way to write data from one step and into another but I could not find it. Other people on SO suggested writing to a file or in a job or step context.

Woodsman
  • 901
  • 21
  • 61
  • What I don't get is why you think you need multiple steps? Why first read in 1 step, then write in the next, that should be 1 step with a reader and writer (without a queue in between). Maybe a processor. Looks to me as if you made things more complex than necessary. – M. Deinum Sep 20 '21 at 05:36
  • I had the same question as M. Deinum, which I asked here: https://stackoverflow.com/questions/69149100/spring-boot-multiple-threads-for-itemwriter#comment122241172_69149100. I see no need for an intermediate queue. A single (multi-threaded or partitioned) chunk-oriented step is enough IMO. – Mahmoud Ben Hassine Sep 20 '21 at 07:10

1 Answers1

0

In a sense, this is only a partial answer because this fixed my problem, but I cannot explain why this was necessary. Following advice on https://github.com/spring-projects/spring-batch/issues/1335

I created my own SimpleAsyncTaskExecutor as follows:

public class ParallelSimpleAsyncTaskExecutor extends SimpleAsyncTaskExecutor {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    public ParallelSimpleAsyncTaskExecutor(String prefix) {
        super(prefix);
    }

    @Override
    protected void doExecute(Runnable task) {
        JobExecution jobExecution = JobSynchronizationManager.getContext().getJobExecution();
        super.doExecute(new Runnable() {
            @Override
            public
            void run() {
                JobSynchronizationManager.register(jobExecution);
                try {
                    task.run();
                } finally {
                    JobSynchronizationManager.release();
                }
                
            }
        });
    }

}

Woodsman
  • 901
  • 21
  • 61