Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ itertools.workspace = true
log.workspace = true
memmap2 = "0.9.4"
nix = { workspace = true, optional = true, features = ["fs"] }
scopeguard.workspace = true
serde = { workspace = true, optional = true }
spacetimedb-fs-utils.workspace = true
spacetimedb-paths.workspace = true
Expand Down
46 changes: 20 additions & 26 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,27 @@ impl<R: Repo, T> Generic<R, T> {
if !tail.is_empty() {
debug!("segments: {tail:?}");
}
let head = if let Some(last) = tail.pop() {
debug!("resuming last segment: {last}");
// Resume the last segment for writing, or create a new segment
// starting from the last good commit + 1.
repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
// The first commit in the last segment being corrupt is an
// edge case: we'd try to start a new segment with an offset
// equal to the already existing one, which would fail.
//
// We cannot just skip it either, as we don't know the reason
// for the corruption (there could be more, potentially
// recoverable commits in the segment).
//
// Thus, provide some context about what is wrong and refuse to
// start.
if meta.tx_range.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("repo {}: first commit in resumed segment {} is corrupt", repo, last),
));

// Resume the last segment for writing, or
// create a new segment starting from the last good commit + 1.
let head = loop {
if let Some(last) = tail.pop() {
info!("repo {}: resuming last segment: {}", repo, last);
match repo::resume_segment_writer(&repo, opts, last)? {
repo::ResumedSegment::Empty => {
repo.remove_segment(last)?;
continue;
}
repo::ResumedSegment::Resumed(writer) => break writer,
repo::ResumedSegment::Sealed(meta) | repo::ResumedSegment::Corrupted(meta) => {
tail.push(meta.tx_range.start);
break repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)?;
}
}
tail.push(meta.tx_range.start);
repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
})?
} else {
debug!("starting fresh log");
repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
} else {
info!("repo {}: starting fresh log", repo);
break repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?;
}
};

