Skip to content

Commit 1443f31

Browse files
feat: add System Status page with Celery queue monitoring and purge (#14349)
* feat: add System Status page with Celery queue management - Add dedicated System Status page (/system_status) with superuser-only access, accessible from the navigation menu alongside System Settings - Display Celery worker liveness, pending queue length with human-readable duration formatting, and active task timeout/expiry configuration - Add Purge queue button that POSTs to the new API endpoint and reloads the page on success - Fix get_celery_worker_status() to use app.control.ping() via the pidbox control channel, which works correctly even when the task queue is clogged (previously dispatched a task that would never be picked up) - Add purge_celery_queue() utility using a direct broker connection - Add two new superuser-only REST API endpoints: GET /api/v2/celery/status/ - worker status, queue length, config POST /api/v2/celery/queue/purge/ - purge all pending tasks Both use the same permission guards as SystemSettingsViewSet (IsSuperUser + DjangoModelPermissions against System_Settings) - Add DD_CELERY_TASK_TIME_LIMIT (default 12h), DD_CELERY_TASK_SOFT_TIME_LIMIT (default disabled), and DD_CELERY_TASK_DEFAULT_EXPIRES (default 12h) environment variables to settings.dist.py with explanatory comments - Move celery status rendering from server-side Django view to client-side AJAX so dojo-pro can consume the same API endpoints feat: add Refresh button next to Purge button on System Status page remove plan * feat(admin): enhance celery queue status UI - Split status display into separate Redis Broker and Celery Worker badges - Add loading spinners on page load and refresh for all status indicators - Add per-task queue breakdown table behind a 'View Details' button (O(N) scan with warning); shows task name, count, oldest/newest queue position, and expiry timestamps with human-readable time-left - Add per-row purge button to remove all queued tasks by task name - Add global queue purge and per-task purge API endpoints - Move Celery settings table into its own panel section * feat(admin): improve celery queue purge with logging, limits, and configurable settings - Add INFO log on start/finish and DEBUG log per batch in purge_celery_queue_by_task_name - Cap per-task purge at 10,000 tasks with a WARNING when the cap is hit - Make batch size and max tasks configurable via DD_CELERY_QUEUE_PURGE_BATCH_SIZE and DD_CELERY_QUEUE_PURGE_MAX_TASKS - After per-task purge, refresh both status and details table instead of removing the row - Refresh details table when clicking the Refresh button while details are visible - Add note below details table referencing the two env variables * fix(admin): use redis.from_url() consistently for queue inspection and purge - Replaced Kombu channel.client with redis.from_url() in both get_celery_queue_details() and purge_celery_queue_by_task_name() so both functions use the same connection mechanism - Batched pipeline approach for per-task purge to avoid hitting Valkey's max query buffer limit on large queues * add link to flower * refactor * exempt celery api endpoint from model method tets * fix template
1 parent f224469 commit 1443f31

11 files changed

Lines changed: 546 additions & 69 deletions

File tree

dojo/api_v2/serializers.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3102,6 +3102,26 @@ def validate(self, data):
31023102
return data
31033103

31043104

3105+
class CeleryStatusSerializer(serializers.Serializer):
3106+
worker_status = serializers.BooleanField(read_only=True)
3107+
broker_status = serializers.BooleanField(read_only=True)
3108+
queue_length = serializers.IntegerField(allow_null=True, read_only=True)
3109+
task_time_limit = serializers.IntegerField(allow_null=True, read_only=True)
3110+
task_soft_time_limit = serializers.IntegerField(allow_null=True, read_only=True)
3111+
task_default_expires = serializers.IntegerField(allow_null=True, read_only=True)
3112+
3113+
3114+
class CeleryQueueTaskDetailSerializer(serializers.Serializer):
3115+
task_name = serializers.CharField(read_only=True)
3116+
count = serializers.IntegerField(read_only=True)
3117+
oldest_position = serializers.IntegerField(read_only=True)
3118+
newest_position = serializers.IntegerField(read_only=True)
3119+
oldest_eta = serializers.CharField(allow_null=True, read_only=True)
3120+
newest_eta = serializers.CharField(allow_null=True, read_only=True)
3121+
earliest_expires = serializers.CharField(allow_null=True, read_only=True)
3122+
latest_expires = serializers.CharField(allow_null=True, read_only=True)
3123+
3124+
31053125
class FindingNoteSerializer(serializers.Serializer):
31063126
note_id = serializers.IntegerField()
31073127

dojo/api_v2/views.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,14 @@
183183
from dojo.utils import (
184184
async_delete,
185185
generate_file_response,
186+
get_celery_queue_details,
187+
get_celery_queue_length,
188+
get_celery_worker_status,
186189
get_setting,
187190
get_system_setting,
188191
process_tag_notifications,
192+
purge_celery_queue,
193+
purge_celery_queue_by_task_name,
189194
)
190195

191196
logger = logging.getLogger(__name__)
@@ -3260,6 +3265,79 @@ def get_queryset(self):
32603265
return System_Settings.objects.all().order_by("id")
32613266

32623267

3268+
class CeleryViewSet(viewsets.ViewSet):
3269+
permission_classes = (permissions.IsSuperUser, DjangoModelPermissions)
3270+
queryset = System_Settings.objects.none()
3271+
3272+
@extend_schema(
3273+
responses=serializers.CeleryStatusSerializer,
3274+
summary="Get Celery worker and queue status",
3275+
description=(
3276+
"Returns Celery worker liveness, pending queue length, and the active task "
3277+
"timeout/expiry configuration. Uses the Celery control channel (pidbox) for "
3278+
"worker status so it works correctly even when the task queue is clogged."
3279+
),
3280+
)
3281+
@action(detail=False, methods=["get"], url_path="status")
3282+
def status(self, request):
3283+
queue_length = get_celery_queue_length()
3284+
data = {
3285+
"worker_status": get_celery_worker_status(),
3286+
"broker_status": queue_length is not None,
3287+
"queue_length": queue_length,
3288+
"task_time_limit": getattr(settings, "CELERY_TASK_TIME_LIMIT", None),
3289+
"task_soft_time_limit": getattr(settings, "CELERY_TASK_SOFT_TIME_LIMIT", None),
3290+
"task_default_expires": getattr(settings, "CELERY_TASK_DEFAULT_EXPIRES", None),
3291+
}
3292+
return Response(serializers.CeleryStatusSerializer(data).data)
3293+
3294+
@extend_schema(
3295+
request=None,
3296+
responses={200: {"type": "object", "properties": {"purged": {"type": "integer"}}}},
3297+
summary="Purge all pending Celery tasks from the queue",
3298+
description=(
3299+
"Removes all pending tasks from the default Celery queue. Tasks already being "
3300+
"executed by workers are not affected. Note: if deduplication tasks were queued, "
3301+
"you may need to re-run deduplication manually via `python manage.py dedupe`."
3302+
),
3303+
)
3304+
@action(detail=False, methods=["post"], url_path="queue/purge")
3305+
def queue_purge(self, request):
3306+
purged = purge_celery_queue()
3307+
return Response({"purged": purged})
3308+
3309+
@extend_schema(
3310+
responses=serializers.CeleryQueueTaskDetailSerializer(many=True),
3311+
summary="Get per-task breakdown of the Celery queue",
3312+
description=(
3313+
"Scans every message in the queue (O(N)) and returns task name, count, and "
3314+
"oldest/newest queue positions. May be slow for large queues."
3315+
),
3316+
)
3317+
@action(detail=False, methods=["get"], url_path="queue/details")
3318+
def queue_details(self, request):
3319+
details = get_celery_queue_details()
3320+
if details is None:
3321+
return Response({"error": "Unable to read queue details."}, status=503)
3322+
return Response(serializers.CeleryQueueTaskDetailSerializer(details, many=True).data)
3323+
3324+
@extend_schema(
3325+
request={"application/json": {"type": "object", "properties": {"task_name": {"type": "string"}}, "required": ["task_name"]}},
3326+
responses={200: {"type": "object", "properties": {"purged": {"type": "integer"}}}},
3327+
summary="Purge all queued tasks with a given task name",
3328+
description="Removes all pending tasks matching the given task name from the default Celery queue.",
3329+
)
3330+
@action(detail=False, methods=["post"], url_path="queue/task/purge")
3331+
def queue_task_purge(self, request):
3332+
task_name = request.data.get("task_name", "").strip()
3333+
if not task_name:
3334+
return Response({"error": "task_name is required."}, status=400)
3335+
purged = purge_celery_queue_by_task_name(task_name)
3336+
if purged is None:
3337+
return Response({"error": "Unable to purge tasks."}, status=503)
3338+
return Response({"purged": purged})
3339+
3340+
32633341
# Authorization: superuser
32643342
@extend_schema_view(**schema_with_prefetch())
32653343
class NotificationsViewSet(

dojo/settings/settings.dist.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,17 @@
9090
DD_CELERY_BEAT_SCHEDULE_FILENAME=(str, root("dojo.celery.beat.db")),
9191
DD_CELERY_TASK_SERIALIZER=(str, "pickle"),
9292
DD_CELERY_LOG_LEVEL=(str, "INFO"),
93+
# Hard ceiling on task runtime. When reached, the worker process is sent SIGKILL — no cleanup
94+
# code runs. Always set higher than DD_CELERY_TASK_SOFT_TIME_LIMIT. (0 = disabled, no limit)
95+
DD_CELERY_TASK_TIME_LIMIT=(int, 43200), # default: 12 hours
96+
# Raises SoftTimeLimitExceeded inside the task, giving it a chance to clean up before the hard
97+
# kill. Set a few seconds below DD_CELERY_TASK_TIME_LIMIT so cleanup has time to finish.
98+
# (0 = disabled, no limit)
99+
DD_CELERY_TASK_SOFT_TIME_LIMIT=(int, 0),
100+
# If a task sits in the broker queue for longer than this without being picked up by a worker,
101+
# Celery silently discards it — it is never executed and no exception is raised. Does not
102+
# affect tasks that are already running. (0 = disabled, no limit)
103+
DD_CELERY_TASK_DEFAULT_EXPIRES=(int, 43200), # default: 12 hours
93104
DD_TAG_BULK_ADD_BATCH_SIZE=(int, 1000),
94105
# Tagulous slug truncate unique setting. Set to -1 to use tagulous internal default (5)
95106
DD_TAGULOUS_SLUG_TRUNCATE_UNIQUE=(int, -1),
@@ -288,6 +299,10 @@
288299
DD_IMPORT_REIMPORT_MATCH_BATCH_SIZE=(int, 1000),
289300
# Batch size for import/reimport deduplication processing
290301
DD_IMPORT_REIMPORT_DEDUPE_BATCH_SIZE=(int, 1000),
302+
# Batch size for Redis pipeline when purging the Celery queue by task name
303+
DD_CELERY_QUEUE_PURGE_BATCH_SIZE=(int, 1000),
304+
# Maximum number of tasks to purge in a single per-task purge action
305+
DD_CELERY_QUEUE_PURGE_MAX_TASKS=(int, 10000),
291306
# Delete Auditlogs older than x month; -1 to keep all logs
292307
DD_AUDITLOG_FLUSH_RETENTION_PERIOD=(int, -1),
293308
# Batch size for flushing audit logs per task run
@@ -1249,6 +1264,13 @@ def saml2_attrib_map_format(din):
12491264
CELERY_TASK_SERIALIZER = env("DD_CELERY_TASK_SERIALIZER")
12501265
CELERY_LOG_LEVEL = env("DD_CELERY_LOG_LEVEL")
12511266

1267+
if env("DD_CELERY_TASK_TIME_LIMIT") > 0:
1268+
CELERY_TASK_TIME_LIMIT = env("DD_CELERY_TASK_TIME_LIMIT")
1269+
if env("DD_CELERY_TASK_SOFT_TIME_LIMIT") > 0:
1270+
CELERY_TASK_SOFT_TIME_LIMIT = env("DD_CELERY_TASK_SOFT_TIME_LIMIT")
1271+
if env("DD_CELERY_TASK_DEFAULT_EXPIRES") > 0:
1272+
CELERY_TASK_DEFAULT_EXPIRES = env("DD_CELERY_TASK_DEFAULT_EXPIRES")
1273+
12521274
if len(env("DD_CELERY_BROKER_TRANSPORT_OPTIONS")) > 0:
12531275
CELERY_BROKER_TRANSPORT_OPTIONS = json.loads(env("DD_CELERY_BROKER_TRANSPORT_OPTIONS"))
12541276

@@ -1783,6 +1805,8 @@ def saml2_attrib_map_format(din):
17831805
TRACK_IMPORT_HISTORY = env("DD_TRACK_IMPORT_HISTORY")
17841806
IMPORT_REIMPORT_MATCH_BATCH_SIZE = env("DD_IMPORT_REIMPORT_MATCH_BATCH_SIZE")
17851807
IMPORT_REIMPORT_DEDUPE_BATCH_SIZE = env("DD_IMPORT_REIMPORT_DEDUPE_BATCH_SIZE")
1808+
CELERY_QUEUE_PURGE_BATCH_SIZE = env("DD_CELERY_QUEUE_PURGE_BATCH_SIZE")
1809+
CELERY_QUEUE_PURGE_MAX_TASKS = env("DD_CELERY_QUEUE_PURGE_MAX_TASKS")
17861810

17871811
# ------------------------------------------------------------------------------
17881812
# JIRA

dojo/system_settings/urls.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,9 @@
88
views.SystemSettingsView.as_view(),
99
name="system_settings",
1010
),
11+
re_path(
12+
r"^celery_status$",
13+
views.CeleryStatusView.as_view(),
14+
name="celery_status",
15+
),
1116
]

