|
183 | 183 | from dojo.utils import ( |
184 | 184 | async_delete, |
185 | 185 | generate_file_response, |
| 186 | + get_celery_queue_details, |
| 187 | + get_celery_queue_length, |
| 188 | + get_celery_worker_status, |
186 | 189 | get_setting, |
187 | 190 | get_system_setting, |
188 | 191 | process_tag_notifications, |
| 192 | + purge_celery_queue, |
| 193 | + purge_celery_queue_by_task_name, |
189 | 194 | ) |
190 | 195 |
|
191 | 196 | logger = logging.getLogger(__name__) |
@@ -3260,6 +3265,79 @@ def get_queryset(self): |
3260 | 3265 | return System_Settings.objects.all().order_by("id") |
3261 | 3266 |
|
3262 | 3267 |
|
| 3268 | +class CeleryViewSet(viewsets.ViewSet): |
| 3269 | + permission_classes = (permissions.IsSuperUser, DjangoModelPermissions) |
| 3270 | + queryset = System_Settings.objects.none() |
| 3271 | + |
| 3272 | + @extend_schema( |
| 3273 | + responses=serializers.CeleryStatusSerializer, |
| 3274 | + summary="Get Celery worker and queue status", |
| 3275 | + description=( |
| 3276 | + "Returns Celery worker liveness, pending queue length, and the active task " |
| 3277 | + "timeout/expiry configuration. Uses the Celery control channel (pidbox) for " |
| 3278 | + "worker status so it works correctly even when the task queue is clogged." |
| 3279 | + ), |
| 3280 | + ) |
| 3281 | + @action(detail=False, methods=["get"], url_path="status") |
| 3282 | + def status(self, request): |
| 3283 | + queue_length = get_celery_queue_length() |
| 3284 | + data = { |
| 3285 | + "worker_status": get_celery_worker_status(), |
| 3286 | + "broker_status": queue_length is not None, |
| 3287 | + "queue_length": queue_length, |
| 3288 | + "task_time_limit": getattr(settings, "CELERY_TASK_TIME_LIMIT", None), |
| 3289 | + "task_soft_time_limit": getattr(settings, "CELERY_TASK_SOFT_TIME_LIMIT", None), |
| 3290 | + "task_default_expires": getattr(settings, "CELERY_TASK_DEFAULT_EXPIRES", None), |
| 3291 | + } |
| 3292 | + return Response(serializers.CeleryStatusSerializer(data).data) |
| 3293 | + |
| 3294 | + @extend_schema( |
| 3295 | + request=None, |
| 3296 | + responses={200: {"type": "object", "properties": {"purged": {"type": "integer"}}}}, |
| 3297 | + summary="Purge all pending Celery tasks from the queue", |
| 3298 | + description=( |
| 3299 | + "Removes all pending tasks from the default Celery queue. Tasks already being " |
| 3300 | + "executed by workers are not affected. Note: if deduplication tasks were queued, " |
| 3301 | + "you may need to re-run deduplication manually via `python manage.py dedupe`." |
| 3302 | + ), |
| 3303 | + ) |
| 3304 | + @action(detail=False, methods=["post"], url_path="queue/purge") |
| 3305 | + def queue_purge(self, request): |
| 3306 | + purged = purge_celery_queue() |
| 3307 | + return Response({"purged": purged}) |
| 3308 | + |
| 3309 | + @extend_schema( |
| 3310 | + responses=serializers.CeleryQueueTaskDetailSerializer(many=True), |
| 3311 | + summary="Get per-task breakdown of the Celery queue", |
| 3312 | + description=( |
| 3313 | + "Scans every message in the queue (O(N)) and returns task name, count, and " |
| 3314 | + "oldest/newest queue positions. May be slow for large queues." |
| 3315 | + ), |
| 3316 | + ) |
| 3317 | + @action(detail=False, methods=["get"], url_path="queue/details") |
| 3318 | + def queue_details(self, request): |
| 3319 | + details = get_celery_queue_details() |
| 3320 | + if details is None: |
| 3321 | + return Response({"error": "Unable to read queue details."}, status=503) |
| 3322 | + return Response(serializers.CeleryQueueTaskDetailSerializer(details, many=True).data) |
| 3323 | + |
| 3324 | + @extend_schema( |
| 3325 | + request={"application/json": {"type": "object", "properties": {"task_name": {"type": "string"}}, "required": ["task_name"]}}, |
| 3326 | + responses={200: {"type": "object", "properties": {"purged": {"type": "integer"}}}}, |
| 3327 | + summary="Purge all queued tasks with a given task name", |
| 3328 | + description="Removes all pending tasks matching the given task name from the default Celery queue.", |
| 3329 | + ) |
| 3330 | + @action(detail=False, methods=["post"], url_path="queue/task/purge") |
| 3331 | + def queue_task_purge(self, request): |
| 3332 | + task_name = request.data.get("task_name", "").strip() |
| 3333 | + if not task_name: |
| 3334 | + return Response({"error": "task_name is required."}, status=400) |
| 3335 | + purged = purge_celery_queue_by_task_name(task_name) |
| 3336 | + if purged is None: |
| 3337 | + return Response({"error": "Unable to purge tasks."}, status=503) |
| 3338 | + return Response({"purged": purged}) |
| 3339 | + |
| 3340 | + |
3263 | 3341 | # Authorization: superuser |
3264 | 3342 | @extend_schema_view(**schema_with_prefetch()) |
3265 | 3343 | class NotificationsViewSet( |
|
0 commit comments