0

TL;DR

I used this example to build a simple application that uses Spring Batch (remote partitioning) and Spring Cloud data flow to deploy worker pods on Kubernetes.

Looking at the logs for the "partitionedJob" pod created on Kubernetes, I see that the worker steps (pods) are getting launched sequentially. The time taken to launch one worker pod is roughly 10-15 seconds (Sometimes this is as high as 2 minutes as well as shown below). As a result, worker pods are getting launched at a gap of 10-15 seconds one by one.


Logs :

[info 2021/06/26 14:30:29.089 UTC <main> tid=0x1] Job: [SimpleJob: [name=job]] launched with the following parameters: [{maxWorkers=40, chunkSize=5000, run.id=13, batch.worker-app=docker://docker-myhost.artifactrepository.net/my-project/myjob:0.1, grideSize=40}]

[info 2021/06/26 14:30:29.155 UTC <main> tid=0x1] The job execution id 26 was run within the task execution 235

[info 2021/06/26 14:30:29.184 UTC <main> tid=0x1] Executing step: [masterStep]

2021-06-26 14:30:29 INFO  AuditRecordPartitioner:51 - Creating partitions. [gridSize=40]

[info 2021/06/26 14:32:41.128 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker entry point style: exec

It takes roughly 7-8 minutes for 40 pods to be created on Kubernetes. (Sometimes this number is as high as 20 minutes) What would be ideal is for all the partitioned steps (worker pods) to be launched asynchronously in one go.

Question : How can we configure Spring Cloud Data Flow /Spring Batch to launch worker pods (partitioned steps) asynchronously/parallelly instead of sequentially? If SCDF is indeed creating 40 partitions in one go, why is that in reality, the master job is creating these partitions one by one at a very slow rate? (As seen in the logs). I don't believe it is an infra issue because I am able to launch tasks at a rapid speed using the Task DSL

Relevant code:

@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}


/**
 * 
 * Main job controller
 * 
 * 
 */
@Profile("master")
@Configuration
public class MasterConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(MasterConfiguration.class);

    @Autowired
    private ApplicationArguments applicationArguments;

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory) {
        LOGGER.info("Creating job...");
        SimpleJobBuilder jobBuilder = jobBuilderFactory.get("job").start(masterStep(null, null, null));

        jobBuilder.incrementer(new RunIdIncrementer());

        return jobBuilder.build();
    }

    @Bean
    public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner,
            PartitionHandler partitionHandler) {
        LOGGER.info("Creating masterStep");
        return stepBuilderFactory.get("masterStep").partitioner("workerStep", partitioner)
                .partitionHandler(partitionHandler).build();
    }

    @Bean
    public DeployerPartitionHandler partitionHandler(@Value("${spring.profiles.active}") String activeProfile,
            @Value("${batch.worker-app}") String resourceLocation,
            @Value("${spring.application.name}") String applicationName, ApplicationContext context,
            TaskLauncher taskLauncher, JobExplorer jobExplorer, ResourceLoaderResolver resolver) {
        ResourceLoader resourceLoader = resolver.get(resourceLocation);
        Resource resource = resourceLoader.getResource(resourceLocation);
        DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                "workerStep");

        List<String> commandLineArgs = new ArrayList<>();
        commandLineArgs.add("--spring.profiles.active=" + activeProfile.replace("master", "worker"));
        commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
        commandLineArgs.add("--spring.batch.initializer.enabled=false");

        commandLineArgs.addAll(Arrays.stream(applicationArguments.getSourceArgs()).filter(
                x -> !x.startsWith("--spring.profiles.active=") && !x.startsWith("--spring.cloud.task.executionid="))
                .collect(Collectors.toList()));
        commandLineArgs.addAll(applicationArguments.getNonOptionArgs());

        partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
        partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());

        List<String> nonOptionArgs = applicationArguments.getNonOptionArgs();

        partitionHandler.setMaxWorkers(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 3)));
        partitionHandler.setGridSize(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 4)));
        partitionHandler.setApplicationName(applicationName);

        return partitionHandler;
    }

    @Bean("auditRecordPartitioner")
    public Partitioner auditRecordPartitioner() {
        
        return new AuditRecordPartitioner<>());
    }
    
    private String getNonOptionArgValue(List<String> nonOptionArgs, int index)  {
        return nonOptionArgs.get(index).split("=")[1];
    }
}


@Profile("worker")
@Configuration
public class WorkerConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerConfiguration.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private ApplicationArguments applicationArguments;

    @Bean
    public DeployerStepExecutionHandler stepExecutionHandler(ApplicationContext context, JobExplorer jobExplorer,
            JobRepository jobRepository) {
        LOGGER.info("stepExecutionHandler...");
        return new DeployerStepExecutionHandler(context, jobExplorer, jobRepository);
    }

    @Bean
    public Step workerStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("workerStep").tasklet(workerTasklet(null)).build();
    }

    @Bean
    @StepScope
    public WorkerTasklet workerTasklet(@Value("#{stepExecutionContext['key']}") String key) {
        return new WorkerTasklet(key);
    }

    
}