dojo/system_settings/views.py

Lines changed: 13 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import logging
22

3-
from django.conf import settings
43
from django.contrib import messages
54
from django.core.exceptions import PermissionDenied
65
from django.http import HttpRequest, HttpResponse
@@ -9,7 +8,7 @@
98

109
from dojo.forms import SystemSettingsForm
1110
from dojo.models import System_Settings
12-
from dojo.utils import add_breadcrumb, get_celery_queue_length, get_celery_worker_status
11+
from dojo.utils import add_breadcrumb
1312

1413
logger = logging.getLogger(__name__)
1514

@@ -30,15 +29,10 @@ def get_context(
3029
request: HttpRequest,
3130
) -> dict:
3231
system_settings_obj = self.get_settings_object()
33-
# Set the initial context
34-
context = {
32+
return {
3533
"system_settings_obj": system_settings_obj,
3634
"form": self.get_form(request, system_settings_obj),
3735
}
38-
# Check the status of celery
39-
self.get_celery_status(context)
40-
41-
return context
4236

4337
def get_form(
4438
self,
@@ -95,35 +89,6 @@ def validate_form(
9589
return request, True
9690
return request, False
9791

98-
def get_celery_status(
99-
self,
100-
context: dict,
101-
) -> None:
102-
# Celery needs to be set with the setting: CELERY_RESULT_BACKEND = 'db+sqlite:///dojo.celeryresults.sqlite'
103-
if hasattr(settings, "CELERY_RESULT_BACKEND"):
104-
# Check the status of Celery by sending calling a celery task
105-
context["celery_bool"] = get_celery_worker_status()
106-
107-
if context["celery_bool"]:
108-
context["celery_msg"] = "Celery is processing tasks."
109-
context["celery_status"] = "Running"
110-
else:
111-
context["celery_msg"] = "Celery does not appear to be up and running. Please ensure celery is running."
112-
context["celery_status"] = "Not Running"
113-
114-
q_len = get_celery_queue_length()
115-
if q_len is None:
116-
context["celery_q_len"] = " It is not possible to identify number of waiting tasks."
117-
elif q_len:
118-
context["celery_q_len"] = f"{q_len} tasks are waiting to be proccessed."
119-
else:
120-
context["celery_q_len"] = "No task is waiting to be proccessed."
121-
122-
else:
123-
context["celery_bool"] = False
124-
context["celery_msg"] = "Celery needs to have the setting CELERY_RESULT_BACKEND = 'db+sqlite:///dojo.celeryresults.sqlite' set in settings.py."
125-
context["celery_status"] = "Unknown"
126-
12792
def get_template(self) -> str:
12893
return "dojo/system_settings.html"
12994

@@ -148,9 +113,19 @@ def post(
148113
self.permission_check(request)
149114
# Set up the initial context
150115
context = self.get_context(request)
151-
# Check the status of celery
152116
request, _ = self.validate_form(request, context)
153117
# Add some breadcrumbs
154118
add_breadcrumb(title="System settings", top_level=False, request=request)
155119
# Render the page
156120
return render(request, self.get_template(), context)
121+
122+
123+
class CeleryStatusView(View):
124+
def get(
125+
self,
126+
request: HttpRequest,
127+
) -> HttpResponse:
128+
if not request.user.is_superuser:
129+
raise PermissionDenied
130+
add_breadcrumb(title="Celery status", top_level=False, request=request)
131+
return render(request, "dojo/celery_status.html")

dojo/templates/base.html

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,11 @@
613613
{% trans "System Settings" %}
614614
</a>
615615
</li>
616+
<li>
617+
<a href="{% url 'celery_status' %}">
618+
{% trans "Celery Status" %}
619+
</a>
620+
</li>
616621
{% endif %}
617622
{% if "dojo.view_tool_configuration"|has_configuration_permission:request %}
618623
<li>

0 commit comments

Comments
 (0)