Skip to content

Instantly share code, notes, and snippets.

@fmbenhassine
Created March 14, 2025 08:03
Show Gist options
  • Save fmbenhassine/04c59d119059bae0beb639a6d62ed92a to your computer and use it in GitHub Desktop.
Save fmbenhassine/04c59d119059bae0beb639a6d62ed92a to your computer and use it in GitHub Desktop.
#SpringBatch failed partition restart sample
package org.example;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.jdbc.support.JdbcTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class Main {
@Configuration
@EnableBatchProcessing
static class JobConfiguration {
@Bean
public Step managerStep(JobRepository jobRepository, Step workerStep) {
Partitioner partitioner = gridSize -> {
Map<String, ExecutionContext> partitionDefinitionMap = new HashMap<>(gridSize);
for (int i = 0; i < gridSize; i++) {
String key = "partition" + i;
ExecutionContext value = new ExecutionContext();
value.put("data", "data" + i);
partitionDefinitionMap.put(key, value);
}
return partitionDefinitionMap;
};
return new StepBuilder("managerStep", jobRepository)
.partitioner(workerStep.getName(), partitioner)
.step(workerStep)
.gridSize(4)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
@Bean
public Step workerStep(JobRepository jobRepository, Tasklet tasklet, PlatformTransactionManager transactionManager) {
return new StepBuilder("workerStep", jobRepository)
.tasklet(tasklet, transactionManager)
.build();
}
@Bean
@StepScope
public Tasklet tasklet(@Value("#{stepExecutionContext['data']}") String partitionData, @Value("#{jobParameters['fail']}") Boolean fail) {
return (contribution, chunkContext) -> {
if (partitionData.equals("data2") && fail) {
throw new Exception("Failed to process partition 2");
}
System.out.println(Thread.currentThread().getName() + " processing partition data = " + partitionData);
return RepeatStatus.FINISHED;
};
}
@Bean
public Job job(JobRepository jobRepository, @Qualifier("managerStep") Step managerStep) {
return new JobBuilder("job", jobRepository)
.start(managerStep)
.build();
}
// infrastructure beans
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTransactionManager transactionManager(DataSource dataSource) {
return new JdbcTransactionManager(dataSource);
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(JobConfiguration.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
String uuid = UUID.randomUUID().toString();
JobParameters jobParameters = new JobParametersBuilder()
.addString("uuid", uuid)
.addJobParameter("fail", true, Boolean.class, false)
.toJobParameters();
// first run: partition 2 fails
JobExecution jobExecution1 = jobLauncher.run(job, jobParameters);
System.out.println("#############");
Thread.sleep(1000);
System.out.println("#############");
// second run: only partition 2 should run
jobParameters = new JobParametersBuilder()
.addString("uuid", uuid)
.addJobParameter("fail", false, Boolean.class, false)
.toJobParameters();
JobExecution jobExecution2 = jobLauncher.run(job, jobParameters);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>sb-partition</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.3.232</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.17</version>
</dependency>
</dependencies>
</project>
@fmbenhassine
Copy link
Author

prints:

[main] INFO org.springframework.batch.core.configuration.annotation.BatchRegistrar - Finished Spring Batch infrastructure beans configuration in 1 ms.
[main] INFO org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseFactory - Starting embedded database: url='jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=false', username='sa'
[main] INFO org.springframework.batch.core.repository.support.JobRepositoryFactoryBean - No database type set, using meta data indicating: H2
[main] INFO org.springframework.batch.core.configuration.annotation.BatchObservabilityBeanPostProcessor - No Micrometer observation registry found, defaulting to ObservationRegistry.NOOP
[main] INFO org.springframework.batch.core.configuration.annotation.BatchObservabilityBeanPostProcessor - No Micrometer observation registry found, defaulting to ObservationRegistry.NOOP
[main] INFO org.springframework.batch.core.configuration.annotation.BatchObservabilityBeanPostProcessor - No Micrometer observation registry found, defaulting to ObservationRegistry.NOOP
[main] INFO org.springframework.batch.core.launch.support.TaskExecutorJobLauncher - No TaskExecutor has been set, defaulting to synchronous executor.
[main] INFO org.springframework.batch.core.launch.support.TaskExecutorJobLauncher - Job: [SimpleJob: [name=job]] launched with the following parameters: [{'fail':'{value=true, type=class java.lang.Boolean, identifying=false}','uuid':'{value=a44917e6-d27f-49a9-83ed-7bf30ef44997, type=class java.lang.String, identifying=true}'}]
[main] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [managerStep]
SimpleAsyncTaskExecutor-1 processing partition data = data3
SimpleAsyncTaskExecutor-3 processing partition data = data1
SimpleAsyncTaskExecutor-2 processing partition data = data0
[SimpleAsyncTaskExecutor-1] INFO org.springframework.batch.core.step.AbstractStep - Step: [workerStep:partition3] executed in 18ms
[SimpleAsyncTaskExecutor-4] ERROR org.springframework.batch.core.step.AbstractStep - Encountered an error executing step workerStep in job job
java.lang.Exception: Failed to process partition 2
	at org.example.Main$JobConfiguration.lambda$tasklet$1(Main.java:70)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:359)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:223)
	at jdk.proxy2/jdk.proxy2.$Proxy17.execute(Unknown Source)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:383)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:307)
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
	at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:250)
	at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
	at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:369)
	at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:206)
	at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:140)
	at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:235)
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:230)
	at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler.lambda$createTask$0(TaskExecutorPartitionHandler.java:132)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.lang.Thread.run(Thread.java:842)
