Introduction to Step Listeners in Spring Batch
Listeners are an important part of Spring Batch's event-driven architecture. They allow you to hook into the lifecycle of a batch job and execute custom logic at specific points, such as before a step starts, after a chunk is processed, or when an item is skipped due to an error.
Here's a breakdown of the key listeners, their purpose, and examples.
StepExecutionListener
This listener is invoked at the start and end of a step. It's primarily used for step-level setup and teardown, like initializing a resource or performing final logging.
- Purpose: To perform logic before a step begins and after it ends.
- Methods:
-
beforeStep(StepExecution stepExecution)
: Called before a step's execution. -
afterStep(StepExecution stepExecution)
: Called after a step's execution. This method can return anExitStatus
to override the step's final status.
-
Example: Logging the start and end time of a step.
import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import java.util.Date; public class MyStepListener implements StepExecutionListener { @Override public void beforeStep(StepExecution stepExecution) { System.out.println("Step '" + stepExecution.getStepName() + "' is starting at " + new Date()); } @Override public ExitStatus afterStep(StepExecution stepExecution) { System.out.println("Step '" + stepExecution.getStepName() + "' finished with status " + stepExecution.getExitStatus().getExitCode() + " at " + new Date()); return stepExecution.getExitStatus(); } }
ChunkListener
A chunk-oriented step is where the read-process-write cycle occurs. This listener is invoked at the start and end of each chunk.
- Purpose: To perform actions for each batch or "chunk" of items being processed. A common use case is tracking the progress of a large job.
- Methods:
-
beforeChunk(ChunkContext context)
: Called before a chunk's transaction begins. -
afterChunk(ChunkContext context)
: Called after a chunk's transaction has committed successfully. -
afterChunkError(ChunkContext context)
: Called after a chunk fails.
-
Example: Counting and logging the number of items processed in each chunk.
import org.springframework.batch.core.ChunkListener; import org.springframework.batch.core.scope.context.ChunkContext; public class MyChunkListener implements ChunkListener { @Override public void beforeChunk(ChunkContext context) { // You could log a message here } @Override public void afterChunk(ChunkContext context) { int itemsWritten = context.getStepContext().getReadCount(); System.out.println("Chunk processed! Items written: " + itemsWritten); } @Override public void afterChunkError(ChunkContext context) { System.err.println("Error processing chunk!"); } }
ItemReadListener
, ItemProcessListener
, ItemWriteListener
These listeners are more granular and operate at the individual item level. They provide hooks for success and failure events for each read
, process
, and write
operation.
- Purpose: To perform specific actions on an item, like logging a failure or performing a retry.
Common Methods:
onReadError(Exception ex)
onProcessError(T item, Exception e)
onWriteError(Exception exception, List<? extends S> items)
Example:
This code snippet shows how to log a message for each successful read and log a skipped item if an error occurs.
import org.springframework.batch.core.ItemReadListener; import org.springframework.batch.core.ItemProcessListener; import org.springframework.batch.core.ItemWriteListener; import java.util.List; public class MyItemListener implements ItemReadListener<String>, ItemProcessListener<String, String>, ItemWriteListener<String> { @Override public void onReadError(Exception ex) { System.err.println("Failed to read item: " + ex.getMessage()); } @Override public void onProcessError(String item, Exception e) { System.err.println("Failed to process item: " + item + " - " + e.getMessage()); } @Override public void onWriteError(Exception exception, List<? extends String> items) { System.err.println("Failed to write chunk of " + items.size() + " items: " + exception.getMessage()); } // You can also add `after` methods for success cases, e.g., onRead, afterProcess, beforeWrite }
SkipListener
This is a specialized listener that is only invoked when a skippable exception occurs, allowing you to handle the skipped item explicitly.
- Purpose: To perform actions when an item is skipped. This is critical for data integrity and error handling.
- Methods:
-
onSkipInRead(Throwable t)
: Called when an error occurs during reading, and the item is skipped. -
onSkipInProcess(T item, Throwable t)
: Called when an error occurs during processing, and the item is skipped. -
onSkipInWrite(S item, Throwable t)
: Called when an error occurs during writing, and the item is skipped.
-
Example: Logging a skipped record to a separate file for later review.
import org.springframework.batch.core.SkipListener; public class MySkipListener implements SkipListener<String, String> { @Override public void onSkipInRead(Throwable t) { System.out.println("Skipped reading due to: " + t.getMessage()); } @Override public void onSkipInProcess(String item, Throwable t) { System.out.println("Skipped processing item: " + item + " due to: " + t.getMessage()); } @Override public void onSkipInWrite(String item, Throwable t) { System.out.println("Skipped writing item: " + item + " due to: " + t.getMessage()); } }
How to Use Listeners in a Step
To use any of these listeners, you must register them with the step configuration. This is typically done within the StepBuilder
.
Example Configuration:
import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.batch.core.ChunkListener; import org.springframework.batch.core.SkipListener; @Configuration @EnableBatchProcessing public class BatchConfig { @Autowired private JobRepository jobRepository; @Autowired private PlatformTransactionManager transactionManager; @Bean public Step myStep() { return new StepBuilder("myStep", jobRepository) .<String, String>chunk(10, transactionManager) .reader(...) .processor(...) .writer(...) // Registering the listeners .listener(new MyStepListener()) .listener(new MyChunkListener()) .listener(new MySkipListener()) .build(); } }
By using listeners, you can build more resilient and observable batch jobs, separating your core business logic from cross-cutting concerns like logging, monitoring, and error handling.
Top comments (0)