Skip to content

Commit 725db88

Browse files
authored
Merge pull request #4694 from RalfJung/epoll-ready-set
epoll: separately track ready_set
2 parents 16685fc + 4155266 commit 725db88

File tree

1 file changed

+60
-74
lines changed
  • src/tools/miri/src/shims/unix/linux_like

1 file changed

+60
-74
lines changed

src/tools/miri/src/shims/unix/linux_like/epoll.rs

Lines changed: 60 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::cell::RefCell;
2-
use std::collections::{BTreeMap, VecDeque};
2+
use std::collections::{BTreeMap, BTreeSet, VecDeque};
33
use std::io;
44
use std::time::Duration;
55

@@ -18,12 +18,13 @@ type EpollEventKey = (FdId, FdNum);
1818
#[derive(Debug, Default)]
1919
struct Epoll {
2020
/// A map of EpollEventInterests registered under this epoll instance. Each entry is
21-
/// differentiated using FdId and file descriptor value. Note that we do not have a separate
22-
/// "ready" list; instead, a boolean flag in this list tracks which subset is ready. This makes
23-
/// `epoll_wait` less efficient, but also requires less bookkeeping.
21+
/// differentiated using FdId and file descriptor value.
2422
interest_list: RefCell<BTreeMap<EpollEventKey, EpollEventInterest>>,
25-
/// The queue of threads blocked on this epoll instance, and how many events they'd like to get.
26-
queue: RefCell<VecDeque<(ThreadId, u32)>>,
23+
/// The subset of interests that is currently considered "ready". Stored separately so we
24+
/// can access it more efficiently.
25+
ready_set: RefCell<BTreeSet<EpollEventKey>>,
26+
/// The queue of threads blocked on this epoll instance.
27+
queue: RefCell<VecDeque<ThreadId>>,
2728
}
2829

