DEV Community

Janardhan Chejarla
Janardhan Chejarla

Posted on • Edited on

Distributed Spring Batch Coordination, Part 4: Getting Started with the Framework

In this part of the series, we'll walk through how to set up and run a distributed Spring Batch job using the open-source database-backed coordination framework we introduced earlier.

✅ By the end of this guide, you’ll have a working multi-node Spring Batch cluster that can process large datasets in parallel — with no messaging queues or external coordination middleware required.


📦 Step 1: Add the Framework Dependency

First, include the published artifact from Maven Central.

<dependency> <groupId>io.github.jchejarla</groupId> <artifactId> spring-batch-db-cluster-core</artifactId> <version>2.0.0</version> </dependency> <!-- XML Writing Support --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> </dependency> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.4.21</version> </dependency> 
Enter fullscreen mode Exit fullscreen mode

Make sure you're using Spring Batch 5+ and Spring Boot 3+.


🧠 Step 2: Configure Your Cluster

Each node in the cluster is a regular Spring Boot application that shares access to the same relational database (PostgreSQL, Oracle, or MySQL).

In your application.yml, configure node identity and coordination settings:

spring: batch: enabled: true node-id: worker-${random.uuid} grid-size: 4 # This is  heartbeat-interval: 5000 task-polling-interval: 2000 
Enter fullscreen mode Exit fullscreen mode

📝 Note: While using worker-${random.uuid} helps during local testing, it's recommended to assign static node IDs (e.g., worker-1, worker-2, etc.) in production environments. This improves observability, debugging, and partition tracking across runs.

Make sure all nodes point to the same database and include the required tables.

⚠️ Important: This framework extends Spring Batch — so make sure to initialize the core Spring Batch schema before applying the coordination schema.

You can find the official Spring Batch schema files here.

Then, execute the coordination framework's schema available here.

📝 Note: We currently support Oracle, MySQL, PostgreSQL, and H2 (for testing purposes).

If you need support for another database, please open an issue on the Feature Request page.


⚙️ Step-by-Step: CSV to XML Job

⚙️ Step 1: Sample CSV File

We’ll use a sample file with 10,000 customer records. You can download the CSV from the GitHub example repo:

📄 customers_example_data.csv

customer_id,first_name,last_name,email,signup_date C00001,Michael,Miller,michael.miller1@test.net,2023-01-03 C00002,David,Brown,david.brown2@test.net,2023-10-01 ... 
Enter fullscreen mode Exit fullscreen mode

⚙️ Step 2: Define the Customer POJO

@Setter @Getter @ToString public class Customer { private String customerId; private String firstName; private String lastName; private String email; private String signupDate; } 
Enter fullscreen mode Exit fullscreen mode

