Skip to content

Instantly share code, notes, and snippets.

@fmbenhassine
Created March 14, 2025 08:40
Show Gist options
  • Save fmbenhassine/a71d7eb5b010b7692828ce2f4f7e4f94 to your computer and use it in GitHub Desktop.
Save fmbenhassine/a71d7eb5b010b7692828ce2f4f7e4f94 to your computer and use it in GitHub Desktop.
#SpringBatch concurrent steps with blocking queue reader/writer
package org.example;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.queue.BlockingQueueItemReader;
import org.springframework.batch.item.queue.BlockingQueueItemWriter;
import org.springframework.batch.item.support.IteratorItemReader;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.jdbc.support.JdbcTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.IntStream;
import java.util.stream.Stream;
/**
* Sample of two steps running in parallel, but processing items concurrently
* from the same blocking queue (producer/consumer pattern)
*/
public class Main {
record Person(String name) { }
@Configuration
@EnableBatchProcessing
static class JobConfiguration {
// staging area
@Bean
public BlockingQueue<Person> queue() {
return new ArrayBlockingQueue<>(20);
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, BlockingQueue<Person> queue) {
Stream<Person> items = IntStream.rangeClosed(1, 100).mapToObj(i -> new Person("foo" + i));
return new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(10, transactionManager)
.reader(new IteratorItemReader<>(items.iterator()))
.processor(item -> {
System.out.println(Thread.currentThread().getName() + " pre-processing item: " + item.name());
Thread.sleep(50);
return new Person(item.name().toUpperCase());
})
.writer(new BlockingQueueItemWriter<>(queue) {
@Override
public void write(Chunk<? extends Person> items) throws Exception {
System.out.println("staging items = " + items);
super.write(items);
}})
.build();
}
@Bean
public Step step2(JobRepository jobRepository, PlatformTransactionManager transactionManager, BlockingQueue<Person> queue) {
return new StepBuilder("step2", jobRepository)
.<Person, Person>chunk(10, transactionManager)
.reader(new BlockingQueueItemReader<>(queue))
.processor(item -> {
System.out.println(Thread.currentThread().getName() + " processing item: " + item.name());
Thread.sleep(50);
return new Person("#"+ item.name() + "#");
})
.writer(items -> {
System.out.println("writing items = " + items);
})
.build();
}
@Bean
public Job Job(JobRepository jobRepository, @Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
return new JobBuilder("job", jobRepository)
.flow(step1)
.split(new SimpleAsyncTaskExecutor())
.add(new FlowBuilder<Flow>("parallelFlow")
.start(step2)
.build())
.end()
.build();
}
// infrastructure beans
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTransactionManager transactionManager(DataSource dataSource) {
return new JdbcTransactionManager(dataSource);
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(JobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>sb-concurrent-steps</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.3.232</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.17</version>
</dependency>
</dependencies>
</project>
@fmbenhassine
Copy link
Author

fmbenhassine commented Mar 14, 2025

prints:

[main] INFO org.springframework.batch.core.configuration.annotation.BatchRegistrar - Finished Spring Batch infrastructure beans configuration in 2 ms.
[main] INFO org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseFactory - Starting embedded database: url='jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=false', username='sa'
[main] INFO org.springframework.batch.core.repository.support.JobRepositoryFactoryBean - No database type set, using meta data indicating: H2
[main] INFO org.springframework.batch.core.configuration.annotation.BatchObservabilityBeanPostProcessor - No Micrometer observation registry found, defaulting to ObservationRegistry.NOOP
[main] INFO org.springframework.batch.core.configuration.annotation.BatchObservabilityBeanPostProcessor - No Micrometer observation registry found, defaulting to ObservationRegistry.NOOP
[main] INFO org.springframework.batch.core.configuration.annotation.BatchObservabilityBeanPostProcessor - No Micrometer observation registry found, defaulting to ObservationRegistry.NOOP
[main] INFO org.springframework.batch.core.launch.support.TaskExecutorJobLauncher - No TaskExecutor has been set, defaulting to synchronous executor.
[main] INFO org.springframework.batch.core.launch.support.TaskExecutorJobLauncher - Job: [FlowJob: [name=job]] launched with the following parameters: [{}]
[SimpleAsyncTaskExecutor-2] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [step1]
[SimpleAsyncTaskExecutor-1] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [step2]
SimpleAsyncTaskExecutor-2 pre-processing item: foo1
SimpleAsyncTaskExecutor-2 pre-processing item: foo2
SimpleAsyncTaskExecutor-2 pre-processing item: foo3
SimpleAsyncTaskExecutor-2 pre-processing item: foo4
SimpleAsyncTaskExecutor-2 pre-processing item: foo5
SimpleAsyncTaskExecutor-2 pre-processing item: foo6
SimpleAsyncTaskExecutor-2 pre-processing item: foo7
SimpleAsyncTaskExecutor-2 pre-processing item: foo8
SimpleAsyncTaskExecutor-2 pre-processing item: foo9
SimpleAsyncTaskExecutor-2 pre-processing item: foo10
staging items = [items=[Person[name=FOO1], Person[name=FOO2], Person[name=FOO3], Person[name=FOO4], Person[name=FOO5], Person[name=FOO6], Person[name=FOO7], Person[name=FOO8], Person[name=FOO9], Person[name=FOO10]], skips=[]]
SimpleAsyncTaskExecutor-1 processing item: FOO1
SimpleAsyncTaskExecutor-2 pre-processing item: foo11
SimpleAsyncTaskExecutor-1 processing item: FOO2
SimpleAsyncTaskExecutor-2 pre-processing item: foo12
SimpleAsyncTaskExecutor-1 processing item: FOO3
SimpleAsyncTaskExecutor-2 pre-processing item: foo13
SimpleAsyncTaskExecutor-1 processing item: FOO4
SimpleAsyncTaskExecutor-2 pre-processing item: foo14
SimpleAsyncTaskExecutor-1 processing item: FOO5
SimpleAsyncTaskExecutor-2 pre-processing item: foo15
SimpleAsyncTaskExecutor-1 processing item: FOO6
SimpleAsyncTaskExecutor-2 pre-processing item: foo16
SimpleAsyncTaskExecutor-1 processing item: FOO7
SimpleAsyncTaskExecutor-2 pre-processing item: foo17
SimpleAsyncTaskExecutor-1 processing item: FOO8
SimpleAsyncTaskExecutor-2 pre-processing item: foo18
SimpleAsyncTaskExecutor-1 processing item: FOO9
SimpleAsyncTaskExecutor-2 pre-processing item: foo19
SimpleAsyncTaskExecutor-1 processing item: FOO10
SimpleAsyncTaskExecutor-2 pre-processing item: foo20
writing items = [items=[Person[name=#FOO1#], Person[name=#FOO2#], Person[name=#FOO3#], Person[name=#FOO4#], Person[name=#FOO5#], Person[name=#FOO6#], Person[name=#FOO7#], Person[name=#FOO8#], Person[name=#FOO9#], Person[name=#FOO10#]], skips=[]]
staging items = [items=[Person[name=FOO11], Person[name=FOO12], Person[name=FOO13], Person[name=FOO14], Person[name=FOO15], Person[name=FOO16], Person[name=FOO17], Person[name=FOO18], Person[name=FOO19], Person[name=FOO20]], skips=[]]
SimpleAsyncTaskExecutor-1 processing item: FOO11
SimpleAsyncTaskExecutor-2 pre-processing item: foo21
SimpleAsyncTaskExecutor-1 processing item: FOO12
SimpleAsyncTaskExecutor-2 pre-processing item: foo22
SimpleAsyncTaskExecutor-1 processing item: FOO13
SimpleAsyncTaskExecutor-2 pre-processing item: foo23
SimpleAsyncTaskExecutor-1 processing item: FOO14
SimpleAsyncTaskExecutor-2 pre-processing item: foo24
SimpleAsyncTaskExecutor-1 processing item: FOO15
SimpleAsyncTaskExecutor-2 pre-processing item: foo25
SimpleAsyncTaskExecutor-1 processing item: FOO16
SimpleAsyncTaskExecutor-2 pre-processing item: foo26
SimpleAsyncTaskExecutor-1 processing item: FOO17
SimpleAsyncTaskExecutor-2 pre-processing item: foo27
SimpleAsyncTaskExecutor-1 processing item: FOO18
SimpleAsyncTaskExecutor-2 pre-processing item: foo28
SimpleAsyncTaskExecutor-1 processing item: FOO19
SimpleAsyncTaskExecutor-2 pre-processing item: foo29
SimpleAsyncTaskExecutor-1 processing item: FOO20
SimpleAsyncTaskExecutor-2 pre-processing item: foo30
writing items = [items=[Person[name=#FOO11#], Person[name=#FOO12#], Person[name=#FOO13#], Person[name=#FOO14#], Person[name=#FOO15#], Person[name=#FOO16#], Person[name=#FOO17#], Person[name=#FOO18#], Person[name=#FOO19#], Person[name=#FOO20#]], skips=[]]
staging items = [items=[Person[name=FOO21], Person[name=FOO22], Person[name=FOO23], Person[name=FOO24], Person[name=FOO25], Person[name=FOO26], Person[name=FOO27], Person[name=FOO28], Person[name=FOO29], Person[name=FOO30]], skips=[]]
SimpleAsyncTaskExecutor-1 processing item: FOO21
SimpleAsyncTaskExecutor-2 pre-processing item: foo31
SimpleAsyncTaskExecutor-1 processing item: FOO22
SimpleAsyncTaskExecutor-2 pre-processing item: foo32
SimpleAsyncTaskExecutor-1 processing item: FOO23
SimpleAsyncTaskExecutor-2 pre-processing item: foo33
SimpleAsyncTaskExecutor-1 processing item: FOO24
SimpleAsyncTaskExecutor-2 pre-processing item: foo34
SimpleAsyncTaskExecutor-1 processing item: FOO25
SimpleAsyncTaskExecutor-2 pre-processing item: foo35
SimpleAsyncTaskExecutor-1 processing item: FOO26
SimpleAsyncTaskExecutor-2 pre-processing item: foo36
SimpleAsyncTaskExecutor-1 processing item: FOO27
SimpleAsyncTaskExecutor-2 pre-processing item: foo37
SimpleAsyncTaskExecutor-1 processing item: FOO28
SimpleAsyncTaskExecutor-2 pre-processing item: foo38
SimpleAsyncTaskExecutor-1 processing item: FOO29
SimpleAsyncTaskExecutor-2 pre-processing item: foo39
SimpleAsyncTaskExecutor-2 pre-processing item: foo40
SimpleAsyncTaskExecutor-1 processing item: FOO30
writing items = [items=[Person[name=#FOO21#], Person[name=#FOO22#], Person[name=#FOO23#], Person[name=#FOO24#], Person[name=#FOO25#], Person[name=#FOO26#], Person[name=#FOO27#], Person[name=#FOO28#], Person[name=#FOO29#], Person[name=#FOO30#]], skips=[]]
staging items = [items=[Person[name=FOO31], Person[name=FOO32], Person[name=FOO33], Person[name=FOO34], Person[name=FOO35], Person[name=FOO36], Person[name=FOO37], Person[name=FOO38], Person[name=FOO39], Person[name=FOO40]], skips=[]]
SimpleAsyncTaskExecutor-2 pre-processing item: foo41
SimpleAsyncTaskExecutor-1 processing item: FOO31
SimpleAsyncTaskExecutor-2 pre-processing item: foo42
SimpleAsyncTaskExecutor-1 processing item: FOO32
SimpleAsyncTaskExecutor-2 pre-processing item: foo43
SimpleAsyncTaskExecutor-1 processing item: FOO33
SimpleAsyncTaskExecutor-2 pre-processing item: foo44
SimpleAsyncTaskExecutor-1 processing item: FOO34
SimpleAsyncTaskExecutor-1 processing item: FOO35
SimpleAsyncTaskExecutor-2 pre-processing item: foo45
SimpleAsyncTaskExecutor-1 processing item: FOO36
SimpleAsyncTaskExecutor-2 pre-processing item: foo46
SimpleAsyncTaskExecutor-2 pre-processing item: foo47
SimpleAsyncTaskExecutor-1 processing item: FOO37
SimpleAsyncTaskExecutor-1 processing item: FOO38
SimpleAsyncTaskExecutor-2 pre-processing item: foo48
SimpleAsyncTaskExecutor-1 processing item: FOO39
SimpleAsyncTaskExecutor-2 pre-processing item: foo49
SimpleAsyncTaskExecutor-1 processing item: FOO40
SimpleAsyncTaskExecutor-2 pre-processing item: foo50
writing items = [items=[Person[name=#FOO31#], Person[name=#FOO32#], Person[name=#FOO33#], Person[name=#FOO34#], Person[name=#FOO35#], Person[name=#FOO36#], Person[name=#FOO37#], Person[name=#FOO38#], Person[name=#FOO39#], Person[name=#FOO40#]], skips=[]]
staging items = [items=[Person[name=FOO41], Person[name=FOO42], Person[name=FOO43], Person[name=FOO44], Person[name=FOO45], Person[name=FOO46], Person[name=FOO47], Person[name=FOO48], Person[name=FOO49], Person[name=FOO50]], skips=[]]
SimpleAsyncTaskExecutor-1 processing item: FOO41
SimpleAsyncTaskExecutor-2 pre-processing item: foo51
SimpleAsyncTaskExecutor-1 processing item: FOO42
SimpleAsyncTaskExecutor-2 pre-processing item: foo52
SimpleAsyncTaskExecutor-2 pre-processing item: foo53
SimpleAsyncTaskExecutor-1 processing item: FOO43
SimpleAsyncTaskExecutor-1 processing item: FOO44
SimpleAsyncTaskExecutor-2 pre-processing item: foo54
SimpleAsyncTaskExecutor-2 pre-processing item: foo55
SimpleAsyncTaskExecutor-1 processing item: FOO45
SimpleAsyncTaskExecutor-2 pre-processing item: foo56
SimpleAsyncTaskExecutor-1 processing item: FOO46
SimpleAsyncTaskExecutor-2 pre-processing item: foo57
SimpleAsyncTaskExecutor-1 processing item: FOO47
SimpleAsyncTaskExecutor-1 processing item: FOO48
SimpleAsyncTaskExecutor-2 pre-processing item: foo58
SimpleAsyncTaskExecutor-1 processing item: FOO49
SimpleAsyncTaskExecutor-2 pre-processing item: foo59
SimpleAsyncTaskExecutor-2 pre-processing item: foo60
SimpleAsyncTaskExecutor-1 processing item: FOO50
staging items = [items=[Person[name=FOO51], Person[name=FOO52], Person[name=FOO53], Person[name=FOO54], Person[name=FOO55], Person[name=FOO56], Person[name=FOO57], Person[name=FOO58], Person[name=FOO59], Person[name=FOO60]], skips=[]]
writing items = [items=[Person[name=#FOO41#], Person[name=#FOO42#], Person[name=#FOO43#], Person[name=#FOO44#], Person[name=#FOO45#], Person[name=#FOO46#], Person[name=#FOO47#], Person[name=#FOO48#], Person[name=#FOO49#], Person[name=#FOO50#]], skips=[]]
SimpleAsyncTaskExecutor-2 pre-processing item: foo61
SimpleAsyncTaskExecutor-1 processing item: FOO51
SimpleAsyncTaskExecutor-2 pre-processing item: foo62
SimpleAsyncTaskExecutor-1 processing item: FOO52
SimpleAsyncTaskExecutor-2 pre-processing item: foo63
SimpleAsyncTaskExecutor-1 processing item: FOO53
SimpleAsyncTaskExecutor-2 pre-processing item: foo64
SimpleAsyncTaskExecutor-1 processing item: FOO54
SimpleAsyncTaskExecutor-1 processing item: FOO55
SimpleAsyncTaskExecutor-2 pre-processing item: foo65
SimpleAsyncTaskExecutor-1 processing item: FOO56
SimpleAsyncTaskExecutor-2 pre-processing item: foo66
SimpleAsyncTaskExecutor-2 pre-processing item: foo67
SimpleAsyncTaskExecutor-1 processing item: FOO57
SimpleAsyncTaskExecutor-2 pre-processing item: foo68
SimpleAsyncTaskExecutor-1 processing item: FOO58
SimpleAsyncTaskExecutor-2 pre-processing item: foo69
SimpleAsyncTaskExecutor-1 processing item: FOO59
SimpleAsyncTaskExecutor-2 pre-processing item: foo70
SimpleAsyncTaskExecutor-1 processing item: FOO60
staging items = [items=[Person[name=FOO61], Person[name=FOO62], Person[name=FOO63], Person[name=FOO64], Person[name=FOO65], Person[name=FOO66], Person[name=FOO67], Person[name=FOO68], Person[name=FOO69], Person[name=FOO70]], skips=[]]
SimpleAsyncTaskExecutor-2 pre-processing item: foo71
writing items = [items=[Person[name=#FOO51#], Person[name=#FOO52#], Person[name=#FOO53#], Person[name=#FOO54#], Person[name=#FOO55#], Person[name=#FOO56#], Person[name=#FOO57#], Person[name=#FOO58#], Person[name=#FOO59#], Person[name=#FOO60#]], skips=[]]
SimpleAsyncTaskExecutor-1 processing item: FOO61
SimpleAsyncTaskExecutor-2 pre-processing item: foo72
SimpleAsyncTaskExecutor-1 processing item: FOO62
SimpleAsyncTaskExecutor-2 pre-processing item: foo73
SimpleAsyncTaskExecutor-1 processing item: FOO63
SimpleAsyncTaskExecutor-2 pre-processing item: foo74
SimpleAsyncTaskExecutor-1 processing item: FOO64
SimpleAsyncTaskExecutor-2 pre-processing item: foo75
SimpleAsyncTaskExecutor-1 processing item: FOO65
SimpleAsyncTaskExecutor-2 pre-processing item: foo76
SimpleAsyncTaskExecutor-1 processing item: FOO66
SimpleAsyncTaskExecutor-2 pre-processing item: foo77
SimpleAsyncTaskExecutor-1 processing item: FOO67
SimpleAsyncTaskExecutor-2 pre-processing item: foo78
SimpleAsyncTaskExecutor-1 processing item: FOO68
SimpleAsyncTaskExecutor-2 pre-processing item: foo79
SimpleAsyncTaskExecutor-1 processing item: FOO69
SimpleAsyncTaskExecutor-2 pre-processing item: foo80
SimpleAsyncTaskExecutor-1 processing item: FOO70
staging items = [items=[Person[name=FOO71], Person[name=FOO72], Person[name=FOO73], Person[name=FOO74], Person[name=FOO75], Person[name=FOO76], Person[name=FOO77], Person[name=FOO78], Person[name=FOO79], Person[name=FOO80]], skips=[]]
SimpleAsyncTaskExecutor-2 pre-processing item: foo81
writing items = [items=[Person[name=#FOO61#], Person[name=#FOO62#], Person[name=#FOO63#], Person[name=#FOO64#], Person[name=#FOO65#], Person[name=#FOO66#], Person[name=#FOO67#], Person[name=#FOO68#], Person[name=#FOO69#], Person[name=#FOO70#]], skips=[]]
SimpleAsyncTaskExecutor-1 processing item: FOO71
SimpleAsyncTaskExecutor-2 pre-processing item: foo82
SimpleAsyncTaskExecutor-1 processing item: FOO72
SimpleAsyncTaskExecutor-2 pre-processing item: foo83
SimpleAsyncTaskExecutor-1 processing item: FOO73
SimpleAsyncTaskExecutor-2 pre-processing item: foo84
SimpleAsyncTaskExecutor-1 processing item: FOO74
SimpleAsyncTaskExecutor-2 pre-processing item: foo85
SimpleAsyncTaskExecutor-1 processing item: FOO75
SimpleAsyncTaskExecutor-1 processing item: FOO76
SimpleAsyncTaskExecutor-2 pre-processing item: foo86
SimpleAsyncTaskExecutor-2 pre-processing item: foo87
SimpleAsyncTaskExecutor-1 processing item: FOO77
SimpleAsyncTaskExecutor-2 pre-processing item: foo88
SimpleAsyncTaskExecutor-1 processing item: FOO78
SimpleAsyncTaskExecutor-2 pre-processing item: foo89
SimpleAsyncTaskExecutor-1 processing item: FOO79
SimpleAsyncTaskExecutor-2 pre-processing item: foo90
SimpleAsyncTaskExecutor-1 processing item: FOO80
staging items = [items=[Person[name=FOO81], Person[name=FOO82], Person[name=FOO83], Person[name=FOO84], Person[name=FOO85], Person[name=FOO86], Person[name=FOO87], Person[name=FOO88], Person[name=FOO89], Person[name=FOO90]], skips=[]]
writing items = [items=[Person[name=#FOO71#], Person[name=#FOO72#], Person[name=#FOO73#], Person[name=#FOO74#], Person[name=#FOO75#], Person[name=#FOO76#], Person[name=#FOO77#], Person[name=#FOO78#], Person[name=#FOO79#], Person[name=#FOO80#]], skips=[]]
SimpleAsyncTaskExecutor-2 pre-processing item: foo91
SimpleAsyncTaskExecutor-1 processing item: FOO81
SimpleAsyncTaskExecutor-1 processing item: FOO82
SimpleAsyncTaskExecutor-2 pre-processing item: foo92
SimpleAsyncTaskExecutor-2 pre-processing item: foo93
SimpleAsyncTaskExecutor-1 processing item: FOO83
SimpleAsyncTaskExecutor-2 pre-processing item: foo94
SimpleAsyncTaskExecutor-1 processing item: FOO84
SimpleAsyncTaskExecutor-2 pre-processing item: foo95
SimpleAsyncTaskExecutor-1 processing item: FOO85
SimpleAsyncTaskExecutor-2 pre-processing item: foo96
SimpleAsyncTaskExecutor-1 processing item: FOO86
SimpleAsyncTaskExecutor-2 pre-processing item: foo97
SimpleAsyncTaskExecutor-1 processing item: FOO87
SimpleAsyncTaskExecutor-1 processing item: FOO88
SimpleAsyncTaskExecutor-2 pre-processing item: foo98
SimpleAsyncTaskExecutor-1 processing item: FOO89
SimpleAsyncTaskExecutor-2 pre-processing item: foo99
SimpleAsyncTaskExecutor-1 processing item: FOO90
SimpleAsyncTaskExecutor-2 pre-processing item: foo100
writing items = [items=[Person[name=#FOO81#], Person[name=#FOO82#], Person[name=#FOO83#], Person[name=#FOO84#], Person[name=#FOO85#], Person[name=#FOO86#], Person[name=#FOO87#], Person[name=#FOO88#], Person[name=#FOO89#], Person[name=#FOO90#]], skips=[]]
staging items = [items=[Person[name=FOO91], Person[name=FOO92], Person[name=FOO93], Person[name=FOO94], Person[name=FOO95], Person[name=FOO96], Person[name=FOO97], Person[name=FOO98], Person[name=FOO99], Person[name=FOO100]], skips=[]]
SimpleAsyncTaskExecutor-1 processing item: FOO91
[SimpleAsyncTaskExecutor-2] INFO org.springframework.batch.core.step.AbstractStep - Step: [step1] executed in 5s287ms
SimpleAsyncTaskExecutor-1 processing item: FOO92
SimpleAsyncTaskExecutor-1 processing item: FOO93
SimpleAsyncTaskExecutor-1 processing item: FOO94
SimpleAsyncTaskExecutor-1 processing item: FOO95
SimpleAsyncTaskExecutor-1 processing item: FOO96
SimpleAsyncTaskExecutor-1 processing item: FOO97
SimpleAsyncTaskExecutor-1 processing item: FOO98
SimpleAsyncTaskExecutor-1 processing item: FOO99
SimpleAsyncTaskExecutor-1 processing item: FOO100
writing items = [items=[Person[name=#FOO91#], Person[name=#FOO92#], Person[name=#FOO93#], Person[name=#FOO94#], Person[name=#FOO95#], Person[name=#FOO96#], Person[name=#FOO97#], Person[name=#FOO98#], Person[name=#FOO99#], Person[name=#FOO100#]], skips=[]]
[SimpleAsyncTaskExecutor-1] INFO org.springframework.batch.core.step.AbstractStep - Step: [step2] executed in 6s818ms
[main] INFO org.springframework.batch.core.launch.support.TaskExecutorJobLauncher - Job: [FlowJob: [name=job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 6s831ms

Step 1 pre-processes items and stages them to the queue. Step 2 consumes pre-processed items as soon as they are ready. Just for the demo, I've overridden BlockingQueueItemWriter#write to add a log message in order to distinguish the staging operation from the actual write operation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment