From bfb2353a8f627b189e104e606b3284fdfaa6d98b Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 25 Mar 2022 19:19:57 -0400 Subject: [PATCH] Add status tracking to FallbackDir. We do this by creating a new FallbackSet type that includes status information, and updating the GuardMgr APIs to record success and failure about it when appropriate. We can use this to mark FallbackDirs retriable (or not). With this change, FallbackDir is now stored internally as a Guard in the GuardMgr crate. That's fine: the FallbackDir type really only matters for configuration. --- crates/tor-circmgr/src/lib.rs | 9 ++ crates/tor-dirmgr/src/bootstrap.rs | 7 +- crates/tor-dirmgr/src/lib.rs | 11 +++ crates/tor-guardmgr/src/fallback.rs | 3 + crates/tor-guardmgr/src/fallback/set.rs | 110 ++++++++++++++++++++- crates/tor-guardmgr/src/fallback/status.rs | 107 ++++++++++++++++++++ crates/tor-guardmgr/src/lib.rs | 63 ++++++++---- crates/tor-guardmgr/src/pending.rs | 1 - 8 files changed, 286 insertions(+), 25 deletions(-) create mode 100644 crates/tor-guardmgr/src/fallback/status.rs diff --git a/crates/tor-circmgr/src/lib.rs b/crates/tor-circmgr/src/lib.rs index b5617832e..4b6a48807 100644 --- a/crates/tor-circmgr/src/lib.rs +++ b/crates/tor-circmgr/src/lib.rs @@ -696,6 +696,15 @@ impl CircMgr { .guardmgr() .note_external_failure(id, external_failure); } + + /// Record that a success occurred on a circuit with a given guard, in a way + /// that makes us possibly willing to use that guard for future circuits. + pub fn note_external_success(&self, id: &GuardId, external_activity: ExternalFailure) { + self.mgr + .peek_builder() + .guardmgr() + .note_external_success(id, external_activity); + } } impl Drop for CircMgr { diff --git a/crates/tor-dirmgr/src/bootstrap.rs b/crates/tor-dirmgr/src/bootstrap.rs index 0337b1917..7c0d7f316 100644 --- a/crates/tor-dirmgr/src/bootstrap.rs +++ b/crates/tor-dirmgr/src/bootstrap.rs @@ -205,7 +205,12 @@ async fn download_attempt( Ok(text) => { let outcome = state.add_from_download(&text, &client_req, Some(&dirmgr.store)); match outcome { - Ok(b) => changed |= b, + Ok(b) => { + changed |= b; + if let Some(source) = source { + dirmgr.note_cache_success(&source); + } + } Err(e) => { warn!("error while adding directory info: {}", e); if let Some(source) = source { diff --git a/crates/tor-dirmgr/src/lib.rs b/crates/tor-dirmgr/src/lib.rs index 33e732e81..424cf55d6 100644 --- a/crates/tor-dirmgr/src/lib.rs +++ b/crates/tor-dirmgr/src/lib.rs @@ -994,6 +994,17 @@ impl DirMgr { circmgr.retire_circ(source.unique_circ_id()); } } + + /// Record that `source` has successfully given us some directory info. + fn note_cache_success(&self, source: &tor_dirclient::SourceInfo) { + use tor_circmgr::{ExternalFailure, GuardId}; + + if let Some(circmgr) = &self.circmgr { + trace!("Marking {:?} as successful", source); + let guard_id = GuardId::from_chan_target(source.cache_id()); + circmgr.note_external_success(&guard_id, ExternalFailure::DirCache); + } + } } /// A degree of readiness for a given directory state object. diff --git a/crates/tor-guardmgr/src/fallback.rs b/crates/tor-guardmgr/src/fallback.rs index d27c59f3e..d35c28e93 100644 --- a/crates/tor-guardmgr/src/fallback.rs +++ b/crates/tor-guardmgr/src/fallback.rs @@ -11,6 +11,7 @@ //! `tor-dirmgr`: any changes here must be reflected there. mod set; +mod status; use derive_builder::Builder; use tor_config::ConfigBuildError; @@ -21,6 +22,8 @@ use serde::Deserialize; use std::net::SocketAddr; pub use set::FallbackList; +pub(crate) use set::FallbackSet; +use status::Status; /// A directory whose location ships with Tor (or arti), and which we /// can use for bootstrapping when we don't know anything else about diff --git a/crates/tor-guardmgr/src/fallback/set.rs b/crates/tor-guardmgr/src/fallback/set.rs index 7cb468f2e..18fb34ff0 100644 --- a/crates/tor-guardmgr/src/fallback/set.rs +++ b/crates/tor-guardmgr/src/fallback/set.rs @@ -1,10 +1,10 @@ //! Declare the [`FallbackSet`] type, which is used to store a set of FallbackDir. use rand::seq::IteratorRandom; -use std::iter::FromIterator; +use std::{iter::FromIterator, time::Instant}; -use super::FallbackDir; -use crate::PickGuardError; +use super::{FallbackDir, Status}; +use crate::{GuardId, PickGuardError}; use serde::Deserialize; /// A list of fallback directories. @@ -53,3 +53,107 @@ impl FallbackList { .ok_or(PickGuardError::AllFallbacksDown { retry_at: None }) } } + +/// A set of fallback directories, in usable form. +#[derive(Debug, Clone)] +pub(crate) struct FallbackSet { + /// The list of fallbacks in the set. + /// + /// We require that these are sorted and unique by (ED,RSA) keys. + fallbacks: Vec, +} + +/// Wrapper type for FallbackDir converted into crate::Guard, and Status. +/// +/// Defines a sort order to ensure that we can look up fallback directories +/// by binary search on keys. +#[derive(Debug, Clone)] +pub(super) struct Entry { + /// The inner fallback directory. + pub(super) fallback: crate::Guard, + /// The status for the fallback directory. + pub(super) status: Status, +} + +impl From for Entry { + fn from(fallback: FallbackDir) -> Self { + let fallback = fallback.as_guard(); + let status = Status::default(); + Entry { fallback, status } + } +} + +impl Entry { + /// Return the identity for this fallback entry. + fn id(&self) -> &GuardId { + self.fallback.id() + } +} + +impl From for FallbackSet { + fn from(list: FallbackList) -> Self { + let mut fallbacks: Vec = list.fallbacks.into_iter().map(|fb| fb.into()).collect(); + fallbacks.sort_by(|x, y| x.id().cmp(y.id())); + fallbacks.dedup_by(|x, y| x.id() == y.id()); + FallbackSet { fallbacks } + } +} + +impl FallbackSet { + /// Return a random member of this FallbackSet that's usable at `now`. + pub(crate) fn choose( + &self, + rng: &mut R, + now: Instant, + ) -> Result<&crate::Guard, PickGuardError> { + if self.fallbacks.is_empty() { + return Err(PickGuardError::NoCandidatesAvailable); + } + + self.fallbacks + .iter() + .filter(|ent| ent.status.usable_at(now)) + .choose(rng) + .map(|ent| &ent.fallback) + .ok_or_else(|| PickGuardError::AllFallbacksDown { + retry_at: self.next_retry(), + }) + } + + /// Return the next time at which any member of this set will become ready. + /// + /// Returns None if no elements are failing. + fn next_retry(&self) -> Option { + self.fallbacks + .iter() + .filter_map(|ent| ent.status.next_retriable()) + .min() + } + + /// Return a mutable reference to the entry whose identity is `id`, if there is one. + fn lookup_mut(&mut self, id: &GuardId) -> Option<&mut Entry> { + match self.fallbacks.binary_search_by(|e| e.id().cmp(id)) { + Ok(idx) => Some(&mut self.fallbacks[idx]), + Err(_) => None, + } + } + + /// Record that a success has occurred for the fallback with the given + /// identity. + /// + /// Be aware that for fallbacks, we only count a successful directory + /// operation as a success: a circuit success is not enough. + pub(crate) fn note_success(&mut self, id: &GuardId) { + if let Some(entry) = self.lookup_mut(id) { + entry.status.note_success(); + } + } + + /// Record that a failure has occurred for the fallback with the given + /// identity. + pub(crate) fn note_failure(&mut self, id: &GuardId, now: Instant) { + if let Some(entry) = self.lookup_mut(id) { + entry.status.note_failure(now); + } + } +} diff --git a/crates/tor-guardmgr/src/fallback/status.rs b/crates/tor-guardmgr/src/fallback/status.rs new file mode 100644 index 000000000..a772fcd79 --- /dev/null +++ b/crates/tor-guardmgr/src/fallback/status.rs @@ -0,0 +1,107 @@ +//! Types and code to track the readiness status of a fallback directory. + +use std::time::{Duration, Instant}; +use tor_basic_utils::retry::RetryDelay; + +/// Status information about whether a [`FallbackDir`](super::FallbackDir) is +/// currently usable. +/// +/// This structure is used to track whether the fallback cache has recently +/// failed, and if so, when it can be retried. +#[derive(Debug, Clone)] +pub(crate) struct Status { + /// Used to decide how long to delay before retrying a fallback cache + /// that has failed. + delay: RetryDelay, + /// A time before which we should assume that this fallback cache is broken. + /// + /// If None, then this fallback cache is ready to use right away. + retry_at: Option, +} + +/// Least amount of time we'll wait before retrying a fallback cache. +// +// TODO: we may want to make this configurable to a smaller value for chutney networks. +const FALLBACK_RETRY_FLOOR: Duration = Duration::from_secs(150); + +impl Default for Status { + fn default() -> Self { + Status { + delay: RetryDelay::from_duration(FALLBACK_RETRY_FLOOR), + retry_at: None, + } + } +} + +impl Status { + /// Return true if this `Status` is usable at the time `now`. + pub(crate) fn usable_at(&self, now: Instant) -> bool { + match self.retry_at { + Some(ready) => now >= ready, + None => true, + } + } + + /// Return the time at which this `Status` can next be retried. + /// + /// A return value of `None`, or of a time in the past, indicates that this + /// status can be used immediately. + pub(crate) fn next_retriable(&self) -> Option { + self.retry_at + } + + /// Restore this `Status` to its original state. + pub(crate) fn reset(&mut self) { + self.retry_at = None; + self.delay = RetryDelay::from_duration(FALLBACK_RETRY_FLOOR); + } + + /// Record that the associated fallback directory has been used successfully. + /// + /// This should only be done after successfully handling a whole reply from the + /// directory. + pub(crate) fn note_success(&mut self) { + self.reset(); + } + + /// Record that the associated fallback directory has failed. + pub(crate) fn note_failure(&mut self, now: Instant) { + let mut rng = rand::thread_rng(); + self.retry_at = Some(now + self.delay.next_delay(&mut rng)); + } +} + +#[cfg(test)] +mod test { + #![allow(clippy::unwrap_used)] + use super::*; + + #[test] + fn status_basics() { + let now = Instant::now(); + + let mut status = Status::default(); + // newly created status is usable. + assert!(status.usable_at(now)); + + // no longer usable after a failure. + status.note_failure(now); + assert_eq!(status.next_retriable().unwrap(), now + FALLBACK_RETRY_FLOOR); + assert!(!status.usable_at(now)); + + // Not enough time has passed. + assert!(!status.usable_at(now + FALLBACK_RETRY_FLOOR / 2)); + + // Enough time has passed. + assert!(status.usable_at(now + FALLBACK_RETRY_FLOOR)); + + // Mark as failed again; the timeout will (probably) be longer. + status.note_failure(now + FALLBACK_RETRY_FLOOR); + assert!(status.next_retriable().unwrap() >= now + FALLBACK_RETRY_FLOOR * 2); + assert!(!status.usable_at(now + FALLBACK_RETRY_FLOOR)); + + // Mark as succeeded; it should be usable immediately. + status.note_success(); + assert!(status.usable_at(now + FALLBACK_RETRY_FLOOR)); + } +} diff --git a/crates/tor-guardmgr/src/lib.rs b/crates/tor-guardmgr/src/lib.rs index 65d91ae52..bcf6da9a7 100644 --- a/crates/tor-guardmgr/src/lib.rs +++ b/crates/tor-guardmgr/src/lib.rs @@ -232,7 +232,7 @@ struct GuardMgrInner { /// A list of fallback directories used to access the directory system /// when no other directory information is yet known. // TODO: reconfigure when the configuration changes. - fallbacks: fallback::FallbackList, + fallbacks: fallback::FallbackSet, /// Location in which to store persistent state. storage: DynStorageHandle, @@ -288,7 +288,7 @@ impl GuardMgr { ctrl, pending: HashMap::new(), waiting: Vec::new(), - fallbacks, + fallbacks: fallbacks.into(), storage, })); { @@ -461,7 +461,7 @@ impl GuardMgr { // it should _probably_ not hurt.) inner.guards.active_guards_mut().consider_all_retries(now); - let (origin, guard) = inner.select_guard_with_expand(&usage, netdir, wallclock)?; + let (origin, guard) = inner.select_guard_with_expand(&usage, netdir, now, wallclock)?; trace!(?guard, ?usage, "Guard selected"); let (usable, usable_sender) = if origin.usable_immediately() { @@ -510,7 +510,8 @@ impl GuardMgr { Ok((guard, monitor, usable)) } - /// Record that after we tried to connect to the guard described in `Guard` + /// Record that _after_ we built a circuit with `guard`,something described + /// in `external_failure` went wrong with it. pub fn note_external_failure(&self, guard: &GuardId, external_failure: ExternalFailure) { let now = self.runtime.now(); let mut inner = self.inner.lock().expect("Poisoned lock"); @@ -519,6 +520,21 @@ impl GuardMgr { .guards .active_guards_mut() .record_failure(guard, Some(external_failure), now); + + if external_failure == ExternalFailure::DirCache { + inner.fallbacks.note_failure(guard, now); + } + } + + /// Record that _after_ we built a circuit with `guard`, some activity described in `external_activity` was successful with it. + /// + pub fn note_external_success(&self, guard: &GuardId, external_activity: ExternalFailure) { + // XXXX: rename the ExternalFailure type, since we're using it in this way too. + let mut inner = self.inner.lock().expect("Poisoned lock"); + + if external_activity == ExternalFailure::DirCache { + inner.fallbacks.note_success(guard); + } } /// Ensure that the message queue is flushed before proceeding to @@ -540,7 +556,7 @@ impl GuardMgr { /// A reason for marking a guard as failed that can't be observed from inside /// the `GuardMgr` code. -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[non_exhaustive] pub enum ExternalFailure { /// This guard has somehow failed to operate as a good directory cache. @@ -666,10 +682,16 @@ impl GuardMgrInner { let guard_id = pending.guard_id(); trace!(?guard_id, ?status, "Received report of guard status"); - // TODO: all the branches below are no-ops (or close to it) if the - // "guard" is actually a fallback. We'll add handling for that later. - match status { - GuardStatus::Success => { + match (status, pending.source()) { + (GuardStatus::Failure, sample::ListKind::Fallback) => { + // We used a fallback, and we weren't able to build a circuit through it. + self.fallbacks.note_failure(guard_id, runtime.now()); + } + (_, sample::ListKind::Fallback) => { + // We don't record any other kind of circuit activity if we + // took the entry from the fallback list. + } + (GuardStatus::Success, _) => { // If we had gone too long without any net activity when we // gave out this guard, and now we're seeing a circuit // succeed, tell the primary guards that they might be @@ -697,19 +719,19 @@ impl GuardMgrInner { self.waiting.push(pending); } } - GuardStatus::Failure => { + (GuardStatus::Failure, _) => { self.guards .active_guards_mut() .record_failure(guard_id, None, runtime.now()); pending.reply(false); } - GuardStatus::AttemptAbandoned => { + (GuardStatus::AttemptAbandoned, _) => { self.guards .active_guards_mut() .record_attempt_abandoned(guard_id); pending.reply(false); } - GuardStatus::Indeterminate => { + (GuardStatus::Indeterminate, _) => { self.guards .active_guards_mut() .record_indeterminate_result(guard_id); @@ -810,7 +832,8 @@ impl GuardMgrInner { &mut self, usage: &GuardUsage, netdir: Option<&NetDir>, - now: SystemTime, + now: Instant, + wallclock: SystemTime, ) -> Result<(sample::ListKind, Guard), PickGuardError> { // Try to find a guard. let res1 = self.select_guard_once(usage); @@ -822,11 +845,11 @@ impl GuardMgrInner { // That didn't work. If we have a netdir, expand the sample and try again. if let Some(dir) = netdir { trace!("No guards available, trying to extend the sample."); - self.update(now, Some(dir)); + self.update(wallclock, Some(dir)); if self .guards .active_guards_mut() - .extend_sample_as_needed(now, &self.params, dir) + .extend_sample_as_needed(wallclock, &self.params, dir) { self.guards .active_guards_mut() @@ -842,7 +865,7 @@ impl GuardMgrInner { // Okay, that didn't work either. If we were asked for a directory // guard, then we may be able to use a fallback. if usage.kind == GuardUsageKind::OneHopDirectory { - return self.select_fallback(); + return self.select_fallback(now); } // Couldn't extend the sample or use a fallback; return the original error. @@ -872,9 +895,9 @@ impl GuardMgrInner { /// /// Called when we have no guard information to use. Return values are as /// for [`select_guard()`] - fn select_fallback(&self) -> Result<(sample::ListKind, Guard), PickGuardError> { - let fallback = self.fallbacks.choose(&mut rand::thread_rng())?; - Ok((sample::ListKind::Fallback, fallback.as_guard())) + fn select_fallback(&self, now: Instant) -> Result<(sample::ListKind, Guard), PickGuardError> { + let fallback = self.fallbacks.choose(&mut rand::thread_rng(), now)?; + Ok((sample::ListKind::Fallback, fallback.clone())) } } @@ -974,7 +997,7 @@ impl TryFrom<&NetParameters> for GuardParams { /// /// (This is implemented internally using both of the guard's Ed25519 /// and RSA identities.) -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd)] pub struct GuardId { /// Ed25519 identity key for a guard ed25519: pk::ed25519::Ed25519Identity, diff --git a/crates/tor-guardmgr/src/pending.rs b/crates/tor-guardmgr/src/pending.rs index c21e7fa3d..845417af9 100644 --- a/crates/tor-guardmgr/src/pending.rs +++ b/crates/tor-guardmgr/src/pending.rs @@ -311,7 +311,6 @@ impl PendingRequest { } /// Return the source from which we took this guard from. - #[allow(dead_code)] // TODO: this will get used later in this branch. pub(crate) fn source(&self) -> ListKind { self.source }