diff --git a/crates/tor-dirmgr/src/bootstrap.rs b/crates/tor-dirmgr/src/bootstrap.rs index b64a8db87..4b49f413b 100644 --- a/crates/tor-dirmgr/src/bootstrap.rs +++ b/crates/tor-dirmgr/src/bootstrap.rs @@ -21,7 +21,8 @@ use futures::channel::oneshot; use futures::FutureExt; use futures::StreamExt; use tor_dirclient::DirResponse; -use tor_rtcompat::{Runtime, SleepProviderExt}; +use tor_rtcompat::scheduler::TaskSchedule; +use tor_rtcompat::Runtime; use tracing::{debug, info, trace, warn}; use crate::storage::Store; @@ -472,6 +473,7 @@ async fn download_attempt( pub(crate) async fn download( dirmgr: Weak>, state: &mut Box, + schedule: &mut TaskSchedule, on_usable: &mut Option>, ) -> Result<()> { let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone(); @@ -515,7 +517,6 @@ pub(crate) async fn download( } let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time()); - let mut reset_timeout_future = runtime.sleep_until_wallclock(reset_time).fuse(); let mut retry = retry_config.schedule(); let mut delay = None; @@ -530,14 +531,19 @@ pub(crate) async fn download( let next_delay = retry.next_delay(&mut rand::thread_rng()); if let Some(delay) = delay.replace(next_delay) { debug!("Waiting {:?} for next download attempt...", delay); - futures::select_biased! { - _ = reset_timeout_future => { - info!("Download attempt timed out completely; resetting download state."); - reset(state); - continue 'next_state; - } - _ = FutureExt::fuse(runtime.sleep(delay)) => {} + let time_until_reset = { + reset_time + .duration_since(now) + .unwrap_or(Duration::from_secs(0)) }; + schedule.sleep(delay.min(time_until_reset)).await; + + now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock(); + if now >= reset_time { + info!("Download attempt timed out completely; resetting download state."); + reset(state); + continue 'next_state; + } } info!("{}: {}", attempt + 1, state.describe()); @@ -553,7 +559,7 @@ pub(crate) async fn download( continue 'next_attempt; } } - _ = runtime.sleep_until_wallclock(reset_time).fuse() => { + _ = schedule.sleep_until_wallclock(reset_time).fuse() => { // We need to reset. This can happen if (for // example) we're downloading the last few // microdescriptors on a consensus that now @@ -785,7 +791,8 @@ mod test { // Let's try bootstrapping when everything is in the cache. tor_rtcompat::test_with_one_runtime!(|rt| async { let now = rt.wallclock(); - let (_tempdir, mgr) = new_mgr(rt); + let (_tempdir, mgr) = new_mgr(rt.clone()); + let (mut schedule, _handle) = TaskSchedule::new(rt); { let mut store = mgr.store_if_rw().unwrap().lock().unwrap(); @@ -804,9 +811,14 @@ mod test { let mut state: Box = Box::new(DemoState::new1()); let mut on_usable = None; - super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable) - .await - .unwrap(); + super::download( + Arc::downgrade(&mgr), + &mut state, + &mut schedule, + &mut on_usable, + ) + .await + .unwrap(); assert!(state.is_ready(Readiness::Complete)); }); } @@ -817,7 +829,8 @@ mod test { // phase 2 in cache. tor_rtcompat::test_with_one_runtime!(|rt| async { let now = rt.wallclock(); - let (_tempdir, mgr) = new_mgr(rt); + let (_tempdir, mgr) = new_mgr(rt.clone()); + let (mut schedule, _handle) = TaskSchedule::new(rt); { let mut store = mgr.store_if_rw().unwrap().lock().unwrap(); @@ -838,9 +851,14 @@ mod test { let mut on_usable = None; let mut state: Box = Box::new(DemoState::new1()); - super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable) - .await - .unwrap(); + super::download( + Arc::downgrade(&mgr), + &mut state, + &mut schedule, + &mut on_usable, + ) + .await + .unwrap(); assert!(state.is_ready(Readiness::Complete)); }); } diff --git a/crates/tor-dirmgr/src/lib.rs b/crates/tor-dirmgr/src/lib.rs index 867d4e01a..d63105cf7 100644 --- a/crates/tor-dirmgr/src/lib.rs +++ b/crates/tor-dirmgr/src/lib.rs @@ -88,7 +88,8 @@ use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider}; use async_trait::async_trait; use futures::{channel::oneshot, stream::BoxStream, task::SpawnExt}; -use tor_rtcompat::{Runtime, SleepProviderExt}; +use tor_rtcompat::scheduler::TaskSchedule; +use tor_rtcompat::Runtime; use tracing::{debug, info, trace, warn}; use std::ops::Deref; @@ -383,6 +384,8 @@ impl DirMgr { ordering: Ordering::SeqCst, }; + let (mut schedule, _handle) = TaskSchedule::new(self.runtime.clone()); + // Try to load from the cache. let have_directory = self.load_directory().await?; @@ -404,12 +407,16 @@ impl DirMgr { // Don't warn when these are Error::ManagerDropped: that // means that the DirMgr has been shut down. - if let Err(e) = Self::reload_until_owner(&dirmgr_weak, &mut sender).await { + if let Err(e) = + Self::reload_until_owner(&dirmgr_weak, &mut schedule, &mut sender).await + { match e { Error::ManagerDropped => {} _ => warn!("Unrecovered error while waiting for bootstrap: {}", e), } - } else if let Err(e) = Self::download_forever(dirmgr_weak, sender).await { + } else if let Err(e) = + Self::download_forever(dirmgr_weak, &mut schedule, sender).await + { match e { Error::ManagerDropped => {} _ => warn!("Unrecovered error while downloading: {}", e), @@ -462,14 +469,13 @@ impl DirMgr { /// If we eventually become the owner, return Ok(). async fn reload_until_owner( weak: &Weak, + schedule: &mut TaskSchedule, on_complete: &mut Option>, ) -> Result<()> { let mut logged = false; let mut bootstrapped; - let runtime; { let dirmgr = upgrade_weak_ref(weak)?; - runtime = dirmgr.runtime.clone(); bootstrapped = dirmgr.netdir.get().is_some(); } @@ -504,7 +510,7 @@ impl DirMgr { } else { std::time::Duration::new(5, 0) }; - runtime.sleep(pause).await; + schedule.sleep(pause).await; // TODO: instead of loading the whole thing we should have a // database entry that says when the last update was, or use // our state functions. @@ -531,6 +537,7 @@ impl DirMgr { /// message using `on_complete`. async fn download_forever( weak: Weak, + schedule: &mut TaskSchedule, mut on_complete: Option>, ) -> Result<()> { let mut state: Box = { @@ -548,11 +555,6 @@ impl DirMgr { )) }; - let runtime = { - let dirmgr = upgrade_weak_ref(&weak)?; - dirmgr.runtime.clone() - }; - loop { let mut usable = false; @@ -567,7 +569,8 @@ impl DirMgr { 'retry_attempt: for _ in retry_config.attempts() { let outcome = - bootstrap::download(Weak::clone(&weak), &mut state, &mut on_complete).await; + bootstrap::download(Weak::clone(&weak), &mut state, schedule, &mut on_complete) + .await; if let Err(err) = outcome { if state.is_ready(Readiness::Usable) { @@ -592,7 +595,7 @@ impl DirMgr { "Unable to download a usable directory: {}. We will restart in {:?}.", err, delay ); - runtime.sleep(delay).await; + schedule.sleep(delay).await; state = state.reset(); } else { info!("Directory is complete."); @@ -617,7 +620,7 @@ impl DirMgr { let reset_at = state.reset_time(); match reset_at { - Some(t) => runtime.sleep_until_wallclock(t).await, + Some(t) => schedule.sleep_until_wallclock(t).await, None => return Ok(()), } state = state.reset();