Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/meta/raft-store/src/applier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ where SM: StateMachineApi<SysData> + 'static

// Send queued change events to subscriber
for event in self.changes.drain(..) {
debug!("send to EventSender: {:?}", event);
info!("send to EventSender: {:?}", event);
self.sm.on_change_applied(event);
}

Expand Down
26 changes: 22 additions & 4 deletions src/meta/raft-store/src/sm_v003/sm_v003.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ pub struct SMV003 {
/// A semaphore that permits at most one compactor to run.
pub(crate) compaction_semaphore: Arc<Semaphore>,

/// Get a permit to write.
/// Semaphore for exclusive write access to the state machine.
///
/// Only one writer is allowed, to achieve serialization.
/// For historical reason, inserting a tombstone does not increase the seq.
/// Thus, mvcc isolation with the seq can not completely separate two concurrent writer.
/// Capacity is 1, ensuring only one writer at a time. This achieves serialization for:
/// - Applying Raft log entries to state machine
/// - Setting up watch streams with atomic snapshot reads
/// - Any operation requiring consistent state machine view
///
/// Historical context: Inserting tombstones does not increase the seq,
/// so MVCC isolation with seq alone cannot completely separate concurrent writers.
/// This semaphore provides the necessary serialization.
pub(crate) write_semaphore: Arc<Semaphore>,

/// Since when to start cleaning expired keys.
Expand Down Expand Up @@ -215,6 +220,19 @@ impl SMV003 {
self.on_change_applied.lock().unwrap().clone()
}

/// Acquire exclusive writer permit for state machine operations.
///
/// This returns a permit that grants exclusive write access to the state machine.
/// The permit is backed by a semaphore with capacity 1, ensuring only one writer
/// can hold the permit at a time.
///
/// This is used to serialize operations that require atomicity across multiple steps,
/// such as:
/// - Applying Raft log entries (via `new_applier()`)
/// - Setting up watch streams with consistent snapshot reads
/// - Any operation that needs to prevent concurrent state machine modifications
///
/// The permit is automatically released when dropped.
pub async fn acquire_writer_permit(&self) -> WriterPermit {
let acquirer = self.new_writer_acquirer();
let permit = acquirer.acquire().await;
Expand Down
10 changes: 9 additions & 1 deletion src/meta/raft-store/src/sm_v003/writer_acquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ impl WriterAcquirer {
}
}

/// ApplierPermit is used to acquire a permit for applying changes to the state machine.
/// Exclusive writer permit for state machine operations.
///
/// This permit is backed by a semaphore with capacity 1, ensuring exclusive access.
/// When held, no other writers can modify the state machine, guaranteeing:
/// - Serialization of Raft log application
/// - Atomicity of multi-step operations (e.g., watch registration + snapshot read)
/// - Prevention of MVCC isolation issues from concurrent writes
///
/// The permit is automatically released when dropped.
pub struct WriterPermit {
_permit: OwnedSemaphorePermit,
_drop: DropCallback,
Expand Down
20 changes: 20 additions & 0 deletions src/meta/service/src/meta_node/meta_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,26 @@ impl MetaNode {
let stream = {
let sm = mn.raft_store.get_sm_v003();

// Acquire exclusive writer permit to ensure atomicity of the entire operation:
//
// 1. Register the watcher to dispatcher
// 2. Read the snapshot of the key range
// 3. Queue the initialization data for forwarding
//
// This permit blocks all state machine writes (Raft log application) during this block,
// preventing the race condition where:
// - A write is applied to state machine after snapshot read but before watcher registration
// - The watcher would miss events for that write
//
// The permit is held until the end of this scope (line 1575), which includes:
// - Creating and registering the watch sender
// - Reading the snapshot (if flush=true)
// - Queuing the initialization future to dispatcher
//
// Since watch ranges are typically small and snapshot reads are fast,
// the blocking duration is acceptable for maintaining correctness.
let _permit = sm.acquire_writer_permit().await;

let sender = mn.new_watch_sender(watch, tx.clone())?;
let sender_str = sender.to_string();
let weak_sender = mn.insert_watch_sender(sender);
Expand Down
Loading