11use std:: ffi:: { c_int, c_void} ;
22use std:: path:: { Path , PathBuf } ;
3- use std:: sync:: atomic:: { AtomicBool , Ordering } ;
43use std:: sync:: Arc ;
54
65use metrics:: { histogram, increment_counter} ;
@@ -292,14 +291,16 @@ impl<W: WalHook> std::fmt::Debug for Connection<W> {
292291}
293292
294293/// A slot for holding the state of a transaction lock permit
294+ #[ allow( unused) ]
295295struct TxnSlot < T : WalHook > {
296296 /// Pointer to the connection holding the lock. Used to rollback the transaction when the lock
297297 /// is stolen.
298298 conn : Arc < Mutex < Connection < T > > > ,
299299 /// Time at which the transaction can be stolen
300300 created_at : tokio:: time:: Instant ,
301301 /// The transaction lock was stolen
302- is_stolen : AtomicBool ,
302+ is_stolen : parking_lot:: Mutex < bool > ,
303+ txn_timeout : Duration ,
303304}
304305
305306impl < T : WalHook > TxnSlot < T > {
@@ -321,7 +322,7 @@ impl<T: WalHook> TxnSlot<T> {
321322
322323impl < T : WalHook > std:: fmt:: Debug for TxnSlot < T > {
323324 fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
324- let stolen = self . is_stolen . load ( Ordering :: Relaxed ) ;
325+ let stolen = self . is_stolen . lock ( ) ;
325326 let time_left = self . expires_at ( ) . duration_since ( Instant :: now ( ) ) ;
326327 write ! (
327328 f,
@@ -365,15 +366,24 @@ impl<W: WalHook> Default for TxnState<W> {
365366/// - If the handler waits until the txn timeout and isn't notified of the termination of the txn, it will attempt to steal the lock.
366367/// This is done by calling rollback on the slot's txn, and marking the slot as stolen.
367368/// - When a connection notices that it's slot has been stolen, it returns a timedout error to the next request.
368- unsafe extern "C" fn busy_handler < W : WalHook > ( state : * mut c_void , _retries : c_int ) -> c_int {
369- let state = & * ( state as * mut TxnState < W > ) ;
369+ const MAX_BUSY_RETRIES : c_int = 512 ;
370+
371+ unsafe extern "C" fn busy_handler < T : WalHook > ( state : * mut c_void , retries : c_int ) -> c_int {
372+ let state = & * ( state as * mut TxnState < T > ) ;
370373 let lock = state. slot . read ( ) ;
371374 // we take a reference to the slot we will attempt to steal. this is to make sure that we
372375 // actually steal the correct lock.
373376 let slot = match & * lock {
374377 Some ( slot) => slot. clone ( ) ,
375378 // fast path: there is no slot, try to acquire the lock again
376- None => return 1 ,
379+ None if retries < 512 => {
380+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 10 ) ) ;
381+ return 1 ;
382+ }
383+ None => {
384+ tracing:: info!( "Failed to steal connection lock after {MAX_BUSY_RETRIES} retries." ) ;
385+ return 0 ;
386+ }
377387 } ;
378388
379389 tokio:: runtime:: Handle :: current ( ) . block_on ( async move {
@@ -396,9 +406,17 @@ unsafe extern "C" fn busy_handler<W: WalHook>(state: *mut c_void, _retries: c_in
396406 if let Some ( ref s) = * lock {
397407 // The state contains the same lock as the one we're attempting to steal
398408 if Arc :: ptr_eq( s, & slot) {
399- // We check that slot wasn't already stolen, and that their is still a slot.
400- // The ordering is relaxed because the atomic is only set under the slot lock.
401- if slot. is_stolen. compare_exchange( false , true , Ordering :: Relaxed , Ordering :: Relaxed ) . is_ok( ) {
409+ let can_steal = {
410+ let mut can_steal = false ;
411+ let mut is_stolen = slot. is_stolen. lock( ) ;
412+ if !* is_stolen {
413+ can_steal = true ;
414+ * is_stolen = true ;
415+ }
416+ can_steal
417+ } ;
418+
419+ if can_steal {
402420 // The connection holding the current txn will set itself as stolen when it
403421 // detects a timeout, so if we arrive to this point, then there is
404422 // necessarily a slot, and this slot has to be the one we attempted to
@@ -419,7 +437,6 @@ unsafe extern "C" fn busy_handler<W: WalHook>(state: *mut c_void, _retries: c_in
419437 1
420438 }
421439 }
422-
423440 } )
424441}
425442
@@ -500,9 +517,7 @@ impl<W: WalHook> Connection<W> {
500517 pgm : Program ,
501518 mut builder : B ,
502519 ) -> Result < B > {
503- use rusqlite:: TransactionState as Tx ;
504-
505- let state = this. lock ( ) . state . clone ( ) ;
520+ let txn_timeout = TXN_TIMEOUT ;
506521
507522 let mut results = Vec :: with_capacity ( pgm. steps . len ( ) ) ;
508523 builder. init ( & this. lock ( ) . builder_config ) ?;
@@ -515,12 +530,17 @@ impl<W: WalHook> Connection<W> {
515530 for step in pgm. steps ( ) {
516531 let mut lock = this. lock ( ) ;
517532
518- if let Some ( slot) = & lock. slot {
519- if slot. is_stolen . load ( Ordering :: Relaxed ) || Instant :: now ( ) > slot. expires_at ( ) {
520- // we mark ourselves as stolen to notify any waiting lock thief.
521- slot. is_stolen . store ( true , Ordering :: Relaxed ) ;
522- lock. rollback ( ) ;
523- has_timeout = true ;
533+ if !has_timeout {
534+ if let Some ( slot) = & lock. slot {
535+ let mut is_stolen = slot. is_stolen . lock ( ) ;
536+ if * is_stolen || Instant :: now ( ) > slot. expires_at ( ) {
537+ // we mark ourselves as stolen to notify any waiting lock thief.
538+ if !* is_stolen {
539+ lock. rollback ( ) ;
540+ }
541+ * is_stolen = true ;
542+ has_timeout = true ;
543+ }
524544 }
525545 }
526546
@@ -533,60 +553,71 @@ impl<W: WalHook> Connection<W> {
533553 continue ;
534554 }
535555
536- let res = lock. execute_step ( step, & results, & mut builder) ?;
537-
538- let new_state = lock. conn . transaction_state ( Some ( DatabaseName :: Main ) ) ?;
539- match ( previous_state, new_state) {
540- // lock was upgraded, claim the slot
541- ( Tx :: None | Tx :: Read , Tx :: Write ) => {
542- let slot = Arc :: new ( TxnSlot {
543- conn : this. clone ( ) ,
544- created_at : Instant :: now ( ) ,
545- is_stolen : AtomicBool :: new ( false ) ,
546- } ) ;
547-
548- lock. slot . replace ( slot. clone ( ) ) ;
549- state. slot . write ( ) . replace ( slot) ;
550- }
551- // lock was downgraded, notify a waiter
552- ( Tx :: Write , Tx :: None | Tx :: Read ) => {
553- let old_slot = lock
554- . slot
555- . take ( )
556- . expect ( "there should be a slot right after downgrading a txn" ) ;
557- let mut maybe_state_slot = state. slot . write ( ) ;
558- // We need to make sure that the state slot is our slot before removing it.
559- if let Some ( ref state_slot) = * maybe_state_slot {
560- if Arc :: ptr_eq ( state_slot, & old_slot) {
561- maybe_state_slot. take ( ) ;
562- }
563- }
564-
565- drop ( maybe_state_slot) ;
566-
567- state. notify . notify_waiters ( ) ;
568- }
569- // nothing to do
570- ( _, _) => ( ) ,
571- }
572-
573- previous_state = new_state;
556+ let ret = lock. execute_step ( step, & results, & mut builder) ;
557+ // /!\ always make sure that the state is updated before returning
558+ previous_state = lock. update_state ( this. clone ( ) , previous_state, txn_timeout) ?;
559+ let res = ret?;
574560
575561 results. push ( res) ;
576562 }
577563
578564 {
579565 let mut lock = this. lock ( ) ;
580566 let is_autocommit = lock. conn . is_autocommit ( ) ;
581- builder. finish (
582- * ( lock. current_frame_no_receiver . borrow_and_update ( ) ) ,
583- is_autocommit,
584- ) ?;
567+ let current_fno = * lock. current_frame_no_receiver . borrow_and_update ( ) ;
568+ builder. finish ( current_fno, is_autocommit) ?;
585569 }
586570
587571 Ok ( builder)
588572 }
589573
574+ fn update_state (
575+ & mut self ,
576+ arc_this : Arc < Mutex < Self > > ,
577+ previous_state : TransactionState ,
578+ txn_timeout : Duration ,
579+ ) -> Result < TransactionState > {
580+ use rusqlite:: TransactionState as Tx ;
581+
582+ let new_state = self . conn . transaction_state ( Some ( DatabaseName :: Main ) ) ?;
583+ match ( previous_state, new_state) {
584+ // lock was upgraded, claim the slot
585+ ( Tx :: None | Tx :: Read , Tx :: Write ) => {
586+ let slot = Arc :: new ( TxnSlot {
587+ conn : arc_this,
588+ created_at : Instant :: now ( ) ,
589+ is_stolen : false . into ( ) ,
590+ txn_timeout,
591+ } ) ;
592+
593+ self . slot . replace ( slot. clone ( ) ) ;
594+ self . state . slot . write ( ) . replace ( slot) ;
595+ }
596+ // lock was downgraded, notify a waiter
597+ ( Tx :: Write , Tx :: None | Tx :: Read ) => {
598+ let old_slot = self
599+ . slot
600+ . take ( )
601+ . expect ( "there should be a slot right after downgrading a txn" ) ;
602+ let mut maybe_state_slot = self . state . slot . write ( ) ;
603+ // We need to make sure that the state slot is our slot before removing it.
604+ if let Some ( ref state_slot) = * maybe_state_slot {
605+ if Arc :: ptr_eq ( state_slot, & old_slot) {
606+ maybe_state_slot. take ( ) ;
607+ }
608+ }
609+
610+ drop ( maybe_state_slot) ;
611+
612+ self . state . notify . notify_waiters ( ) ;
613+ }
614+ // nothing to do
615+ ( _, _) => ( ) ,
616+ }
617+
618+ Ok ( new_state)
619+ }
620+
590621 fn execute_step (
591622 & mut self ,
592623 step : & Step ,
@@ -1053,8 +1084,12 @@ mod test {
10531084 TestBuilder :: default ( ) ,
10541085 )
10551086 . unwrap ( ) ;
1056- assert_eq ! ( conn. txn_status( ) . unwrap( ) , TxnStatus :: Txn ) ;
1057- assert ! ( builder. into_ret( ) [ 0 ] . is_ok( ) ) ;
1087+ let ret = & builder. into_ret ( ) [ 0 ] ;
1088+ assert ! (
1089+ ( ret. is_ok( ) && matches!( conn. txn_status( ) . unwrap( ) , TxnStatus :: Txn ) )
1090+ || ( matches!( ret, Err ( Error :: RusqliteErrorExtended ( _, 5 ) ) )
1091+ && matches!( conn. txn_status( ) . unwrap( ) , TxnStatus :: Init ) )
1092+ ) ;
10581093 } ) ;
10591094 }
10601095
0 commit comments