Building resilient scheduling in distributed systems with Spring Marek Jeszka @logic_marc
Agenda 一 Background 一 Approach 一 Results 一 Conclusion
Use-case 一 asynchronous communication 一 reliable processing 一 better visibility
@Component public class SimpleService { @Scheduled(cron = "0 * * * * *") public void runQuiteOften() { // process events } }
Distributed Systems 一 vertical scaling • actually it doesn’t scale… 一 horizontal scaling • cost-efficiency • higher reliability • easier to expand
Running on a single node 一 How to select the node? 一 Where to keep information about the selected node?
Instance 1 Instance 2 Instance 3
Instance 1 Instance 2 Instance 3 Am I a leader?
Leader election in Spring-based application
@Component public class SimpleService { @Scheduled(cron = "0 * * * * *") public void runQuiteOften() { // process events } @RunIfLeader
@Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.METHOD }) public @interface RunIfLeader { }
Aspect Oriented Programming with Spring 一 Aspect - crosscutting concern • Logging • Transaction management <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> 一 Enabled with dependency:
Types of advices 一 After 一 Around 一 Before
@Aspect public class RunIfLeaderAspect { @Around("@annotation(com.n26.RunIfLeader) && execution(void *(..))") public void annotatedMethod(ProceedingJoinPoint joinPoint) throws Throwable { if (isLeader()) { joinPoint.proceed(); } // do not execute }
Why we didn’t like it? 一 No clear separation between business and scheduling logic 一 Hard to test 一 Scheduled jobs spread across the application
Issue with the @SqsListener @SqsListener(value = "eventsQueue", deletionPolicy = ON_SUCCESS) @RunIfLeader void onEvent(String eventAsJson) { // process event }
Selecting a leader in programmatic approach
Programmatic approach 一 SchedulingConfigurer from org.springframework.scheduling.annotation public interface SchedulingConfigurer { void configureTasks( ScheduledTaskRegistrar taskRegistrar); }
@Configuration @EnableScheduling public class SchedulingConfig implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.addCronTask( new CronTask(() -> { // process events }, "0 * * * * *")); }
@Autowired private Runnable processEventsTask; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.addCronTask( new CronTask(processEventsTask, "0 * * * * *")); } @Component public class ProcessEventsTask implements Runnable { @Override public void run() { // process events } }
What are the benefits of programmatic approach? 一 Tasks are scheduled in one place 一 Custom executor service
@Configuration @EnableScheduling public class SchedulingConfig implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(taskScheduler()); } @Bean(destroyMethod = "shutdown") public ExecutorService taskScheduler() { return Executors.newScheduledThreadPool( 4, // pool size new ThreadFactoryBuilder() .setNameFormat("scheduler-thread-%d").build()); } }
What are the benefits of programmatic approach? 一 Tasks are scheduled in one place 一 Custom executor service 一 Convenient testing
@RunWith(MockitoJUnitRunner.class) public class SchedulingConfigTest { @InjectMocks private SchedulingConfig underTest; @Mock private ScheduledTaskRegistrar taskRegistrarMock; @Mock private ProcessEventsTask processEventsTaskMock; @Test public void schedulesCronTask() { underTest.configureTasks(taskRegistrarMock); verify(taskRegistrarMock) .addCronTask(processEventsTaskMock, "0 * * * * *"); }
@RunWith(MockitoJUnitRunner.class) public class SchedulingConfigTest { @InjectMocks private SchedulingConfig underTest; @Mock private ScheduledTaskRegistrar taskRegistrarMock; @Test public void usesScheduledThreadPoolExecutor() { ArgumentCaptor<ScheduledThreadPoolExecutor> captor = forClass(ScheduledThreadPoolExecutor.class); underTest.configureTasks(taskRegistrarMock); verify(taskRegistrarMock).setScheduler(captor.capture()); assertThat(captor.getValue().getCorePoolSize()).isEqualTo(4); }
@Configuration @EnableScheduling public class SchedulingConfig implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.addCronTask( new CronTask(() -> { if (isLeader()) { // process events } }, "0 * * * * *")); }
@Configuration @EnableScheduling public class SchedulingConfig implements SchedulingConfigurer { @Autowired private Runnable processEventsTask; @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { Runnable leaderAwareTask = new LeaderAwareTaskDecorator(processEventsTask); taskRegistrar.addCronTask( new CronTask(leaderAwareTask, "0 * * * * *")); }
public final class LeaderAwareTaskDecorator implements Runnable { private Runnable delegate; public LeaderAwareTaskDecorator(Runnable delegate) { this.delegate = delegate; } @Override public void run() { if (isLeader()) { delegate.run(); } }
Resiliency 一 What if the response didn’t come? 一 Can we safely repeat? • Duplicate entries created 一 Is the action idempotent? • One or multiple identical requests give the same result
Improvements 一 Distribute the jobs SELECT * FROM events FOR UPDATE SKIP LOCKED;
Further improvements SQS queue Instance 1 Instance 2 Instance 3
What have we learned? 一 Annotation-driven development is hard 一 Keep (code) consistency 一 Increase resilience & predictability 一 Think about observability
Thank you Questions?
References 一 AOP: https://docs.spring.io/spring/docs/2.5.x /reference/aop.html 一 SchedulingConfigurer: https://docs.spring.io/spring- framework/docs/current/javadoc- api/org/springframework/scheduling/a nnotation/SchedulingConfigurer.html 一 Postgresql select: https://www.postgresql.org/docs/9.5/s ql-select.html

Building resilient scheduling in distributed systems with Spring

Editor's Notes

  • #2 At first sight scheduling a job seems like a trivial task, especially when there are not that many tasks that are performed by the scheduler and performance is not a huge factor. But is there a simple solution available that could enable handling background jobs in a distributed environment? We want to keep the amount of dependencies low and avoid problem when there is a higher chance of failure.
  • #3 Evolutionary approach - try simple, extend afterwards.
  • #4 Resilience as a main requirement. Benefits: fast/safe processing of the events stream and better visibility (amount of items to process).
  • #5 Service received event, it was stored into database as a reaction and job was taking these items to process them and send them to partner.
  • #6 Job scheduling in Unix environments
  • #7 How we can handle growing amount of work?
  • #8 Same application on multiple nodes. Without going into details: we can use different machines or containerization.
  • #10 Spring philosophy is to keep servers stateless, which means they are not aware of each other. Microservices are not aware about the scaling method, so how they could know where to look for another node? Another machine? Another container? Or a pod in Kubernetes cluster?
  • #11 We need third-party service to keep the notation of the leader. We can call it orchestrator. Services can register themselves.
  • #13 Let’s introduce our own annotation - it will be used to verify if the method should run.
  • #14 Retention - annotations can be read from source files, class files, or reflectively at run time. Target - declaration contexts, in our case a method will be marked with this annotation. Let’s see how to “teach” Spring to understand this annotation.
  • #15 To enable this annotation AOP is used - this looks like cross-cutting concern similar to logging or monitoring. AOP framework - independent from IoC.
  • #16 @Around is needed because inside we need to decide if the method should be called (@Before would be not enough - it is unconditional)
  • #17 Spring can interpret aspects defined in XML configuration or defined with use of AspectJ annotation. It’s developer’s informed decision which one to use. Execution expression has two required patterns: returning type and name/params.
  • #18 Simple, but there is no separation between business logic and scheduling. Adding additional class that is responsible for scheduling and just calls this method looks like overdoing things. There is no easy way to verify if something is scheduled.
  • #20 Let’s try a naive approach.
  • #21 Used for setting a specific task scheduler (i.e. executor service - next slide). And of course - for registering scheduled task in programmatic fashion.
  • #25 An object that executes submitted Runnable tasks, requirement is that it needs to be of type scheduled. @Bean(destroyMethod="shutdown") - ensures that the task executor is properly shut down when the Spring application context itself is closed. We can control pool size of this executor - the number of threads to keep in the pool. (most importantly) We can specify the naming of the threads, which increases the visibility in the logs. (builder is from Guava library)
  • #32 Network partition = split brain.
  • #33 If we are able to provide ID from outside (from the action performed by the scheduler) it will be safe.
  • #35 Items are
  • #36 Maybe more complex setup, but with Spring-type consistency. Multiple instances, automatic switch to a new leader in case of a failure. Good visibility in logs and we can test it quite well.