@@ -290,6 +290,94 @@ async fn run_periodic_compactions(logger: Arc<ReplicationLogger>) -> anyhow::Res
290290 }
291291}
292292
293+ fn tokenize_sql_keywords ( text : & str ) -> Vec < String > {
294+ let mut tokens = Vec :: new ( ) ;
295+ let mut chars = text. chars ( ) . peekable ( ) ;
296+ let mut current_token = String :: new ( ) ;
297+ let mut in_string_literal = false ;
298+ let mut string_delimiter = '\0' ;
299+
300+ while let Some ( ch) = chars. next ( ) {
301+ match ch {
302+ '\'' | '"' => {
303+ if !in_string_literal {
304+ in_string_literal = true ;
305+ string_delimiter = ch;
306+ } else if ch == string_delimiter {
307+ in_string_literal = false ;
308+ }
309+ }
310+ c if c. is_whitespace ( ) || "(){}[];," . contains ( c) => {
311+ if in_string_literal {
312+ continue ;
313+ }
314+ if !current_token. is_empty ( ) {
315+ tokens. push ( current_token. to_uppercase ( ) ) ;
316+ current_token. clear ( ) ;
317+ }
318+ }
319+ // Regular characters
320+ _ => {
321+ if !in_string_literal {
322+ current_token. push ( ch) ;
323+ }
324+ }
325+ }
326+ }
327+
328+ if !current_token. is_empty ( ) && !in_string_literal {
329+ tokens. push ( current_token. to_uppercase ( ) ) ;
330+ }
331+
332+ tokens
333+ }
334+
335+ fn is_complete_sql_statement ( sql : & str ) -> bool {
336+ let tokens = tokenize_sql_keywords ( sql) ;
337+ let mut begin_end_depth = 0 ;
338+ let mut case_depth = 0 ;
339+
340+ for ( i, token) in tokens. iter ( ) . enumerate ( ) {
341+ match token. as_str ( ) {
342+ "CASE" => {
343+ case_depth += 1 ;
344+ }
345+ "BEGIN" => {
346+ let next_token = tokens. get ( i + 1 ) . map ( |s| s. as_str ( ) ) ;
347+ let is_transaction_keyword = matches ! (
348+ next_token,
349+ Some ( "TRANSACTION" ) | Some ( "IMMEDIATE" ) | Some ( "EXCLUSIVE" ) | Some ( "DEFERRED" )
350+ ) ;
351+
352+ if !is_transaction_keyword {
353+ begin_end_depth += 1 ;
354+ }
355+ }
356+ "END" => {
357+ if case_depth > 0 {
358+ case_depth -= 1 ;
359+ } else {
360+ // This is a block-ending END (BEGIN/END, IF/END IF, etc.)
361+ let is_control_flow_end = tokens. get ( i + 1 )
362+ . map ( |next| matches ! ( next. as_str( ) , "IF" | "LOOP" | "WHILE" ) )
363+ . unwrap_or ( false ) ;
364+
365+ if !is_control_flow_end {
366+ begin_end_depth -= 1 ;
367+ }
368+ }
369+ }
370+ _ => { }
371+ }
372+
373+ if begin_end_depth < 0 {
374+ return false ;
375+ }
376+ }
377+
378+ begin_end_depth == 0 && case_depth == 0
379+ }
380+
293381async fn load_dump < S > ( dump : S , conn : PrimaryConnection ) -> crate :: Result < ( ) , LoadDumpError >
294382where
295383 S : Stream < Item = std:: io:: Result < Bytes > > + Unpin ,
@@ -311,12 +399,11 @@ where
311399 curr. clear ( ) ;
312400 continue ;
313401 }
314- // FIXME: it's well known bug that comment ending with semicolon will be handled incorrectly by currend dump processing code
315- let statement_end = trimmed. ends_with ( ';' ) ;
316402
317403 // we want to concat original(non-trimmed) lines as trimming will join all them in one
318404 // single-line statement which is incorrect if comments in the end are present
319405 line. push_str ( & curr) ;
406+ let statement_end = trimmed. ends_with ( ';' ) && is_complete_sql_statement ( & line) ;
320407 curr. clear ( ) ;
321408
322409 // This is a hack to ignore the libsql_wasm_func_table table because it is already created
@@ -374,6 +461,13 @@ where
374461 }
375462 tracing:: debug!( "loaded {} lines from dump" , line_id) ;
376463
464+ if !line. trim ( ) . is_empty ( ) {
465+ return Err ( LoadDumpError :: InvalidSqlInput ( format ! (
466+ "Incomplete SQL statement at end of dump: {}" ,
467+ line. trim( )
468+ ) ) ) ;
469+ }
470+
377471 if !conn. is_autocommit ( ) . await . unwrap ( ) {
378472 tokio:: task:: spawn_blocking ( {
379473 let conn = conn. clone ( ) ;
0 commit comments