Merge branch 'schedule_drop' into 'main'
`TaskSchedule`: give error on `sleep*()` if last handle is dropped Closes #572 See merge request tpo/core/arti!725
This commit is contained in:
commit
c4bc426be3
|
@ -3870,6 +3870,7 @@ dependencies = [
|
|||
"native-tls",
|
||||
"pin-project",
|
||||
"rustls",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"x509-signature",
|
||||
|
|
|
@ -583,7 +583,7 @@ pub(crate) async fn download<R: Runtime>(
|
|||
.duration_since(now)
|
||||
.unwrap_or(Duration::from_secs(0))
|
||||
};
|
||||
schedule.sleep(delay.min(time_until_reset)).await;
|
||||
schedule.sleep(delay.min(time_until_reset)).await?;
|
||||
|
||||
now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
|
||||
if now >= reset_time {
|
||||
|
|
|
@ -145,6 +145,16 @@ impl From<signature::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<tor_rtcompat::scheduler::SleepError> for Error {
|
||||
fn from(err: tor_rtcompat::scheduler::SleepError) -> Self {
|
||||
use tor_rtcompat::scheduler::SleepError::*;
|
||||
match err {
|
||||
ScheduleDropped => Error::ManagerDropped,
|
||||
e => tor_error::into_internal!("Unexpected sleep error")(e).into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The effect that a given error has on our bootstrapping process
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub(crate) enum BootstrapAction {
|
||||
|
|
|
@ -581,7 +581,7 @@ impl<R: Runtime> DirMgr<R> {
|
|||
} else {
|
||||
std::time::Duration::new(5, 0)
|
||||
};
|
||||
schedule.sleep(pause).await;
|
||||
schedule.sleep(pause).await?;
|
||||
// TODO: instead of loading the whole thing we should have a
|
||||
// database entry that says when the last update was, or use
|
||||
// our state functions.
|
||||
|
@ -676,7 +676,7 @@ impl<R: Runtime> DirMgr<R> {
|
|||
let dirmgr = upgrade_weak_ref(&weak)?;
|
||||
dirmgr.note_reset(attempt_id);
|
||||
}
|
||||
schedule.sleep(delay).await;
|
||||
schedule.sleep(delay).await?;
|
||||
state = state.reset();
|
||||
} else {
|
||||
info!("Directory is complete.");
|
||||
|
@ -701,7 +701,7 @@ impl<R: Runtime> DirMgr<R> {
|
|||
|
||||
let reset_at = state.reset_time();
|
||||
match reset_at {
|
||||
Some(t) => schedule.sleep_until_wallclock(t).await,
|
||||
Some(t) => schedule.sleep_until_wallclock(t).await?,
|
||||
None => return Ok(()),
|
||||
}
|
||||
attempt_id = bootstrap::AttemptId::next();
|
||||
|
|
|
@ -38,6 +38,7 @@ pin-project = "1"
|
|||
rustls-crate = { package = "rustls", version = "0.19", optional = true, features = [
|
||||
"dangerous_configuration",
|
||||
] }
|
||||
thiserror = "1"
|
||||
tokio-crate = { package = "tokio", version = "1.7", optional = true, features = [
|
||||
"rt",
|
||||
"rt-multi-thread",
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
BREAKING: Schedule::sleep{,_until_wallclock} now return Result().
|
|
@ -11,6 +11,23 @@ use std::time::{Duration, Instant, SystemTime};
|
|||
|
||||
use pin_project::pin_project;
|
||||
|
||||
/// An error returned while telling a [`TaskSchedule`] to sleep.
|
||||
///
|
||||
/// Unlike regular "sleep" functions, the sleep operations on a [`TaskSchedule`]
|
||||
/// can fail because there are no [`TaskHandle`]s left.
|
||||
///
|
||||
/// Note that it is *not* an error if the `sleep` function is interrupted,
|
||||
/// cancelled, or or rescheduled for a later time: See [`TaskSchedule::sleep`]
|
||||
/// for more information.
|
||||
#[derive(Clone, Debug, thiserror::Error)]
|
||||
#[non_exhaustive]
|
||||
pub enum SleepError {
|
||||
/// The final [`TaskHandle`] for this [`TaskSchedule`] has been dropped: the
|
||||
/// task should exit.
|
||||
#[error("All task handles dropped: task exiting.")]
|
||||
ScheduleDropped,
|
||||
}
|
||||
|
||||
/// A command sent from task handles to schedule objects.
|
||||
#[derive(Copy, Clone)]
|
||||
enum SchedulerCommand {
|
||||
|
@ -51,6 +68,9 @@ pub struct TaskSchedule<R: SleepProvider> {
|
|||
}
|
||||
|
||||
/// A handle used to control a [`TaskSchedule`].
|
||||
///
|
||||
/// When the final handle is dropped, the computation governed by the
|
||||
/// `TaskSchedule` should terminate.
|
||||
#[derive(Clone)]
|
||||
pub struct TaskHandle {
|
||||
/// Sender of scheduler commands to the corresponding schedule.
|
||||
|
@ -101,20 +121,23 @@ impl<R: SleepProvider> TaskSchedule<R> {
|
|||
/// timer elapses. If the associated [`TaskHandle`] is cancelled, then this
|
||||
/// method will not return at all, until the schedule is re-activated by
|
||||
/// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
|
||||
pub async fn sleep(&mut self, dur: Duration) {
|
||||
///
|
||||
/// Finally, if every associated [`TaskHandle`] has been dropped, then this
|
||||
/// method will return an error.
|
||||
pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> {
|
||||
self.fire_in(dur);
|
||||
self.next().await;
|
||||
self.next().await.ok_or(SleepError::ScheduleDropped)
|
||||
}
|
||||
|
||||
/// As
|
||||
/// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
|
||||
/// but respect messages from this schedule's associated [`TaskHandle`].
|
||||
pub async fn sleep_until_wallclock(&mut self, when: SystemTime) {
|
||||
pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> {
|
||||
loop {
|
||||
let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
|
||||
self.sleep(delay).await;
|
||||
self.sleep(delay).await?;
|
||||
if finished {
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue