Skip to content

Commit 910a649

Browse files
authored
Merge branch 'main' into fix/fixup-markdown-documents
2 parents 70efc56 + c6e4e09 commit 910a649

4 files changed

Lines changed: 149 additions & 48 deletions

File tree

libsql-ffi/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ pub fn build_bundled(out_dir: &str, out_path: &Path) {
274274
cfg.file(&sqlean);
275275

276276
cfg.include(format!("{BUNDLED_DIR}/sqlean/"));
277+
cfg.include(format!("{BUNDLED_DIR}/src/"));
277278
}
278279

279280
if cfg!(feature = "wasmtime-bindings") {

libsql/src/local/connection.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction};
99
use crate::TransactionBehavior;
1010

1111
use libsql_sys::ffi;
12+
use std::cell::RefCell;
1213
use std::{ffi::c_int, fmt, path::Path, sync::Arc};
1314

1415
/// A connection to a libSQL database.
@@ -451,6 +452,20 @@ impl Connection {
451452
}
452453
}
453454

455+
pub(crate) fn wal_checkpoint(&self, truncate: bool) -> Result<()> {
456+
let rc = unsafe { libsql_sys::ffi::sqlite3_wal_checkpoint_v2(self.handle(), std::ptr::null(), truncate as i32, std::ptr::null_mut(), std::ptr::null_mut()) };
457+
if rc != 0 {
458+
let err_msg = unsafe { libsql_sys::ffi::sqlite3_errmsg(self.handle()) };
459+
let err_msg = unsafe { std::ffi::CStr::from_ptr(err_msg) };
460+
let err_msg = err_msg.to_string_lossy().to_string();
461+
return Err(crate::errors::Error::SqliteFailure(
462+
rc as std::ffi::c_int,
463+
format!("Failed to checkpoint WAL: {}", err_msg),
464+
));
465+
}
466+
Ok(())
467+
}
468+
454469
pub(crate) fn wal_frame_count(&self) -> u32 {
455470
let mut max_frame_no: std::os::raw::c_uint = 0;
456471
unsafe { libsql_sys::ffi::libsql_wal_frame_count(self.handle(), &mut max_frame_no) };
@@ -537,25 +552,43 @@ impl Connection {
537552

538553
pub(crate) fn wal_insert_handle(&self) -> Result<WalInsertHandle<'_>> {
539554
self.wal_insert_begin()?;
540-
Ok(WalInsertHandle { conn: self })
555+
Ok(WalInsertHandle { conn: self, in_session: RefCell::new(true) })
541556
}
542557
}
543558

544559
pub(crate) struct WalInsertHandle<'a> {
545560
conn: &'a Connection,
561+
in_session: RefCell<bool>
546562
}
547563

548564
impl WalInsertHandle<'_> {
549565
pub fn insert(&self, frame: &[u8]) -> Result<()> {
566+
assert!(*self.in_session.borrow());
550567
self.conn.wal_insert_frame(frame)
551568
}
569+
570+
pub fn begin(&self) -> Result<()> {
571+
assert!(!*self.in_session.borrow());
572+
self.conn.wal_insert_begin()?;
573+
self.in_session.replace(true);
574+
Ok(())
575+
}
576+
577+
pub fn end(&self) -> Result<()> {
578+
assert!(*self.in_session.borrow());
579+
self.conn.wal_insert_end()?;
580+
self.in_session.replace(false);
581+
Ok(())
582+
}
552583
}
553584

554585
impl Drop for WalInsertHandle<'_> {
555586
fn drop(&mut self) {
556-
if let Err(err) = self.conn.wal_insert_end() {
557-
tracing::error!("{:?}", err);
558-
Err(err).unwrap()
587+
if *self.in_session.borrow() {
588+
if let Err(err) = self.conn.wal_insert_end() {
589+
tracing::error!("{:?}", err);
590+
Err(err).unwrap()
591+
}
559592
}
560593
}
561594
}

libsql/src/sync.rs

Lines changed: 107 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,23 @@ impl SyncError {
6161
}
6262
}
6363

64+
pub enum PullResult {
65+
/// A frame was successfully pulled.
66+
Frame(Bytes),
67+
/// We've reached the end of the generation.
68+
EndOfGeneration { max_generation: u32 },
69+
}
70+
6471
pub struct SyncContext {
6572
db_path: String,
6673
client: hyper::Client<ConnectorService, Body>,
6774
sync_url: String,
6875
auth_token: Option<HeaderValue>,
6976
max_retries: usize,
77+
/// The current durable generation.
78+
durable_generation: u32,
7079
/// Represents the max_frame_no from the server.
7180
durable_frame_num: u32,
72-
/// Represents the current checkpoint generation.
73-
generation: u32,
7481
}
7582

