Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ itertools.workspace = true
log.workspace = true
memmap2 = "0.9.4"
nix = { workspace = true, optional = true, features = ["fs"] }
scopeguard.workspace = true
serde = { workspace = true, optional = true }
spacetimedb-fs-utils.workspace = true
spacetimedb-paths.workspace = true
Expand Down
46 changes: 20 additions & 26 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,27 @@ impl<R: Repo, T> Generic<R, T> {
if !tail.is_empty() {
debug!("segments: {tail:?}");
}
let head = if let Some(last) = tail.pop() {
debug!("resuming last segment: {last}");
// Resume the last segment for writing, or create a new segment
// starting from the last good commit + 1.
repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
// The first commit in the last segment being corrupt is an
// edge case: we'd try to start a new segment with an offset
// equal to the already existing one, which would fail.
//
// We cannot just skip it either, as we don't know the reason
// for the corruption (there could be more, potentially
// recoverable commits in the segment).
//
// Thus, provide some context about what is wrong and refuse to
// start.
if meta.tx_range.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("repo {}: first commit in resumed segment {} is corrupt", repo, last),
));

// Resume the last segment for writing, or
// create a new segment starting from the last good commit + 1.
let head = loop {
if let Some(last) = tail.pop() {
info!("repo {}: resuming last segment: {}", repo, last);
match repo::resume_segment_writer(&repo, opts, last)? {
repo::ResumedSegment::Empty => {
repo.remove_segment(last)?;
continue;
}
repo::ResumedSegment::Resumed(writer) => break writer,
repo::ResumedSegment::Sealed(meta) | repo::ResumedSegment::Corrupted(meta) => {
tail.push(meta.tx_range.start);
break repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)?;
}
}
tail.push(meta.tx_range.start);
repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
})?
} else {
debug!("starting fresh log");
repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?
} else {
info!("repo {}: starting fresh log", repo);
break repo::create_segment_writer(&repo, opts, Commit::DEFAULT_EPOCH, 0)?;
}
};

