Merge branch 'dormant_directory' into 'main'

Use TaskSchedule to sleep in directory bootstrapping

Closes #497

See merge request tpo/core/arti!571
This commit is contained in:
Nick Mathewson 2022-06-13 13:59:26 +00:00
commit d44dd6f44f
8 changed files with 219 additions and 90 deletions

1
Cargo.lock generated
View File

@ -3617,6 +3617,7 @@ dependencies = [
"rand 0.8.5",
"retry-error",
"rusqlite",
"scopeguard",
"serde",
"signature",
"tempfile",

View File

@ -384,6 +384,7 @@ impl<R: Runtime> TorClient<R> {
let mut periodic_task_handles = circmgr
.launch_background_tasks(&runtime, &dirmgr, statemgr.clone())
.map_err(ErrorDetail::CircMgrSetup)?;
periodic_task_handles.extend(dirmgr.download_task_handle());
periodic_task_handles.extend(
chanmgr

View File

@ -45,6 +45,7 @@ postage = { version = "0.5.0", default-features = false, features = ["futures-tr
rand = "0.8"
retry-error = { path = "../retry-error", version = "0.2.0" }
rusqlite = { version = "0.27.0", features = ["time"] }
scopeguard = "1"
serde = { version = "1.0.103", features = ["derive"] }
signature = "1"
thiserror = "1"

View File

@ -0,0 +1,3 @@
MODIFIED: DirProvider now has download_task_handle().
There's a default implementation, so this isn't a breaking change.

View File

@ -21,7 +21,8 @@ use futures::channel::oneshot;
use futures::FutureExt;
use futures::StreamExt;
use tor_dirclient::DirResponse;
use tor_rtcompat::{Runtime, SleepProviderExt};
use tor_rtcompat::scheduler::TaskSchedule;
use tor_rtcompat::Runtime;
use tracing::{debug, info, trace, warn};
use crate::storage::Store;
@ -472,6 +473,7 @@ async fn download_attempt<R: Runtime>(
pub(crate) async fn download<R: Runtime>(
dirmgr: Weak<DirMgr<R>>,
state: &mut Box<dyn DirState>,
schedule: &mut TaskSchedule<R>,
on_usable: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
@ -515,7 +517,6 @@ pub(crate) async fn download<R: Runtime>(
}
let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
let mut reset_timeout_future = runtime.sleep_until_wallclock(reset_time).fuse();
let mut retry = retry_config.schedule();
let mut delay = None;
@ -530,14 +531,19 @@ pub(crate) async fn download<R: Runtime>(
let next_delay = retry.next_delay(&mut rand::thread_rng());
if let Some(delay) = delay.replace(next_delay) {
debug!("Waiting {:?} for next download attempt...", delay);
futures::select_biased! {
_ = reset_timeout_future => {
info!("Download attempt timed out completely; resetting download state.");
reset(state);
continue 'next_state;
}
_ = FutureExt::fuse(runtime.sleep(delay)) => {}
let time_until_reset = {
reset_time
.duration_since(now)
.unwrap_or(Duration::from_secs(0))
};
schedule.sleep(delay.min(time_until_reset)).await;
now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
if now >= reset_time {
info!("Download attempt timed out completely; resetting download state.");
reset(state);
continue 'next_state;
}
}
info!("{}: {}", attempt + 1, state.describe());
@ -553,7 +559,7 @@ pub(crate) async fn download<R: Runtime>(
continue 'next_attempt;
}
}
_ = runtime.sleep_until_wallclock(reset_time).fuse() => {
_ = schedule.sleep_until_wallclock(reset_time).fuse() => {
// We need to reset. This can happen if (for
// example) we're downloading the last few
// microdescriptors on a consensus that now
@ -785,7 +791,8 @@ mod test {
// Let's try bootstrapping when everything is in the cache.
tor_rtcompat::test_with_one_runtime!(|rt| async {
let now = rt.wallclock();
let (_tempdir, mgr) = new_mgr(rt);
let (_tempdir, mgr) = new_mgr(rt.clone());
let (mut schedule, _handle) = TaskSchedule::new(rt);
{
let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
@ -804,9 +811,14 @@ mod test {
let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
let mut on_usable = None;
super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable)
.await
.unwrap();
super::download(
Arc::downgrade(&mgr),
&mut state,
&mut schedule,
&mut on_usable,
)
.await
.unwrap();
assert!(state.is_ready(Readiness::Complete));
});
}
@ -817,7 +829,8 @@ mod test {
// phase 2 in cache.
tor_rtcompat::test_with_one_runtime!(|rt| async {
let now = rt.wallclock();
let (_tempdir, mgr) = new_mgr(rt);
let (_tempdir, mgr) = new_mgr(rt.clone());
let (mut schedule, _handle) = TaskSchedule::new(rt);
{
let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
@ -838,9 +851,14 @@ mod test {
let mut on_usable = None;
let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable)
.await
.unwrap();
super::download(
Arc::downgrade(&mgr),
&mut state,
&mut schedule,
&mut on_usable,
)
.await
.unwrap();
assert!(state.is_ready(Readiness::Complete));
});
}

View File

@ -81,6 +81,7 @@ pub use crate::shared_ref::SharedMutArc;
use crate::storage::{DynStore, Store};
use postage::watch;
pub use retry::{DownloadSchedule, DownloadScheduleBuilder};
use scopeguard::ScopeGuard;
use tor_circmgr::CircMgr;
use tor_dirclient::SourceInfo;
use tor_error::into_internal;
@ -88,7 +89,8 @@ use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
use async_trait::async_trait;
use futures::{channel::oneshot, stream::BoxStream, task::SpawnExt};
use tor_rtcompat::{Runtime, SleepProviderExt};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
use tor_rtcompat::Runtime;
use tracing::{debug, info, trace, warn};
use std::ops::Deref;
@ -134,6 +136,11 @@ pub trait DirProvider: NetDirProvider {
/// Note that this stream can be lossy: the caller will not necessarily
/// observe every event on the stream
fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
/// Return a [`TaskHandle`] that can be used to manage the download process.
fn download_task_handle(&self) -> Option<TaskHandle> {
None
}
}
// NOTE(eta): We can't implement this for Arc<DirMgr<R>> due to trait coherence rules, so instead
@ -165,6 +172,10 @@ impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
Box::pin(DirMgr::bootstrap_events(self))
}
fn download_task_handle(&self) -> Option<TaskHandle> {
Some(self.task_handle.clone())
}
}
/// A directory manager to download, fetch, and cache a Tor directory.
@ -231,29 +242,13 @@ pub struct DirMgr<R: Runtime> {
/// A filter that gets applied to directory objects before we use them.
#[cfg(feature = "dirfilter")]
filter: crate::filter::FilterConfig,
}
/// RAII guard to reset an AtomicBool on drop.
struct BoolResetter<'a> {
/// The bool to reset.
inner: &'a AtomicBool,
/// What value to store.
reset_to: bool,
/// What atomic ordering to use.
ordering: Ordering,
}
/// A task schedule that can be used if we're bootstrapping. If this is
/// None, then there's currently a scheduled task in progress.
task_schedule: Mutex<Option<TaskSchedule<R>>>,
impl<'a> Drop for BoolResetter<'a> {
fn drop(&mut self) {
self.inner.store(self.reset_to, self.ordering);
}
}
impl<'a> BoolResetter<'a> {
/// Disarm the guard, consuming it to make it not reset any more.
fn disarm(self) {
std::mem::forget(self);
}
/// A task handle that we return to anybody who needs to manage our download process.
task_handle: TaskHandle,
}
/// The possible origins of a document.
@ -377,10 +372,16 @@ impl<R: Runtime> DirMgr<R> {
// Use a RAII guard to reset `bootstrap_started` to `false` if we return early without
// completing bootstrap.
let resetter = BoolResetter {
inner: &self.bootstrap_started,
reset_to: false,
ordering: Ordering::SeqCst,
let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
v.store(false, Ordering::SeqCst);
});
let schedule = match self.task_schedule.lock().expect("poisoned lock").take() {
Some(sched) => sched,
None => {
debug!("Attempted to bootstrap twice; ignoring.");
return Ok(());
}
};
// Try to load from the cache.
@ -399,17 +400,30 @@ impl<R: Runtime> DirMgr<R> {
let dirmgr_weak = Arc::downgrade(self);
self.runtime
.spawn(async move {
// NOTE: This is a daemon task. It should eventually get
// treated as one.
// Use an RAII guard to make sure that when this task exits, the
// TaskSchedule object is put back.
//
// TODO(nick): Putting the schedule back isn't actually useful
// if the task exits _after_ we've bootstrapped for the first
// time, because of how bootstrap_started works.
let mut schedule = scopeguard::guard(schedule, |schedule| {
if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
*dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
}
});
// Don't warn when these are Error::ManagerDropped: that
// means that the DirMgr has been shut down.
if let Err(e) = Self::reload_until_owner(&dirmgr_weak, &mut sender).await {
if let Err(e) =
Self::reload_until_owner(&dirmgr_weak, &mut schedule, &mut sender).await
{
match e {
Error::ManagerDropped => {}
_ => warn!("Unrecovered error while waiting for bootstrap: {}", e),
}
} else if let Err(e) = Self::download_forever(dirmgr_weak, sender).await {
} else if let Err(e) =
Self::download_forever(dirmgr_weak.clone(), &mut schedule, sender).await
{
match e {
Error::ManagerDropped => {}
_ => warn!("Unrecovered error while downloading: {}", e),
@ -422,8 +436,8 @@ impl<R: Runtime> DirMgr<R> {
match receiver.await {
Ok(()) => {
info!("We have enough information to build circuits.");
// Disarm the RAII guard, since we succeeded.
resetter.disarm();
// Disarm the RAII guard, since we succeeded. Now bootstrap_started will remain true.
let _ = ScopeGuard::into_inner(reset_bootstrap_started);
}
Err(_) => {
warn!("Bootstrapping task exited before finishing.");
@ -462,14 +476,13 @@ impl<R: Runtime> DirMgr<R> {
/// If we eventually become the owner, return Ok().
async fn reload_until_owner(
weak: &Weak<Self>,
schedule: &mut TaskSchedule<R>,
on_complete: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut logged = false;
let mut bootstrapped;
let runtime;
{
let dirmgr = upgrade_weak_ref(weak)?;
runtime = dirmgr.runtime.clone();
bootstrapped = dirmgr.netdir.get().is_some();
}
@ -504,7 +517,7 @@ impl<R: Runtime> DirMgr<R> {
} else {
std::time::Duration::new(5, 0)
};
runtime.sleep(pause).await;
schedule.sleep(pause).await;
// TODO: instead of loading the whole thing we should have a
// database entry that says when the last update was, or use
// our state functions.
@ -531,6 +544,7 @@ impl<R: Runtime> DirMgr<R> {
/// message using `on_complete`.
async fn download_forever(
weak: Weak<Self>,
schedule: &mut TaskSchedule<R>,
mut on_complete: Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut state: Box<dyn DirState> = {
@ -548,11 +562,6 @@ impl<R: Runtime> DirMgr<R> {
))
};
let runtime = {
let dirmgr = upgrade_weak_ref(&weak)?;
dirmgr.runtime.clone()
};
loop {
let mut usable = false;
@ -567,7 +576,8 @@ impl<R: Runtime> DirMgr<R> {
'retry_attempt: for _ in retry_config.attempts() {
let outcome =
bootstrap::download(Weak::clone(&weak), &mut state, &mut on_complete).await;
bootstrap::download(Weak::clone(&weak), &mut state, schedule, &mut on_complete)
.await;
if let Err(err) = outcome {
if state.is_ready(Readiness::Usable) {
@ -592,7 +602,7 @@ impl<R: Runtime> DirMgr<R> {
"Unable to download a usable directory: {}. We will restart in {:?}.",
err, delay
);
runtime.sleep(delay).await;
schedule.sleep(delay).await;
state = state.reset();
} else {
info!("Directory is complete.");
@ -617,7 +627,7 @@ impl<R: Runtime> DirMgr<R> {
let reset_at = state.reset_time();
match reset_at {
Some(t) => runtime.sleep_until_wallclock(t).await,
Some(t) => schedule.sleep_until_wallclock(t).await,
None => return Ok(()),
}
state = state.reset();
@ -743,6 +753,10 @@ impl<R: Runtime> DirMgr<R> {
#[cfg(feature = "dirfilter")]
let filter = config.extensions.filter.clone();
// We create these early so the client code can access task_handle before bootstrap() returns.
let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
let task_schedule = Mutex::new(Some(task_schedule));
Ok(DirMgr {
config: config.into(),
store,
@ -756,6 +770,8 @@ impl<R: Runtime> DirMgr<R> {
bootstrap_started: AtomicBool::new(false),
#[cfg(feature = "dirfilter")]
filter,
task_schedule,
task_handle,
})
}
@ -1365,31 +1381,4 @@ replacement line
assert!(expanded.is_err());
});
}
#[test]
#[allow(clippy::bool_assert_comparison)]
fn bool_resetter_works() {
let bool = AtomicBool::new(false);
fn example(bool: &AtomicBool) {
bool.store(true, Ordering::SeqCst);
let _resetter = BoolResetter {
inner: bool,
reset_to: false,
ordering: Ordering::SeqCst,
};
}
fn example_disarm(bool: &AtomicBool) {
bool.store(true, Ordering::SeqCst);
let resetter = BoolResetter {
inner: bool,
reset_to: false,
ordering: Ordering::SeqCst,
};
resetter.disarm();
}
example(&bool);
assert_eq!(bool.load(Ordering::SeqCst), false);
example_disarm(&bool);
assert_eq!(bool.load(Ordering::SeqCst), true);
}
}

View File

@ -7,7 +7,7 @@ use futures::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime};
use pin_project::pin_project;
@ -20,6 +20,12 @@ enum SchedulerCommand {
FireAt(Instant),
/// Cancel a pending execution, if there is one.
Cancel,
/// Pause execution without cancelling any running timers. (Those timers
/// will fire after we resume execution.)
Suspend,
/// Resume execution. If there is a pending timer, start waiting for it again;
/// otherwise, fire immediately.
Resume,
}
/// A remotely-controllable trigger for recurring tasks.
@ -39,6 +45,9 @@ pub struct TaskSchedule<R: SleepProvider> {
/// This is used to avoid having to create a `SleepFuture` with zero duration,
/// which is potentially a bit wasteful.
instant_fire: bool,
/// Whether we are currently "suspended". If we are suspended, we won't
/// start executing again till we're explicitly "resumed".
suspended: bool,
}
/// A handle used to control a [`TaskSchedule`].
@ -59,6 +68,7 @@ impl<R: SleepProvider> TaskSchedule<R> {
rt,
// Start off ready.
instant_fire: true,
suspended: false,
},
TaskHandle { tx },
)
@ -75,6 +85,39 @@ impl<R: SleepProvider> TaskSchedule<R> {
self.instant_fire = true;
self.sleep = None;
}
/// Wait until `Dur` has elapsed.
///
/// This call is equivalent to [`SleepProvider::sleep`], except that the
/// resulting future will respect calls to the functions on this schedule's
/// associated [`TaskHandle`].
///
/// Alternatively, you can view this function as equivalent to
/// `self.fire_in(dur); self.next().await;`, only with the intent made more
/// explicit.
///
/// If the associated [`TaskHandle`] for this schedule is suspended, then
/// this method will not return until the schedule is unsuspended _and_ the
/// timer elapses. If the associated [`TaskHandle`] is cancelled, then this
/// method will not return at all, until the schedule is re-activated by
/// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
pub async fn sleep(&mut self, dur: Duration) {
self.fire_in(dur);
self.next().await;
}
/// As
/// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
/// but respect messages from this schedule's associated [`TaskHandle`].
pub async fn sleep_until_wallclock(&mut self, when: SystemTime) {
loop {
let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
self.sleep(delay).await;
if finished {
return;
}
}
}
}
impl TaskHandle {
@ -98,6 +141,31 @@ impl TaskHandle {
pub fn cancel(&self) -> bool {
self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok()
}
/// Suspend execution of the corresponding schedule.
///
/// If the schedule is ready now, it will become pending; it won't become
/// ready again until `resume()` is called. If the schedule is waiting for a
/// timer, the timer will keep counting, but the schedule won't become ready
/// until the timer has elapsed _and_ `resume()` has been called.
///
/// Returns `true` if the schedule still exists, and `false` otherwise.
pub fn suspend(&self) -> bool {
self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok()
}
/// Resume execution of the corresponding schedule.
///
/// This method undoes the effect of a call to `suspend()`: the schedule
/// will fire again if it is ready (or when it becomes ready).
///
/// This method won't cause the schedule to fire if it was already
/// cancelled. For that, use the `fire()` or fire_at()` methods.
///
/// Returns `true` if the schedule still exists, and `false` otherwise.
pub fn resume(&self) -> bool {
self.tx.unbounded_send(SchedulerCommand::Resume).is_ok()
}
}
// NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want
@ -120,6 +188,12 @@ impl<R: SleepProvider> TaskScheduleP<'_, R> {
*self.instant_fire = false;
*self.sleep = None;
}
SchedulerCommand::Suspend => {
*self.suspended = true;
}
SchedulerCommand::Resume => {
*self.suspended = false;
}
}
}
}
@ -138,6 +212,9 @@ impl<R: SleepProvider> Stream for TaskSchedule<R> {
}
}
}
if *this.suspended {
return Poll::Pending;
}
if *this.instant_fire {
*this.instant_fire = false;
return Poll::Ready(Some(()));
@ -283,4 +360,43 @@ mod test {
assert!(sch.next().now_or_never().is_none());
});
}
#[test]
fn suspend_and_resume_with_fire() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt.clone());
hdl.fire();
hdl.suspend();
assert!(sch.next().now_or_never().is_none());
hdl.resume();
assert!(sch.next().now_or_never().is_some());
});
}
#[test]
fn suspend_and_resume_with_sleep() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt.clone());
sch.fire_in(Duration::from_micros(100));
hdl.suspend();
assert!(sch.next().now_or_never().is_none());
hdl.resume();
assert!(sch.next().now_or_never().is_none());
assert!(sch.next().await.is_some());
});
}
#[test]
fn suspend_and_resume_with_nothing() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt.clone());
assert!(sch.next().now_or_never().is_some());
hdl.suspend();
assert!(sch.next().now_or_never().is_none());
hdl.resume();
});
}
}

View File

@ -180,7 +180,7 @@ const MAX_SLEEP: Duration = Duration::from_secs(600);
/// expect this to be the final delay.
///
/// (This is a separate function for testing.)
fn calc_next_delay(now: SystemTime, when: SystemTime) -> (bool, Duration) {
pub(crate) fn calc_next_delay(now: SystemTime, when: SystemTime) -> (bool, Duration) {
let remainder = when
.duration_since(now)
.unwrap_or_else(|_| Duration::from_secs(0));