I'm trying to use a multi-threaded step within a Spring Batch job but I'm getting a "Scope 'job' is not active for the current thread...". I've tried a few approaches in Spring but at this point I'm using what I thought was OOTB Spring constructs but it still fails.
The error is:
2021-09-19 22:40:03,432 ERROR [https-jsse-nio-8448-exec-4]: org.springframework.batch.core.step.AbstractStep Encountered an error executing step writeToDatabaseStep in job softlayerUploadJob
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.softLayerDataItemQueue': Scope 'job' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton; nested exception is java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:365)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:192)
at com.sun.proxy.$Proxy126.read(Unknown Source)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:161)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:113)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: No context holder available for job scope
at org.springframework.batch.core.scope.JobScope.getContext(JobScope.java:159)
at org.springframework.batch.core.scope.JobScope.get(JobScope.java:92)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:353)
... 19 common frames omitted
Basic Job Structure simplified: Job SoftLayerUploadJob Step: softlayerUploadFileStep (Can't be multi-threaded) Reads from Excel file Writes to SoftLayerDataItemQueue) bean that ultimately writes to java.util.Queue Step: writeToDatabaseStep Reads from SoftLayerDataItemQueue bean Writes database using JpaWriter
SoftLayerJobConfiguration.java
public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);
private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();
// private Queue<SoftLayerData> queue = new LinkedList<>();
// @Value("#{stepExecution.jobExecution.jobInstance.instanceId}")
@Value("#{jobExecution.jobInstance.instanceId}")
private int jobInstanceId;
public Queue<SoftLayerData> getQueue() {
Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
if (result == null) {
result = new LinkedList<>();
queueMap.put(jobInstanceId, result);
}
logger.info("Returning queue with item count=" + result.size());
return result;
}
@Override
public void write(List<? extends SoftLayerData> items) throws Exception {
logger.info("@@@ Attempting to add item to queue with bean hashCode=" + this.hashCode() + " job instanceid="
+ jobInstanceId + " ");
logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
if (logger.isDebugEnabled()) {
for (SoftLayerData item : items) {
logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
}
}
getQueue().addAll(items);
}
@Override
public SoftLayerData read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
logger.info("@@@ Attempting to remove item from queue with bean hashCode=" + this.hashCode()
+ " job instanceid=" + jobInstanceId + " ");
SoftLayerData result = null;
if (getQueue() != null && getQueue().size() > 0) {
result = getQueue().remove();
logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
} else {
logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
}
return result;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: open()");
/* Unused method */
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: update()");
/* Unused method */
}
@Override
public void close() throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: close()");
/* Unused method */
}
}
SoftLayerDataItemQueue.java
public class SoftLayerDataItemQueue implements ItemReaderWriterQueue<SoftLayerData> {
private static final Logger logger = LoggerFactory.getLogger(SoftLayerController.class);
private Map<Integer, Queue<SoftLayerData>> queueMap = new HashMap<>();
@Value("#{jobExecution.jobInstance.instanceId}")
private int jobInstanceId;
public Queue<SoftLayerData> getQueue() {
Queue<SoftLayerData> result = queueMap.get(jobInstanceId);
logger.info("@@@SoftLayerDataItemQueue jobInstanceId=" + jobInstanceId);
if (result == null) {
result = new LinkedList<>();
queueMap.put(jobInstanceId, result);
}
logger.info("Returning queue with item count=" + result.size());
return result;
}
@Override
public void write(List<? extends SoftLayerData> items) throws Exception {
logger.info("SoftLayerDataItemQueue: writing items: count=" + items.size());
if (logger.isDebugEnabled()) {
for (SoftLayerData item : items) {
logger.info("SoftLayerDataItemQueue: Adding items " + item.toString());
}
}
getQueue().addAll(items);
}
@Override
public SoftLayerData read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
+ " job instanceid=" + jobInstanceId + " ");
SoftLayerData result = null;
if (getQueue() != null && getQueue().size() > 0) {
result = getQueue().remove();
logger.info("SoftLayerDataItemQueue: Removing item " + result.toString());
} else {
logger.info("SoftLayerDataItemQueue: Empty queue. Returning null to signal EOF ");
}
return result;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: open()");
/* Unused method */
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: update()");
/* Unused method */
}
@Override
public void close() throws ItemStreamException {
logger.info("SoftLayerDataItemQueue: close()");
/* Unused method */
}
}
Note: I don't like using SoftLayerDataItemQueue, but I can't figure out any other way to write items processed in one step and read them in another, especially for large volumes and parallel processing. I wish Spring had some sort of way to write data from one step and into another but I could not find it. Other people on SO suggested writing to a file or in a job or step context.