Ok(Self {
Expand Down
118 changes: 86 additions & 32 deletions crates/commitlog/src/repo/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::sync::Arc;

use log::{debug, warn};
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
use spacetimedb_fs_utils::lockfile;
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
use tempfile::NamedTempFile;

use crate::segment::FileLike;
use crate::segment::{self, FileLike};

use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};

Expand Down Expand Up @@ -160,6 +161,7 @@ impl FileLike for NamedTempFile {
/// [Self::sealed] returns `true` if the segment is compressed.
pub struct ReadOnlySegment {
inner: CompressReader,
len: u64,
}

impl SegmentReader for ReadOnlySegment {
Expand Down Expand Up @@ -195,45 +197,98 @@ impl io::Seek for ReadOnlySegment {
}
}

impl SegmentLen for ReadOnlySegment {}
impl SegmentLen for ReadOnlySegment {
fn segment_len(&mut self) -> io::Result<u64> {
// If the segment is compressed, we guarantee that it is immutable,
// so use the file length as determined when opening the reader.
// Seeking would be somewhat expensive in this case, as the zstd reader
// translates to uncompressed offsets and thus must decompress at least
// some frames.
//
// If the segment is not compressed, we may be reading the active
// segment, so immutability is not guaranteed. Use the default seek
// strategy thus.
if self.inner.is_compressed() {
Ok(self.len)
} else {
use io::Seek as _;

let old_pos = self.stream_position()?;
let len = self.seek(io::SeekFrom::End(0))?;

// Avoid seeking a third time when we were already at the end of the
// stream. The branch is usually way cheaper than a seek operation.
if old_pos != len {
self.seek(io::SeekFrom::Start(old_pos))?;
}

Ok(len)
}
}
}

impl Repo for Fs {
type SegmentWriter = File;
type SegmentReader = ReadOnlySegment;

fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
File::options()
.read(true)
.append(true)
.create_new(true)
.open(self.segment_path(offset))
.or_else(|e| {
if e.kind() == io::ErrorKind::AlreadyExists {
debug!("segment {offset} already exists");
// If the segment is completely empty, we can resume writing.
let file = self.open_segment_writer(offset)?;
if file.metadata()?.len() == 0 {
debug!("segment {offset} is empty");
return Ok(file);
}

// Otherwise, provide some context.
fn create_segment(&self, offset: u64, header: segment::Header) -> io::Result<Self::SegmentWriter> {
let path = self.segment_path(offset);

// We need to check if the segment already exists,
// so use file locking to prevent a TOCTOU race.
// Using `flock` means we don't need to worry about stale lockfiles.
let lock_path = path.0.with_extension("lock");
let _lock = scopeguard::guard(
lockfile::advisory::LockedFile::lock(&lock_path)
.map_err(|e| io::Error::new(e.source.kind(), format!("repo {}: {}: {}", self, e, e.source)))?,
|lockfile| {
if let Err(e) = lockfile.release(true) {
// It's ok if removing the file fails, but print a warning
// anyways.
warn!("repo {}: failed to remove {}: {}", self, lock_path.display(), e);
}
},
);

// Check whether the segment already exists.
// Overwrite it if its length is zero.
match fs::metadata(&path) {
Ok(stat) => {
if stat.len() > 0 {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("repo {}: segment {} already exists and is non-empty", self, offset),
));
}

Err(e)
})
.inspect(|_| {
// We're rotating commitlog segments, so we should also take a snapshot at the earliest opportunity.
if let Some(on_new_segment) = self.on_new_segment.as_ref() {
// No need to handle the error here: if the snapshot worker is closed we'll eventually close too,
// and we don't want to die prematurely if there are still TXes to write.
on_new_segment();
}
Err(e) => {
if e.kind() != io::ErrorKind::NotFound {
return Err(io::Error::new(
e.kind(),
format!(
"repo {}: error getting file metadata for segment {}: {}",
self, offset, e
),
));
}
})
}
}

// The segment file either does not exist, or is of length zero.
// Write the header to a temporary file and atomically move it into place.
let mut tmp = tempfile::Builder::new().make_in(&self.root.0, |tmp_path| {
File::options().read(true).append(true).create_new(true).open(tmp_path)
})?;
header.write(&mut tmp)?;
tmp.as_file_mut().sync_all()?;
let segment = tmp.persist(path)?;

// Notify subscribers.
if let Some(on_new_segment) = self.on_new_segment.as_ref() {
on_new_segment();
}

Ok(segment)
}

fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
Expand All @@ -248,7 +303,8 @@ impl Repo for Fs {
let path = self.segment_path(offset);
debug!("fs: open segment at {}", path.display());
let file = File::open(&path)?;
CompressReader::new(file).map(|inner| ReadOnlySegment { inner })
let len = file.metadata()?.len();
CompressReader::new(file).map(|inner| ReadOnlySegment { inner, len })
}

fn remove_segment(&self, offset: u64) -> io::Result<()> {
Expand Down Expand Up @@ -315,8 +371,6 @@ impl Repo for Fs {
}
}

impl SegmentLen for CompressReader {}

#[cfg(feature = "streaming")]
impl crate::stream::AsyncRepo for Fs {
type AsyncSegmentWriter = tokio::io::BufWriter<tokio::fs::File>;
Expand Down
22 changes: 12 additions & 10 deletions crates/commitlog/src/repo/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,28 @@ impl Repo for Memory {
type SegmentWriter = Segment;
type SegmentReader = ReadOnlySegment;

fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
fn create_segment(&self, offset: u64, header: crate::segment::Header) -> io::Result<Self::SegmentWriter> {
let mut inner = self.segments.write().unwrap();
match inner.entry(offset) {
let mut segment = match inner.entry(offset) {
btree_map::Entry::Occupied(entry) => {
let entry = entry.get();
let read_guard = entry.read().unwrap();
if read_guard.is_empty() {
Ok(Segment::from_shared(self.space.clone(), entry.clone()))
if entry.read().unwrap().is_empty() {
Segment::from_shared(self.space.clone(), entry.clone())
} else {
Err(io::Error::new(
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("segment {offset} already exists"),
))
));
}
}
btree_map::Entry::Vacant(entry) => {
let segment = entry.insert(Arc::new(RwLock::new(Storage::new())));
Ok(Segment::from_shared(self.space.clone(), segment.clone()))
let storage = entry.insert(Arc::new(RwLock::new(Storage::new())));
Segment::from_shared(self.space.clone(), storage.clone())
}
}
};
header.write(&mut segment)?;

Ok(segment)
}

fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
Expand Down
Loading
Loading