`TaskSchedule`: give error on `sleep*()` if last handle is dropped
This fixes an busy-loop. When the last `TaskHandle` on a `TaskSchedule` is dropped, the schedule is permanently canceled: whatever operation it was scheduling should no longer be performed. But our code was broken: the `sleep()` and `sleep_until_wallclock()` functions don't verify whether the handles are dropped or not. This breakage caused an CPU-eating busy-loop in `sleep_until_wallclock`. With this patch, we now return a `Result<(), SleepError>` from these functions. Fixes #572.
This commit is contained in:
parent
239c1f044f
commit
86e479ae13
|
@ -3870,6 +3870,7 @@ dependencies = [
|
||||||
"native-tls",
|
"native-tls",
|
||||||
"pin-project",
|
"pin-project",
|
||||||
"rustls",
|
"rustls",
|
||||||
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"x509-signature",
|
"x509-signature",
|
||||||
|
|
|
@ -583,7 +583,7 @@ pub(crate) async fn download<R: Runtime>(
|
||||||
.duration_since(now)
|
.duration_since(now)
|
||||||
.unwrap_or(Duration::from_secs(0))
|
.unwrap_or(Duration::from_secs(0))
|
||||||
};
|
};
|
||||||
schedule.sleep(delay.min(time_until_reset)).await;
|
schedule.sleep(delay.min(time_until_reset)).await?;
|
||||||
|
|
||||||
now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
|
now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
|
||||||
if now >= reset_time {
|
if now >= reset_time {
|
||||||
|
|
|
@ -145,6 +145,16 @@ impl From<signature::Error> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<tor_rtcompat::scheduler::SleepError> for Error {
|
||||||
|
fn from(err: tor_rtcompat::scheduler::SleepError) -> Self {
|
||||||
|
use tor_rtcompat::scheduler::SleepError::*;
|
||||||
|
match err {
|
||||||
|
ScheduleDropped => Error::ManagerDropped,
|
||||||
|
e => tor_error::into_internal!("Unexpected sleep error")(e).into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The effect that a given error has on our bootstrapping process
|
/// The effect that a given error has on our bootstrapping process
|
||||||
#[derive(Copy, Clone, Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
pub(crate) enum BootstrapAction {
|
pub(crate) enum BootstrapAction {
|
||||||
|
|
|
@ -581,7 +581,7 @@ impl<R: Runtime> DirMgr<R> {
|
||||||
} else {
|
} else {
|
||||||
std::time::Duration::new(5, 0)
|
std::time::Duration::new(5, 0)
|
||||||
};
|
};
|
||||||
schedule.sleep(pause).await;
|
schedule.sleep(pause).await?;
|
||||||
// TODO: instead of loading the whole thing we should have a
|
// TODO: instead of loading the whole thing we should have a
|
||||||
// database entry that says when the last update was, or use
|
// database entry that says when the last update was, or use
|
||||||
// our state functions.
|
// our state functions.
|
||||||
|
@ -676,7 +676,7 @@ impl<R: Runtime> DirMgr<R> {
|
||||||
let dirmgr = upgrade_weak_ref(&weak)?;
|
let dirmgr = upgrade_weak_ref(&weak)?;
|
||||||
dirmgr.note_reset(attempt_id);
|
dirmgr.note_reset(attempt_id);
|
||||||
}
|
}
|
||||||
schedule.sleep(delay).await;
|
schedule.sleep(delay).await?;
|
||||||
state = state.reset();
|
state = state.reset();
|
||||||
} else {
|
} else {
|
||||||
info!("Directory is complete.");
|
info!("Directory is complete.");
|
||||||
|
@ -701,7 +701,7 @@ impl<R: Runtime> DirMgr<R> {
|
||||||
|
|
||||||
let reset_at = state.reset_time();
|
let reset_at = state.reset_time();
|
||||||
match reset_at {
|
match reset_at {
|
||||||
Some(t) => schedule.sleep_until_wallclock(t).await,
|
Some(t) => schedule.sleep_until_wallclock(t).await?,
|
||||||
None => return Ok(()),
|
None => return Ok(()),
|
||||||
}
|
}
|
||||||
attempt_id = bootstrap::AttemptId::next();
|
attempt_id = bootstrap::AttemptId::next();
|
||||||
|
|
|
@ -38,6 +38,7 @@ pin-project = "1"
|
||||||
rustls-crate = { package = "rustls", version = "0.19", optional = true, features = [
|
rustls-crate = { package = "rustls", version = "0.19", optional = true, features = [
|
||||||
"dangerous_configuration",
|
"dangerous_configuration",
|
||||||
] }
|
] }
|
||||||
|
thiserror = "1"
|
||||||
tokio-crate = { package = "tokio", version = "1.7", optional = true, features = [
|
tokio-crate = { package = "tokio", version = "1.7", optional = true, features = [
|
||||||
"rt",
|
"rt",
|
||||||
"rt-multi-thread",
|
"rt-multi-thread",
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
BREAKING: Schedule::sleep{,_until_wallclock} now return Result().
|
|
@ -11,6 +11,23 @@ use std::time::{Duration, Instant, SystemTime};
|
||||||
|
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
|
|
||||||
|
/// An error returned while telling a [`TaskSchedule`] to sleep.
|
||||||
|
///
|
||||||
|
/// Unlike regular "sleep" functions, the sleep operations on a [`TaskSchedule`]
|
||||||
|
/// can fail because there are no [`TaskHandle`]s left.
|
||||||
|
///
|
||||||
|
/// Note that it is *not* an error if the `sleep` function is interrupted,
|
||||||
|
/// cancelled, or or rescheduled for a later time: See [`TaskSchedule::sleep`]
|
||||||
|
/// for more information.
|
||||||
|
#[derive(Clone, Debug, thiserror::Error)]
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum SleepError {
|
||||||
|
/// The final [`TaskHandle`] for this [`TaskSchedule`] has been dropped: the
|
||||||
|
/// task should exit.
|
||||||
|
#[error("All task handles dropped: task exiting.")]
|
||||||
|
ScheduleDropped,
|
||||||
|
}
|
||||||
|
|
||||||
/// A command sent from task handles to schedule objects.
|
/// A command sent from task handles to schedule objects.
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
enum SchedulerCommand {
|
enum SchedulerCommand {
|
||||||
|
@ -51,6 +68,9 @@ pub struct TaskSchedule<R: SleepProvider> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A handle used to control a [`TaskSchedule`].
|
/// A handle used to control a [`TaskSchedule`].
|
||||||
|
///
|
||||||
|
/// When the final handle is dropped, the computation governed by the
|
||||||
|
/// `TaskSchedule` should terminate.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TaskHandle {
|
pub struct TaskHandle {
|
||||||
/// Sender of scheduler commands to the corresponding schedule.
|
/// Sender of scheduler commands to the corresponding schedule.
|
||||||
|
@ -101,20 +121,23 @@ impl<R: SleepProvider> TaskSchedule<R> {
|
||||||
/// timer elapses. If the associated [`TaskHandle`] is cancelled, then this
|
/// timer elapses. If the associated [`TaskHandle`] is cancelled, then this
|
||||||
/// method will not return at all, until the schedule is re-activated by
|
/// method will not return at all, until the schedule is re-activated by
|
||||||
/// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
|
/// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
|
||||||
pub async fn sleep(&mut self, dur: Duration) {
|
///
|
||||||
|
/// Finally, if every associated [`TaskHandle`] has been dropped, then this
|
||||||
|
/// method will return an error.
|
||||||
|
pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> {
|
||||||
self.fire_in(dur);
|
self.fire_in(dur);
|
||||||
self.next().await;
|
self.next().await.ok_or(SleepError::ScheduleDropped)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// As
|
/// As
|
||||||
/// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
|
/// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
|
||||||
/// but respect messages from this schedule's associated [`TaskHandle`].
|
/// but respect messages from this schedule's associated [`TaskHandle`].
|
||||||
pub async fn sleep_until_wallclock(&mut self, when: SystemTime) {
|
pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> {
|
||||||
loop {
|
loop {
|
||||||
let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
|
let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
|
||||||
self.sleep(delay).await;
|
self.sleep(delay).await?;
|
||||||
if finished {
|
if finished {
|
||||||
return;
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue