Skip to content

Commit bbcda39

Browse files
authored
Clean up blocking until deadline (#4217)
1 parent f5c28c3 commit bbcda39

File tree

6 files changed

+85
-134
lines changed

6 files changed

+85
-134
lines changed

esp-rtos/src/esp_radio/queue.rs

Lines changed: 18 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use alloc::{boxed::Box, vec};
22
use core::ptr::NonNull;
33

4-
use esp_hal::time::{Duration, Instant};
54
use esp_radio_rtos_driver::{
65
queue::{QueueImplementation, QueuePtr},
76
register_queue_implementation,
@@ -131,10 +130,8 @@ impl Queue {
131130
}
132131

133132
unsafe fn send_to_back(&self, item: *const u8, timeout_us: Option<u32>) -> bool {
134-
let deadline = timeout_us.map(|us| Instant::now() + Duration::from_micros(us as u64));
135-
136-
loop {
137-
let enqueued = self.inner.with(|queue| {
133+
if crate::with_deadline(timeout_us, |deadline| {
134+
self.inner.with(|queue| {
138135
if unsafe { queue.try_enqueue(item) } {
139136
trace!("Queue - notify with item");
140137
queue.waiting_for_item.notify();
@@ -145,25 +142,13 @@ impl Queue {
145142
queue.waiting_for_space.wait_with_deadline(deadline);
146143
false
147144
}
148-
});
149-
150-
if enqueued {
151-
return true;
152-
}
153-
154-
// We are here because the queue was full. Now we've either timed out, or an item has
155-
// been removed from the queue. However, any higher priority task can wake up
156-
// and preempt us still. Let's just check for the timeout, and try the whole process
157-
// again.
158-
159-
if let Some(deadline) = deadline
160-
&& deadline < Instant::now()
161-
{
162-
debug!("Queue - send to back - timed out");
163-
// We have a deadline and we've timed out.
164-
return false;
165-
}
166-
// We can block more, so let's attempt to enqueue again.
145+
})
146+
}) {
147+
debug!("Queue - send to back - success");
148+
true
149+
} else {
150+
debug!("Queue - send to back - timed out");
151+
false
167152
}
168153
}
169154

@@ -179,11 +164,8 @@ impl Queue {
179164
}
180165

181166
unsafe fn receive(&self, item: *mut u8, timeout_us: Option<u32>) -> bool {
182-
let deadline = timeout_us.map(|us| Instant::now() + Duration::from_micros(us as u64));
183-
184-
loop {
185-
// Attempt to dequeue an item from the queue
186-
let dequeued = self.inner.with(|queue| {
167+
if crate::with_deadline(timeout_us, |deadline| {
168+
self.inner.with(|queue| {
187169
if unsafe { queue.try_dequeue(item) } {
188170
trace!("Queue - notify with space");
189171
queue.waiting_for_space.notify();
@@ -194,25 +176,13 @@ impl Queue {
194176
queue.waiting_for_item.wait_with_deadline(deadline);
195177
false
196178
}
197-
});
198-
199-
if dequeued {
200-
return true;
201-
}
202-
203-
// We are here because we weren't able to dequeue from the queue previously. We've
204-
// either timed out, or the queue has an item. However, any higher priority
205-
// task can wake up and preempt us still. Let's just check for the timeout,
206-
// and try the whole process again.
207-
208-
if let Some(deadline) = deadline
209-
&& deadline < Instant::now()
210-
{
211-
// We have a deadline and we've timed out.
212-
debug!("Queue - timed out waiting for item");
213-
return false;
214-
}
215-
// We can block more, so let's attempt to dequeue again.
179+
})
180+
}) {
181+
debug!("Queue - dequeued item");
182+
true
183+
} else {
184+
debug!("Queue - timed out waiting for item");
185+
false
216186
}
217187
}
218188

esp-rtos/src/lib.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@ use esp_hal::interrupt::software::SoftwareInterrupt;
9898
use esp_hal::{
9999
Blocking,
100100
system::Cpu,
101+
time::{Duration, Instant},
101102
timer::{AnyTimer, OneShotTimer},
102103
};
103104
#[cfg(multi_core)]
104105
use esp_hal::{
105106
peripherals::CPU_CTRL,
106107
system::{CpuControl, Stack},
107-
time::{Duration, Instant},
108108
};
109109
#[cfg_attr(docsrs, doc(cfg(feature = "embassy")))]
110110
pub use macros::rtos_main as main;
@@ -336,10 +336,33 @@ pub fn start_second_core_with_stack_guard_offset<const STACK_SIZE: usize>(
336336
const TICK_RATE: u32 = esp_config::esp_config_int!(u32, "ESP_RTOS_CONFIG_TICK_RATE_HZ");
337337

338338
pub(crate) fn now() -> u64 {
339-
esp_hal::time::Instant::now()
340-
.duration_since_epoch()
341-
.as_micros()
339+
Instant::now().duration_since_epoch().as_micros()
342340
}
343341

344342
#[cfg(feature = "embassy")]
345343
embassy_time_driver::time_driver_impl!(static TIMER_QUEUE: crate::timer::embassy::TimerQueue = crate::timer::embassy::TimerQueue::new());
344+
345+
/// Waits for a condition to be met or a timeout to occur.
346+
///
347+
/// This function is meant to simplify implementation of blocking primitives. Upon failure the
348+
/// `attempt` function should enqueue the task in a wait queue and put the task to sleep.
349+
fn with_deadline(timeout_us: Option<u32>, attempt: impl Fn(Instant) -> bool) -> bool {
350+
let deadline = timeout_us
351+
.map(|us| Instant::now() + Duration::from_micros(us as u64))
352+
.unwrap_or(Instant::EPOCH + Duration::MAX);
353+
354+
while !attempt(deadline) {
355+
// We are here because the operation failed. We've either timed out, or the operation is
356+
// ready to be attempted again. However, any higher priority task can wake up and
357+
// preempt us still. Let's just check for the timeout, and try the whole process
358+
// again.
359+
360+
if timeout_us.is_some() && deadline < Instant::now() {
361+
// We have a deadline and we've timed out.
362+
return false;
363+
}
364+
// We can block more, so let's attempt the operation again.
365+
}
366+
367+
true
368+
}

esp-rtos/src/scheduler.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -359,18 +359,9 @@ impl SchedulerState {
359359
is_current
360360
}
361361

362-
pub(crate) fn sleep_until(&mut self, at: Instant) -> bool {
363-
let current_cpu = Cpu::current() as usize;
364-
let current_task = unwrap!(self.per_cpu[current_cpu].current_task, "No current task");
362+
pub(crate) fn sleep_task_until(&mut self, task: TaskPtr, at: Instant) -> bool {
365363
let timer_queue = unwrap!(self.time_driver.as_mut());
366-
if timer_queue.schedule_wakeup(current_task, at) {
367-
// The task has been scheduled for wakeup.
368-
task::yield_task();
369-
true
370-
} else {
371-
// The task refuses to sleep.
372-
false
373-
}
364+
timer_queue.schedule_wakeup(task, at)
374365
}
375366

376367
pub(crate) fn resume_task(&mut self, task: TaskPtr) {
@@ -448,7 +439,19 @@ impl Scheduler {
448439
}
449440

450441
pub(crate) fn sleep_until(&self, wake_at: Instant) -> bool {
451-
self.with(|scheduler| scheduler.sleep_until(wake_at))
442+
self.with(|scheduler| {
443+
let current_cpu = Cpu::current() as usize;
444+
let current_task = unwrap!(
445+
scheduler.per_cpu[current_cpu].current_task,
446+
"No current task"
447+
);
448+
if scheduler.sleep_task_until(current_task, wake_at) {
449+
task::yield_task();
450+
true
451+
} else {
452+
false
453+
}
454+
})
452455
}
453456
}
454457

esp-rtos/src/semaphore.rs

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@
22
//!
33
//! This module provides the [`Semaphore`] type, which implements counting semaphores and mutexes.
44
5-
use esp_hal::{
6-
system::Cpu,
7-
time::{Duration, Instant},
8-
};
5+
use esp_hal::{system::Cpu, time::Instant};
96
use esp_sync::NonReentrantMutex;
107

118
use crate::{
@@ -118,7 +115,7 @@ impl SemaphoreInner {
118115
}
119116
}
120117

121-
fn wait_with_deadline(&mut self, deadline: Option<Instant>) {
118+
fn wait_with_deadline(&mut self, deadline: Instant) {
122119
trace!("Semaphore wait_with_deadline - {:?}", deadline);
123120
match self {
124121
SemaphoreInner::Counting { waiting, .. } => waiting.wait_with_deadline(deadline),
@@ -181,36 +178,22 @@ impl Semaphore {
181178
/// If the semaphore is already taken, the task will be blocked until the semaphore is released.
182179
/// Recursive mutexes can be locked multiple times by the mutex owner task.
183180
pub fn take(&self, timeout_us: Option<u32>) -> bool {
184-
let deadline = timeout_us.map(|us| Instant::now() + Duration::from_micros(us as u64));
185-
loop {
186-
let taken = self.inner.with(|sem| {
181+
if crate::with_deadline(timeout_us, |deadline| {
182+
self.inner.with(|sem| {
187183
if sem.try_take() {
188184
true
189185
} else {
190186
// The task will go to sleep when the above critical section is released.
191187
sem.wait_with_deadline(deadline);
192188
false
193189
}
194-
});
195-
196-
if taken {
197-
debug!("Semaphore - take - success");
198-
return true;
199-
}
200-
201-
// We are here because we weren't able to take the semaphore previously. We've either
202-
// timed out, or the semaphore is ready for taking. However, any higher priority task
203-
// can wake up and preempt us still. Let's just check for the timeout, and
204-
// try the whole process again.
205-
206-
if let Some(deadline) = deadline
207-
&& deadline < Instant::now()
208-
{
209-
// We have a deadline and we've timed out.
210-
trace!("Semaphore - take - timed out");
211-
return false;
212-
}
213-
// We can block more, so let's attempt to take the semaphore again.
190+
})
191+
}) {
192+
debug!("Semaphore - take - success");
193+
true
194+
} else {
195+
debug!("Semaphore - take - timed out");
196+
false
214197
}
215198
}
216199

esp-rtos/src/task/mod.rs

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,6 @@ impl<E: TaskListElement> TaskQueue<E> {
233233

234234
#[cfg(feature = "embassy")]
235235
pub(crate) mod flags {
236-
237-
use esp_hal::time::{Duration, Instant};
238236
use esp_sync::NonReentrantMutex;
239237

240238
use crate::{
@@ -290,39 +288,21 @@ pub(crate) mod flags {
290288
}
291289

292290
pub(crate) fn wait(&self, wait_flags: u32, timeout_us: Option<u32>) -> bool {
293-
let deadline = timeout_us.map(|us| Instant::now() + Duration::from_micros(us as u64));
294-
loop {
295-
let success = self.inner.with(|inner| {
291+
if crate::with_deadline(timeout_us, |deadline| {
292+
self.inner.with(|inner| {
296293
if inner.wait(wait_flags) {
297294
true
298295
} else {
299-
let wake_at = if let Some(deadline) = deadline {
300-
deadline
301-
} else {
302-
Instant::EPOCH + Duration::MAX
303-
};
304-
SCHEDULER.sleep_until(wake_at);
296+
SCHEDULER.sleep_until(deadline);
305297
false
306298
}
307-
});
308-
309-
if success {
310-
debug!("Flags - wait - success");
311-
return true;
312-
}
313-
314-
// We are here because the required flags were not set previously. We've
315-
// either timed out, or the flags are now set. However,
316-
// any higher priority task can wake up and preempt us still. Let's
317-
// just check for the timeout, and try the whole process again.
318-
319-
if let Some(deadline) = deadline
320-
&& deadline < Instant::now()
321-
{
322-
// We have a deadline and we've timed out.
323-
trace!("Flags - wait - timed out");
324-
return false;
325-
}
299+
})
300+
}) {
301+
debug!("Flags - wait - success");
302+
true
303+
} else {
304+
trace!("Flags - wait - timed out");
305+
false
326306
}
327307
}
328308
}

esp-rtos/src/wait_queue.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
use core::ptr::NonNull;
22

3-
use esp_hal::{
4-
system::Cpu,
5-
time::{Duration, Instant},
6-
};
3+
use esp_hal::{system::Cpu, time::Instant};
74

85
use crate::{
96
SCHEDULER,
@@ -33,22 +30,17 @@ impl WaitQueue {
3330
});
3431
}
3532

36-
pub(crate) fn wait_with_deadline(&mut self, deadline: Option<Instant>) {
33+
pub(crate) fn wait_with_deadline(&mut self, deadline: Instant) {
3734
SCHEDULER.with(|scheduler| {
3835
let current_cpu = Cpu::current() as usize;
3936
let mut task = unwrap!(scheduler.per_cpu[current_cpu].current_task);
4037

41-
let wake_at = if let Some(deadline) = deadline {
42-
deadline
43-
} else {
44-
Instant::EPOCH + Duration::MAX
45-
};
46-
47-
if scheduler.sleep_until(wake_at) {
38+
if scheduler.sleep_task_until(task, deadline) {
4839
self.waiting_tasks.push(task);
4940
unsafe {
5041
task.as_mut().current_queue = Some(NonNull::from(self));
5142
}
43+
crate::task::yield_task();
5244
}
5345
});
5446
}

0 commit comments

Comments
 (0)