Ok(Self {
Expand Down
118 changes: 86 additions & 32 deletions crates/commitlog/src/repo/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::sync::Arc;

use log::{debug, warn};
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
use spacetimedb_fs_utils::lockfile;
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
use tempfile::NamedTempFile;

use crate::segment::FileLike;
use crate::segment::{self, FileLike};

use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};

Expand Down Expand Up @@ -160,6 +161,7 @@ impl FileLike for NamedTempFile {
/// [Self::sealed] returns `true` if the segment is compressed.
pub struct ReadOnlySegment {
inner: CompressReader,
len: u64,
}

impl SegmentReader for ReadOnlySegment {
Expand Down Expand Up @@ -195,45 +197,98 @@ impl io::Seek for ReadOnlySegment {
}
}

impl SegmentLen for ReadOnlySegment {}
impl SegmentLen for ReadOnlySegment {
fn segment_len(&mut self) -> io::Result<u64> {
// If the segment is compressed, we guarantee that it is immutable,
// so use the file length as determined when opening the reader.
// Seeking would be somewhat expensive in this case, as the zstd reader
// translates to uncompressed offsets and thus must decompress at least
// some frames.
//
// If the segment is not compressed, we may be reading the active
// segment, so immutability is not guaranteed. Use the default seek
// strategy thus.
if self.inner.is_compressed() {
Ok(self.len)
} else {
use io::Seek as _;

let old_pos = self.stream_position()?;
let len = self.seek(io::SeekFrom::End(0))?;

// Avoid seeking a third time when we were already at the end of the
// stream. The branch is usually way cheaper than a seek operation.
if old_pos != len {
self.seek(io::SeekFrom::Start(old_pos))?;
}

Ok(len)
}
}
}

impl Repo for Fs {
type SegmentWriter = File;
type SegmentReader = ReadOnlySegment;

fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
File::options()
.read(true)
.append(true)
.create_new(true)
.open(self.segment_path(offset))
.or_else(|e| {
if e.kind() == io::ErrorKind::AlreadyExists {
debug!("segment {offset} already exists");
// If the segment is completely empty, we can resume writing.
let file = self.open_segment_writer(offset)?;
if file.metadata()?.len() == 0 {
debug!("segment {offset} is empty");
return Ok(file);
}

// Otherwise, provide some context.
fn create_segment(&self, offset: u64, header: segment::Header) -> io::Result<Self::SegmentWriter> {
let path = self.segment_path(offset);

// We need to check if the segment already exists,
// so use file locking to prevent a TOCTOU race.
// Using `flock` means we don't need to worry about stale lockfiles.
let lock_path = path.0.with_extension("lock");
let _lock = scopeguard::guard(
lockfile::advisory::LockedFile::lock(&lock_path)
.map_err(|e| io::Error::new(e.source.kind(), format!("repo {}: {}: {}", self, e, e.source)))?,
|lockfile| {
if let Err(e) = lockfile.release(true) {
// It's ok if removing the file fails, but print a warning
// anyways.
warn!("repo {}: failed to remove {}: {}", self, lock_path.display(), e);
}
},
);

// Check whether the segment already exists.
// Overwrite it if its length is zero.
match fs::metadata(&path) {
Ok(stat) => {
if stat.len() > 0 {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("repo {}: segment {} already exists and is non-empty", self, offset),
));
}

Err(e)
})
.inspect(|_| {
// We're rotating commitlog segments, so we should also take a snapshot at the earliest opportunity.
if let Some(on_new_segment) = self.on_new_segment.as_ref() {
// No need to handle the error here: if the snapshot worker is closed we'll eventually close too,
// and we don't want to die prematurely if there are still TXes to write.
on_new_segment();
}
Err(e) => {
if e.kind() != io::ErrorKind::NotFound {
return Err(io::Error::new(
e.kind(),
format!(
"repo {}: error getting file metadata for segment {}: {}",
self, offset, e
),
));
}
})
}
}

// The segment file either does not exist, or is of length zero.
// Write the header to a temporary file and atomically move it into place.
let mut tmp = tempfile::Builder::new().make_in(&self.root.0, |tmp_path| {
File::options().read(true).append(true).create_new(true).open(tmp_path)
})?;
header.write(&mut tmp)?;
tmp.as_file_mut().sync_all()?;
let segment = tmp.persist(path)?;

// Notify subscribers.
if let Some(on_new_segment) = self.on_new_segment.as_ref() {
on_new_segment();
}

Ok(segment)
}

fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
Expand All @@ -248,7 +303,8 @@ impl Repo for Fs {
let path = self.segment_path(offset);
debug!("fs: open segment at {}", path.display());
let file = File::open(&path)?;
CompressReader::new(file).map(|inner| ReadOnlySegment { inner })
let len = file.metadata()?.len();
CompressReader::new(file).map(|inner| ReadOnlySegment { inner, len })
}

fn remove_segment(&self, offset: u64) -> io::Result<()> {
Expand Down Expand Up @@ -315,8 +371,6 @@ impl Repo for Fs {
}
}

impl SegmentLen for CompressReader {}

#[cfg(feature = "streaming")]
impl crate::stream::AsyncRepo for Fs {
type AsyncSegmentWriter = tokio::io::BufWriter<tokio::fs::File>;
Expand Down
22 changes: 12 additions & 10 deletions crates/commitlog/src/repo/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,28 @@ impl Repo for Memory {
type SegmentWriter = Segment;
type SegmentReader = ReadOnlySegment;

fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
fn create_segment(&self, offset: u64, header: crate::segment::Header) -> io::Result<Self::SegmentWriter> {
let mut inner = self.segments.write().unwrap();
match inner.entry(offset) {
let mut segment = match inner.entry(offset) {
btree_map::Entry::Occupied(entry) => {
let entry = entry.get();
let read_guard = entry.read().unwrap();
if read_guard.is_empty() {
Ok(Segment::from_shared(self.space.clone(), entry.clone()))
if entry.read().unwrap().is_empty() {
Segment::from_shared(self.space.clone(), entry.clone())
} else {
Err(io::Error::new(
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("segment {offset} already exists"),
))
));
}
}
btree_map::Entry::Vacant(entry) => {
let segment = entry.insert(Arc::new(RwLock::new(Storage::new())));
Ok(Segment::from_shared(self.space.clone(), segment.clone()))
let storage = entry.insert(Arc::new(RwLock::new(Storage::new())));
Segment::from_shared(self.space.clone(), storage.clone())
}
}
};
header.write(&mut segment)?;

Ok(segment)
}

fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
Expand Down
Loading
Loading