Message Queue - Postgres
Overview
Message Queue implementation using PostgreSQL
Github: https://github.com/gitorko/project81
Message Queue
PostgreSQL can be used a messaging queue, it also offers features like LISTEN/NOTIFY which make it a suitable to support message queue.
Advantages
- Reuse existing infrastructure - Use an existing database keeping the tech stack simple.
- Low messages throughput - Not every system needs high volume of messages to process per second.
- Persistent Store - You can query the db to check the messages if they are processed and manually trigger re-queue.
This command notifies the channel of a new message in the queue
1NOTIFY new_task_channel, 'New task added'; This command listens for these notifications
1LISTEN new_task_channel; You also need to lock the row being read to avoid the same row from being updated by 2 different transactions
select * from table FOR SHARE - This clause locks the selected rows for read, other threads can read but cant modify. select * from table FOR UPDATE - This clause locks the selected rows for update. This prevents other transactions from reading/modifying these rows until the current transaction is completed (committed or rolled back) select * from table FOR UPDATE SKIP LOCKED clause - This clause tells the database to skip rows that are already locked by another transaction. Instead of waiting for the lock to be released
select * from table FOR NO KEY SHARE - Use this when you want to ensure that no other transaction can obtain locks that would conflict with your current transaction’s updates, but you do not need to prevent other transactions from acquiring FOR SHARE locks. select * from table FOR NO KEY UPDATE - Use this when you need to prevent all types of locks that could conflict with your updates, providing a more restrictive lock compared to FOR NO KEY SHARE
Disadvantages
- Missing notifications if a worker is disconnected.
- Row-level locking is needed to prevent multiple workers from picking up the same message.
Code
1package com.demo.project81.config; 2 3import com.demo.project81.service.NotificationHandler; 4import com.demo.project81.service.NotifierService; 5import lombok.RequiredArgsConstructor; 6import lombok.extern.slf4j.Slf4j; 7import org.springframework.boot.CommandLineRunner; 8import org.springframework.context.annotation.Bean; 9import org.springframework.context.annotation.Configuration; 10 11@Configuration 12@RequiredArgsConstructor 13@Slf4j 14public class ListenerConfiguration { 15 16 @Bean 17 CommandLineRunner startListener(NotifierService notifier, NotificationHandler handler) { 18 return (args) -> { 19 log.info("Starting task listener thread..."); 20 Thread.ofVirtual().name("task-listener").start(notifier.createNotificationHandler(handler)); 21 }; 22 } 23} 1package com.demo.project81.service; 2 3import java.time.LocalDateTime; 4import java.util.function.Consumer; 5 6import com.demo.project81.domain.Task; 7import lombok.RequiredArgsConstructor; 8import lombok.extern.slf4j.Slf4j; 9import org.postgresql.PGNotification; 10import org.springframework.stereotype.Component; 11 12@Component 13@Slf4j 14@RequiredArgsConstructor 15public class NotificationHandler implements Consumer<PGNotification> { 16 17 final TaskService taskService; 18 19 @Override 20 public void accept(PGNotification t) { 21 log.info("Notification received: pid={}, name={}, param={}", t.getPID(), t.getName(), t.getParameter()); 22 Task task = taskService.findByIdWithLock(Long.valueOf(t.getParameter())); 23 task.setProcessedAt(LocalDateTime.now()); 24 task.setProcessedBy(taskService.getHostName() + "_" + Thread.currentThread().getName()); 25 taskService.save(task); 26 log.info("Processed Task: {}", task); 27 } 28 29} 1package com.demo.project81.service; 2 3import java.sql.Connection; 4import java.util.function.Consumer; 5 6import com.demo.project81.domain.Task; 7import lombok.RequiredArgsConstructor; 8import lombok.extern.slf4j.Slf4j; 9import org.postgresql.PGConnection; 10import org.postgresql.PGNotification; 11import org.springframework.jdbc.core.JdbcTemplate; 12import org.springframework.stereotype.Service; 13import org.springframework.transaction.annotation.Transactional; 14 15@Service 16@RequiredArgsConstructor 17@Slf4j 18public class NotifierService { 19 20 static final String TASK_CHANNEL = "tasks"; 21 final JdbcTemplate jdbcTemplate; 22 23 @Transactional 24 public void notifyTaskCreated(Task task) { 25 log.info("Notifying task channel!"); 26 jdbcTemplate.execute("NOTIFY " + TASK_CHANNEL + ", '" + task.getId() + "'"); 27 } 28 29 public Runnable createNotificationHandler(Consumer<PGNotification> consumer) { 30 return () -> { 31 jdbcTemplate.execute((Connection connection) -> { 32 log.info("notificationHandler: sending LISTEN command..."); 33 connection.createStatement().execute("LISTEN " + TASK_CHANNEL); 34 35 PGConnection pgConnection = connection.unwrap(PGConnection.class); 36 37 while (!Thread.currentThread().isInterrupted()) { 38 PGNotification[] notifications = pgConnection.getNotifications(10000); 39 if (notifications == null || notifications.length == 0) { 40 continue; 41 } 42 for (PGNotification nt : notifications) { 43 consumer.accept(nt); 44 } 45 } 46 return 0; 47 }); 48 49 }; 50 } 51} 1package com.demo.project81.service; 2 3import java.net.InetAddress; 4import java.time.LocalDateTime; 5 6import com.demo.project81.domain.Task; 7import com.demo.project81.repository.TaskRepository; 8import lombok.RequiredArgsConstructor; 9import lombok.SneakyThrows; 10import org.springframework.stereotype.Service; 11import org.springframework.transaction.annotation.Transactional; 12 13@Service 14@RequiredArgsConstructor 15public class TaskService { 16 17 final TaskRepository taskRepository; 18 final NotifierService notifier; 19 20 @Transactional(readOnly = true) 21 public Task findById(Long id) { 22 return taskRepository.findById(id).orElseThrow(); 23 } 24 25 @Transactional 26 public Task findByIdWithLock(Long id) { 27 return taskRepository.findByIdWithLock(id); 28 } 29 30 @Transactional 31 public Task queueTask(Task task) { 32 task.setCreatedAt(LocalDateTime.now()); 33 task.setCreatedBy(getHostName() + "_" + Thread.currentThread().getName()); 34 task = taskRepository.save(task); 35 notifier.notifyTaskCreated(task); 36 return task; 37 } 38 39 @SneakyThrows 40 public String getHostName() { 41 return InetAddress.getLocalHost().getHostName(); 42 } 43 44 @Transactional 45 public Task save(Task task) { 46 return taskRepository.save(task); 47 } 48} Postman
Import the postman collection to postman
Setup
1# Project 81 2 3Message Queue - Postgres 4 5[https://gitorko.github.io/post/message-queue-postgres](https://gitorko.github.io/post/message-queue-postgres) 6 7### Version 8 9Check version 10 11```bash 12$java --version 13openjdk 21.0.3 2024-04-16 LTS 14``` 15 16### Postgres DB 17 18```bash 19docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:14 20docker ps 21docker exec -it pg-container psql -U postgres -W postgres 22CREATE USER test WITH PASSWORD 'test@123'; 23CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0; 24grant all PRIVILEGES ON DATABASE "test-db" to test; 25 26docker stop pg-container 27docker start pg-container 28``` 29 30### Dev 31 32To run the backend in dev mode. 33 34```bash 35./gradlew clean build 36./gradlew bootRun 37``` 38 References
https://www.postgresql.org/docs/current/sql-listen.html