Ever tried to check whether a Celery task is already running?
Here is the piece of code I created for checking whether a task is running by its name.
from celery import current_app class CeleryHelper: """Contains helper functionalities to be used while interacting with Celery.""" @staticmethod def is_being_executed(task_name: str) -> bool: """Returns whether the task with given task_name is already being executed. Args: task_name: Name of the task to check if it is running currently. Returns: A boolean indicating whether the task with the given task name is running currently. """ active_tasks = current_app.control.inspect().active() for worker, running_tasks in active_tasks.items(): for task in running_tasks: if task["name"] == task_name: return True return False
I searchedd for that a while ago and I couldn't really find a direct way to do that in a Django project but the above code is tested and works in Django-based projects.
Top comments (0)