Skip to content

Commit f82c5bf

Browse files
committed
libsql: Make Statement::reset() immutable
1 parent ebd4e78 commit f82c5bf

6 files changed

Lines changed: 24 additions & 22 deletions

File tree

libsql/src/hrana/hyper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ impl crate::statement::Stmt for crate::hrana::Statement<HttpSender> {
255255
))
256256
}
257257

258-
fn reset(&mut self) {}
258+
fn reset(&self) {}
259259

260260
fn parameter_count(&self) -> usize {
261261
let stmt = &self.inner;

libsql/src/hrana/stream.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use libsql_hrana::proto::{
88
GetAutocommitStreamReq, PipelineReqBody, PipelineRespBody, SequenceStreamReq,
99
StoreSqlStreamReq, StreamRequest, StreamResponse, StreamResult,
1010
};
11+
use std::cell::RefCell;
1112
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
1213
use std::sync::Arc;
1314
use tokio::sync::Mutex;
@@ -66,8 +67,8 @@ where
6667
pipeline_url,
6768
cursor_url,
6869
auth_token,
69-
sql_id_generator: 0,
70-
baton: None,
70+
sql_id_generator: RefCell::new(0),
71+
baton: RefCell::new(None),
7172
}),
7273
}),
7374
}
@@ -77,7 +78,7 @@ where
7778
/// Returns true if request was finalized correctly, false if stream was already closed.
7879
pub(super) async fn finalize(&mut self, req: StreamRequest) -> Result<bool> {
7980
let mut client = self.inner.stream.lock().await;
80-
if client.baton.is_none() {
81+
if client.baton.borrow().is_none() {
8182
tracing::trace!("baton not found - skipping finalize for {:?}", req);
8283
return Ok(false);
8384
}
@@ -298,11 +299,11 @@ where
298299
T: HttpSend,
299300
{
300301
client: T,
301-
baton: Option<String>,
302+
baton: RefCell<Option<String>>,
302303
pipeline_url: Arc<str>,
303304
cursor_url: Arc<str>,
304305
auth_token: Arc<str>,
305-
sql_id_generator: SqlId,
306+
sql_id_generator: RefCell<SqlId>,
306307
}
307308

308309
impl<T> RawStream<T>
@@ -316,7 +317,7 @@ where
316317

317318
pub async fn open_cursor(&mut self, batch: Batch) -> Result<Cursor<T::Stream>> {
318319
let msg = CursorReq {
319-
baton: self.baton.clone(),
320+
baton: self.baton.borrow().clone(),
320321
batch,
321322
};
322323
let body = serde_json::to_string(&msg).map_err(HranaError::Json)?;
@@ -336,7 +337,7 @@ where
336337
} // stream has been closed by the server
337338
Some(baton) => {
338339
tracing::trace!("client stream has been assigned with baton: `{}`", baton);
339-
self.baton = Some(baton)
340+
*self.baton.borrow_mut() = Some(baton)
340341
}
341342
}
342343
Ok(cursor)
@@ -349,11 +350,11 @@ where
349350
tracing::trace!(
350351
"client stream sending {} requests with baton `{}`: {:?}",
351352
N,
352-
self.baton.as_deref().unwrap_or_default(),
353+
self.baton.borrow().as_deref().unwrap_or_default(),
353354
requests
354355
);
355356
let msg = PipelineReqBody {
356-
baton: self.baton.clone(),
357+
baton: self.baton.borrow().clone(),
357358
requests: Vec::from(requests),
358359
};
359360
let body = serde_json::to_string(&msg).map_err(HranaError::Json)?;
@@ -375,7 +376,7 @@ where
375376
} // stream has been closed by the server
376377
Some(baton) => {
377378
tracing::trace!("client stream has been assigned with baton: `{}`", baton);
378-
self.baton = Some(baton)
379+
*self.baton.borrow_mut() = Some(baton)
379380
}
380381
}
381382

@@ -424,16 +425,17 @@ where
424425
Ok((resp, is_autocommit))
425426
}
426427

427-
fn reset(&mut self) {
428-
if let Some(baton) = self.baton.take() {
428+
fn reset(&self) {
429+
if let Some(baton) = self.baton.borrow_mut().take() {
429430
tracing::trace!("closing client stream (baton: `{}`)", baton);
430431
}
431-
self.sql_id_generator = 0;
432+
*self.sql_id_generator.borrow_mut() = 0;
432433
}
433434

434435
fn next_sql_id(&mut self) -> SqlId {
435-
self.sql_id_generator = self.sql_id_generator.wrapping_add(1);
436-
self.sql_id_generator
436+
let mut gen = self.sql_id_generator.borrow_mut();
437+
*gen = gen.wrapping_add(1);
438+
*gen
437439
}
438440
}
439441

@@ -443,7 +445,7 @@ where
443445
T: HttpSend,
444446
{
445447
fn drop(&mut self) {
446-
if let Some(baton) = self.baton.take() {
448+
if let Some(baton) = self.baton.borrow_mut().take() {
447449
// only send a close request if stream was ever used to send the data
448450
tracing::trace!("closing client stream (baton: `{}`)", baton);
449451
let req = serde_json::to_string(&PipelineReqBody {

libsql/src/local/impls.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ impl Stmt for LibsqlStmt {
133133
self.0.interrupt()
134134
}
135135

136-
fn reset(&mut self) {
136+
fn reset(&self) {
137137
self.0.reset();
138138
}
139139

libsql/src/replication/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ impl Stmt for RemoteStatement {
755755
))
756756
}
757757

758-
fn reset(&mut self) {}
758+
fn reset(&self) {}
759759

760760
fn parameter_count(&self) -> usize {
761761
if let Some(stmt) = self.local_statement.as_ref() {

libsql/src/statement.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pub(crate) trait Stmt {
1616

1717
fn interrupt(&self) -> Result<()>;
1818

19-
fn reset(&mut self);
19+
fn reset(&self);
2020

2121
fn parameter_count(&self) -> usize;
2222

@@ -83,7 +83,7 @@ impl Statement {
8383
}
8484

8585
/// Reset the state of this prepared statement.
86-
pub fn reset(&mut self) {
86+
pub fn reset(&self) {
8787
self.inner.reset();
8888
}
8989

libsql/src/sync/statement.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl Stmt for SyncedStatement {
5151
self.inner.interrupt()
5252
}
5353

54-
fn reset(&mut self) {
54+
fn reset(&self) {
5555
self.inner.reset()
5656
}
5757

0 commit comments

Comments
 (0)