Created
March 14, 2025 08:40
-
-
Save fmbenhassine/a71d7eb5b010b7692828ce2f4f7e4f94 to your computer and use it in GitHub Desktop.
#SpringBatch concurrent steps with blocking queue reader/writer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example; | |
import org.springframework.batch.core.Job; | |
import org.springframework.batch.core.JobParameters; | |
import org.springframework.batch.core.Step; | |
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; | |
import org.springframework.batch.core.job.builder.FlowBuilder; | |
import org.springframework.batch.core.job.builder.JobBuilder; | |
import org.springframework.batch.core.job.flow.Flow; | |
import org.springframework.batch.core.launch.JobLauncher; | |
import org.springframework.batch.core.repository.JobRepository; | |
import org.springframework.batch.core.step.builder.StepBuilder; | |
import org.springframework.batch.item.Chunk; | |
import org.springframework.batch.item.queue.BlockingQueueItemReader; | |
import org.springframework.batch.item.queue.BlockingQueueItemWriter; | |
import org.springframework.batch.item.support.IteratorItemReader; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.context.annotation.AnnotationConfigApplicationContext; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.core.task.SimpleAsyncTaskExecutor; | |
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; | |
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; | |
import org.springframework.jdbc.support.JdbcTransactionManager; | |
import org.springframework.transaction.PlatformTransactionManager; | |
import javax.sql.DataSource; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.stream.IntStream; | |
import java.util.stream.Stream; | |
/** | |
* Sample of two steps running in parallel, but processing items concurrently | |
* from the same blocking queue (producer/consumer pattern) | |
*/ | |
public class Main { | |
record Person(String name) { } | |
@Configuration | |
@EnableBatchProcessing | |
static class JobConfiguration { | |
// staging area | |
@Bean | |
public BlockingQueue<Person> queue() { | |
return new ArrayBlockingQueue<>(20); | |
} | |
@Bean | |
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager, BlockingQueue<Person> queue) { | |
Stream<Person> items = IntStream.rangeClosed(1, 100).mapToObj(i -> new Person("foo" + i)); | |
return new StepBuilder("step1", jobRepository) | |
.<Person, Person>chunk(10, transactionManager) | |
.reader(new IteratorItemReader<>(items.iterator())) | |
.processor(item -> { | |
System.out.println(Thread.currentThread().getName() + " pre-processing item: " + item.name()); | |
Thread.sleep(50); | |
return new Person(item.name().toUpperCase()); | |
}) | |
.writer(new BlockingQueueItemWriter<>(queue) { | |
@Override | |
public void write(Chunk<? extends Person> items) throws Exception { | |
System.out.println("staging items = " + items); | |
super.write(items); | |
}}) | |
.build(); | |
} | |
@Bean | |
public Step step2(JobRepository jobRepository, PlatformTransactionManager transactionManager, BlockingQueue<Person> queue) { | |
return new StepBuilder("step2", jobRepository) | |
.<Person, Person>chunk(10, transactionManager) | |
.reader(new BlockingQueueItemReader<>(queue)) | |
.processor(item -> { | |
System.out.println(Thread.currentThread().getName() + " processing item: " + item.name()); | |
Thread.sleep(50); | |
return new Person("#"+ item.name() + "#"); | |
}) | |
.writer(items -> { | |
System.out.println("writing items = " + items); | |
}) | |
.build(); | |
} | |
@Bean | |
public Job Job(JobRepository jobRepository, @Qualifier("step1") Step step1, @Qualifier("step2") Step step2) { | |
return new JobBuilder("job", jobRepository) | |
.flow(step1) | |
.split(new SimpleAsyncTaskExecutor()) | |
.add(new FlowBuilder<Flow>("parallelFlow") | |
.start(step2) | |
.build()) | |
.end() | |
.build(); | |
} | |
// infrastructure beans | |
@Bean | |
public DataSource dataSource() { | |
return new EmbeddedDatabaseBuilder() | |
.setType(EmbeddedDatabaseType.H2) | |
.addScript("/org/springframework/batch/core/schema-h2.sql") | |
.build(); | |
} | |
@Bean | |
public JdbcTransactionManager transactionManager(DataSource dataSource) { | |
return new JdbcTransactionManager(dataSource); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
ApplicationContext context = new AnnotationConfigApplicationContext(JobConfiguration.class); | |
JobLauncher jobLauncher = context.getBean(JobLauncher.class); | |
Job job = context.getBean(Job.class); | |
jobLauncher.run(job, new JobParameters()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>org.example</groupId> | |
<artifactId>sb-concurrent-steps</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<properties> | |
<maven.compiler.source>17</maven.compiler.source> | |
<maven.compiler.target>17</maven.compiler.target> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.batch</groupId> | |
<artifactId>spring-batch-core</artifactId> | |
<version>5.2.1</version> | |
</dependency> | |
<dependency> | |
<groupId>com.h2database</groupId> | |
<artifactId>h2</artifactId> | |
<version>2.3.232</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-simple</artifactId> | |
<version>2.0.17</version> | |
</dependency> | |
</dependencies> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
prints:
Step 1 pre-processes items and stages them to the queue. Step 2 consumes pre-processed items as soon as they are ready. Just for the demo, I've overridden
BlockingQueueItemWriter#write
to add a log message in order to distinguish the staging operation from the actual write operation.