Skip to content

Commit 807b45c

Browse files
authored
Fix configuration not being synced with replicas (#1071)
1 parent b3c6863 commit 807b45c

3 files changed

Lines changed: 48 additions & 9 deletions

File tree

libsql-server/src/namespace/meta_store.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::{collections::HashMap, fs::read_dir};
55

66
use bottomless::bottomless_wal::BottomlessWalWrapper;
77
use bottomless::replicator::CompressionKind;
8+
use futures_core::Future;
89
use libsql_replication::rpc::metadata;
910
use libsql_sys::wal::{
1011
wrapper::{WalWrapper, WrappedWal},
@@ -409,6 +410,18 @@ impl MetaStoreHandle {
409410
}
410411
}
411412

413+
pub fn changed(&self) -> impl Future<Output = ()> {
414+
let mut rcv = match &self.inner {
415+
HandleState::Internal(_) => panic!("can't wait for change on internal handle"),
416+
HandleState::External(_, rcv) => rcv.clone(),
417+
};
418+
// ack the current value.
419+
rcv.borrow_and_update();
420+
async move {
421+
let _ = rcv.changed().await;
422+
}
423+
}
424+
412425
pub async fn store(&self, new_config: impl Into<Arc<DatabaseConfig>>) -> Result<()> {
413426
match &self.inner {
414427
HandleState::Internal(config) => {
@@ -418,6 +431,8 @@ impl MetaStoreHandle {
418431
let new_config = new_config.into();
419432
tracing::debug!(?new_config, "storing new namespace config");
420433
let mut c = config.clone();
434+
// ack the current value.
435+
c.borrow_and_update();
421436
let changed = c.changed();
422437

423438
changes_tx

libsql-server/src/namespace/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,10 @@ impl<T: Database> Namespace<T> {
840840
pub fn stats(&self) -> Arc<Stats> {
841841
self.stats.clone()
842842
}
843+
844+
pub fn config_changed(&self) -> impl Future<Output = ()> {
845+
self.db_config_store.changed()
846+
}
843847
}
844848

845849
pub struct ReplicaNamespaceConfig {

libsql-server/src/rpc/replication_log.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ use std::sync::{Arc, RwLock};
66
use bytes::Bytes;
77
use chrono::{DateTime, Utc};
88
use futures::stream::BoxStream;
9+
use futures_core::Future;
910
pub use libsql_replication::rpc::replication as rpc;
1011
use libsql_replication::rpc::replication::replication_log_server::ReplicationLog;
1112
use libsql_replication::rpc::replication::{
1213
Frame, Frames, HelloRequest, HelloResponse, LogOffset, NAMESPACE_DOESNT_EXIST,
1314
NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY,
1415
};
1516
use md5::{Digest, Md5};
16-
use tokio_stream::StreamExt;
17+
use tokio_stream::StreamExt as _;
1718
use tonic::transport::server::TcpConnectInfo;
1819
use tonic::Status;
1920
use uuid::Uuid;
@@ -139,18 +140,20 @@ impl ReplicationLogService {
139140
Arc<DatabaseConfig>,
140141
usize,
141142
Arc<Stats>,
143+
impl Future<Output = ()>,
142144
),
143145
Status,
144146
> {
145-
let (logger, config, version, stats) = self
147+
let (logger, config, version, stats, config_changed) = self
146148
.namespaces
147149
.with(namespace, |ns| {
148150
let logger = ns.db.wal_manager.wrapped().logger().clone();
151+
let config_changed = ns.config_changed();
149152
let config = ns.config();
150153
let version = ns.config_version();
151154
let stats = ns.stats();
152155

153-
(logger, config, version, stats)
156+
(logger, config, version, stats, config_changed)
154157
})
155158
.await
156159
.map_err(|e| {
@@ -165,7 +168,7 @@ impl ReplicationLogService {
165168
self.verify_session_token(req, version)?;
166169
}
167170

168-
Ok((logger, config, version, stats))
171+
Ok((logger, config, version, stats, config_changed))
169172
}
170173

171174
fn encode_session_token(&self, version: usize) -> Uuid {
@@ -249,7 +252,8 @@ impl ReplicationLog for ReplicationLogService {
249252

250253
self.authenticate(&req, namespace.clone()).await?;
251254

252-
let (logger, _, _, stats) = self.logger_from_namespace(namespace, &req, true).await?;
255+
let (logger, _, _, stats, config_changed) =
256+
self.logger_from_namespace(namespace, &req, true).await?;
253257

254258
let stats = if self.collect_stats {
255259
Some(stats)
@@ -259,13 +263,29 @@ impl ReplicationLog for ReplicationLogService {
259263

260264
let req = req.into_inner();
261265

262-
let stream = StreamGuard::new(
266+
let mut stream = StreamGuard::new(
263267
FrameStream::new(logger, req.next_offset, true, None, stats)
264268
.map_err(|e| Status::internal(e.to_string()))?,
265269
self.idle_shutdown_layer.clone(),
266270
)
267271
.map(map_frame_stream_output);
268272

273+
// if only tokio_stream had futures::Stream::take_until...
274+
let stream = async_stream::stream! {
275+
tokio::pin!(config_changed);
276+
loop {
277+
tokio::select! {
278+
_ = &mut config_changed => {
279+
break
280+
},
281+
Some(next) = stream.next() => {
282+
yield next
283+
}
284+
else => break,
285+
}
286+
}
287+
};
288+
269289
Ok(tonic::Response::new(Box::pin(stream)))
270290
}
271291

@@ -276,7 +296,7 @@ impl ReplicationLog for ReplicationLogService {
276296
let namespace = super::extract_namespace(self.disable_namespaces, &req)?;
277297
self.authenticate(&req, namespace.clone()).await?;
278298

279-
let (logger, _, _, stats) = self.logger_from_namespace(namespace, &req, true).await?;
299+
let (logger, _, _, stats, _) = self.logger_from_namespace(namespace, &req, true).await?;
280300

281301
let stats = if self.collect_stats {
282302
Some(stats)
@@ -324,7 +344,7 @@ impl ReplicationLog for ReplicationLogService {
324344
}
325345
}
326346

327-
let (logger, config, version, _) =
347+
let (logger, config, version, _, _) =
328348
self.logger_from_namespace(namespace, &req, false).await?;
329349

330350
let session_hash = self.encode_session_token(version);
@@ -348,7 +368,7 @@ impl ReplicationLog for ReplicationLogService {
348368
let namespace = super::extract_namespace(self.disable_namespaces, &req)?;
349369
self.authenticate(&req, namespace.clone()).await?;
350370

351-
let (logger, _, _, stats) = self.logger_from_namespace(namespace, &req, true).await?;
371+
let (logger, _, _, stats, _) = self.logger_from_namespace(namespace, &req, true).await?;
352372

353373
let stats = if self.collect_stats {
354374
Some(stats)

0 commit comments

Comments
 (0)