@@ -56,6 +56,44 @@ def get_excluded_fields(self, model_name):
5656 }
5757 return excluded_fields_map .get (model_name , [])
5858
59+ def process_batch (self , event_model , event_records , model_name , dry_run , batch_start_time , processed , backfill_count , * , is_final_batch = False ):
60+ """Process a batch of event records by bulk creating them in the database."""
61+ if not event_records :
62+ return 0 , batch_start_time
63+
64+ if dry_run :
65+ actually_created = len (event_records )
66+ else :
67+ try :
68+ attempted = len (event_records )
69+ # No need to pass batch_size since we're already batching ourselves
70+ created_objects = event_model .objects .bulk_create (event_records )
71+ actually_created = len (created_objects ) if created_objects else 0
72+
73+ if actually_created != attempted :
74+ logger .warning (
75+ f"bulk_create for { model_name } : attempted { attempted } , "
76+ f"actually created { actually_created } ({ attempted - actually_created } skipped)" ,
77+ )
78+ except Exception :
79+ logger .exception (f"Failed to bulk create events for { model_name } " )
80+ raise
81+
82+ # Calculate timing after the actual database operation
83+ batch_end_time = time .time ()
84+ batch_duration = batch_end_time - batch_start_time
85+ batch_records_per_second = len (event_records ) / batch_duration if batch_duration > 0 else 0
86+
87+ # Log batch timing
88+ if is_final_batch :
89+ self .stdout .write (f" Final batch: { batch_duration :.2f} s ({ batch_records_per_second :.1f} records/sec)" )
90+ else :
91+ progress = (processed + actually_created ) / backfill_count * 100
92+ self .stdout .write (f" Processed { processed + actually_created :,} /{ backfill_count :,} records needing backfill ({ progress :.1f} %) - "
93+ f"Last batch: { batch_duration :.2f} s ({ batch_records_per_second :.1f} records/sec)" )
94+
95+ return actually_created , batch_end_time
96+
5997 def enable_db_logging (self ):
6098 """Enable database query logging for this command."""
6199 # Store original DEBUG setting
@@ -257,70 +295,32 @@ def handle(self, *args, **options):
257295
258296 event_records .append (EventModel (** event_data ))
259297
260- except Exception as e :
298+ except Exception :
261299 failed_records .append (instance .id )
262300 logger .exception (
263301 f"Failed to prepare event for { model_name } ID { instance .id } " ,
264302 )
265303
266304 # Bulk create when we hit batch_size records
267305 if len (event_records ) >= batch_size :
268- batch_end_time = time .time ()
269- batch_duration = batch_end_time - batch_start_time
270- batch_records_per_second = len (event_records ) / batch_duration if batch_duration > 0 else 0
271-
272- if not dry_run and event_records :
273- try :
274- attempted = len (event_records )
275- created_objects = EventModel .objects .bulk_create (event_records , batch_size = batch_size )
276- actually_created = len (created_objects ) if created_objects else 0
277- processed += actually_created
278-
279- if actually_created != attempted :
280- logger .warning (
281- f"bulk_create for { model_name } : attempted { attempted } , "
282- f"actually created { actually_created } ({ attempted - actually_created } skipped)" ,
283- )
284- except Exception as e :
285- logger .exception (f"Failed to bulk create events for { model_name } " )
286- raise
287- elif dry_run :
288- processed += len (event_records )
306+ # Process the batch
307+ batch_processed , batch_start_time = self .process_batch (
308+ EventModel , event_records , model_name , dry_run ,
309+ batch_start_time , processed , backfill_count ,
310+ )
311+ processed += batch_processed
289312
290313 event_records = [] # Reset for next batch
291314 batch_start_time = time .time () # Reset batch timer
292315
293- # Progress update with batch timing
294- progress = (processed / backfill_count ) * 100
295- self .stdout .write (f" Processed { processed :,} /{ backfill_count :,} records needing backfill ({ progress :.1f} %) - "
296- f"Last batch: { batch_duration :.2f} s ({ batch_records_per_second :.1f} records/sec)" )
297-
298316 # Handle remaining records
299317 if event_records :
300- batch_end_time = time .time ()
301- batch_duration = batch_end_time - batch_start_time
302- batch_records_per_second = len (event_records ) / batch_duration if batch_duration > 0 else 0
303-
304- if not dry_run :
305- try :
306- attempted = len (event_records )
307- created_objects = EventModel .objects .bulk_create (event_records , batch_size = batch_size )
308- actually_created = len (created_objects ) if created_objects else 0
309- processed += actually_created
310-
311- if actually_created != attempted :
312- logger .warning (
313- f"bulk_create final batch for { model_name } : attempted { attempted } , "
314- f"actually created { actually_created } ({ attempted - actually_created } skipped)" ,
315- )
316- except Exception as e :
317- logger .exception (f"Failed to bulk create final batch for { model_name } " )
318- raise
319- else :
320- processed += len (event_records )
321-
322- # Log final batch timing
323- self .stdout .write (f" Final batch: { batch_duration :.2f} s ({ batch_records_per_second :.1f} records/sec)" )
318+ # Process the final batch
319+ batch_processed , _ = self .process_batch (
320+ EventModel , event_records , model_name , dry_run ,
321+ batch_start_time , processed , backfill_count , is_final_batch = True ,
322+ )
323+ processed += batch_processed
324324
325325 # Final progress update
326326 if backfill_count > 0 :
0 commit comments