2930
impl VisitProvenance for Epoll {
@@ -44,8 +45,6 @@ pub struct EpollEventInterest {
4445
relevant_events: u32,
4546
/// The currently active events for this file descriptor.
4647
active_events: u32,
47-
/// Whether this interest is in the "ready" set.
48-
ready: bool,
4948
/// The vector clock for wakeups.
5049
clock: VClock,
5150
/// User-defined data associated with this interest.
@@ -176,13 +175,19 @@ impl EpollInterestTable {
176175
if let Some(epolls) = self.0.remove(&id) {
177176
for epoll in epolls.iter().filter_map(|(_id, epoll)| epoll.upgrade()) {
178177
// This is a still-live epoll with interest in this FD. Remove all
179-
// relevent interests.
178+
// relevent interests (including from the ready set).
180179
epoll
181180
.interest_list
182181
.borrow_mut()
183182
.extract_if(range_for_id(id), |_, _| true)
184183
// Consume the iterator.
185184
.for_each(|_| ());
185+
epoll
186+
.ready_set
187+
.borrow_mut()
188+
.extract_if(range_for_id(id), |_| true)
189+
// Consume the iterator.
190+
.for_each(|_| ());
186191
}
187192
}
188193
}
@@ -326,7 +331,6 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
326331
relevant_events: events,
327332
data,
328333
active_events: 0,
329-
ready: false,
330334
clock: VClock::default(),
331335
};
332336
if interest_list.try_insert(epoll_key, new_interest).is_err() {
@@ -351,19 +355,20 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
351355
move |callback| {
352356
// Need to release the RefCell when this closure returns, so we have to move
353357
// it into the closure, so we have to do a re-lookup here.
354-
callback(interest_list.get_mut(&epoll_key).unwrap())
358+
callback(epoll_key, interest_list.get_mut(&epoll_key).unwrap())
355359
},
356360
)?;
357361

358362
interp_ok(Scalar::from_i32(0))
359363
} else if op == epoll_ctl_del {
360364
let epoll_key = (id, fd);
361365

362-
// Remove epoll_event_interest from interest_list.
366+
// Remove epoll_event_interest from interest_list and ready_set.
363367
if interest_list.remove(&epoll_key).is_none() {
364368
// We did not have interest in this.
365369
return this.set_last_error_and_return_i32(LibcError("ENOENT"));
366370
};
371+
epfd.ready_set.borrow_mut().remove(&epoll_key);
367372
// If this was the last interest in this FD, remove us from the global list
368373
// of who is interested in this FD.
369374
if interest_list.range(range_for_id(id)).next().is_none() {
@@ -441,7 +446,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
441446
return this.set_last_error_and_return(LibcError("EBADF"), dest);
442447
};
443448

444-
if timeout == 0 || epfd.interest_list.borrow().values().any(|i| i.ready) {
449+
if timeout == 0 || !epfd.ready_set.borrow().is_empty() {
445450
// If the timeout is 0 or there is a ready event, we can return immediately.
446451
return_ready_list(&epfd, dest, &event, this)?;
447452
} else {
@@ -459,9 +464,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
459464
}
460465
};
461466
// Record this thread as blocked.
462-
epfd.queue
463-
.borrow_mut()
464-
.push_back((this.active_thread(), maxevents.try_into().unwrap()));
467+
epfd.queue.borrow_mut().push_back(this.active_thread());
465468
// And block it.
466469
let dest = dest.clone();
467470
// We keep a strong ref to the underlying `Epoll` to make sure it sticks around.
@@ -487,7 +490,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
487490
// Remove the current active thread_id from the blocked thread_id list.
488491
epfd
489492
.queue.borrow_mut()
490-
.retain(|&(id, _events)| id != this.active_thread());
493+
.retain(|&id| id != this.active_thread());
491494
this.write_int(0, &dest)?;
492495
interp_ok(())
493496
},
@@ -526,9 +529,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
526529
let active_events = fd_ref.as_unix(this).epoll_active_events()?.get_event_bitmask(this);
527530
for epoll in epolls {
528531
update_readiness(this, &epoll, active_events, force_edge, |callback| {
529-
for (_key, interest) in epoll.interest_list.borrow_mut().range_mut(range_for_id(id))
532+
for (&key, interest) in epoll.interest_list.borrow_mut().range_mut(range_for_id(id))
530533
{
531-
callback(interest)?;
534+
callback(key, interest)?;
532535
}
533536
interp_ok(())
534537
})?;
@@ -550,46 +553,34 @@ fn update_readiness<'tcx>(
550553
active_events: u32,
551554
force_edge: bool,
552555
for_each_interest: impl FnOnce(
553-
&mut dyn FnMut(&mut EpollEventInterest) -> InterpResult<'tcx>,
556+
&mut dyn FnMut(EpollEventKey, &mut EpollEventInterest) -> InterpResult<'tcx>,
554557
) -> InterpResult<'tcx>,
555558
) -> InterpResult<'tcx> {
556-
let mut num_ready = 0u32; // how many events we have ready to deliver
557-
for_each_interest(&mut |interest| {
559+
let mut ready_set = epoll.ready_set.borrow_mut();
560+
for_each_interest(&mut |key, interest| {
558561
// Update the ready events tracked in this interest.
559562
let new_readiness = interest.relevant_events & active_events;
560563
let prev_readiness = std::mem::replace(&mut interest.active_events, new_readiness);
561564
if new_readiness == 0 {
562565
// Un-trigger this, there's nothing left to report here.
563-
interest.ready = false;
566+
ready_set.remove(&key);
564567
} else if force_edge || new_readiness != prev_readiness & new_readiness {
565568
// Either we force an "edge" to be detected, or there's a bit set in `new`
566-
// that was not set in `prev`.
567-
interest.ready = true;
569+
// that was not set in `prev`. In both cases, this is ready now.
570+
ready_set.insert(key);
568571
ecx.release_clock(|clock| {
569572
interest.clock.join(clock);
570573
})?;
571-
num_ready = num_ready.saturating_add(1);
572574
}
573575
interp_ok(())
574576
})?;
575-
// Edge-triggered notifications only wake up as many threads as are needed to deliver
576-
// all the events.
577-
while num_ready > 0
578-
&& let Some((thread_id, events)) = epoll.queue.borrow_mut().pop_front()
577+
// While there are events ready to be delivered, wake up a thread to receive them.
578+
while !ready_set.is_empty()
579+
&& let Some(thread_id) = epoll.queue.borrow_mut().pop_front()
579580
{
581+
drop(ready_set); // release the "lock" so the unblocked thread can have it
580582
ecx.unblock_thread(thread_id, BlockReason::Epoll)?;
581-
// Keep track of how many events we have left to deliver (except if we saturated;
582-
// in that case we just wake up everybody).
583-
if num_ready != u32::MAX {
584-
num_ready = num_ready.saturating_sub(events);
585-
}
586-
}
587-
// Sanity-check: if there are threads left to wake up, then there are no more ready events.
588-
if !epoll.queue.borrow().is_empty() {
589-
assert!(
590-
epoll.interest_list.borrow().values().all(|i| !i.ready),
591-
"there are unconsumed ready events and threads ready to take them"
592-
);
583+
ready_set = epoll.ready_set.borrow_mut();
593584
}
594585

595586
interp_ok(())
@@ -604,43 +595,38 @@ fn return_ready_list<'tcx>(
604595
ecx: &mut MiriInterpCx<'tcx>,
605596
) -> InterpResult<'tcx, i32> {
606597
let mut interest_list = epfd.interest_list.borrow_mut();
598+
let mut ready_set = epfd.ready_set.borrow_mut();
607599
let mut num_of_events: i32 = 0;
608600
let mut array_iter = ecx.project_array_fields(events)?;
609601

610-
let mut interests = interest_list.iter_mut();
611-
while let Some(slot) = array_iter.next(ecx)? {
612-
// Search for next ready event that we are intersted in. This is an inefficient linear scan.
613-
// We could make it efficient by tracking the set of triggered events in a BTreeSet or so,
614-
// but since this is very unlikely to be a bottleneck we prefer cleaner code.
615-
for (key, interest) in interests.by_ref() {
616-
// Sanity-check to ensure that the info about this event is up-to-date.
617-
if cfg!(debug_assertions) {
618-
// Ensure this matches the latest readiness of this FD.
619-
// We have to do an FD lookup by ID for this. The FdNum might be already closed.
620-
let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap();
621-
let current_readiness =
622-
fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx);
623-
assert_eq!(interest.active_events, current_readiness & interest.relevant_events);
624-
}
625-
// Skip event if it has not been triggered.
626-
if !interest.ready {
627-
continue;
628-
}
629-
// Deliver event to caller.
630-
ecx.write_int_fields_named(
631-
&[("events", interest.active_events.into()), ("u64", interest.data.into())],
632-
&slot.1,
633-
)?;
634-
num_of_events = num_of_events.strict_add(1);
635-
// Synchronize waking thread with the event of interest.
636-
ecx.acquire_clock(&interest.clock)?;
637-
// Mark this interest as no-longer-ready, since it has been delivered (and we only
638-
// support ET).
639-
interest.ready = false;
640-
// Skip out of this loop so that we go to the next slot in the array.
641-
break;
602+
// Sanity-check to ensure that all event info is up-to-date.
603+
if cfg!(debug_assertions) {
604+
for (key, interest) in interest_list.iter() {
605+
// Ensure this matches the latest readiness of this FD.
606+
// We have to do an FD lookup by ID for this. The FdNum might be already closed.
607+
let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap();
608+
let current_active = fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx);
609+
assert_eq!(interest.active_events, current_active & interest.relevant_events);
642610
}
643611
}
612+
613+
// While there is a slot to store another event, and an event to store, deliver that event.
614+
while let Some(slot) = array_iter.next(ecx)?
615+
&& let Some(&key) = ready_set.first()
616+
{
617+
let interest = interest_list.get_mut(&key).expect("non-existent event in ready set");
618+
// Deliver event to caller.
619+
ecx.write_int_fields_named(
620+
&[("events", interest.active_events.into()), ("u64", interest.data.into())],
621+
&slot.1,
622+
)?;
623+
num_of_events = num_of_events.strict_add(1);
624+
// Synchronize receiving thread with the event of interest.
625+
ecx.acquire_clock(&interest.clock)?;
626+
// Since currently, all events are edge-triggered, we remove them from the ready set when
627+
// they get delivered.
628+
ready_set.remove(&key);
629+
}
644630
ecx.write_int(num_of_events, dest)?;
645631
interp_ok(num_of_events)
646632
}

0 commit comments

Comments
 (0)