Have you ever wanted to dynamically add schedules for a large base of users in your service and found out that celery-beat
is limited? And you then stumble into django-celery-beat
, but you are using fastapi
or flask
? This article is just made for you!
TLDR: Use rdbbeat library to persist dynamic schedules in your RDB.
pip install rdbbeat
- Run
celery-beat
with the custom scheduler:-
python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler
.
-
- Run a celery worker and use rdbbeat models and controller to add/get schedules.
Introduction
celery-beat runs periodic tasks from schedules that are defined before run time. We want a tool that can add schedules dynamically to our systems during operation.
rdbbeat
extends the celery-beat
scheduling system in many ways to solve the above problem in a generic pattern. Firstly, the library is built with sqlalchemy
models and persists the schedules in your RDB. The library implements listeners on the database models to capture addition/deletion and/or modification of schedules during run time.
Usage Example
We can look at this example:
Build a company's admin tool that sends "Happy Birthday" messages to employees on their birthdays via email.
We can use rdbbeat
to add schedules for each employee's birthday dynamically during their on-boarding. The schedules are persisted in the database and can be modified or deleted at any time.
Complete code for this example can be found on Github.
1. Basic service setup with flask
and celery
mkdir rdbbeat-flask-example cd rdbbeat-flask-example python -m venv venv source venv/bin/activate pip install flask celery # view requirements.txt for other dependencies
2. Basic model and DB setup
- Create and run a database server (I used postgres)
- Create an employee model in SQLAlchemy
# server/models.py from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() class Employee(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(80), nullable=False) surname = db.Column(db.String(80), nullable=False) date_of_birth = db.Column(db.Date, nullable=False) def to_dict(self): return { "id": self.id, "name": self.name, "surname": self.surname, "date_of_birth": self.date_of_birth.strftime("%Y-%m-%d"), }
3. Basic Flask app setup
- A simple flask app with a blueprint for employee routes
# server/app.py import os from dotenv import load_dotenv from flask import Flask from flask_cors import CORS from flask_migrate import Migrate from server.db_connection import DATABASE_URL from server.models import db load_dotenv() app = Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] = DATABASE_URL app.config["SECRET_KEY"] = os.getenv("SECRET_KEY") # Celery configuration app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0" app.config["CELERY_RESULT_BACKEND"] = "database" app.config["CELERY_RESULT_DBURI"] = DATABASE_URL app.config["CELERY_TRACK_STARTED"] = True app.config["CELERY_SEND_EVENTS"] = True app.config["BROKER_TRANSPORT_OPTIONS"] = {"visibility_timeout": 3600} app.config["CELERY_DEFAULT_QUEUE"] = "default" migrate = Migrate(app, db, directory="server/migrations", compare_type=True) CORS(app) db.init_app(app) from server.views import employee_router # noqa isort:skip app.register_blueprint(employee_router) @app.route("/") def index(): return "Learn to use the celery-rdbbeat scheduler!"
NB: Just adding celery config that we will use later
The
db_connection.py
file to holds the creation of DB connections with session managementThe
views.py
file holds the employee routes
from dateutil.parser import parse from flask import Blueprint, jsonify, request from rdbbeat.controller import Schedule, ScheduledTask, schedule_task from server.db_connection import session_scope from server.models import Employee employee_router = Blueprint("employee_router", __name__, url_prefix="/api/v1") @employee_router.post("/employees/") def create_employee(): employee_data = request.get_json() date_of_birth = parse(employee_data["date_of_birth"]).date() employee = Employee( name=employee_data["name"], surname=employee_data["surname"], date_of_birth=date_of_birth, ) with session_scope() as session: session.add(employee) session.commit() return jsonify(db_employee.to_dict()), 201 @employee_router.get("/employees/<int:employee_id>") def get_employee(employee_id): with session_scope() as session: employee = session.query(Employee).get(employee_id) if not employee: return jsonify({"error": "Employee not found"}), 404 return jsonify(employee.to_dict())
Test Point
- Use flask-migration or pure alembic to create migrations and run them
# with flask-migrate flask db init flask db migrate -m "create employee table" # create migrations flask db upgrade # create the table in the DB
- Check that your table is created in the DB (I used
TablePlus
) - Run the flask app
export FLASK_APP=server/app.py flask run # (or python -m flask run)
- At this point, you should be able to create employees and get them from the DB using the routes we created above. You can use Postman here or Insomnia. I just used CURL.
# Create an employee curl -X POST -H "Content-Type: application/json" -d '{"name": "John", "surname": "Doe", "date_of_birth": "1990-01-01"}' http://localhost:5000/api/v1/employees/ # {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}
# Get an employee with id=1 curl http://localhost:5000/api/v1/employees/1 # {"date_of_birth":"1990-01-01","id":1,"name":"John","surname":"Doe"}
Now, let the scheduling fun begin!
4. Running the celery-beat with rdbbeat
as a custom scheduler
- Run the migrations to create
rdbbeat
tables (note that they live in thescheduler
schema, not thepublic
schema)
python -m alembic -n scheduler upgrade head
- Then run the celery-beat (in a separate terminal)
python -m celery --app=server.tasks beat --loglevel=info --scheduler=rdbbeat.schedulers:DatabaseScheduler
5. Create a simple celery app and link it to the flask app
# server/celery_worker.py from celery import Celery from server.app import app as flask_app from server.db_connection import session_scope REDIS_URL = "redis://localhost:6379/0" def create_celery(flask_app=flask_app): celery_app = Celery("flask", broker=REDIS_URL) celery_app.conf.task_default_queue = "default" celery_app.conf.broker_transport_options = { "max_retries": 3, "interval_start": 0, "interval_step": 0.2, "interval_max": 0.2, } # Provide session scope to `rdbbeat` celery_app.conf.session_scope = session_scope celery_app.conf.update(flask_app.config) TaskBase = celery_app.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with flask_app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery_app.Task = ContextTask return celery_app app = create_celery(flask_app=flask_app)
NB: Be sure to provide the session scope to
rdbbeat
so that it can access the DB.
- Create a simple task
# server/tasks.py from server.celery_worker import app from server.db_connection import session_scope from server.models import Employee, db @app.task(name="birthday_greeting") def birthday_greeting(employee_id): with session_scope() as session: employee = session.query(Employee).get(employee_id) print(f"Happy birthday, {employee.name} {employee.surname}!") # Send email to employee # email_service.send_email(template="birthday_greeting", to=employee.email, context={"employee": employee.to_dict()}) # Remind his manager too, in case they forgot :)
- Now, modify the
views.py
file to add a schedule for each employee's birthday message
# server/views.py ... from rdbbeat.controller import Schedule, ScheduledTask, schedule_task @employee_router.post("/employees/") def create_employee(): employee_data = request.get_json() date_of_birth = parse(employee_data["date_of_birth"]).date() employee = Employee( name=employee_data["name"], surname=employee_data["surname"], date_of_birth=date_of_birth, ) with session_scope() as session: session.add(employee) session.commit() # Create birthday greeting task db_employee = session.query(Employee).get(employee.id) schedule = Schedule( minute="*", hour="*", day_of_week="*", day_of_month=str(date_of_birth.day), month_of_year=str(date_of_birth.month), timezone="UTC" # FIX-ME: get timezone from employee ) task_to_schedule = ScheduledTask( name=f"{db_employee.id}_birthday_greeting", # All tasks must have a unique name task="birthday_greeting", schedule=schedule, ) # Provide task kwargs for when the task is executed task_kwargs = {"employee_id": db_employee.id} schedule_task(session=session, scheduled_task=task_to_schedule, **task_kwargs) return jsonify(db_employee.to_dict()), 201 ...
6. Run everything and test!
- For testing sanity, let's modify the schedule to run every minute
# server/views.py ... schedule = Schedule( minute="*", hour="*", day_of_week="*", day_of_month="*", month_of_year="*", timezone="UTC" ) ...
- Check that the flask app in running in 1 terminal
- Check that the celery-beat is running in another terminal
- Then run the celery worker in another terminal
python -m celery --app=server.tasks worker --loglevel=info
- Run those curl commands again to create an employees.
7. It works!!
- You should see logs like these in terminal 2:
[2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting) [2021-03-01 16:00:00,000: INFO/MainProcess] Scheduler: Sending due task birthday_greeting (birthday_greeting)
- And the celery worker logs should be happy like:
[2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] [2021-03-01 16:00:00,000: INFO/MainProcess] Received task: birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] [2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None [2021-03-01 16:00:00,000: INFO/MainProcess] Task birthday_greeting[3a3b1b1b-1b1b-3a3a-1b1b-1b1b1b1b1b1b] succeeded in 0.000s: None
8. Advanced stuff
8.1. Single-beat
You can use single-beat to run celery-beat in a single process. It has its benefits - it basically safe guards against multiple celery-beat processes running at the same time.
8.2. Deploy with k8s
If you are running your system in kubernetes, you can start up the celery-beat in on pod, the celery-worker in a different pod and run your server application in a different pod. As long as they all have access to the same DB, they should be able to communicate with each other.
9. Alternatives.
9.1. Redbeat
According to its GitHub's docs, RedBeat is a Celery Beat Scheduler that stores the scheduled tasks and runtime metadata in Redis. The trade-off here is that your task schedules are stores in Redis and you have to ensure that your Redis instance is highly available to avoid losing your schedules.
9.2. Django Celery Beat
rdbbeat
does what Django Celery Beat does for django, but for SQLAlchemy. If you are using django framework for your server application, you can use Django Celery Beat
to schedule your tasks.
Top comments (0)