⚙️ Step 3: ItemReader Setup

 @Bean("customerReader") @StepScope public FlatFileItemReader<Customer> customerReader( @Value("#{stepExecutionContext['startRow']}") long startIndex, @Value("#{stepExecutionContext['endRow']}") long endIndex, @Value("#{jobParameters['inputFile']}") String inputFile, LineMapper<Customer> lineMapper) { FlatFileItemReader<Customer> reader = new FlatFileItemReader<Customer>() { private int currentLine = 0; @Override public Customer read() throws Exception { Customer customer; while ((customer = super.read()) != null) { currentLine++; if (currentLine < startIndex) { continue; } if (currentLine > endIndex) { return null; } return customer; } return null; } }; reader.setResource(new FileSystemResource(inputFile)); reader.setLinesToSkip(1); // Skip header reader.setLineMapper(lineMapper); return reader; } @Bean public LineMapper<Customer> lineMapper() { DefaultLineMapper<Customer> mapper = new DefaultLineMapper<Customer>(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("customer_id", "first_name", "last_name", "email", "signup_date"); BeanWrapperFieldSetMapper<Customer> fieldSetMapper = new BeanWrapperFieldSetMapper<Customer>(); fieldSetMapper.setTargetType(Customer.class); mapper.setLineTokenizer(tokenizer); mapper.setFieldSetMapper(fieldSetMapper); return mapper; } 
Enter fullscreen mode Exit fullscreen mode

⚙️ Step 4: Optional Processor

@Bean public ItemProcessor<Customer, Customer> customerProcessor() { return item -> item; // No-op for now } 
Enter fullscreen mode Exit fullscreen mode

⚙️ Step 5: XML Writer

@StepScope @Bean public StaxEventItemWriter<Customer> customerXmlWriter( @Value("#{jobParameters['outputDir']}") String outputDir, @Value("#{stepExecutionContext['partitionId']}") String partitionId ) { StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>(); writer.setRootTagName("customers"); writer.setResource(new FileSystemResource(outputDir + "/customers-part-" + partitionId + ".xml")); XStreamMarshaller marshaller = new XStreamMarshaller(); marshaller.setAliases(Map.of("customer", Customer.class)); writer.setMarshaller(marshaller); return writer; } 
Enter fullscreen mode Exit fullscreen mode

⚙️ Step 6: Job Config with Partitioning Strategy

Use round-robin or fixed-node allocation. Each ExecutionContext contains:

 context.putInt("startRow", i * range + 1); // Skip header context.putInt("endRow", (i + 1) * range); context.putInt("partitionId", i); 
Enter fullscreen mode Exit fullscreen mode

Complete Job Config:

import io.github.jchejarla.springbatch.clustering.api.ClusterAwarePartitioner; import io.github.jchejarla.springbatch.clustering.api.PartitionStrategy; import io.github.jchejarla.springbatch.clustering.autoconfigure.conditions.ConditionalOnClusterEnabled; import io.github.jchejarla.springbatch.clustering.partition.ClusterAwarePartitionHandler; import io.github.jchejarla.springbatch.clustering.partition.PartitionTransferableProp; import io.github.jchejarla.springbatch.clustering.partition.PartitioningMode; import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.partition.support.Partitioner; import org.springframework.batch.core.partition.support.StepExecutionAggregator; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.xml.StaxEventItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import java.util.ArrayList; import java.util.List; @Slf4j @Configuration @EnableBatchProcessing @ConditionalOnClusterEnabled public class ETLJobConfig { @Autowired private FlatFileItemReader<Customer> customerItemReader; @Autowired private CustomerProcessor customerProcessor; @Autowired private StaxEventItemWriter<Customer> customerXmlWriter; @Autowired @Qualifier("etlJobPartitioner") private Partitioner partitioner; @Autowired private ClusterAwarePartitionHandler partitionHandler; @Bean("etlClusteredJob") public Job clusteredJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager, @Qualifier("multiStepAggregator") StepExecutionAggregator clusterAwareAggregator) { return new JobBuilder("etl-clustered-job", jobRepository) .incrementer(new RunIdIncrementer()) .preventRestart() .start(multiNodeExecutionStep(jobRepository, platformTransactionManager, clusterAwareAggregator)) .build(); } public Step multiNodeExecutionStep(JobRepository jobRepository, PlatformTransactionManager txnManager, StepExecutionAggregator clusterAwareAggregator) { return new StepBuilder("etlStep.manager", jobRepository) .partitioner("multiNodeExecStep", partitioner) .partitionHandler(partitionHandler) .step(etlReaderWriterStep(jobRepository, txnManager)) .aggregator(clusterAwareAggregator) .build(); } @Bean public Step etlReaderWriterStep(JobRepository jobRepository, PlatformTransactionManager txnManager) { return new StepBuilder("etlReaderWriterStep", jobRepository).<Customer, Customer>chunk(100, txnManager) .reader(customerItemReader) .processor(customerProcessor) .writer(customerXmlWriter) .build(); } @Bean("etlJobPartitioner") @StepScope public Partitioner etlJobPartitioner(@Value("#{jobParameters['rows']}") Integer rows) { return new ClusterAwarePartitioner() { @Override public List<ExecutionContext> createDistributedPartitions(int availableNodeCount) { int range = rows / availableNodeCount; List<ExecutionContext> partitions = new ArrayList<>(availableNodeCount); for (int i = 0; i < availableNodeCount; i++) { ExecutionContext context = new ExecutionContext(); context.putInt("startRow", i * range + 1); // Skip header context.putInt("endRow", (i + 1) * range); context.putInt("partitionId", i); partitions.add(context); } return partitions; } @Override public PartitionTransferableProp arePartitionsTransferableWhenNodeFailed() { return PartitionTransferableProp.YES; } @Override public PartitionStrategy buildPartitionStrategy() { return PartitionStrategy.builder().partitioningMode(PartitioningMode.ROUND_ROBIN).build(); } }; } } 
Enter fullscreen mode Exit fullscreen mode

⚙️ Step 7: Launching Job with parameters

JobParameters parameters = new JobParametersBuilder() .addString("RUN_TIME", LocalDateTime.now().toString(), true) .addLong("rows", rows) .addString("inputFile", ETL_JOB_INPUT_FILE) .addString("outputDir", ETL_JOB_OUTPUT_DIR) .toJobParameters(); Job job = applicationContext.getBean("etlClusteredJob", Job.class); try { JobExecution jobExecution = jobLauncher.run(job, parameters); } catch(Exception e) { throw new RuntimeException("Exception occurred when launching the Job", e); } 
Enter fullscreen mode Exit fullscreen mode

✅ What’s Next (Part 5 Preview)

In the next part, we’ll cover:

  • 📈 Monitoring batch executions (e.g., via /actuator/batch-cluster)
  • ⚠️ Failure handling and retries
  • 📊 Best practices for large-scale ingestion
  • 🚀 Future roadmap (e.g., metrics, better dashboards)

Top comments (0)