7683
impl SyncContext {
@@ -96,8 +103,8 @@ impl SyncContext {
96103
auth_token,
97104
max_retries: DEFAULT_MAX_RETRIES,
98105
client,
106+
durable_generation: 1,
99107
durable_frame_num: 0,
100-
generation: 1,
101108
};
102109

103110
if let Err(e) = me.read_metadata().await {
@@ -115,7 +122,7 @@ impl SyncContext {
115122
&mut self,
116123
generation: u32,
117124
frame_no: u32,
118-
) -> Result<Option<Bytes>> {
125+
) -> Result<PullResult> {
119126
let uri = format!(
120127
"{}/sync/{}/{}/{}",
121128
self.sync_url,
@@ -124,13 +131,7 @@ impl SyncContext {
124131
frame_no + 1
125132
);
126133
tracing::debug!("pulling frame");
127-
match self.pull_with_retry(uri, self.max_retries).await? {
128-
Some(frame) => {
129-
self.durable_frame_num = frame_no;
130-
Ok(Some(frame))
131-
}
132-
None => Ok(None),
133-
}
134+
self.pull_with_retry(uri, self.max_retries).await
134135
}
135136

136137
#[tracing::instrument(skip(self, frame))]
@@ -149,7 +150,7 @@ impl SyncContext {
149150
);
150151
tracing::debug!("pushing frame");
151152

152-
let durable_frame_num = self.push_with_retry(uri, frame, self.max_retries).await?;
153+
let (generation, durable_frame_num) = self.push_with_retry(uri, frame, self.max_retries).await?;
153154

154155
if durable_frame_num > frame_no {
155156
tracing::error!(
@@ -178,12 +179,14 @@ impl SyncContext {
178179
tracing::debug!(?durable_frame_num, "frame successfully pushed");
179180

180181
// Update our last known max_frame_no from the server.
182+
tracing::debug!(?generation, ?durable_frame_num, "updating remote generation and durable_frame_num");
183+
self.durable_generation = generation;
181184
self.durable_frame_num = durable_frame_num;
182185

183186
Ok(durable_frame_num)
184187
}
185188

186-
async fn push_with_retry(&self, uri: String, frame: Bytes, max_retries: usize) -> Result<u32> {
189+
async fn push_with_retry(&self, uri: String, frame: Bytes, max_retries: usize) -> Result<(u32, u32)> {
187190
let mut nr_retries = 0;
188191
loop {
189192
let mut req = http::Request::post(uri.clone());
@@ -213,6 +216,14 @@ impl SyncContext {
213216
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
214217
.map_err(SyncError::JsonDecode)?;
215218

219+
let generation = resp
220+
.get("generation")
221+
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
222+
223+
let generation = generation
224+
.as_u64()
225+
.ok_or_else(|| SyncError::JsonValue(generation.clone()))?;
226+
216227
let max_frame_no = resp
217228
.get("max_frame_no")
218229
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
@@ -221,7 +232,7 @@ impl SyncContext {
221232
.as_u64()
222233
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;
223234

224-
return Ok(max_frame_no as u32);
235+
return Ok((generation as u32, max_frame_no as u32));
225236
}
226237

227238
// If we've retried too many times or the error is not a server error,
@@ -244,7 +255,7 @@ impl SyncContext {
244255
}
245256
}
246257

247-
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Option<Bytes>> {
258+
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<PullResult> {
248259
let mut nr_retries = 0;
249260
loop {
250261
let mut req = http::Request::builder().method("GET").uri(uri.clone());
@@ -268,10 +279,26 @@ impl SyncContext {
268279
let frame = hyper::body::to_bytes(res.into_body())
269280
.await
270281
.map_err(SyncError::HttpBody)?;
271-
return Ok(Some(frame));
282+
return Ok(PullResult::Frame(frame));
272283
}
273-
if res.status() == StatusCode::BAD_REQUEST {
274-
return Ok(None);
284+
// BUG ALERT: The server returns a 500 error if the remote database is empty.
285+
// This is a bug and should be fixed.
286+
if res.status() == StatusCode::BAD_REQUEST || res.status() == StatusCode::INTERNAL_SERVER_ERROR {
287+
let res_body = hyper::body::to_bytes(res.into_body())
288+
.await
289+
.map_err(SyncError::HttpBody)?;
290+
291+
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
292+
.map_err(SyncError::JsonDecode)?;
293+
294+
let generation = resp
295+
.get("generation")
296+
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
297+
298+
let generation = generation
299+
.as_u64()
300+
.ok_or_else(|| SyncError::JsonValue(generation.clone()))?;
301+
return Ok(PullResult::EndOfGeneration { max_generation: generation as u32 });
275302
}
276303
// If we've retried too many times or the error is not a server error,
277304
// return the error.
@@ -293,12 +320,18 @@ impl SyncContext {
293320
}
294321
}
295322

323+
324+
pub(crate) fn next_generation(&mut self) {
325+
self.durable_generation += 1;
326+
self.durable_frame_num = 0;
327+
}
328+
296329
pub(crate) fn durable_frame_num(&self) -> u32 {
297330
self.durable_frame_num
298331
}
299332

300-
pub(crate) fn generation(&self) -> u32 {
301-
self.generation
333+
pub(crate) fn durable_generation(&self) -> u32 {
334+
self.durable_generation
302335
}
303336

304337
pub(crate) async fn write_metadata(&mut self) -> Result<()> {
@@ -308,7 +341,7 @@ impl SyncContext {
308341
hash: 0,
309342
version: METADATA_VERSION,
310343
durable_frame_num: self.durable_frame_num,
311-
generation: self.generation,
344+
generation: self.durable_generation,
312345
};
313346

314347
metadata.set_hash();
@@ -350,8 +383,8 @@ impl SyncContext {
350383
metadata
351384
);
352385

386+
self.durable_generation = metadata.generation;
353387
self.durable_frame_num = metadata.durable_frame_num;
354-
self.generation = metadata.generation;
355388

356389
Ok(())
357390
}
@@ -436,10 +469,7 @@ pub async fn sync_offline(
436469
sync_ctx: &mut SyncContext,
437470
conn: &Connection,
438471
) -> Result<crate::database::Replicated> {
439-
let durable_frame_no = sync_ctx.durable_frame_num();
440-
let max_frame_no = conn.wal_frame_count();
441-
442-
if max_frame_no > durable_frame_no {
472+
if is_ahead_of_remote(&sync_ctx, &conn) {
443473
match try_push(sync_ctx, conn).await {
444474
Ok(rep) => Ok(rep),
445475
Err(Error::Sync(err)) => {
@@ -475,6 +505,11 @@ pub async fn sync_offline(
475505
})
476506
}
477507

508+
fn is_ahead_of_remote(sync_ctx: &SyncContext, conn: &Connection) -> bool {
509+
let max_local_frame = conn.wal_frame_count();
510+
max_local_frame > sync_ctx.durable_frame_num()
511+
}
512+
478513
async fn try_push(
479514
sync_ctx: &mut SyncContext,
480515
conn: &Connection,
@@ -496,7 +531,7 @@ async fn try_push(
496531
});
497532
}
498533

499-
let generation = sync_ctx.generation(); // TODO: Probe from WAL.
534+
let generation = sync_ctx.durable_generation();
500535
let start_frame_no = sync_ctx.durable_frame_num() + 1;
501536
let end_frame_no = max_frame_no;
502537

@@ -532,29 +567,60 @@ async fn try_pull(
532567
sync_ctx: &mut SyncContext,
533568
conn: &Connection,
534569
) -> Result<crate::database::Replicated> {
535-
let generation = sync_ctx.generation();
536-
let mut frame_no = sync_ctx.durable_frame_num() + 1;
537-
538570
let insert_handle = conn.wal_insert_handle()?;
539571

572+
let mut err = None;
573+
540574
loop {
575+
let generation = sync_ctx.durable_generation();
576+
let frame_no = sync_ctx.durable_frame_num() + 1;
541577
match sync_ctx.pull_one_frame(generation, frame_no).await {
542-
Ok(Some(frame)) => {
578+
Ok(PullResult::Frame(frame)) => {
543579
insert_handle.insert(&frame)?;
544-
frame_no += 1;
580+
sync_ctx.durable_frame_num = frame_no;
545581
}
546-
Ok(None) => {
582+
Ok(PullResult::EndOfGeneration { max_generation }) => {
583+
// If there are no more generations to pull, we're done.
584+
if generation >= max_generation {
585+
break;
586+
}
587+
insert_handle.end()?;
547588
sync_ctx.write_metadata().await?;
548-
return Ok(crate::database::Replicated {
549-
frame_no: None,
550-
frames_synced: 1,
551-
});
552-
}
553-
Err(err) => {
554-
tracing::debug!("pull_one_frame error: {:?}", err);
589+
590+
// TODO: Make this crash-proof.
591+
conn.wal_checkpoint(true)?;
592+
593+
sync_ctx.next_generation();
555594
sync_ctx.write_metadata().await?;
556-
return Err(err);
595+
596+
insert_handle.begin()?;
597+
}
598+
Err(e) => {
599+
tracing::debug!("pull_one_frame error: {:?}", e);
600+
err.replace(e);
601+
break;
557602
}
558603
}
559604
}
605+
// This is crash-proof because we:
606+
//
607+
// 1. Write WAL frame first
608+
// 2. Write new max frame to temporary metadata
609+
// 3. Atomically rename the temporary metadata to the real metadata
610+
//
611+
// If we crash before metadata rename completes, the old metadata still
612+
// points to last successful frame, allowing safe retry from that point.
613+
// If we happen to have the frame already in the WAL, it's fine to re-pull
614+
// because append locally is idempotent.
615+
insert_handle.end()?;
616+
sync_ctx.write_metadata().await?;
617+
618+
if let Some(err) = err {
619+
Err(err)
620+
} else {
621+
Ok(crate::database::Replicated {
622+
frame_no: None,
623+
frames_synced: 1,
624+
})
625+
}
560626
}

0 commit comments

Comments
 (0)