@@ -118,11 +118,13 @@ def configure_stream(self, frame_size: tuple[int, int], frame_rate: float | None
118118 self ._frame_rate = frame_rate
119119
120120 def write (self , frame : np .ndarray , timestamp : float | None = None ) -> bool :
121- if not self .is_running or self ._queue is None :
122- return False
123121 error = self ._current_error ()
124122 if error is not None :
125123 raise RuntimeError (f"Video encoding failed: { error } " ) from error
124+ if not self .is_running or self ._queue is None :
125+ return False
126+ if self ._stop_event .is_set ():
127+ return False
126128
127129 # Capture timestamp now, but only record it if frame is successfully enqueued
128130 if timestamp is None :
@@ -181,25 +183,32 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool:
181183 def stop (self ) -> None :
182184 if self ._writer is None and not self .is_running :
183185 return
186+
184187 self ._stop_event .set ()
185- if self ._queue is not None :
188+
189+ q = self ._queue
190+ if q is not None :
186191 try :
187- self . _queue .put_nowait (_SENTINEL )
192+ q .put_nowait (_SENTINEL )
188193 except queue .Full :
189194 pass
190- # self._queue.put(_SENTINEL)
191- if self ._writer_thread is not None :
192- self ._writer_thread .join (timeout = 5.0 )
193- if self ._writer_thread .is_alive ():
195+
196+ t = self ._writer_thread
197+ if t is not None :
198+ t .join (timeout = 5.0 )
199+ if t .is_alive ():
194200 logger .warning ("Video recorder thread did not terminate cleanly" )
201+ return
202+
195203 if self ._writer is not None :
196204 try :
197205 self ._writer .close ()
198206 except Exception :
199207 logger .exception ("Failed to close WriteGear cleanly" )
200208
201- # Save timestamps to JSON file
202- self ._save_timestamps ()
209+ if self ._writer_thread is None :
210+ # Save timestamps to JSON file
211+ self ._save_timestamps ()
203212
204213 self ._writer = None
205214 self ._writer_thread = None
@@ -236,45 +245,80 @@ def get_stats(self) -> RecorderStats | None:
236245 )
237246
238247 def _writer_loop (self ) -> None :
239- assert self ._queue is not None
248+ q = self ._queue
249+ if q is None :
250+ with self ._stats_lock :
251+ self ._encode_error = RuntimeError ("Writer loop started without a queue" )
252+ logger .error ("Writer loop started without a queue; exiting" )
253+ return
254+
240255 try :
241256 while True :
242257 try :
243- item = self . _queue .get (timeout = 0.1 )
258+ item = q .get (timeout = 0.1 )
244259 except queue .Empty :
245260 if self ._stop_event .is_set ():
246261 break
247262 continue
248- if item is _SENTINEL :
249- self ._queue .task_done ()
250- break
251- frame , timestamp = item
252- start = time .perf_counter ()
253- try :
254- assert self ._writer is not None
255- self ._writer .write (frame )
256- except OSError as exc :
263+ except Exception as exc :
257264 with self ._stats_lock :
258265 self ._encode_error = exc
259- logger .exception ("Video encoding failed while writing frame" )
260- self ._queue .task_done ()
266+ logger .exception ("Could not retrieve item from queue" , exc_info = exc )
261267 self ._stop_event .set ()
262268 break
263- elapsed = time .perf_counter () - start
264- now = time .perf_counter ()
265- with self ._stats_lock :
266- self ._frames_written += 1
267- self ._total_latency += elapsed
268- self ._last_latency = elapsed
269- self ._written_times .append (now )
270- self ._frame_timestamps .append (timestamp )
271- if now - self ._last_log_time >= 1.0 :
272- self ._compute_write_fps_locked ()
273- self ._queue .qsize ()
274- self ._last_log_time = now
275- self ._queue .task_done ()
269+
270+ stop_now = False
271+ try :
272+ if item is _SENTINEL :
273+ stop_now = True
274+ continue
275+
276+ frame , timestamp = item
277+ start = time .perf_counter ()
278+
279+ try :
280+ writer = self ._writer
281+ if writer is None :
282+ raise RuntimeError ("WriteGear writer is not initialized" )
283+ writer .write (frame )
284+ except Exception as exc : # <- broader than OSError
285+ with self ._stats_lock :
286+ self ._encode_error = exc
287+ logger .exception ("Video encoding failed while writing frame" , exc_info = exc )
288+ self ._stop_event .set ()
289+ stop_now = True
290+ continue
291+
292+ elapsed = time .perf_counter () - start
293+ now = time .perf_counter ()
294+ with self ._stats_lock :
295+ self ._frames_written += 1
296+ self ._total_latency += elapsed
297+ self ._last_latency = elapsed
298+ self ._written_times .append (now )
299+ self ._frame_timestamps .append (timestamp )
300+ if now - self ._last_log_time >= 1.0 :
301+ self ._compute_write_fps_locked ()
302+ self ._last_log_time = now
303+
304+ finally :
305+ # Ensure queue accounting is correct for every item pulled from q
306+ try :
307+ q .task_done ()
308+ except ValueError :
309+ pass
310+
311+ if stop_now :
312+ break
313+
276314 finally :
277315 self ._finalize_writer ()
316+ self ._save_timestamps ()
317+
318+ # Safe cleanup only once the thread is actually exiting
319+ self ._queue = None
320+ if self ._writer_thread is threading .current_thread ():
321+ self ._writer_thread = None
278322
279323 def _finalize_writer (self ) -> None :
280324 writer = self ._writer
0 commit comments