Skip to content

Commit b4e57d6

Browse files
committed
Add background reaper for stalled DLC worker
Introduce STOP_WORKER_TIMEOUT and a background reaper to handle worker threads that don't terminate within the stop timeout. Increase join timeout to STOP_WORKER_TIMEOUT, set _pending_reset when a reset is requested while the worker is alive, and schedule _schedule_reap to join the thread in the background instead of immediately marking the processor as FAULTED. The reaper performs final cleanup (clearing thread, queue, state, stop event) and applies a pending DLCLive reset once the thread is reaped, with guards to ensure only one reaper runs at a time.
1 parent 5c06b5c commit b4e57d6

1 file changed

Lines changed: 38 additions & 3 deletions

File tree

dlclivegui/services/dlc_processor.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from dlclivegui.temp import Engine # type: ignore # TODO use main package enum when released
2222

2323
logger = logging.getLogger(__name__)
24+
STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before marking as faulted
2425

2526
try: # pragma: no cover - optional dependency
2627
from dlclive import (
@@ -155,6 +156,9 @@ def __init__(self) -> None:
155156
self._lifecycle_lock = threading.Lock()
156157
self._stop_event = threading.Event()
157158
self._initialized = False
159+
## Worker cleanup
160+
self._reaping = False
161+
self._pending_reset = False
158162

159163
# Statistics tracking
160164
self._frames_enqueued = 0
@@ -184,6 +188,8 @@ def reset(self) -> None:
184188
"""Stop the worker thread and drop the current DLCLive instance."""
185189
stopped = self._stop_worker()
186190
if not stopped:
191+
with self._lifecycle_lock:
192+
self._pending_reset = True
187193
logger.warning(
188194
"Reset requested but worker thread is still alive; skipping DLCLive reset to avoid potential issues."
189195
)
@@ -330,21 +336,50 @@ def _stop_worker(self) -> bool:
330336
self._state = WorkerState.STOPPING
331337
self._stop_event.set()
332338

333-
t.join(timeout=2.0)
339+
t.join(timeout=STOP_WORKER_TIMEOUT)
334340
if t.is_alive():
335341
qsize = self._queue.qsize() if self._queue is not None else -1
336342
logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)", qsize)
337-
with self._lifecycle_lock:
338-
self._state = WorkerState.FAULTED
343+
self._schedule_reap(t)
339344
return False
340345

346+
# Normal cleanup
341347
with self._lifecycle_lock:
342348
self._worker_thread = None
343349
self._queue = None
344350
self._state = WorkerState.STOPPED
345351
self._stop_event.clear()
346352
return True
347353

354+
def _schedule_reap(self, t: threading.Thread) -> None:
355+
with self._lifecycle_lock:
356+
if self._reaping:
357+
return
358+
self._reaping = True
359+
360+
# ensure only one reaper
361+
def reap():
362+
try:
363+
t.join() # wait without timeout in background
364+
with self._lifecycle_lock:
365+
# only clean if we're still stopping this thread
366+
if self._worker_thread is t:
367+
self._worker_thread = None
368+
self._queue = None
369+
self._state = WorkerState.STOPPED
370+
self._stop_event.clear()
371+
372+
if self._pending_reset:
373+
self._dlc = None
374+
self._initialized = False
375+
self._pending_reset = False
376+
finally:
377+
with self._lifecycle_lock:
378+
self._reaping = False
379+
logger.debug("[Stop worker] DLC worker thread reaped; processor is STOPPED again")
380+
381+
threading.Thread(target=reap, name="DLCLiveReaper", daemon=True).start()
382+
348383
@contextmanager
349384
def _timed_processor(self):
350385
"""

0 commit comments

Comments
 (0)