Skip to content

Commit dafd747

Browse files
JakeHedmanLucioFranco
authored andcommitted
Add --max-concurrent-requests flag (#1030)
* Add --max-concurrent-requests flag * Add missing field in test
1 parent 18ab6f6 commit dafd747

6 files changed

Lines changed: 30 additions & 2 deletions

File tree

libsql-server/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ pub struct DbConfig {
125125
pub snapshot_exec: Option<String>,
126126
pub checkpoint_interval: Option<Duration>,
127127
pub snapshot_at_shutdown: bool,
128+
pub max_concurrent_requests: u64,
128129
}
129130

130131
impl Default for DbConfig {
@@ -141,6 +142,7 @@ impl Default for DbConfig {
141142
snapshot_exec: None,
142143
checkpoint_interval: None,
143144
snapshot_at_shutdown: false,
145+
max_concurrent_requests: 128,
144146
}
145147
}
146148
}

libsql-server/src/connection/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,18 @@ pub trait MakeConnection: Send + Sync + 'static {
159159
semaphore: Arc<Semaphore>,
160160
timeout: Option<Duration>,
161161
max_total_response_size: u64,
162+
max_concurrent_requests: u64,
162163
) -> MakeThrottledConnection<Self>
163164
where
164165
Self: Sized,
165166
{
166-
MakeThrottledConnection::new(semaphore, self, timeout, max_total_response_size)
167+
MakeThrottledConnection::new(
168+
semaphore,
169+
self,
170+
timeout,
171+
max_total_response_size,
172+
max_concurrent_requests,
173+
)
167174
}
168175
}
169176

@@ -190,6 +197,7 @@ pub struct MakeThrottledConnection<F> {
190197
// will result in reducing concurrency to prevent out-of-memory errors.
191198
max_total_response_size: u64,
192199
waiters: AtomicUsize,
200+
max_concurrent_requests: u64,
193201
}
194202

195203
impl<F> MakeThrottledConnection<F> {
@@ -198,13 +206,15 @@ impl<F> MakeThrottledConnection<F> {
198206
connection_maker: F,
199207
timeout: Option<Duration>,
200208
max_total_response_size: u64,
209+
max_concurrent_requests: u64,
201210
) -> Self {
202211
Self {
203212
semaphore,
204213
connection_maker,
205214
timeout,
206215
max_total_response_size,
207216
waiters: AtomicUsize::new(0),
217+
max_concurrent_requests,
208218
}
209219
}
210220

@@ -263,7 +273,7 @@ impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
263273
);
264274
let units = self.units_to_take();
265275
let waiters_guard = WaitersGuard::new(&self.waiters);
266-
if waiters_guard.waiters.load(Ordering::Relaxed) >= 128 {
276+
if (waiters_guard.waiters.load(Ordering::Relaxed) as u64) >= self.max_concurrent_requests {
267277
return Err(Error::TooManyRequests);
268278
}
269279
let fut = self.semaphore.clone().acquire_many_owned(units);
@@ -423,6 +433,7 @@ pub mod test {
423433
Arc::new(Semaphore::new(10)),
424434
Some(Duration::from_millis(100)),
425435
u64::MAX,
436+
u64::MAX,
426437
);
427438

428439
let mut conns = Vec::with_capacity(10);

libsql-server/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ where
529529
checkpoint_interval: self.db_config.checkpoint_interval,
530530
disable_namespace: self.disable_namespaces,
531531
max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)),
532+
max_concurrent_requests: self.db_config.max_concurrent_requests,
532533
};
533534
let factory = PrimaryNamespaceMaker::new(conf);
534535
let namespaces = NamespaceStore::new(factory, false, self.db_config.snapshot_at_shutdown);
@@ -619,6 +620,7 @@ impl<C: Connector> Replica<C> {
619620
max_response_size: self.db_config.max_response_size,
620621
max_total_response_size: self.db_config.max_total_response_size,
621622
max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)),
623+
max_concurrent_requests: self.db_config.max_concurrent_requests,
622624
};
623625
let factory = ReplicaNamespaceMaker::new(conf);
624626
let namespaces = NamespaceStore::new(factory, true, false);

libsql-server/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,9 @@ struct Cli {
231231

232232
#[clap(long, default_value = "128", env = "SQLD_MAX_CONCURRENT_CONNECTIONS")]
233233
max_concurrent_connections: usize,
234+
// max number of concurrent requests across all connections
235+
#[clap(long, default_value = "128", env = "SQLD_MAX_CONCURRENT_REQUESTS")]
236+
max_concurrent_requests: u64,
234237
}
235238

236239
#[derive(clap::Subcommand, Debug)]
@@ -337,6 +340,7 @@ fn make_db_config(config: &Cli) -> anyhow::Result<DbConfig> {
337340
snapshot_exec: config.snapshot_exec.clone(),
338341
checkpoint_interval: config.checkpoint_interval_s.map(Duration::from_secs),
339342
snapshot_at_shutdown: config.snapshot_at_shutdown,
343+
max_concurrent_requests: config.max_concurrent_requests,
340344
})
341345
}
342346

libsql-server/src/namespace/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,7 @@ pub struct ReplicaNamespaceConfig {
669669
/// Stats monitor
670670
pub stats_sender: StatsSender,
671671
pub max_concurrent_connections: Arc<Semaphore>,
672+
pub max_concurrent_requests: u64,
672673
}
673674

674675
impl Namespace<ReplicaDatabase> {
@@ -789,6 +790,7 @@ impl Namespace<ReplicaDatabase> {
789790
config.max_concurrent_connections.clone(),
790791
Some(DB_CREATE_TIMEOUT),
791792
config.max_total_response_size,
793+
config.max_concurrent_requests,
792794
);
793795

794796
Ok(Self {
@@ -817,6 +819,7 @@ pub struct PrimaryNamespaceConfig {
817819
pub checkpoint_interval: Option<Duration>,
818820
pub disable_namespace: bool,
819821
pub max_concurrent_connections: Arc<Semaphore>,
822+
pub max_concurrent_requests: u64,
820823
}
821824

822825
pub type DumpStream =
@@ -991,6 +994,7 @@ impl Namespace<PrimaryDatabase> {
991994
config.max_concurrent_connections.clone(),
992995
Some(DB_CREATE_TIMEOUT),
993996
config.max_total_response_size,
997+
config.max_concurrent_requests,
994998
)
995999
.into();
9961000

libsql-server/src/test/bottomless.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ async fn configure_server(
5151
snapshot_exec: None,
5252
checkpoint_interval: Some(Duration::from_secs(3)),
5353
snapshot_at_shutdown: false,
54+
<<<<<<< HEAD
55+
=======
56+
encryption_key: None,
57+
max_concurrent_requests: 128,
58+
>>>>>>> 63a707798a (Add --max-concurrent-requests flag (#1030))
5459
},
5560
admin_api_config: None,
5661
disable_namespaces: true,

0 commit comments

Comments
 (0)