From 86e479ae1360a0cea3ad8020653c59f6aa52d338 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 7 Sep 2022 09:22:38 -0400 Subject: [PATCH] `TaskSchedule`: give error on `sleep*()` if last handle is dropped This fixes an busy-loop. When the last `TaskHandle` on a `TaskSchedule` is dropped, the schedule is permanently canceled: whatever operation it was scheduling should no longer be performed. But our code was broken: the `sleep()` and `sleep_until_wallclock()` functions don't verify whether the handles are dropped or not. This breakage caused an CPU-eating busy-loop in `sleep_until_wallclock`. With this patch, we now return a `Result<(), SleepError>` from these functions. Fixes #572. --- Cargo.lock | 1 + crates/tor-dirmgr/src/bootstrap.rs | 2 +- crates/tor-dirmgr/src/err.rs | 10 +++++++++ crates/tor-dirmgr/src/lib.rs | 6 ++--- crates/tor-rtcompat/Cargo.toml | 1 + crates/tor-rtcompat/semver.md | 1 + crates/tor-rtcompat/src/scheduler.rs | 33 +++++++++++++++++++++++----- 7 files changed, 45 insertions(+), 9 deletions(-) create mode 100644 crates/tor-rtcompat/semver.md diff --git a/Cargo.lock b/Cargo.lock index cd21fb6ac..5f794414a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3870,6 +3870,7 @@ dependencies = [ "native-tls", "pin-project", "rustls", + "thiserror", "tokio", "tokio-util", "x509-signature", diff --git a/crates/tor-dirmgr/src/bootstrap.rs b/crates/tor-dirmgr/src/bootstrap.rs index 9cddb81a2..44cef5b3c 100644 --- a/crates/tor-dirmgr/src/bootstrap.rs +++ b/crates/tor-dirmgr/src/bootstrap.rs @@ -583,7 +583,7 @@ pub(crate) async fn download( .duration_since(now) .unwrap_or(Duration::from_secs(0)) }; - schedule.sleep(delay.min(time_until_reset)).await; + schedule.sleep(delay.min(time_until_reset)).await?; now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock(); if now >= reset_time { diff --git a/crates/tor-dirmgr/src/err.rs b/crates/tor-dirmgr/src/err.rs index 6df26e5e7..d3c02df6f 100644 --- a/crates/tor-dirmgr/src/err.rs +++ b/crates/tor-dirmgr/src/err.rs @@ -145,6 +145,16 @@ impl From for Error { } } +impl From for Error { + fn from(err: tor_rtcompat::scheduler::SleepError) -> Self { + use tor_rtcompat::scheduler::SleepError::*; + match err { + ScheduleDropped => Error::ManagerDropped, + e => tor_error::into_internal!("Unexpected sleep error")(e).into(), + } + } +} + /// The effect that a given error has on our bootstrapping process #[derive(Copy, Clone, Debug)] pub(crate) enum BootstrapAction { diff --git a/crates/tor-dirmgr/src/lib.rs b/crates/tor-dirmgr/src/lib.rs index 5bcd99ee1..c4b1bfe35 100644 --- a/crates/tor-dirmgr/src/lib.rs +++ b/crates/tor-dirmgr/src/lib.rs @@ -581,7 +581,7 @@ impl DirMgr { } else { std::time::Duration::new(5, 0) }; - schedule.sleep(pause).await; + schedule.sleep(pause).await?; // TODO: instead of loading the whole thing we should have a // database entry that says when the last update was, or use // our state functions. @@ -676,7 +676,7 @@ impl DirMgr { let dirmgr = upgrade_weak_ref(&weak)?; dirmgr.note_reset(attempt_id); } - schedule.sleep(delay).await; + schedule.sleep(delay).await?; state = state.reset(); } else { info!("Directory is complete."); @@ -701,7 +701,7 @@ impl DirMgr { let reset_at = state.reset_time(); match reset_at { - Some(t) => schedule.sleep_until_wallclock(t).await, + Some(t) => schedule.sleep_until_wallclock(t).await?, None => return Ok(()), } attempt_id = bootstrap::AttemptId::next(); diff --git a/crates/tor-rtcompat/Cargo.toml b/crates/tor-rtcompat/Cargo.toml index 6fa0802d6..9f7e6ddfe 100644 --- a/crates/tor-rtcompat/Cargo.toml +++ b/crates/tor-rtcompat/Cargo.toml @@ -38,6 +38,7 @@ pin-project = "1" rustls-crate = { package = "rustls", version = "0.19", optional = true, features = [ "dangerous_configuration", ] } +thiserror = "1" tokio-crate = { package = "tokio", version = "1.7", optional = true, features = [ "rt", "rt-multi-thread", diff --git a/crates/tor-rtcompat/semver.md b/crates/tor-rtcompat/semver.md new file mode 100644 index 000000000..4033a920c --- /dev/null +++ b/crates/tor-rtcompat/semver.md @@ -0,0 +1 @@ +BREAKING: Schedule::sleep{,_until_wallclock} now return Result(). diff --git a/crates/tor-rtcompat/src/scheduler.rs b/crates/tor-rtcompat/src/scheduler.rs index 745d8e377..040595e74 100644 --- a/crates/tor-rtcompat/src/scheduler.rs +++ b/crates/tor-rtcompat/src/scheduler.rs @@ -11,6 +11,23 @@ use std::time::{Duration, Instant, SystemTime}; use pin_project::pin_project; +/// An error returned while telling a [`TaskSchedule`] to sleep. +/// +/// Unlike regular "sleep" functions, the sleep operations on a [`TaskSchedule`] +/// can fail because there are no [`TaskHandle`]s left. +/// +/// Note that it is *not* an error if the `sleep` function is interrupted, +/// cancelled, or or rescheduled for a later time: See [`TaskSchedule::sleep`] +/// for more information. +#[derive(Clone, Debug, thiserror::Error)] +#[non_exhaustive] +pub enum SleepError { + /// The final [`TaskHandle`] for this [`TaskSchedule`] has been dropped: the + /// task should exit. + #[error("All task handles dropped: task exiting.")] + ScheduleDropped, +} + /// A command sent from task handles to schedule objects. #[derive(Copy, Clone)] enum SchedulerCommand { @@ -51,6 +68,9 @@ pub struct TaskSchedule { } /// A handle used to control a [`TaskSchedule`]. +/// +/// When the final handle is dropped, the computation governed by the +/// `TaskSchedule` should terminate. #[derive(Clone)] pub struct TaskHandle { /// Sender of scheduler commands to the corresponding schedule. @@ -101,20 +121,23 @@ impl TaskSchedule { /// timer elapses. If the associated [`TaskHandle`] is cancelled, then this /// method will not return at all, until the schedule is re-activated by /// [`TaskHandle::fire`] or [`TaskHandle::fire_at`]. - pub async fn sleep(&mut self, dur: Duration) { + /// + /// Finally, if every associated [`TaskHandle`] has been dropped, then this + /// method will return an error. + pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> { self.fire_in(dur); - self.next().await; + self.next().await.ok_or(SleepError::ScheduleDropped) } /// As /// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock), /// but respect messages from this schedule's associated [`TaskHandle`]. - pub async fn sleep_until_wallclock(&mut self, when: SystemTime) { + pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> { loop { let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when); - self.sleep(delay).await; + self.sleep(delay).await?; if finished { - return; + return Ok(()); } } }