DirMgr: Refactor bootstrap to use a TaskSchedule for sleeping.

This change (not yet exposed as an API) will let the TorClient have
a `TaskHandle` corresponding to the directory task, letting it
make the directory task dormant as needed.
This commit is contained in:
Nick Mathewson 2022-06-07 12:37:12 -04:00
parent 947bbe6fe7
commit 171b406b00
2 changed files with 53 additions and 32 deletions

View File

@ -21,7 +21,8 @@ use futures::channel::oneshot;
use futures::FutureExt;
use futures::StreamExt;
use tor_dirclient::DirResponse;
use tor_rtcompat::{Runtime, SleepProviderExt};
use tor_rtcompat::scheduler::TaskSchedule;
use tor_rtcompat::Runtime;
use tracing::{debug, info, trace, warn};
use crate::storage::Store;
@ -472,6 +473,7 @@ async fn download_attempt<R: Runtime>(
pub(crate) async fn download<R: Runtime>(
dirmgr: Weak<DirMgr<R>>,
state: &mut Box<dyn DirState>,
schedule: &mut TaskSchedule<R>,
on_usable: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
@ -515,7 +517,6 @@ pub(crate) async fn download<R: Runtime>(
}
let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
let mut reset_timeout_future = runtime.sleep_until_wallclock(reset_time).fuse();
let mut retry = retry_config.schedule();
let mut delay = None;
@ -530,14 +531,19 @@ pub(crate) async fn download<R: Runtime>(
let next_delay = retry.next_delay(&mut rand::thread_rng());
if let Some(delay) = delay.replace(next_delay) {
debug!("Waiting {:?} for next download attempt...", delay);
futures::select_biased! {
_ = reset_timeout_future => {
info!("Download attempt timed out completely; resetting download state.");
reset(state);
continue 'next_state;
}
_ = FutureExt::fuse(runtime.sleep(delay)) => {}
let time_until_reset = {
reset_time
.duration_since(now)
.unwrap_or(Duration::from_secs(0))
};
schedule.sleep(delay.min(time_until_reset)).await;
now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
if now >= reset_time {
info!("Download attempt timed out completely; resetting download state.");
reset(state);
continue 'next_state;
}
}
info!("{}: {}", attempt + 1, state.describe());
@ -553,7 +559,7 @@ pub(crate) async fn download<R: Runtime>(
continue 'next_attempt;
}
}
_ = runtime.sleep_until_wallclock(reset_time).fuse() => {
_ = schedule.sleep_until_wallclock(reset_time).fuse() => {
// We need to reset. This can happen if (for
// example) we're downloading the last few
// microdescriptors on a consensus that now
@ -785,7 +791,8 @@ mod test {
// Let's try bootstrapping when everything is in the cache.
tor_rtcompat::test_with_one_runtime!(|rt| async {
let now = rt.wallclock();
let (_tempdir, mgr) = new_mgr(rt);
let (_tempdir, mgr) = new_mgr(rt.clone());
let (mut schedule, _handle) = TaskSchedule::new(rt);
{
let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
@ -804,9 +811,14 @@ mod test {
let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
let mut on_usable = None;
super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable)
.await
.unwrap();
super::download(
Arc::downgrade(&mgr),
&mut state,
&mut schedule,
&mut on_usable,
)
.await
.unwrap();
assert!(state.is_ready(Readiness::Complete));
});
}
@ -817,7 +829,8 @@ mod test {
// phase 2 in cache.
tor_rtcompat::test_with_one_runtime!(|rt| async {
let now = rt.wallclock();
let (_tempdir, mgr) = new_mgr(rt);
let (_tempdir, mgr) = new_mgr(rt.clone());
let (mut schedule, _handle) = TaskSchedule::new(rt);
{
let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
@ -838,9 +851,14 @@ mod test {
let mut on_usable = None;
let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable)
.await
.unwrap();
super::download(
Arc::downgrade(&mgr),
&mut state,
&mut schedule,
&mut on_usable,
)
.await
.unwrap();
assert!(state.is_ready(Readiness::Complete));
});
}

View File

@ -88,7 +88,8 @@ use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
use async_trait::async_trait;
use futures::{channel::oneshot, stream::BoxStream, task::SpawnExt};
use tor_rtcompat::{Runtime, SleepProviderExt};
use tor_rtcompat::scheduler::TaskSchedule;
use tor_rtcompat::Runtime;
use tracing::{debug, info, trace, warn};
use std::ops::Deref;
@ -383,6 +384,8 @@ impl<R: Runtime> DirMgr<R> {
ordering: Ordering::SeqCst,
};
let (mut schedule, _handle) = TaskSchedule::new(self.runtime.clone());
// Try to load from the cache.
let have_directory = self.load_directory().await?;
@ -404,12 +407,16 @@ impl<R: Runtime> DirMgr<R> {
// Don't warn when these are Error::ManagerDropped: that
// means that the DirMgr has been shut down.
if let Err(e) = Self::reload_until_owner(&dirmgr_weak, &mut sender).await {
if let Err(e) =
Self::reload_until_owner(&dirmgr_weak, &mut schedule, &mut sender).await
{
match e {
Error::ManagerDropped => {}
_ => warn!("Unrecovered error while waiting for bootstrap: {}", e),
}
} else if let Err(e) = Self::download_forever(dirmgr_weak, sender).await {
} else if let Err(e) =
Self::download_forever(dirmgr_weak, &mut schedule, sender).await
{
match e {
Error::ManagerDropped => {}
_ => warn!("Unrecovered error while downloading: {}", e),
@ -462,14 +469,13 @@ impl<R: Runtime> DirMgr<R> {
/// If we eventually become the owner, return Ok().
async fn reload_until_owner(
weak: &Weak<Self>,
schedule: &mut TaskSchedule<R>,
on_complete: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut logged = false;
let mut bootstrapped;
let runtime;
{
let dirmgr = upgrade_weak_ref(weak)?;
runtime = dirmgr.runtime.clone();
bootstrapped = dirmgr.netdir.get().is_some();
}
@ -504,7 +510,7 @@ impl<R: Runtime> DirMgr<R> {
} else {
std::time::Duration::new(5, 0)
};
runtime.sleep(pause).await;
schedule.sleep(pause).await;
// TODO: instead of loading the whole thing we should have a
// database entry that says when the last update was, or use
// our state functions.
@ -531,6 +537,7 @@ impl<R: Runtime> DirMgr<R> {
/// message using `on_complete`.
async fn download_forever(
weak: Weak<Self>,
schedule: &mut TaskSchedule<R>,
mut on_complete: Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut state: Box<dyn DirState> = {
@ -548,11 +555,6 @@ impl<R: Runtime> DirMgr<R> {
))
};
let runtime = {
let dirmgr = upgrade_weak_ref(&weak)?;
dirmgr.runtime.clone()
};
loop {
let mut usable = false;
@ -567,7 +569,8 @@ impl<R: Runtime> DirMgr<R> {
'retry_attempt: for _ in retry_config.attempts() {
let outcome =
bootstrap::download(Weak::clone(&weak), &mut state, &mut on_complete).await;
bootstrap::download(Weak::clone(&weak), &mut state, schedule, &mut on_complete)
.await;
if let Err(err) = outcome {
if state.is_ready(Readiness::Usable) {
@ -592,7 +595,7 @@ impl<R: Runtime> DirMgr<R> {
"Unable to download a usable directory: {}. We will restart in {:?}.",
err, delay
);
runtime.sleep(delay).await;
schedule.sleep(delay).await;
state = state.reset();
} else {
info!("Directory is complete.");
@ -617,7 +620,7 @@ impl<R: Runtime> DirMgr<R> {
let reset_at = state.reset_time();
match reset_at {
Some(t) => runtime.sleep_until_wallclock(t).await,
Some(t) => schedule.sleep_until_wallclock(t).await,
None => return Ok(()),
}
state = state.reset();