diff --git a/Cargo.lock b/Cargo.lock index 52ae8eda6..52a577fe7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3617,6 +3617,7 @@ dependencies = [ "rand 0.8.5", "retry-error", "rusqlite", + "scopeguard", "serde", "signature", "tempfile", diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index 7bee45a89..59606dc0a 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -384,6 +384,7 @@ impl TorClient { let mut periodic_task_handles = circmgr .launch_background_tasks(&runtime, &dirmgr, statemgr.clone()) .map_err(ErrorDetail::CircMgrSetup)?; + periodic_task_handles.extend(dirmgr.download_task_handle()); periodic_task_handles.extend( chanmgr diff --git a/crates/tor-dirmgr/Cargo.toml b/crates/tor-dirmgr/Cargo.toml index ee19da16f..bcac23741 100644 --- a/crates/tor-dirmgr/Cargo.toml +++ b/crates/tor-dirmgr/Cargo.toml @@ -45,6 +45,7 @@ postage = { version = "0.5.0", default-features = false, features = ["futures-tr rand = "0.8" retry-error = { path = "../retry-error", version = "0.2.0" } rusqlite = { version = "0.27.0", features = ["time"] } +scopeguard = "1" serde = { version = "1.0.103", features = ["derive"] } signature = "1" thiserror = "1" diff --git a/crates/tor-dirmgr/semver.md b/crates/tor-dirmgr/semver.md new file mode 100644 index 000000000..f54423030 --- /dev/null +++ b/crates/tor-dirmgr/semver.md @@ -0,0 +1,3 @@ +MODIFIED: DirProvider now has download_task_handle(). + There's a default implementation, so this isn't a breaking change. + diff --git a/crates/tor-dirmgr/src/bootstrap.rs b/crates/tor-dirmgr/src/bootstrap.rs index b64a8db87..4b49f413b 100644 --- a/crates/tor-dirmgr/src/bootstrap.rs +++ b/crates/tor-dirmgr/src/bootstrap.rs @@ -21,7 +21,8 @@ use futures::channel::oneshot; use futures::FutureExt; use futures::StreamExt; use tor_dirclient::DirResponse; -use tor_rtcompat::{Runtime, SleepProviderExt}; +use tor_rtcompat::scheduler::TaskSchedule; +use tor_rtcompat::Runtime; use tracing::{debug, info, trace, warn}; use crate::storage::Store; @@ -472,6 +473,7 @@ async fn download_attempt( pub(crate) async fn download( dirmgr: Weak>, state: &mut Box, + schedule: &mut TaskSchedule, on_usable: &mut Option>, ) -> Result<()> { let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone(); @@ -515,7 +517,6 @@ pub(crate) async fn download( } let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time()); - let mut reset_timeout_future = runtime.sleep_until_wallclock(reset_time).fuse(); let mut retry = retry_config.schedule(); let mut delay = None; @@ -530,14 +531,19 @@ pub(crate) async fn download( let next_delay = retry.next_delay(&mut rand::thread_rng()); if let Some(delay) = delay.replace(next_delay) { debug!("Waiting {:?} for next download attempt...", delay); - futures::select_biased! { - _ = reset_timeout_future => { - info!("Download attempt timed out completely; resetting download state."); - reset(state); - continue 'next_state; - } - _ = FutureExt::fuse(runtime.sleep(delay)) => {} + let time_until_reset = { + reset_time + .duration_since(now) + .unwrap_or(Duration::from_secs(0)) }; + schedule.sleep(delay.min(time_until_reset)).await; + + now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock(); + if now >= reset_time { + info!("Download attempt timed out completely; resetting download state."); + reset(state); + continue 'next_state; + } } info!("{}: {}", attempt + 1, state.describe()); @@ -553,7 +559,7 @@ pub(crate) async fn download( continue 'next_attempt; } } - _ = runtime.sleep_until_wallclock(reset_time).fuse() => { + _ = schedule.sleep_until_wallclock(reset_time).fuse() => { // We need to reset. This can happen if (for // example) we're downloading the last few // microdescriptors on a consensus that now @@ -785,7 +791,8 @@ mod test { // Let's try bootstrapping when everything is in the cache. tor_rtcompat::test_with_one_runtime!(|rt| async { let now = rt.wallclock(); - let (_tempdir, mgr) = new_mgr(rt); + let (_tempdir, mgr) = new_mgr(rt.clone()); + let (mut schedule, _handle) = TaskSchedule::new(rt); { let mut store = mgr.store_if_rw().unwrap().lock().unwrap(); @@ -804,9 +811,14 @@ mod test { let mut state: Box = Box::new(DemoState::new1()); let mut on_usable = None; - super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable) - .await - .unwrap(); + super::download( + Arc::downgrade(&mgr), + &mut state, + &mut schedule, + &mut on_usable, + ) + .await + .unwrap(); assert!(state.is_ready(Readiness::Complete)); }); } @@ -817,7 +829,8 @@ mod test { // phase 2 in cache. tor_rtcompat::test_with_one_runtime!(|rt| async { let now = rt.wallclock(); - let (_tempdir, mgr) = new_mgr(rt); + let (_tempdir, mgr) = new_mgr(rt.clone()); + let (mut schedule, _handle) = TaskSchedule::new(rt); { let mut store = mgr.store_if_rw().unwrap().lock().unwrap(); @@ -838,9 +851,14 @@ mod test { let mut on_usable = None; let mut state: Box = Box::new(DemoState::new1()); - super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable) - .await - .unwrap(); + super::download( + Arc::downgrade(&mgr), + &mut state, + &mut schedule, + &mut on_usable, + ) + .await + .unwrap(); assert!(state.is_ready(Readiness::Complete)); }); } diff --git a/crates/tor-dirmgr/src/lib.rs b/crates/tor-dirmgr/src/lib.rs index 5ea63b9f4..20ac7f367 100644 --- a/crates/tor-dirmgr/src/lib.rs +++ b/crates/tor-dirmgr/src/lib.rs @@ -81,6 +81,7 @@ pub use crate::shared_ref::SharedMutArc; use crate::storage::{DynStore, Store}; use postage::watch; pub use retry::{DownloadSchedule, DownloadScheduleBuilder}; +use scopeguard::ScopeGuard; use tor_circmgr::CircMgr; use tor_dirclient::SourceInfo; use tor_error::into_internal; @@ -88,7 +89,8 @@ use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider}; use async_trait::async_trait; use futures::{channel::oneshot, stream::BoxStream, task::SpawnExt}; -use tor_rtcompat::{Runtime, SleepProviderExt}; +use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule}; +use tor_rtcompat::Runtime; use tracing::{debug, info, trace, warn}; use std::ops::Deref; @@ -134,6 +136,11 @@ pub trait DirProvider: NetDirProvider { /// Note that this stream can be lossy: the caller will not necessarily /// observe every event on the stream fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>; + + /// Return a [`TaskHandle`] that can be used to manage the download process. + fn download_task_handle(&self) -> Option { + None + } } // NOTE(eta): We can't implement this for Arc> due to trait coherence rules, so instead @@ -165,6 +172,10 @@ impl DirProvider for Arc> { fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> { Box::pin(DirMgr::bootstrap_events(self)) } + + fn download_task_handle(&self) -> Option { + Some(self.task_handle.clone()) + } } /// A directory manager to download, fetch, and cache a Tor directory. @@ -231,29 +242,13 @@ pub struct DirMgr { /// A filter that gets applied to directory objects before we use them. #[cfg(feature = "dirfilter")] filter: crate::filter::FilterConfig, -} -/// RAII guard to reset an AtomicBool on drop. -struct BoolResetter<'a> { - /// The bool to reset. - inner: &'a AtomicBool, - /// What value to store. - reset_to: bool, - /// What atomic ordering to use. - ordering: Ordering, -} + /// A task schedule that can be used if we're bootstrapping. If this is + /// None, then there's currently a scheduled task in progress. + task_schedule: Mutex>>, -impl<'a> Drop for BoolResetter<'a> { - fn drop(&mut self) { - self.inner.store(self.reset_to, self.ordering); - } -} - -impl<'a> BoolResetter<'a> { - /// Disarm the guard, consuming it to make it not reset any more. - fn disarm(self) { - std::mem::forget(self); - } + /// A task handle that we return to anybody who needs to manage our download process. + task_handle: TaskHandle, } /// The possible origins of a document. @@ -377,10 +372,16 @@ impl DirMgr { // Use a RAII guard to reset `bootstrap_started` to `false` if we return early without // completing bootstrap. - let resetter = BoolResetter { - inner: &self.bootstrap_started, - reset_to: false, - ordering: Ordering::SeqCst, + let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| { + v.store(false, Ordering::SeqCst); + }); + + let schedule = match self.task_schedule.lock().expect("poisoned lock").take() { + Some(sched) => sched, + None => { + debug!("Attempted to bootstrap twice; ignoring."); + return Ok(()); + } }; // Try to load from the cache. @@ -399,17 +400,30 @@ impl DirMgr { let dirmgr_weak = Arc::downgrade(self); self.runtime .spawn(async move { - // NOTE: This is a daemon task. It should eventually get - // treated as one. + // Use an RAII guard to make sure that when this task exits, the + // TaskSchedule object is put back. + // + // TODO(nick): Putting the schedule back isn't actually useful + // if the task exits _after_ we've bootstrapped for the first + // time, because of how bootstrap_started works. + let mut schedule = scopeguard::guard(schedule, |schedule| { + if let Some(dm) = Weak::upgrade(&dirmgr_weak) { + *dm.task_schedule.lock().expect("poisoned lock") = Some(schedule); + } + }); // Don't warn when these are Error::ManagerDropped: that // means that the DirMgr has been shut down. - if let Err(e) = Self::reload_until_owner(&dirmgr_weak, &mut sender).await { + if let Err(e) = + Self::reload_until_owner(&dirmgr_weak, &mut schedule, &mut sender).await + { match e { Error::ManagerDropped => {} _ => warn!("Unrecovered error while waiting for bootstrap: {}", e), } - } else if let Err(e) = Self::download_forever(dirmgr_weak, sender).await { + } else if let Err(e) = + Self::download_forever(dirmgr_weak.clone(), &mut schedule, sender).await + { match e { Error::ManagerDropped => {} _ => warn!("Unrecovered error while downloading: {}", e), @@ -422,8 +436,8 @@ impl DirMgr { match receiver.await { Ok(()) => { info!("We have enough information to build circuits."); - // Disarm the RAII guard, since we succeeded. - resetter.disarm(); + // Disarm the RAII guard, since we succeeded. Now bootstrap_started will remain true. + let _ = ScopeGuard::into_inner(reset_bootstrap_started); } Err(_) => { warn!("Bootstrapping task exited before finishing."); @@ -462,14 +476,13 @@ impl DirMgr { /// If we eventually become the owner, return Ok(). async fn reload_until_owner( weak: &Weak, + schedule: &mut TaskSchedule, on_complete: &mut Option>, ) -> Result<()> { let mut logged = false; let mut bootstrapped; - let runtime; { let dirmgr = upgrade_weak_ref(weak)?; - runtime = dirmgr.runtime.clone(); bootstrapped = dirmgr.netdir.get().is_some(); } @@ -504,7 +517,7 @@ impl DirMgr { } else { std::time::Duration::new(5, 0) }; - runtime.sleep(pause).await; + schedule.sleep(pause).await; // TODO: instead of loading the whole thing we should have a // database entry that says when the last update was, or use // our state functions. @@ -531,6 +544,7 @@ impl DirMgr { /// message using `on_complete`. async fn download_forever( weak: Weak, + schedule: &mut TaskSchedule, mut on_complete: Option>, ) -> Result<()> { let mut state: Box = { @@ -548,11 +562,6 @@ impl DirMgr { )) }; - let runtime = { - let dirmgr = upgrade_weak_ref(&weak)?; - dirmgr.runtime.clone() - }; - loop { let mut usable = false; @@ -567,7 +576,8 @@ impl DirMgr { 'retry_attempt: for _ in retry_config.attempts() { let outcome = - bootstrap::download(Weak::clone(&weak), &mut state, &mut on_complete).await; + bootstrap::download(Weak::clone(&weak), &mut state, schedule, &mut on_complete) + .await; if let Err(err) = outcome { if state.is_ready(Readiness::Usable) { @@ -592,7 +602,7 @@ impl DirMgr { "Unable to download a usable directory: {}. We will restart in {:?}.", err, delay ); - runtime.sleep(delay).await; + schedule.sleep(delay).await; state = state.reset(); } else { info!("Directory is complete."); @@ -617,7 +627,7 @@ impl DirMgr { let reset_at = state.reset_time(); match reset_at { - Some(t) => runtime.sleep_until_wallclock(t).await, + Some(t) => schedule.sleep_until_wallclock(t).await, None => return Ok(()), } state = state.reset(); @@ -743,6 +753,10 @@ impl DirMgr { #[cfg(feature = "dirfilter")] let filter = config.extensions.filter.clone(); + // We create these early so the client code can access task_handle before bootstrap() returns. + let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone()); + let task_schedule = Mutex::new(Some(task_schedule)); + Ok(DirMgr { config: config.into(), store, @@ -756,6 +770,8 @@ impl DirMgr { bootstrap_started: AtomicBool::new(false), #[cfg(feature = "dirfilter")] filter, + task_schedule, + task_handle, }) } @@ -1365,31 +1381,4 @@ replacement line assert!(expanded.is_err()); }); } - - #[test] - #[allow(clippy::bool_assert_comparison)] - fn bool_resetter_works() { - let bool = AtomicBool::new(false); - fn example(bool: &AtomicBool) { - bool.store(true, Ordering::SeqCst); - let _resetter = BoolResetter { - inner: bool, - reset_to: false, - ordering: Ordering::SeqCst, - }; - } - fn example_disarm(bool: &AtomicBool) { - bool.store(true, Ordering::SeqCst); - let resetter = BoolResetter { - inner: bool, - reset_to: false, - ordering: Ordering::SeqCst, - }; - resetter.disarm(); - } - example(&bool); - assert_eq!(bool.load(Ordering::SeqCst), false); - example_disarm(&bool); - assert_eq!(bool.load(Ordering::SeqCst), true); - } } diff --git a/crates/tor-rtcompat/src/scheduler.rs b/crates/tor-rtcompat/src/scheduler.rs index 686b4f47a..56fca9490 100644 --- a/crates/tor-rtcompat/src/scheduler.rs +++ b/crates/tor-rtcompat/src/scheduler.rs @@ -7,7 +7,7 @@ use futures::{Stream, StreamExt}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use pin_project::pin_project; @@ -20,6 +20,12 @@ enum SchedulerCommand { FireAt(Instant), /// Cancel a pending execution, if there is one. Cancel, + /// Pause execution without cancelling any running timers. (Those timers + /// will fire after we resume execution.) + Suspend, + /// Resume execution. If there is a pending timer, start waiting for it again; + /// otherwise, fire immediately. + Resume, } /// A remotely-controllable trigger for recurring tasks. @@ -39,6 +45,9 @@ pub struct TaskSchedule { /// This is used to avoid having to create a `SleepFuture` with zero duration, /// which is potentially a bit wasteful. instant_fire: bool, + /// Whether we are currently "suspended". If we are suspended, we won't + /// start executing again till we're explicitly "resumed". + suspended: bool, } /// A handle used to control a [`TaskSchedule`]. @@ -59,6 +68,7 @@ impl TaskSchedule { rt, // Start off ready. instant_fire: true, + suspended: false, }, TaskHandle { tx }, ) @@ -75,6 +85,39 @@ impl TaskSchedule { self.instant_fire = true; self.sleep = None; } + + /// Wait until `Dur` has elapsed. + /// + /// This call is equivalent to [`SleepProvider::sleep`], except that the + /// resulting future will respect calls to the functions on this schedule's + /// associated [`TaskHandle`]. + /// + /// Alternatively, you can view this function as equivalent to + /// `self.fire_in(dur); self.next().await;`, only with the intent made more + /// explicit. + /// + /// If the associated [`TaskHandle`] for this schedule is suspended, then + /// this method will not return until the schedule is unsuspended _and_ the + /// timer elapses. If the associated [`TaskHandle`] is cancelled, then this + /// method will not return at all, until the schedule is re-activated by + /// [`TaskHandle::fire`] or [`TaskHandle::fire_at`]. + pub async fn sleep(&mut self, dur: Duration) { + self.fire_in(dur); + self.next().await; + } + + /// As + /// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock), + /// but respect messages from this schedule's associated [`TaskHandle`]. + pub async fn sleep_until_wallclock(&mut self, when: SystemTime) { + loop { + let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when); + self.sleep(delay).await; + if finished { + return; + } + } + } } impl TaskHandle { @@ -98,6 +141,31 @@ impl TaskHandle { pub fn cancel(&self) -> bool { self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok() } + + /// Suspend execution of the corresponding schedule. + /// + /// If the schedule is ready now, it will become pending; it won't become + /// ready again until `resume()` is called. If the schedule is waiting for a + /// timer, the timer will keep counting, but the schedule won't become ready + /// until the timer has elapsed _and_ `resume()` has been called. + /// + /// Returns `true` if the schedule still exists, and `false` otherwise. + pub fn suspend(&self) -> bool { + self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok() + } + + /// Resume execution of the corresponding schedule. + /// + /// This method undoes the effect of a call to `suspend()`: the schedule + /// will fire again if it is ready (or when it becomes ready). + /// + /// This method won't cause the schedule to fire if it was already + /// cancelled. For that, use the `fire()` or fire_at()` methods. + /// + /// Returns `true` if the schedule still exists, and `false` otherwise. + pub fn resume(&self) -> bool { + self.tx.unbounded_send(SchedulerCommand::Resume).is_ok() + } } // NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want @@ -120,6 +188,12 @@ impl TaskScheduleP<'_, R> { *self.instant_fire = false; *self.sleep = None; } + SchedulerCommand::Suspend => { + *self.suspended = true; + } + SchedulerCommand::Resume => { + *self.suspended = false; + } } } } @@ -138,6 +212,9 @@ impl Stream for TaskSchedule { } } } + if *this.suspended { + return Poll::Pending; + } if *this.instant_fire { *this.instant_fire = false; return Poll::Ready(Some(())); @@ -283,4 +360,43 @@ mod test { assert!(sch.next().now_or_never().is_none()); }); } + + #[test] + fn suspend_and_resume_with_fire() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, hdl) = TaskSchedule::new(rt.clone()); + hdl.fire(); + hdl.suspend(); + + assert!(sch.next().now_or_never().is_none()); + hdl.resume(); + assert!(sch.next().now_or_never().is_some()); + }); + } + + #[test] + fn suspend_and_resume_with_sleep() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, hdl) = TaskSchedule::new(rt.clone()); + sch.fire_in(Duration::from_micros(100)); + hdl.suspend(); + + assert!(sch.next().now_or_never().is_none()); + hdl.resume(); + assert!(sch.next().now_or_never().is_none()); + assert!(sch.next().await.is_some()); + }); + } + + #[test] + fn suspend_and_resume_with_nothing() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, hdl) = TaskSchedule::new(rt.clone()); + assert!(sch.next().now_or_never().is_some()); + hdl.suspend(); + + assert!(sch.next().now_or_never().is_none()); + hdl.resume(); + }); + } } diff --git a/crates/tor-rtcompat/src/timer.rs b/crates/tor-rtcompat/src/timer.rs index f2e0dc7e5..b817a5191 100644 --- a/crates/tor-rtcompat/src/timer.rs +++ b/crates/tor-rtcompat/src/timer.rs @@ -180,7 +180,7 @@ const MAX_SLEEP: Duration = Duration::from_secs(600); /// expect this to be the final delay. /// /// (This is a separate function for testing.) -fn calc_next_delay(now: SystemTime, when: SystemTime) -> (bool, Duration) { +pub(crate) fn calc_next_delay(now: SystemTime, when: SystemTime) -> (bool, Duration) { let remainder = when .duration_since(now) .unwrap_or_else(|_| Duration::from_secs(0));