@@ -2,6 +2,7 @@ use crate::{local::Connection, util::ConnectorService, Error, Result};
22
33use std:: path:: Path ;
44
5+ use crate :: replication:: remote_client:: time;
56use bytes:: Bytes ;
67use chrono:: Utc ;
78use http:: { HeaderValue , StatusCode } ;
@@ -20,6 +21,7 @@ const METADATA_VERSION: u32 = 0;
2021
2122const DEFAULT_MAX_RETRIES : usize = 5 ;
2223const DEFAULT_PUSH_BATCH_SIZE : u32 = 128 ;
24+ const DEFAULT_PULL_BATCH_SIZE : u32 = 128 ;
2325
2426#[ derive( thiserror:: Error , Debug ) ]
2527#[ non_exhaustive]
@@ -66,6 +68,8 @@ pub enum SyncError {
6668 InvalidLocalGeneration ( u32 , u32 ) ,
6769 #[ error( "invalid local state: {0}" ) ]
6870 InvalidLocalState ( String ) ,
71+ #[ error( "server returned invalid length of frames: {0}" ) ]
72+ InvalidPullFrameBytes ( usize ) ,
6973}
7074
7175impl SyncError {
@@ -98,8 +102,8 @@ pub enum PushStatus {
98102}
99103
100104pub enum PullResult {
101- /// A frame was successfully pulled.
102- Frame ( Bytes ) ,
105+ /// Frames were successfully pulled.
106+ Frames ( Bytes ) ,
103107 /// We've reached the end of the generation.
104108 EndOfGeneration { max_generation : u32 } ,
105109}
@@ -122,6 +126,7 @@ pub struct SyncContext {
122126 auth_token : Option < HeaderValue > ,
123127 max_retries : usize ,
124128 push_batch_size : u32 ,
129+ pull_batch_size : u32 ,
125130 /// The current durable generation.
126131 durable_generation : u32 ,
127132 /// Represents the max_frame_no from the server.
@@ -154,6 +159,7 @@ impl SyncContext {
154159 auth_token,
155160 max_retries : DEFAULT_MAX_RETRIES ,
156161 push_batch_size : DEFAULT_PUSH_BATCH_SIZE ,
162+ pull_batch_size : DEFAULT_PULL_BATCH_SIZE ,
157163 client,
158164 durable_generation : 0 ,
159165 durable_frame_num : 0 ,
@@ -175,7 +181,7 @@ impl SyncContext {
175181 }
176182
177183 #[ tracing:: instrument( skip( self ) ) ]
178- pub ( crate ) async fn pull_one_frame (
184+ pub ( crate ) async fn pull_frames (
179185 & mut self ,
180186 generation : u32 ,
181187 frame_no : u32 ,
@@ -185,9 +191,10 @@ impl SyncContext {
185191 self . sync_url,
186192 generation,
187193 frame_no,
188- frame_no + 1
194+ // the server expects the range of [start, end) frames, i.e. end is exclusive
195+ frame_no + self . pull_batch_size
189196 ) ;
190- tracing:: debug!( "pulling frame" ) ;
197+ tracing:: debug!( "pulling frame (uri={})" , uri ) ;
191198 self . pull_with_retry ( uri, self . max_retries ) . await
192199 }
193200
@@ -417,20 +424,39 @@ impl SyncContext {
417424 . map_err ( SyncError :: HttpDispatch ) ?;
418425
419426 if res. status ( ) . is_success ( ) {
420- let frame = hyper:: body:: to_bytes ( res. into_body ( ) )
427+ let frames = hyper:: body:: to_bytes ( res. into_body ( ) )
421428 . await
422429 . map_err ( SyncError :: HttpBody ) ?;
423- return Ok ( PullResult :: Frame ( frame) ) ;
430+ // a success result should always return some frames
431+ if frames. is_empty ( ) {
432+ tracing:: error!( "server returned empty frames in pull response" ) ;
433+ return Err ( SyncError :: InvalidPullFrameBytes ( 0 ) . into ( ) ) ;
434+ }
435+ // the minimum payload size cannot be less than a single frame
436+ if frames. len ( ) < FRAME_SIZE {
437+ tracing:: error!(
438+ "server returned frames with invalid length: {} < {}" ,
439+ frames. len( ) ,
440+ FRAME_SIZE
441+ ) ;
442+ return Err ( SyncError :: InvalidPullFrameBytes ( frames. len ( ) ) . into ( ) ) ;
443+ }
444+ return Ok ( PullResult :: Frames ( frames) ) ;
424445 }
425446 // BUG ALERT: The server returns a 500 error if the remote database is empty.
426447 // This is a bug and should be fixed.
427448 if res. status ( ) == StatusCode :: BAD_REQUEST
428449 || res. status ( ) == StatusCode :: INTERNAL_SERVER_ERROR
429450 {
451+ let status = res. status ( ) ;
430452 let res_body = hyper:: body:: to_bytes ( res. into_body ( ) )
431453 . await
432454 . map_err ( SyncError :: HttpBody ) ?;
433-
455+ tracing:: trace!(
456+ "server returned: {} body: {}" ,
457+ status,
458+ String :: from_utf8_lossy( & res_body[ ..] )
459+ ) ;
434460 let resp = serde_json:: from_slice :: < serde_json:: Value > ( & res_body[ ..] )
435461 . map_err ( SyncError :: JsonDecode ) ?;
436462
@@ -650,22 +676,33 @@ impl SyncContext {
650676
651677 let req = req. body ( Body :: empty ( ) ) . expect ( "valid request" ) ;
652678
653- let res = self
654- . client
655- . request ( req)
656- . await
657- . map_err ( SyncError :: HttpDispatch ) ?;
679+ let ( res, http_duration) = time ( self . client . request ( req) ) . await ;
680+ let res = res. map_err ( SyncError :: HttpDispatch ) ?;
658681
659682 if !res. status ( ) . is_success ( ) {
660683 let status = res. status ( ) ;
661684 let body = hyper:: body:: to_bytes ( res. into_body ( ) )
662685 . await
663686 . map_err ( SyncError :: HttpBody ) ?;
687+ tracing:: error!(
688+ "failed to pull db file from remote server, status={}, body={}, url={}, duration={:?}" ,
689+ status,
690+ String :: from_utf8_lossy( & body) ,
691+ uri,
692+ http_duration
693+ ) ;
664694 return Err (
665695 SyncError :: PullFrame ( status, String :: from_utf8_lossy ( & body) . to_string ( ) ) . into ( ) ,
666696 ) ;
667697 }
668698
699+ tracing:: debug!(
700+ "pulled db file from remote server, status={}, url={}, duration={:?}" ,
701+ res. status( ) ,
702+ uri,
703+ http_duration
704+ ) ;
705+
669706 // todo: do streaming write to the disk
670707 let bytes = hyper:: body:: to_bytes ( res. into_body ( ) )
671708 . await
@@ -887,6 +924,11 @@ async fn try_push(
887924 } )
888925}
889926
927+ /// PAGE_SIZE used by the sync / diskless server
928+ const PAGE_SIZE : usize = 4096 ;
929+ const FRAME_HEADER_SIZE : usize = 24 ;
930+ const FRAME_SIZE : usize = PAGE_SIZE + FRAME_HEADER_SIZE ;
931+
890932pub async fn try_pull (
891933 sync_ctx : & mut SyncContext ,
892934 conn : & Connection ,
@@ -898,10 +940,39 @@ pub async fn try_pull(
898940 loop {
899941 let generation = sync_ctx. durable_generation ( ) ;
900942 let frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
901- match sync_ctx. pull_one_frame ( generation, frame_no) . await {
902- Ok ( PullResult :: Frame ( frame) ) => {
903- insert_handle. insert ( & frame) ?;
904- sync_ctx. durable_frame_num = frame_no;
943+ match sync_ctx. pull_frames ( generation, frame_no) . await {
944+ Ok ( PullResult :: Frames ( frames) ) => {
945+ tracing:: debug!(
946+ "pull_frames: generation={}, start_frame_no={} (batch_size={}), frame_size={}" ,
947+ generation,
948+ frame_no,
949+ sync_ctx. pull_batch_size,
950+ frames. len( ) ,
951+ ) ;
952+ if frames. len ( ) % FRAME_SIZE != 0 {
953+ tracing:: error!(
954+ "frame size {} is not a multiple of the expected size {}" ,
955+ frames. len( ) ,
956+ FRAME_SIZE ,
957+ ) ;
958+ return Err ( SyncError :: InvalidPullFrameBytes ( frames. len ( ) ) . into ( ) ) ;
959+ }
960+ for chunk in frames. chunks ( FRAME_SIZE ) {
961+ tracing:: debug!(
962+ "inserting frame (frame_no={})" ,
963+ sync_ctx. durable_frame_num + 1
964+ ) ;
965+ let r = insert_handle. insert ( & chunk) ;
966+ if let Err ( e) = r {
967+ tracing:: debug!(
968+ "insert error (frame= {}) : {:?}" ,
969+ sync_ctx. durable_frame_num + 1 ,
970+ e
971+ ) ;
972+ return Err ( e) ;
973+ }
974+ sync_ctx. durable_frame_num += 1 ;
975+ }
905976 }
906977 Ok ( PullResult :: EndOfGeneration { max_generation } ) => {
907978 // If there are no more generations to pull, we're done.
@@ -920,7 +991,7 @@ pub async fn try_pull(
920991 insert_handle. begin ( ) ?;
921992 }
922993 Err ( e) => {
923- tracing:: debug!( "pull_one_frame error: {:?}" , e) ;
994+ tracing:: debug!( "pull_frames error: {:?}" , e) ;
924995 err. replace ( e) ;
925996 break ;
926997 }
0 commit comments