Created
March 14, 2025 08:03
-
-
Save fmbenhassine/04c59d119059bae0beb639a6d62ed92a to your computer and use it in GitHub Desktop.
#SpringBatch failed partition restart sample
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.example; | |
import org.springframework.batch.core.*; | |
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; | |
import org.springframework.batch.core.configuration.annotation.StepScope; | |
import org.springframework.batch.core.job.builder.JobBuilder; | |
import org.springframework.batch.core.launch.JobLauncher; | |
import org.springframework.batch.core.partition.support.Partitioner; | |
import org.springframework.batch.core.repository.JobRepository; | |
import org.springframework.batch.core.step.builder.StepBuilder; | |
import org.springframework.batch.core.step.tasklet.Tasklet; | |
import org.springframework.batch.item.ExecutionContext; | |
import org.springframework.batch.repeat.RepeatStatus; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.context.annotation.AnnotationConfigApplicationContext; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.core.task.SimpleAsyncTaskExecutor; | |
import org.springframework.core.task.TaskExecutor; | |
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; | |
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; | |
import org.springframework.jdbc.support.JdbcTransactionManager; | |
import org.springframework.transaction.PlatformTransactionManager; | |
import javax.sql.DataSource; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.UUID; | |
public class Main { | |
@Configuration | |
@EnableBatchProcessing | |
static class JobConfiguration { | |
@Bean | |
public Step managerStep(JobRepository jobRepository, Step workerStep) { | |
Partitioner partitioner = gridSize -> { | |
Map<String, ExecutionContext> partitionDefinitionMap = new HashMap<>(gridSize); | |
for (int i = 0; i < gridSize; i++) { | |
String key = "partition" + i; | |
ExecutionContext value = new ExecutionContext(); | |
value.put("data", "data" + i); | |
partitionDefinitionMap.put(key, value); | |
} | |
return partitionDefinitionMap; | |
}; | |
return new StepBuilder("managerStep", jobRepository) | |
.partitioner(workerStep.getName(), partitioner) | |
.step(workerStep) | |
.gridSize(4) | |
.taskExecutor(new SimpleAsyncTaskExecutor()) | |
.build(); | |
} | |
@Bean | |
public Step workerStep(JobRepository jobRepository, Tasklet tasklet, PlatformTransactionManager transactionManager) { | |
return new StepBuilder("workerStep", jobRepository) | |
.tasklet(tasklet, transactionManager) | |
.build(); | |
} | |
@Bean | |
@StepScope | |
public Tasklet tasklet(@Value("#{stepExecutionContext['data']}") String partitionData, @Value("#{jobParameters['fail']}") Boolean fail) { | |
return (contribution, chunkContext) -> { | |
if (partitionData.equals("data2") && fail) { | |
throw new Exception("Failed to process partition 2"); | |
} | |
System.out.println(Thread.currentThread().getName() + " processing partition data = " + partitionData); | |
return RepeatStatus.FINISHED; | |
}; | |
} | |
@Bean | |
public Job job(JobRepository jobRepository, @Qualifier("managerStep") Step managerStep) { | |
return new JobBuilder("job", jobRepository) | |
.start(managerStep) | |
.build(); | |
} | |
// infrastructure beans | |
@Bean | |
public DataSource dataSource() { | |
return new EmbeddedDatabaseBuilder() | |
.setType(EmbeddedDatabaseType.H2) | |
.addScript("/org/springframework/batch/core/schema-h2.sql") | |
.build(); | |
} | |
@Bean | |
public JdbcTransactionManager transactionManager(DataSource dataSource) { | |
return new JdbcTransactionManager(dataSource); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
ApplicationContext context = new AnnotationConfigApplicationContext(JobConfiguration.class); | |
JobLauncher jobLauncher = context.getBean(JobLauncher.class); | |
Job job = context.getBean(Job.class); | |
String uuid = UUID.randomUUID().toString(); | |
JobParameters jobParameters = new JobParametersBuilder() | |
.addString("uuid", uuid) | |
.addJobParameter("fail", true, Boolean.class, false) | |
.toJobParameters(); | |
// first run: partition 2 fails | |
JobExecution jobExecution1 = jobLauncher.run(job, jobParameters); | |
System.out.println("#############"); | |
Thread.sleep(1000); | |
System.out.println("#############"); | |
// second run: only partition 2 should run | |
jobParameters = new JobParametersBuilder() | |
.addString("uuid", uuid) | |
.addJobParameter("fail", false, Boolean.class, false) | |
.toJobParameters(); | |
JobExecution jobExecution2 = jobLauncher.run(job, jobParameters); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>org.example</groupId> | |
<artifactId>sb-partition</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<properties> | |
<maven.compiler.source>17</maven.compiler.source> | |
<maven.compiler.target>17</maven.compiler.target> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.batch</groupId> | |
<artifactId>spring-batch-core</artifactId> | |
<version>5.2.1</version> | |
</dependency> | |
<dependency> | |
<groupId>com.h2database</groupId> | |
<artifactId>h2</artifactId> | |
<version>2.3.232</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-simple</artifactId> | |
<version>2.0.17</version> | |
</dependency> | |
</dependencies> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
prints:
on restart, only the failed partition is re-processed. Note how I used the same identifying job parameter to restart the same job instance and a non-identifying job parameter to use as a failure flag for the demo.