DEV Community

Cover image for Building a Queue System with Supabase and PostgreSQL
Rodrigo Mansueli for Supabase

Posted on • Originally published at blog.mansueli.com

Building a Queue System with Supabase and PostgreSQL

Supabase is an open-source alternative to Firebase that offers a wide range of capabilities for building robust and scalable applications. One of the key advantages of Supabase is its integration with PostgreSQL, a powerful and reliable database management system. In this blog post, we will explore how to leverage Supabase and PostgreSQL to create a highly efficient queue system.

A queue system is crucial for handling asynchronous tasks and managing job processing. By utilizing a queue, we can ensure that tasks are processed in a controlled and organized manner, improving performance and scalability. Let's dive into the process of setting up a queue system with Supabase and PostgreSQL.

Setting up the Queue System

To set up the queue system, we need to enable the required extensions in PostgreSQL. Specifically, we need to enable the pg_net and pg_cron extensions, which provide the necessary functionality for our queue system.

Please navigate to the Extensions page in your Supabase project and enable both the pg_net and pg_cron extensions.

Next, we can define the necessary table structure. The three key tables we'll be working with are job_queue, current_jobs, and workers. The job_queue table stores the pending jobs, while the current_jobs table keeps track of the jobs currently being processed. The workers table maintains records of the available worker instances.

To create these tables, execute the following SQL code:

CREATE TABLE job_queue ( job_id serial PRIMARY KEY, http_verb TEXT NOT NULL CHECK (http_verb IN ('GET', 'POST', 'DELETE')), payload jsonb, status TEXT NOT NULL DEFAULT '', retry_count INTEGER DEFAULT 0, retry_limit INTEGER DEFAULT 10, url_path TEXT DEFAULT '', content TEXT DEFAULT '', created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE TABLE current_jobs ( request_id BIGINT NOT NULL, job_id BIGINT NOT NULL ); CREATE TABLE workers ( id SERIAL PRIMARY KEY, locked BOOLEAN NOT NULL DEFAULT FALSE ); -- Adding two workers: INSERT INTO workers (locked) VALUES (FALSE), (FALSE); -- You can also add a worker with: INSERT INTO workers DEFAULT VALUES; -- Adding 10 workers: do $$ begin execute ( select string_agg('INSERT INTO workers DEFAULT VALUES',';') from generate_series(1,10) ); end; $$; 
Enter fullscreen mode Exit fullscreen mode

Functions for Job Processing

To process jobs in the queue system, we must define several functions that work together. Let's discuss the purpose and functionality of each function:

  • process_current_jobs_if_unlocked:This function checks if there is an available worker by selecting an unlocked worker from the workers table. If a worker is found, it locks the worker, processes the current jobs using the process_current_jobs function, and unlocks the worker afterward.

  • process_current_jobs: This function iterates over the records in the current_jobs table and retrieves the response from the pg_net extension using the net._http_collect_response function. It checks the status and status code of the response and performs the necessary actions based on the result. If the job is successful, it updates the job_queue table with the completion status and content and deletes the corresponding record from the current_jobs table. If the job fails, it updates the job_queue table with the failure status and increases the retry count, and deletes the record from the current_jobs table. If the job is still in progress or not found, it raises a notice.

  • process_job: This trigger function is called after a record is inserted into the job_queue table. It updates the status of the job to "processing" and retrieves the API key from the vault.decrypted_secrets table. It then calls the request_wrapper function to process the job by making an HTTP request to the specified URL. The request ID is inserted into the current_jobs table for tracking purposes.

  • request_wrapper: This function is a convenience wrapper around the pg_net extension functions (net.http_delete, net.http_post, and net.http_get) for making HTTP requests. It accepts the method, URL, parameters, body, and headers as input and returns the request ID.

  • retry_failed_jobs: This function is responsible for retrying failed jobs. It selects the failed jobs from the job_queue table based on the status and retry count. It then updates the retry count and calls the request_wrapper function to process the job again. The new request ID is inserted into the current_jobs table.

The trigger function process_job() plays a vital role in executing job processing when a new job is added to the queue. Then, there are schedule functions to update the status and response acquired from the requests.

--  -- Execute current jobs function if there are available workers -- CREATE OR REPLACE FUNCTION process_current_jobs_if_unlocked() RETURNS VOID AS $$ DECLARE worker RECORD; BEGIN -- Find an unlocked worker SELECT * INTO worker FROM workers FOR UPDATE SKIP LOCKED LIMIT 1; IF worker IS NOT NULL THEN RAISE NOTICE 'Using worker_id: %', worker.id; -- Lock the worker (this is already done by the SELECT ... FOR UPDATE) -- Process current jobs PERFORM process_current_jobs(); -- Unlock the worker UPDATE workers SET locked = FALSE WHERE id = worker.id; ELSE RAISE NOTICE 'No unlocked workers available'; END IF; END; $$ LANGUAGE plpgsql; --  -- Loop through records in current_jobs and get response from pg_net -- CREATE OR REPLACE FUNCTION process_current_jobs() RETURNS VOID SECURITY DEFINER SET search_path = public, extensions, net, vault AS $$ DECLARE current_job RECORD; response_result RECORD; BEGIN FOR current_job IN SELECT * FROM current_jobs FOR UPDATE SKIP LOCKED LOOP RAISE NOTICE 'Processing job_id: %, request_id: %', current_job.job_id, current_job.request_id; SELECT status, (response).status_code AS status_code, (response).body AS body INTO response_result FROM net._http_collect_response(current_job.request_id); IF response_result.status = 'SUCCESS' AND response_result.status_code BETWEEN 200 AND 299 THEN RAISE NOTICE 'Job completed (job_id: %)', current_job.job_id; UPDATE job_queue SET status = 'complete', content = response_result.body::TEXT WHERE job_id = current_job.job_id; DELETE FROM current_jobs WHERE request_id = current_job.request_id; ELSIF response_result.status = 'ERROR' THEN RAISE NOTICE 'Job failed (job_id: %)', current_job.job_id; UPDATE job_queue SET status = 'failed', retry_count = retry_count + 1 WHERE job_id = current_job.job_id; DELETE FROM current_jobs WHERE request_id = current_job.request_id; ELSE RAISE NOTICE 'Job still in progress or not found (job_id: %)', current_job.job_id; END IF; END LOOP; END; $$ LANGUAGE plpgsql; --  -- Called after a record is insert in the queue -- CREATE OR REPLACE FUNCTION process_job() RETURNS TRIGGER SECURITY DEFINER SET search_path = public, extensions, net, vault AS $$ DECLARE request_id BIGINT; api_key TEXT; did_timeout BOOLEAN; response_message TEXT; response_status_code INTEGER; BEGIN RAISE NOTICE 'Processing job_id: %', NEW.job_id; UPDATE job_queue SET status = 'processing' WHERE job_id = NEW.job_id; -- Get the API key SELECT decrypted_secret INTO api_key FROM vault.decrypted_secrets WHERE name = 'service_role'; -- Call the request_wrapper to process the job request_id := request_wrapper( method := NEW.http_verb, url := 'https://contoso.supabase.co/functions/v1/consume_job' || COALESCE(NEW.url_path, ''), body := COALESCE(NEW.payload::jsonb, '{}'::jsonb), headers := jsonb_build_object('Authorization', 'Bearer ' || api_key, 'Content-Type', 'application/json') ); INSERT INTO current_jobs (request_id, job_id) VALUES (request_id, NEW.job_id); RETURN NEW; END; $$ LANGUAGE plpgsql; --  -- Convenience function around pg_net: -- CREATE OR REPLACE FUNCTION request_wrapper( method TEXT, url TEXT, params JSONB DEFAULT '{}'::JSONB, body JSONB DEFAULT '{}'::JSONB, headers JSONB DEFAULT '{}'::JSONB ) RETURNS BIGINT SECURITY DEFINER SET search_path = public, extensions, net LANGUAGE plpgsql AS $$ DECLARE request_id BIGINT; timeout INT; BEGIN timeout := 3000; IF method = 'DELETE' THEN SELECT net.http_delete( url:=url, params:=params, headers:=headers, timeout_milliseconds:=timeout ) INTO request_id; ELSIF method = 'POST' THEN SELECT net.http_post( url:=url, body:=body, params:=params, headers:=headers, timeout_milliseconds:=timeout ) INTO request_id; ELSIF method = 'GET' THEN SELECT net.http_get( url:=url, params:=params, headers:=headers, timeout_milliseconds:=timeout ) INTO request_id; ELSE RAISE EXCEPTION 'Method must be DELETE, POST, or GET'; END IF; RETURN request_id; END; $$; --  -- Retrying jobs flagged as failures to increase reliability  -- CREATE OR REPLACE FUNCTION retry_failed_jobs() RETURNS VOID SECURITY DEFINER SET search_path = public, extensions, net, vault AS $$ DECLARE r RECORD; request_id BIGINT; api_key TEXT; response_result net._http_response_result; BEGIN RAISE NOTICE 'Retrying failed jobs'; -- Get the API key SELECT decrypted_secret INTO api_key FROM vault.decrypted_secrets WHERE name = 'service_role'; FOR r IN ( SELECT * FROM job_queue WHERE status = 'failed' AND retry_count < retry_limit FOR UPDATE SKIP LOCKED ) LOOP RAISE NOTICE 'Retrying job_id: %', r.job_id; UPDATE job_queue SET retry_count = retry_count + 1 WHERE job_id = r.job_id; -- Call the request_wrapper to process the job request_id := request_wrapper( method := r.http_verb, -- Edge function call (like AWS lambda) url := 'https://contoso.supabase.co/functions/v1/consume_job' || COALESCE(r.url_path, ''), body := COALESCE(r.payload::jsonb, '{}'::jsonb), headers := jsonb_build_object('Authorization', 'Bearer ' || api_key, 'Content-Type', 'application/json') ); INSERT INTO current_jobs (request_id, job_id) VALUES (request_id, r.job_id); END LOOP; END; $$ LANGUAGE plpgsql; -- Adding the trigger to the queue table: CREATE TRIGGER process_job_trigger AFTER INSERT ON job_queue FOR EACH ROW EXECUTE FUNCTION process_job(); 
Enter fullscreen mode Exit fullscreen mode

These functions work together to process jobs in the queue system, handle retries for failed jobs, and ensure the proper flow of job execution. We are also using Postgres SKIP LOCKED feature on the following functions: retry_failed_jobs(), process_current_jobs() and process_current_jobs_if_unlocked().

Scheduling Tasks

Scheduling tasks is essential for regular job processing and retrying failed jobs. In this part, we'll explore how to schedule tasks using the cron.schedule() function in PostgreSQL.

Scheduling Retry Jobs

To automatically retry failed jobs at specific intervals, we can use the cron.schedule() function. Here's an example:

SELECT cron.schedule( 'retry_failed_jobs', '*/10 * * * *', $$ SELECT retry_failed_jobs(); $$ ); 
Enter fullscreen mode Exit fullscreen mode

In this example, the retry_failed_jobs function is scheduled to run every 10 minutes (*/10 * * * *). This function is responsible for retrying failed jobs by calling the retry_failed_jobs() function.

Scheduling Current Job Processing

To process current jobs periodically, we can schedule the process_current_jobs_if_unlocked function. Here's an example:

SELECT cron.schedule( 'process_current_jobs_if_unlocked_', '* * * * *', $$ SELECT process_current_jobs_if_unlocked(); $$ ); 
Enter fullscreen mode Exit fullscreen mode

In this example, the process_current_jobs_if_unlocked function is scheduled to run every minute (* * * * *). This function checks if there is an unlocked worker available and processes current jobs if there is one.

Setting Up Sub-Minute Pooling

For an optimal setup that distributes the workload evenly, we can use sub-minute pooling with the help of pg_sleep(). Here's an example:

-- Run this in PSQL to schedule pooling workers 20 seconds apart: SET statement_timeout TO 0; CREATE OR REPLACE FUNCTION schedule_jobs() RETURNS VOID AS $$ BEGIN -- Schedule retry job SELECT cron.schedule( 'retry_failed_jobs', '*/10 * * * *', $$ SELECT retry_failed_jobs(); $$ ); -- Schedule first job PERFORM cron.schedule( 'process_current_jobs_if_unlocked_job_1', '* * * * *', $$ SELECT process_current_jobs_if_unlocked(); $$ ); -- Schedule second job with a 20-second delay PERFORM pg_sleep(20); PERFORM cron.schedule( 'process_current_jobs_if_unlocked_job_2', '* * * * *', $$ SELECT process_current_jobs_if_unlocked(); $$ ); -- Schedule third job with another 20-second delay PERFORM pg_sleep(20); PERFORM cron.schedule( 'process_current_jobs_if_unlocked_job_3', '* * * * *', $$ SELECT process_current_jobs_if_unlocked(); $$ ); END; $$ LANGUAGE plpgsql; 
Enter fullscreen mode Exit fullscreen mode

In this example, we define the schedule_jobs() function to set up the scheduling. Firstly, we set the statement timeout to 0 to avoid any timeouts during the execution. Then, we schedule the retry job and the first job to run immediately. After that, we introduce a 20-second delay using pg_sleep(20) before scheduling the second and third jobs. This delay helps distribute the jobs evenly across multiple workers.

By using the cron.schedule() function and setting up sub-minute pooling, we can ensure that failed jobs are retried periodically and parallel jobs are processed efficiently.

Consuming the Queue with Supabase Edge Function

Supabase Edge Functions play a crucial role in serverless execution at the edge. They can be used to consume tasks from the job queue. Here's an example of an Edge Function that fetches and processes jobs from the queue:

import { serve } from "https://deno.land/std@0.192.0/http/server.ts"; console.log("Function that will handle the tasks!"); serve(async (req) => { const payload = await req.text(); const url = new URL(req.url); const headers = req.headers; const method = req.method; // Bundle variables in the 'data' variable as a JSON object const data = { payload, url, headers, method }; return new Response( JSON.stringify(data), { headers: { "Content-Type": "application/json" } }, ); }); 
Enter fullscreen mode Exit fullscreen mode

Securing the SQL Functions

Securing the SQL functions is of utmost importance to prevent unauthorized access. To revoke the execution permission on the functions from anonymous and authenticated users, you can use the following SQL code:

-- SQL code to revoke execute permission on the functions REVOKE ALL PRIVILEGES ON FUNCTION process_current_jobs_if_unlocked FROM anon, authenticated; REVOKE ALL PRIVILEGES ON FUNCTION process_job FROM anon, authenticated; REVOKE ALL PRIVILEGES ON FUNCTION process_current_jobs FROM anon, authenticated; REVOKE ALL PRIVILEGES ON FUNCTION request_wrapper FROM anon, authenticated; REVOKE ALL PRIVILEGES ON FUNCTION retry_failed_jobs FROM anon, authenticated; 
Enter fullscreen mode Exit fullscreen mode

Conclusion

Building a queue system with Supabase and PostgreSQL offers a powerful solution for handling asynchronous tasks and managing job processing. By leveraging the table structure, functions, scheduled tasks, and edge functions, you can create a robust and scalable queue system.

In this blog post, we explored the process of setting up the queue system, which involved enabling the necessary PostgreSQL extensions, defining functions for job processing, scheduling tasks, securing the functions, and consuming the queue using Supabase Edge Functions. Note that this example takes into consideration the URL path to execute and you could expand it with similar ideas to the RESTful service example.

With a complete understanding of both processing and consuming the jobs in the queue, you now have the knowledge to implement a highly efficient queue system in your applications. Happy queuing!

Top comments (4)

Collapse
 
markedev profile image
Marke-DEV

Sometimes I do get stuck in "Processing". When PROCESS_CURRENT_JOBS checks in, the response is (SUCCESS,,). However successful ones are (SUCCESS,200,true).

Collapse
 
mansueli profile image
Rodrigo Mansueli

Have you considered changing retry_failed_jobs() to check process that are stuck in processing?

FOR r IN ( SELECT * FROM job_queue WHERE (status = 'failed' AND retry_count < retry_limit) OR (status = 'processing' AND created_at < current_time - INTERVAL '10 minutes') FOR UPDATE SKIP LOCKED ) LOOP RAISE NOTICE 'Retrying job_id: %', r.job_id; 
Enter fullscreen mode Exit fullscreen mode
Collapse
 
markedev profile image
Marke-DEV

okay so I believe I solved my problem, there was not enough timeout, I am on vercel and possibly why. In request_wrapper i increased my timeout from 3000 to 15000. So far so good.

Collapse
 
markedev profile image
Marke-DEV

Yes it attempts retrying over the allowed retry amounts as well, it is interesting. I will need to investigate more. It has only happened once and is similar to all other records.