From f11f1b0f3eea3b74ad6c714bfa00049e4ef04af2 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 21 Apr 2026 12:52:38 +0200 Subject: [PATCH 1/4] commitlog: Handle empty tail segments upon resumption In #4338, the read-only path was made resilient against empty segments at the end of the log, but corresponding logic was not applied to re-opening the commitlog for writing. This patch rectifies that by ignoring and removing segments from the tail of the log if they contain less than `segment::Header::LEN` bytes. Additionally, zero-sized segments are eliminated entirely by ensuring that the header is written before moving the segment into place atomically. The benefit of this is not huge, but could simplify commitlog-consuming code by not having to worry about empty (zero-sized) segments. Happy to revert if that is deemed too less of a benefit. --- Cargo.lock | 1 + crates/commitlog/Cargo.toml | 1 + crates/commitlog/src/commitlog.rs | 46 ++++---- crates/commitlog/src/repo/fs.rs | 118 ++++++++++++++----- crates/commitlog/src/repo/mem.rs | 22 ++-- crates/commitlog/src/repo/mod.rs | 100 ++++++++++++---- crates/commitlog/src/stream/writer.rs | 9 +- crates/commitlog/src/tests/partial.rs | 4 +- crates/commitlog/tests/random_payload/mod.rs | 35 ++++++ 9 files changed, 238 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 301a89331dc..ce25e178ae7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7957,6 +7957,7 @@ dependencies = [ "proptest", "proptest-derive", "rand 0.9.2", + "scopeguard", "serde", "spacetimedb-commitlog", "spacetimedb-fs-utils", diff --git a/crates/commitlog/Cargo.toml b/crates/commitlog/Cargo.toml index 1f538453311..03cd60fc37d 100644 --- a/crates/commitlog/Cargo.toml +++ b/crates/commitlog/Cargo.toml @@ -26,6 +26,7 @@ itertools.workspace = true log.workspace = true memmap2 = "0.9.4" nix = { workspace = true, optional = true, features = ["fs"] } +scopeguard.workspace = true serde = { workspace = true, optional = true } spacetimedb-fs-utils.workspace = true spacetimedb-paths.workspace = true diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index dd46ab42ea1..dfb7cf3d3ec 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -59,33 +59,27 @@ impl Generic { if !tail.is_empty() { debug!("segments: {tail:?}"); } - let head = if let Some(last) = tail.pop() { - debug!("resuming last segment: {last}"); - // Resume the last segment for writing, or create a new segment - // starting from the last good commit + 1. - repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| { - // The first commit in the last segment being corrupt is an - // edge case: we'd try to start a new segment with an offset - // equal to the already existing one, which would fail. - // - // We cannot just skip it either, as we don't know the reason - // for the corruption (there could be more, potentially - // recoverable commits in the segment). - // - // Thus, provide some context about what is wrong and refuse to - // start. - if meta.tx_range.is_empty() { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("repo {}: first commit in resumed segment {} is corrupt", repo, last), - )); + + // Resume the last segment for writing, or + // create a new segment starting from the last good commit + 1. + let head = loop { + if let Some(last) = tail.pop() { + info!("repo {}: resuming last segment: {}", repo, last); + match repo::resume_segment_writer(&repo, opts, last)? { + repo::ResumedSegment::Empty => { + repo.remove_segment(last)?; + continue; + } + repo::ResumedSegment::Resumed(writer) => break writer, + repo::ResumedSegment::Sealed(meta) | repo::ResumedSegment::Corrupted(meta) => { + tail.push(meta.tx_range.start); + break repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)?; + } } - tail.push(meta.tx_range.start); - repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end) - })? - } else { - debug!("starting fresh log"); - repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)? + } else { + info!("repo {}: starting fresh log", repo); + break repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?; + } }; Ok(Self { diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 506cde5fb41..55e96fe40d9 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -5,10 +5,11 @@ use std::sync::Arc; use log::{debug, warn}; use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader}; +use spacetimedb_fs_utils::lockfile; use spacetimedb_paths::server::{CommitLogDir, SegmentFile}; use tempfile::NamedTempFile; -use crate::segment::FileLike; +use crate::segment::{self, FileLike}; use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut}; @@ -160,6 +161,7 @@ impl FileLike for NamedTempFile { /// [Self::sealed] returns `true` if the segment is compressed. pub struct ReadOnlySegment { inner: CompressReader, + len: u64, } impl SegmentReader for ReadOnlySegment { @@ -195,45 +197,98 @@ impl io::Seek for ReadOnlySegment { } } -impl SegmentLen for ReadOnlySegment {} +impl SegmentLen for ReadOnlySegment { + fn segment_len(&mut self) -> io::Result { + // If the segment is compressed, we guarantee that it is immutable, + // so use the file length as determined when opening the reader. + // Seeking would be somewhat expensive in this case, as the zstd reader + // translates to uncompressed offsets and thus must decompress at least + // some frames. + // + // If the segment is not compressed, we may be reading the active + // segment, so immutability is not guaranteed. Use the default seek + // strategy thus. + if self.inner.is_compressed() { + Ok(self.len) + } else { + use io::Seek as _; + + let old_pos = self.stream_position()?; + let len = self.seek(io::SeekFrom::End(0))?; + + // Avoid seeking a third time when we were already at the end of the + // stream. The branch is usually way cheaper than a seek operation. + if old_pos != len { + self.seek(io::SeekFrom::Start(old_pos))?; + } + + Ok(len) + } + } +} impl Repo for Fs { type SegmentWriter = File; type SegmentReader = ReadOnlySegment; - fn create_segment(&self, offset: u64) -> io::Result { - File::options() - .read(true) - .append(true) - .create_new(true) - .open(self.segment_path(offset)) - .or_else(|e| { - if e.kind() == io::ErrorKind::AlreadyExists { - debug!("segment {offset} already exists"); - // If the segment is completely empty, we can resume writing. - let file = self.open_segment_writer(offset)?; - if file.metadata()?.len() == 0 { - debug!("segment {offset} is empty"); - return Ok(file); - } - - // Otherwise, provide some context. + fn create_segment(&self, offset: u64, header: segment::Header) -> io::Result { + let path = self.segment_path(offset); + + // We need to check if the segment already exists, + // so use file locking to prevent a TOCTOU race. + // Using `flock` means we don't need to worry about stale lockfiles. + let lock_path = path.0.with_extension("lock"); + let _lock = scopeguard::guard( + lockfile::advisory::LockedFile::lock(&lock_path) + .map_err(|e| io::Error::new(e.source.kind(), format!("repo {}: {}: {}", self, e, e.source)))?, + |lockfile| { + if let Err(e) = lockfile.release(true) { + // It's ok if removing the file fails, but print a warning + // anyways. + warn!("repo {}: failed to remove {}: {}", self, lock_path.display(), e); + } + }, + ); + + // Check whether the segment already exists. + // Overwrite it if its length is zero. + match fs::metadata(&path) { + Ok(stat) => { + if stat.len() > 0 { return Err(io::Error::new( io::ErrorKind::AlreadyExists, format!("repo {}: segment {} already exists and is non-empty", self, offset), )); } - - Err(e) - }) - .inspect(|_| { - // We're rotating commitlog segments, so we should also take a snapshot at the earliest opportunity. - if let Some(on_new_segment) = self.on_new_segment.as_ref() { - // No need to handle the error here: if the snapshot worker is closed we'll eventually close too, - // and we don't want to die prematurely if there are still TXes to write. - on_new_segment(); + } + Err(e) => { + if e.kind() != io::ErrorKind::NotFound { + return Err(io::Error::new( + e.kind(), + format!( + "repo {}: error getting file metadata for segment {}: {}", + self, offset, e + ), + )); } - }) + } + } + + // The segment file either does not exist, or is of length zero. + // Write the header to a temporary file and atomically move it into place. + let mut tmp = tempfile::Builder::new().make_in(&self.root.0, |tmp_path| { + File::options().read(true).append(true).create_new(true).open(tmp_path) + })?; + header.write(&mut tmp)?; + tmp.as_file_mut().sync_all()?; + let segment = tmp.persist(path)?; + + // Notify subscribers. + if let Some(on_new_segment) = self.on_new_segment.as_ref() { + on_new_segment(); + } + + Ok(segment) } fn open_segment_writer(&self, offset: u64) -> io::Result { @@ -248,7 +303,8 @@ impl Repo for Fs { let path = self.segment_path(offset); debug!("fs: open segment at {}", path.display()); let file = File::open(&path)?; - CompressReader::new(file).map(|inner| ReadOnlySegment { inner }) + let len = file.metadata()?.len(); + CompressReader::new(file).map(|inner| ReadOnlySegment { inner, len }) } fn remove_segment(&self, offset: u64) -> io::Result<()> { @@ -315,8 +371,6 @@ impl Repo for Fs { } } -impl SegmentLen for CompressReader {} - #[cfg(feature = "streaming")] impl crate::stream::AsyncRepo for Fs { type AsyncSegmentWriter = tokio::io::BufWriter; diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index d2b9bfb8d86..dd9fa91f9aa 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -54,26 +54,28 @@ impl Repo for Memory { type SegmentWriter = Segment; type SegmentReader = ReadOnlySegment; - fn create_segment(&self, offset: u64) -> io::Result { + fn create_segment(&self, offset: u64, header: crate::segment::Header) -> io::Result { let mut inner = self.segments.write().unwrap(); - match inner.entry(offset) { + let mut segment = match inner.entry(offset) { btree_map::Entry::Occupied(entry) => { let entry = entry.get(); - let read_guard = entry.read().unwrap(); - if read_guard.is_empty() { - Ok(Segment::from_shared(self.space.clone(), entry.clone())) + if entry.read().unwrap().is_empty() { + Segment::from_shared(self.space.clone(), entry.clone()) } else { - Err(io::Error::new( + return Err(io::Error::new( io::ErrorKind::AlreadyExists, format!("segment {offset} already exists"), - )) + )); } } btree_map::Entry::Vacant(entry) => { - let segment = entry.insert(Arc::new(RwLock::new(Storage::new()))); - Ok(Segment::from_shared(self.space.clone(), segment.clone())) + let storage = entry.insert(Arc::new(RwLock::new(Storage::new()))); + Segment::from_shared(self.space.clone(), storage.clone()) } - } + }; + header.write(&mut segment)?; + + Ok(segment) } fn open_segment_writer(&self, offset: u64) -> io::Result { diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 3d1968c00b2..2b54216bad3 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -9,7 +9,7 @@ use crate::{ commit::Commit, error, index::{IndexFile, IndexFileMut}, - segment::{FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer}, + segment::{self, FileLike, Header, Metadata, OffsetIndexWriter, Reader, Writer}, Options, }; @@ -82,12 +82,12 @@ pub trait Repo: Clone + fmt::Display { /// Create a new segment with the minimum transaction offset `offset`. /// /// This **must** create the segment atomically, and return - /// [`io::ErrorKind::AlreadyExists`] if the segment already exists. + /// [`io::ErrorKind::AlreadyExists`] if the segment already exists (it is + /// permissible to overwrite an existing segment if it is zero-length). /// - /// It is permissible, however, to successfully return the new segment if - /// it is completely empty (i.e. [`create_segment_writer`] did not previously - /// succeed in writing the segment header). - fn create_segment(&self, offset: u64) -> io::Result; + /// If the method returns successfully, the `header` **must** have been + /// durably written to the segment. + fn create_segment(&self, offset: u64, header: segment::Header) -> io::Result; /// Open an existing segment at the minimum transaction offset `offset`. /// @@ -148,8 +148,8 @@ impl Repo for &T { type SegmentWriter = T::SegmentWriter; type SegmentReader = T::SegmentReader; - fn create_segment(&self, offset: u64) -> io::Result { - T::create_segment(self, offset) + fn create_segment(&self, offset: u64, header: segment::Header) -> io::Result { + T::create_segment(self, offset, header) } fn open_segment_reader(&self, offset: u64) -> io::Result { @@ -214,15 +214,15 @@ pub fn create_segment_writer( epoch: u64, offset: u64, ) -> io::Result> { - let mut storage = repo.create_segment(offset)?; + let mut storage = repo.create_segment( + offset, + Header { + log_format_version: opts.log_format_version, + checksum_algorithm: Commit::CHECKSUM_ALGORITHM, + }, + )?; // Ensure we have enough space for this segment. fallocate(&mut storage, &opts)?; - Header { - log_format_version: opts.log_format_version, - checksum_algorithm: Commit::CHECKSUM_ALGORITHM, - } - .write(&mut storage)?; - storage.fsync()?; Ok(Writer { commit: Commit { @@ -240,6 +240,27 @@ pub fn create_segment_writer( }) } +/// Outcome of [resume_segment_writer]. +pub enum ResumedSegment { + /// The segment contains at most the header bytes. + /// + /// It is not safe to resume without first checking integrity of + /// the preceeding segment. The empty segment should be removed. + Empty, + /// The successfully resumed segment writer. + Resumed(Writer), + /// The segment is valid, but should not be resumed as it is already sealed. + /// + /// The [Metadata] is guaranteed to contain at least one valid commit. + /// A new segment should be created at `Metadata::tx_range.end()`. + Sealed(Metadata), + /// The segment contains corrupted data and should not be resumed. + /// + /// The [Metadata] is guaranteed to contain at least one valid commit. + /// A new segment should be created at `Metadata::tx_range.end()`. + Corrupted(Metadata), +} + /// Open the existing segment at `offset` for writing. /// /// This will traverse the segment in order to find the offset of the next @@ -253,22 +274,55 @@ pub fn create_segment_writer( /// /// If only a (non-empty) prefix of the segment could be read due to a failure /// to decode a [`Commit`], the segment [`Metadata`] read up to the faulty -/// commit is returned in an `Err`. In this case, a new segment should be -/// created for writing. +/// commit is returned. In this case, a new segment should be created for +/// writing. Similarly if the segment is sealed. pub fn resume_segment_writer( repo: &R, opts: Options, offset: u64, -) -> io::Result, Metadata>> { +) -> io::Result> { let mut reader = repo .open_segment_reader(offset) .map_err(|source| with_segment_context("opening segment for resume", repo, offset, source))?; + + // If the segment at `offset` is empty, remove it and try the previous. + // Return an error if no previous segment is found. + let len = reader + .segment_len() + .map_err(|source| with_segment_context("determining segment file size for resume", repo, offset, source))?; + if len <= segment::Header::LEN as u64 { + debug!("repo {}: segment {} is empty", repo, offset); + return Ok(ResumedSegment::Empty); + } + + let guard_non_empty = |meta: &Metadata| match meta.tx_range.is_empty() { + true => Err(with_segment_context( + "checking metadata", + repo, + offset, + io::Error::new(io::ErrorKind::InvalidData, "no valid commits in segment"), + )), + false => Ok(()), + }; + + // The segment is now guaranteed to be non-empty, i.e. contain more bytes + // than the segment header. + // + // Traverse it to gather the `Metadata` and ensure that the segment is safe + // to resume, which is the case if: + // + // - it contains at least one commit + // - it does not contain corrupted commits + // - the existing segment passes the compatibility check + // - the existing segment's version is the same as + // the one requested in `opts` let offset_index = repo.get_offset_index(offset).ok(); let meta = match Metadata::extract(offset, &mut reader, offset_index.as_ref()) { Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => { - warn!("invalid commit in segment {offset}: {source}"); + warn!("{repo}: invalid commit in segment {offset}: {source}"); debug!("sofar={sofar:?}"); - return Ok(Err(sofar)); + guard_non_empty(&sofar)?; + return Ok(ResumedSegment::Corrupted(sofar)); } Err(error::SegmentMetadata::Io(e)) => { return Err(with_segment_context("extracting segment metadata", repo, offset, e)); @@ -285,7 +339,6 @@ pub fn resume_segment_writer( io::Error::new(io::ErrorKind::InvalidData, msg), ) })?; - // When resuming, the log format version must be equal. if meta.header.log_format_version != opts.log_format_version { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -297,9 +350,10 @@ pub fn resume_segment_writer( ), )); } + guard_non_empty(&meta)?; if reader.sealed() { - Ok(Err(meta)) + Ok(ResumedSegment::Sealed(meta)) } else { let Metadata { header: _, @@ -318,7 +372,7 @@ pub fn resume_segment_writer( // We use `O_APPEND`, but make the file offset consistent regardless. writer.seek(io::SeekFrom::End(0))?; - Ok(Ok(Writer { + Ok(ResumedSegment::Resumed(Writer { commit: Commit { min_tx_offset: tx_range.end, n: 0, diff --git a/crates/commitlog/src/stream/writer.rs b/crates/commitlog/src/stream/writer.rs index b2f2b0c279a..ca3fefd894f 100644 --- a/crates/commitlog/src/stream/writer.rs +++ b/crates/commitlog/src/stream/writer.rs @@ -210,17 +210,15 @@ where .map(|range| range.end) .unwrap_or_default() ); - let (mut segment, index) = spawn_blocking({ + let (segment, index) = spawn_blocking({ let repo = self.repo.clone(); let last_written_tx_range = self.last_written_tx_range.clone(); let commitlog_options = self.commitlog_options; - move || create_segment(repo, last_written_tx_range, commitlog_options) + move || create_segment(repo, last_written_tx_range, commitlog_options, header) }) .await .unwrap() .map(|(segment, index)| (segment.into_async_writer(), index))?; - - segment.write_all(&buf[..segment::Header::LEN]).await?; stream.consume(segment::Header::LEN as _); CurrentSegment { @@ -445,12 +443,13 @@ fn create_segment( repo: R, last_written_tx_range: Option>, commitlog_options: Options, + header: segment::Header, ) -> io::Result<(R::SegmentWriter, Option)> { let segment_offset = last_written_tx_range .as_ref() .map(|range| range.end) .unwrap_or_default(); - let mut segment = repo.create_segment(segment_offset).or_else(|e| { + let mut segment = repo.create_segment(segment_offset, header).or_else(|e| { if e.kind() == io::ErrorKind::AlreadyExists { trace!("segment already exists"); let mut s = repo.open_segment_writer(segment_offset)?; diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index 69f54241874..3f17b6cfa64 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -273,8 +273,8 @@ impl Repo for ShortMem { type SegmentWriter = ShortSegment; type SegmentReader = repo::mem::ReadOnlySegment; - fn create_segment(&self, offset: u64) -> io::Result { - self.inner.create_segment(offset).map(|inner| ShortSegment { + fn create_segment(&self, offset: u64, header: segment::Header) -> io::Result { + self.inner.create_segment(offset, header).map(|inner| ShortSegment { inner, max_len: self.max_len, }) diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index 2d109734eac..752ab885836 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -161,3 +161,38 @@ fn all_segments_sealed() { assert_eq!(num_segments + 1, segments.len()); assert_eq!(segments.last().copied(), Some(num_commits as u64)); } + +#[test] +fn resume_empty_segment() { + enable_logging(); + + let root = tempdir().unwrap(); + let path = CommitLogDir::from_path_unchecked(root.path()); + let opts = Options { + max_segment_size: 64 * 1024, + ..<_>::default() + }; + let num_commits = 1024; + let repo = repo::Fs::new(path, None).unwrap(); + { + let mut clog = commitlog::Generic::open(&repo, opts).unwrap(); + for (i, payload) in compressible_payloads().take(num_commits).enumerate() { + clog.commit([(i as u64, payload)]).unwrap(); + } + clog.flush().unwrap(); + clog.sync(); + } + + let mut segments = repo.existing_offsets().unwrap(); + while let Some(last_segment) = segments.pop() { + repo.open_segment_writer(last_segment).unwrap().set_len(0).unwrap(); + + let _ = commitlog::Generic::<_, [u8; 256]>::open(&repo, opts).unwrap(); + let segments1 = repo.existing_offsets().unwrap(); + if segments.is_empty() { + assert_eq!([0], segments1.as_slice()); + } else { + assert_eq!(segments, segments1); + } + } +} From a94e006f55ca6c67f88568c8df79ad195f441ad1 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 22 Apr 2026 13:48:12 +0200 Subject: [PATCH 2/4] Fix test --- crates/durability/tests/io/fallocate.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index ae8071a53bd..64e50faf4cc 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -33,7 +33,6 @@ use scopeguard::ScopeGuard; use spacetimedb_commitlog::{ payload::txdata::{Mutations, Ops}, repo::{self, OnNewSegmentFn, Repo}, - segment, tests::helpers::enable_logging, }; use spacetimedb_durability::{local::OpenError, Durability, Transaction, Txdata}; @@ -135,9 +134,7 @@ async fn local_durability_crashes_on_resume_with_insuffient_space() -> anyhow::R // Write a segment with only a header and no `fallocate` reservation. { let repo = repo::Fs::new(replica_dir.commit_log(), None)?; - let mut segment = repo.create_segment(0)?; - segment::Header::default().write(&mut segment)?; - segment.sync_data()?; + repo.create_segment(0, <_>::default())?; } // Try to open local durability with a 1GiB segment size, From 7e7f5923e8d567bcdf061f6ab556cfaf28c4b3e2 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 24 Apr 2026 09:02:05 +0200 Subject: [PATCH 3/4] Resurrect `LockedFile::release` --- crates/fs-utils/src/lockfile.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/crates/fs-utils/src/lockfile.rs b/crates/fs-utils/src/lockfile.rs index 26f2bc5415e..3c8abe0d6f8 100644 --- a/crates/fs-utils/src/lockfile.rs +++ b/crates/fs-utils/src/lockfile.rs @@ -96,7 +96,7 @@ pub mod advisory { use std::{ error::Error as StdError, fmt, - fs::File, + fs::{self, File}, io::{self, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, process, @@ -169,6 +169,19 @@ pub mod advisory { Ok(()) } + /// Release the lock and optionally remove the locked file. + /// + /// By default, dropping [LockedFile] will release the lock, but not + /// remove the file. If `remove` is `true`, this method will also remove + /// the file. + pub fn release(self, remove: bool) -> io::Result<()> { + if remove { + fs::remove_file(&self.path)?; + } + + Ok(()) + } + fn lock_inner(path: &Path) -> Result { create_parent_dir(path).map_err(|source| LockError { path: path.to_path_buf(), From a95d24414c8bb2a7c7cc0ca05e12551e2cdc5c8c Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 24 Apr 2026 15:32:05 +0200 Subject: [PATCH 4/4] Fix segment creation in stream writer --- crates/commitlog/src/stream/writer.rs | 36 +++++++++++++++++---------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/crates/commitlog/src/stream/writer.rs b/crates/commitlog/src/stream/writer.rs index ca3fefd894f..c7cc2ed5132 100644 --- a/crates/commitlog/src/stream/writer.rs +++ b/crates/commitlog/src/stream/writer.rs @@ -449,21 +449,31 @@ fn create_segment( .as_ref() .map(|range| range.end) .unwrap_or_default(); - let mut segment = repo.create_segment(segment_offset, header).or_else(|e| { - if e.kind() == io::ErrorKind::AlreadyExists { - trace!("segment already exists"); - let mut s = repo.open_segment_writer(segment_offset)?; - let len = s.segment_len()?; - trace!("segment len: {len}"); - if len <= segment::Header::LEN as _ { - trace!("overwriting existing segment"); - s.ftruncate(0, 0)?; - return Ok(s); + let mut segment = loop { + match repo.create_segment(segment_offset, header) { + Ok(segment) => break segment, + Err(e) if e.kind() == io::ErrorKind::AlreadyExists => { + trace!("segment already exists"); + let mut s = repo.open_segment_writer(segment_offset)?; + let len = s.segment_len()?; + trace!("segment len: {len}"); + if len <= segment::Header::LEN as _ { + trace!("overwriting existing segment"); + repo.remove_segment(segment_offset)?; + continue; + } + + return Err(io::Error::new( + e.kind(), + format!( + "repo {}: segment {} already exists and is non-empty: {}", + repo, segment_offset, e + ), + )); } + Err(e) => return Err(e), } - - Err(e) - })?; + }; fallocate(&mut segment, &commitlog_options)?; let index_writer = repo