Note that I am passing gridSize and maxWorkers as input argumnets to the master step (From SCDF UI while launching the task).

Ping
  • 587
  • 5
  • 27

2 Answers2

0

The sample, for demonstration purposes, sets the maximum number of workers to 2 here. So for your 40 partitions, only two workers will be launched in parallel and which makes you think you partitions are being processed in sequence.

You need to update the sample (or make it configurable) and increase the number of concurrent workers as needed.

Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50
  • Thanks for the answer. I am using max-workers as 40 and not 2. The reason why it takes 7-8 minutes for all of them to get created is because the master job is creating child partitions sequentially after a gap of 10-15 seconds. By the time the 40th pod request is generated, 7-8 minutes have elapsed. This is 7-8 minutes lost on doing actual processing. I want to launch 100 worker pods so this could easily take 15 minutes till the time the 100th pod is created. What would be ideal is to be able to send pod creation requests in one go (That is asynchronously and not sequentially). – Ping Jun 25 '21 at 17:11
  • `because the master job is creating child partitions sequentially after a gap of 10-15 seconds`: That should not be the case. Are using the same code as the sample? The master creates `GRID_SIZE` partitions at once. I don't see where it could create them sequentially as you mention. Please share your [minimal example](https://stackoverflow.com/help/minimal-reproducible-example) to be able to help you efficiently, because I think we are not talking about the same thing. – Mahmoud Ben Hassine Jun 25 '21 at 19:50
  • I have spent a lot of energy adding more details to my question. (Code as well as actual logs from k8). Surprisingly, I can see that it is taking 2 minutes between each pod creation request as opposed to my earlier observations of 10-15 seconds. If SCDF is indeed creating 40 partitions in one go, why is that in reality, the master job is creating these partitions one by one at a very slow rate? (As seen in the logs). I don't believe it is an infra issue because I am able to launch tasks at a rapid speed using the [Task DSL](https://dataflow.spring.io/docs/feature-guides/batch/java-dsl/) – Ping Jun 26 '21 at 14:58
  • Thank you for the updates. It seems that workers are launched sequentially here: https://github.com/spring-cloud/spring-cloud-task/blob/7f20c5a06ed7b1edb83b8bd187fcdaccfe8ed20e/spring-cloud-task-batch/src/main/java/org/springframework/cloud/task/batch/partition/DeployerPartitionHandler.java#L309. This should be pretty quick if you provide enough resources to your Kubernetes cluster to launch 40 workers at once, otherwise you will be limited by k8s itself (ie without enough resources to launch all workers at once, k8s will wait for workers to complete before launching new tasks). – Mahmoud Ben Hassine Jun 28 '21 at 12:20
  • Thanks for the response. The Kubernetes cluster is an enterprise cluster so it may be months before corporate agrees to give more resources. I have tried an alternate approach using the - http:// dataflow.spring.io/docs/feature-guides/batch/java-dsl where I launch 40 tasks in one go and I can see as high as 20 pods under creation within seconds on k8. This proves that the k8 cluster does have the capacity to create atleast 20 pods in one shot. I am unable to leverage this parallelism when using remote partitioning. Is there a way I can do this as of today? – Ping Jun 28 '21 at 16:52
  • I agree with Mahmoud's response. I've created the following issue: https://github.com/spring-cloud/spring-cloud-task/issues/785 – Glenn Renfro Jun 29 '21 at 15:59
  • @GlennRenfro Thanks for acknowledging the suggestion. Hoping that there is a way to deploy partitions asynchronously in the future. On a related topic and as mentioned in my question as well, I am using the Java Task DSL to overcome this limitation; however, the Task DSL introduces a few other challenges as shown in these questions 1) https://stackoverflow.com/questions/67674952/spring-cloud-data-flow-java-dsl-only-returns-the-last-20-task-execution-instan and 2) https://stackoverflow.com/questions/68118229/spring-cloud-data-flow-increase-timeout-while-launching-tasks-using-task-dsl – Ping Jul 03 '21 at 15:49
0

As mentioned by Mahmoud Ben Hassine in the comments, the workers are launched sequentially :

private void launchWorkers(Set<StepExecution> candidates,
            Set<StepExecution> executed) {
        for (StepExecution execution : candidates) {
            if (this.currentWorkers < this.maxWorkers || this.maxWorkers < 0) {
                launchWorker(execution);
                this.currentWorkers++;

                executed.add(execution);
            }
        }
    }

As Glen Renfro mentioned in the comments, an issue has been created for the same. This answer will be updated if a solution is available for launching workers asynchronously.

Ping
  • 587
  • 5
  • 27