Skip to content

Commit e164617

Browse files
authored
libsql: WAL push conflict detection (#1989)
This adds WAL push conflict detection to the client side.
2 parents 2f2820e + 477cfc7 commit e164617

2 files changed

Lines changed: 41 additions & 3 deletions

File tree

libsql/src/sync.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ pub enum SyncError {
5252
InvalidPushFrameNoLow(u32, u32),
5353
#[error("server returned a higher frame_no: sent={0}, got={1}")]
5454
InvalidPushFrameNoHigh(u32, u32),
55+
#[error("server returned a conflict: sent={0}, got={1}")]
56+
InvalidPushFrameConflict(u32, u32),
5557
#[error("failed to pull frame: status={0}, error={1}")]
5658
PullFrame(StatusCode, String),
5759
#[error("failed to get location header for redirect: {0}")]
@@ -66,6 +68,17 @@ impl SyncError {
6668
}
6769
}
6870

71+
pub struct PushResult {
72+
status: PushStatus,
73+
generation: u32,
74+
max_frame_no: u32,
75+
}
76+
77+
pub enum PushStatus {
78+
Ok,
79+
Conflict,
80+
}
81+
6982
pub enum PullResult {
7083
/// A frame was successfully pulled.
7184
Frame(Bytes),
@@ -162,7 +175,16 @@ impl SyncContext {
162175
);
163176
tracing::debug!("pushing frame");
164177

165-
let (generation, durable_frame_num) = self.push_with_retry(uri, frames, self.max_retries).await?;
178+
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
179+
180+
match result.status {
181+
PushStatus::Conflict => {
182+
return Err(SyncError::InvalidPushFrameConflict(frame_no, result.max_frame_no).into());
183+
}
184+
_ => {}
185+
}
186+
let generation = result.generation;
187+
let durable_frame_num = result.max_frame_no;
166188

167189
if durable_frame_num > frame_no + frames_count - 1 {
168190
tracing::error!(
@@ -198,7 +220,7 @@ impl SyncContext {
198220
Ok(durable_frame_num)
199221
}
200222

201-
async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<(u32, u32)> {
223+
async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<PushResult> {
202224
let mut nr_retries = 0;
203225
loop {
204226
let mut req = http::Request::post(uri.clone());
@@ -228,6 +250,14 @@ impl SyncContext {
228250
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
229251
.map_err(SyncError::JsonDecode)?;
230252

253+
let status = resp
254+
.get("status")
255+
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
256+
257+
let status = status
258+
.as_str()
259+
.ok_or_else(|| SyncError::JsonValue(status.clone()))?;
260+
231261
let generation = resp
232262
.get("generation")
233263
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
@@ -244,7 +274,14 @@ impl SyncContext {
244274
.as_u64()
245275
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;
246276

247-
return Ok((generation as u32, max_frame_no as u32));
277+
let status = match status {
278+
"ok" => PushStatus::Ok,
279+
"conflict" => PushStatus::Conflict,
280+
_ => return Err(SyncError::JsonValue(resp.clone()).into()),
281+
};
282+
let generation = generation as u32;
283+
let max_frame_no = max_frame_no as u32;
284+
return Ok(PushResult { status, generation, max_frame_no });
248285
}
249286

250287
if res.status().is_redirection() {

libsql/src/sync/test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ impl MockServer {
376376
if req.uri().path().contains("/sync/") {
377377
// Return the max_frame_no that has been accepted
378378
let response = serde_json::json!({
379+
"status": "ok",
379380
"generation": 1,
380381
"max_frame_no": current_count
381382
});

0 commit comments

Comments
 (0)