In this part of the series, we'll walk through how to set up and run a distributed Spring Batch job using the open-source database-backed coordination framework we introduced earlier.
✅ By the end of this guide, you’ll have a working multi-node Spring Batch cluster that can process large datasets in parallel — with no messaging queues or external coordination middleware required.
📦 Step 1: Add the Framework Dependency
First, include the published artifact from Maven Central.
<dependency> <groupId>io.github.jchejarla</groupId> <artifactId> spring-batch-db-cluster-core</artifactId> <version>2.0.0</version> </dependency> <!-- XML Writing Support --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> </dependency> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.4.21</version> </dependency>
Make sure you're using Spring Batch 5+ and Spring Boot 3+.
🧠 Step 2: Configure Your Cluster
Each node in the cluster is a regular Spring Boot application that shares access to the same relational database (PostgreSQL, Oracle, or MySQL).
In your application.yml
, configure node identity and coordination settings:
spring: batch: enabled: true node-id: worker-${random.uuid} grid-size: 4 # This is heartbeat-interval: 5000 task-polling-interval: 2000
📝 Note: While using
worker-${random.uuid}
helps during local testing, it's recommended to assign static node IDs (e.g.,worker-1
,worker-2
, etc.) in production environments. This improves observability, debugging, and partition tracking across runs.
Make sure all nodes point to the same database and include the required tables.
⚠️ Important: This framework extends Spring Batch — so make sure to initialize the core Spring Batch schema before applying the coordination schema.
You can find the official Spring Batch schema files here.
Then, execute the coordination framework's schema available here.
📝 Note: We currently support Oracle, MySQL, PostgreSQL, and H2 (for testing purposes).
If you need support for another database, please open an issue on the Feature Request page.
⚙️ Step-by-Step: CSV to XML Job
⚙️ Step 1: Sample CSV File
We’ll use a sample file with 10,000 customer records. You can download the CSV from the GitHub example repo:
customer_id,first_name,last_name,email,signup_date C00001,Michael,Miller,michael.miller1@test.net,2023-01-03 C00002,David,Brown,david.brown2@test.net,2023-10-01 ...
⚙️ Step 2: Define the Customer POJO
@Setter @Getter @ToString public class Customer { private String customerId; private String firstName; private String lastName; private String email; private String signupDate; }
⚙️ Step 3: ItemReader Setup
@Bean("customerReader") @StepScope public FlatFileItemReader<Customer> customerReader( @Value("#{stepExecutionContext['startRow']}") long startIndex, @Value("#{stepExecutionContext['endRow']}") long endIndex, @Value("#{jobParameters['inputFile']}") String inputFile, LineMapper<Customer> lineMapper) { FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>() { private int currentLine = 0; @Override public Customer read() throws Exception { Customer customer; while ((customer = super.read()) != null) { currentLine++; if (currentLine < startIndex) { continue; } if (currentLine > endIndex) { return null; } return customer; } return null; } }; reader.setResource(new FileSystemResource(inputFile)); reader.setLinesToSkip(1); // Skip header reader.setLineMapper(lineMapper); return reader; } @Bean public LineMapper<Customer> lineMapper() { DefaultLineMapper<Customer> mapper = new DefaultLineMapper<Customer>(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("customer_id", "first_name", "last_name", "email", "signup_date"); BeanWrapperFieldSetMapper<Customer> fieldSetMapper = new BeanWrapperFieldSetMapper<Customer>(); fieldSetMapper.setTargetType(Customer.class); mapper.setLineTokenizer(tokenizer); mapper.setFieldSetMapper(fieldSetMapper); return mapper; }
⚙️ Step 4: Optional Processor
@Bean public ItemProcessor<Customer, Customer> customerProcessor() { return item -> item; // No-op for now }
⚙️ Step 5: XML Writer
@StepScope @Bean public StaxEventItemWriter<Customer> customerXmlWriter( @Value("#{jobParameters['outputDir']}") String outputDir, @Value("#{stepExecutionContext['partitionId']}") String partitionId ) { StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>(); writer.setRootTagName("customers"); writer.setResource(new FileSystemResource(outputDir + "/customers-part-" + partitionId + ".xml")); XStreamMarshaller marshaller = new XStreamMarshaller(); marshaller.setAliases(Map.of("customer", Customer.class)); writer.setMarshaller(marshaller); return writer; }
⚙️ Step 6: Job Config with Partitioning Strategy
Use round-robin or fixed-node allocation. Each ExecutionContext
contains:
context.putInt("startRow", i * range + 1); // Skip header context.putInt("endRow", (i + 1) * range); context.putInt("partitionId", i);
Complete Job Config:
import io.github.jchejarla.springbatch.clustering.api.ClusterAwarePartitioner; import io.github.jchejarla.springbatch.clustering.api.PartitionStrategy; import io.github.jchejarla.springbatch.clustering.autoconfigure.conditions.ConditionalOnClusterEnabled; import io.github.jchejarla.springbatch.clustering.partition.ClusterAwarePartitionHandler; import io.github.jchejarla.springbatch.clustering.partition.PartitionTransferableProp; import io.github.jchejarla.springbatch.clustering.partition.PartitioningMode; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.core.partition.support.StepExecutionAggregator; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.xml.StaxEventItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import java.util.ArrayList; import java.util.List; @Slf4j @Configuration @EnableBatchProcessing @ConditionalOnClusterEnabled public class ETLJobConfig { @Autowired private FlatFileItemReader<Customer> customerItemReader; @Autowired private CustomerProcessor customerProcessor; @Autowired private StaxEventItemWriter<Customer> customerXmlWriter; @Autowired @Qualifier("etlJobPartitioner") private Partitioner partitioner; @Autowired private ClusterAwarePartitionHandler partitionHandler; @Bean("etlClusteredJob") public Job clusteredJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager, @Qualifier("multiStepAggregator") StepExecutionAggregator clusterAwareAggregator) { return new JobBuilder("etl-clustered-job", jobRepository) .incrementer(new RunIdIncrementer()) .preventRestart() .start(multiNodeExecutionStep(jobRepository, platformTransactionManager, clusterAwareAggregator)) .build(); } public Step multiNodeExecutionStep(JobRepository jobRepository, PlatformTransactionManager txnManager, StepExecutionAggregator clusterAwareAggregator) { return new StepBuilder("etlStep.manager", jobRepository) .partitioner("multiNodeExecStep", partitioner) .partitionHandler(partitionHandler) .step(etlReaderWriterStep(jobRepository, txnManager)) .aggregator(clusterAwareAggregator) .build(); } @Bean public Step etlReaderWriterStep(JobRepository jobRepository, PlatformTransactionManager txnManager) { return new StepBuilder("etlReaderWriterStep", jobRepository).<Customer, Customer>chunk(100, txnManager) .reader(customerItemReader) .processor(customerProcessor) .writer(customerXmlWriter) .build(); } @Bean("etlJobPartitioner") @StepScope public Partitioner etlJobPartitioner(@Value("#{jobParameters['rows']}") Integer rows) { return new ClusterAwarePartitioner() { @Override public List<ExecutionContext> createDistributedPartitions(int availableNodeCount) { int range = rows / availableNodeCount; List<ExecutionContext> partitions = new ArrayList<>(availableNodeCount); for (int i = 0; i < availableNodeCount; i++) { ExecutionContext context = new ExecutionContext(); context.putInt("startRow", i * range + 1); // Skip header context.putInt("endRow", (i + 1) * range); context.putInt("partitionId", i); partitions.add(context); } return partitions; } @Override public PartitionTransferableProp arePartitionsTransferableWhenNodeFailed() { return PartitionTransferableProp.YES; } @Override public PartitionStrategy buildPartitionStrategy() { return PartitionStrategy.builder().partitioningMode(PartitioningMode.ROUND_ROBIN).build(); } }; } }
⚙️ Step 7: Launching Job with parameters
JobParameters parameters = new JobParametersBuilder() .addString("RUN_TIME", LocalDateTime.now().toString(), true) .addLong("rows", rows) .addString("inputFile", ETL_JOB_INPUT_FILE) .addString("outputDir", ETL_JOB_OUTPUT_DIR) .toJobParameters(); Job job = applicationContext.getBean("etlClusteredJob", Job.class); try { JobExecution jobExecution = jobLauncher.run(job, parameters); } catch(Exception e) { throw new RuntimeException("Exception occurred when launching the Job", e); }
✅ What’s Next (Part 5 Preview)
In the next part, we’ll cover:
- 📈 Monitoring batch executions (e.g., via
/actuator/batch-cluster
) - ⚠️ Failure handling and retries
- 📊 Best practices for large-scale ingestion
- 🚀 Future roadmap (e.g., metrics, better dashboards)
Top comments (0)