[SimpleAsyncTaskExecutor-3] INFO org.springframework.batch.core.step.AbstractStep - Step: [workerStep:partition1] executed in 18ms
[SimpleAsyncTaskExecutor-4] INFO org.springframework.batch.core.step.AbstractStep - Step: [workerStep:partition2] executed in 18ms
[SimpleAsyncTaskExecutor-2] INFO org.springframework.batch.core.step.AbstractStep - Step: [workerStep:partition0] executed in 19ms
[main] ERROR org.springframework.batch.core.step.AbstractStep - Encountered an error executing step managerStep in job job
org.springframework.batch.core.JobExecutionException: Partition handler returned an unsuccessful step
	at org.springframework.batch.core.partition.support.PartitionStep.doExecute(PartitionStep.java:108)
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:230)
	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:153)
	at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:408)
	at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:127)
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:307)
	at org.springframework.batch.core.launch.support.TaskExecutorJobLauncher$1.run(TaskExecutorJobLauncher.java:155)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.batch.core.launch.support.TaskExecutorJobLauncher.run(TaskExecutorJobLauncher.java:146)
	at org.example.Main.main(Main.java:110)
[main] INFO org.springframework.batch.core.step.AbstractStep - Step: [managerStep] executed in 31ms
[main] INFO org.springframework.batch.core.launch.support.TaskExecutorJobLauncher - Job: [SimpleJob: [name=job]] completed with the following parameters: [{'fail':'{value=true, type=class java.lang.Boolean, identifying=false}','uuid':'{value=a44917e6-d27f-49a9-83ed-7bf30ef44997, type=class java.lang.String, identifying=true}'}] and the following status: [FAILED] in 46ms
#############
#############
[main] INFO org.springframework.batch.core.launch.support.TaskExecutorJobLauncher - Job: [SimpleJob: [name=job]] launched with the following parameters: [{'fail':'{value=false, type=class java.lang.Boolean, identifying=false}','uuid':'{value=a44917e6-d27f-49a9-83ed-7bf30ef44997, type=class java.lang.String, identifying=true}'}]
[main] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [managerStep]
[SimpleAsyncTaskExecutor-5] INFO org.springframework.batch.core.step.AbstractStep - Step: [workerStep:partition2] executed in 3ms
[main] INFO org.springframework.batch.core.step.AbstractStep - Step: [managerStep] executed in 11ms
[main] INFO org.springframework.batch.core.launch.support.TaskExecutorJobLauncher - Job: [SimpleJob: [name=job]] completed with the following parameters: [{'fail':'{value=false, type=class java.lang.Boolean, identifying=false}','uuid':'{value=a44917e6-d27f-49a9-83ed-7bf30ef44997, type=class java.lang.String, identifying=true}'}] and the following status: [COMPLETED] in 19ms
SimpleAsyncTaskExecutor-5 processing partition data = data2

on restart, only the failed partition is re-processed. Note how I used the same identifying job parameter to restart the same job instance and a non-identifying job parameter to use as a failure flag for the demo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment