Add status tracking to FallbackDir.

We do this by creating a new FallbackSet type that includes status
information, and updating the GuardMgr APIs to record success and
failure about it when appropriate.  We can use this to mark
FallbackDirs retriable (or not).

With this change, FallbackDir is now stored internally as a Guard in
the GuardMgr crate.  That's fine: the FallbackDir type really only
matters for configuration.
This commit is contained in:
Nick Mathewson 2022-03-25 19:19:57 -04:00
parent ea520898fd
commit bfb2353a8f
8 changed files with 286 additions and 25 deletions

View File

@ -696,6 +696,15 @@ impl<R: Runtime> CircMgr<R> {
.guardmgr()
.note_external_failure(id, external_failure);
}
/// Record that a success occurred on a circuit with a given guard, in a way
/// that makes us possibly willing to use that guard for future circuits.
pub fn note_external_success(&self, id: &GuardId, external_activity: ExternalFailure) {
self.mgr
.peek_builder()
.guardmgr()
.note_external_success(id, external_activity);
}
}
impl<R: Runtime> Drop for CircMgr<R> {

View File

@ -205,7 +205,12 @@ async fn download_attempt<R: Runtime>(
Ok(text) => {
let outcome = state.add_from_download(&text, &client_req, Some(&dirmgr.store));
match outcome {
Ok(b) => changed |= b,
Ok(b) => {
changed |= b;
if let Some(source) = source {
dirmgr.note_cache_success(&source);
}
}
Err(e) => {
warn!("error while adding directory info: {}", e);
if let Some(source) = source {

View File

@ -994,6 +994,17 @@ impl<R: Runtime> DirMgr<R> {
circmgr.retire_circ(source.unique_circ_id());
}
}
/// Record that `source` has successfully given us some directory info.
fn note_cache_success(&self, source: &tor_dirclient::SourceInfo) {
use tor_circmgr::{ExternalFailure, GuardId};
if let Some(circmgr) = &self.circmgr {
trace!("Marking {:?} as successful", source);
let guard_id = GuardId::from_chan_target(source.cache_id());
circmgr.note_external_success(&guard_id, ExternalFailure::DirCache);
}
}
}
/// A degree of readiness for a given directory state object.

View File

@ -11,6 +11,7 @@
//! `tor-dirmgr`: any changes here must be reflected there.
mod set;
mod status;
use derive_builder::Builder;
use tor_config::ConfigBuildError;
@ -21,6 +22,8 @@ use serde::Deserialize;
use std::net::SocketAddr;
pub use set::FallbackList;
pub(crate) use set::FallbackSet;
use status::Status;
/// A directory whose location ships with Tor (or arti), and which we
/// can use for bootstrapping when we don't know anything else about

View File

@ -1,10 +1,10 @@
//! Declare the [`FallbackSet`] type, which is used to store a set of FallbackDir.
use rand::seq::IteratorRandom;
use std::iter::FromIterator;
use std::{iter::FromIterator, time::Instant};
use super::FallbackDir;
use crate::PickGuardError;
use super::{FallbackDir, Status};
use crate::{GuardId, PickGuardError};
use serde::Deserialize;
/// A list of fallback directories.
@ -53,3 +53,107 @@ impl FallbackList {
.ok_or(PickGuardError::AllFallbacksDown { retry_at: None })
}
}
/// A set of fallback directories, in usable form.
#[derive(Debug, Clone)]
pub(crate) struct FallbackSet {
/// The list of fallbacks in the set.
///
/// We require that these are sorted and unique by (ED,RSA) keys.
fallbacks: Vec<Entry>,
}
/// Wrapper type for FallbackDir converted into crate::Guard, and Status.
///
/// Defines a sort order to ensure that we can look up fallback directories
/// by binary search on keys.
#[derive(Debug, Clone)]
pub(super) struct Entry {
/// The inner fallback directory.
pub(super) fallback: crate::Guard,
/// The status for the fallback directory.
pub(super) status: Status,
}
impl From<FallbackDir> for Entry {
fn from(fallback: FallbackDir) -> Self {
let fallback = fallback.as_guard();
let status = Status::default();
Entry { fallback, status }
}
}
impl Entry {
/// Return the identity for this fallback entry.
fn id(&self) -> &GuardId {
self.fallback.id()
}
}
impl From<FallbackList> for FallbackSet {
fn from(list: FallbackList) -> Self {
let mut fallbacks: Vec<Entry> = list.fallbacks.into_iter().map(|fb| fb.into()).collect();
fallbacks.sort_by(|x, y| x.id().cmp(y.id()));
fallbacks.dedup_by(|x, y| x.id() == y.id());
FallbackSet { fallbacks }
}
}
impl FallbackSet {
/// Return a random member of this FallbackSet that's usable at `now`.
pub(crate) fn choose<R: rand::Rng>(
&self,
rng: &mut R,
now: Instant,
) -> Result<&crate::Guard, PickGuardError> {
if self.fallbacks.is_empty() {
return Err(PickGuardError::NoCandidatesAvailable);
}
self.fallbacks
.iter()
.filter(|ent| ent.status.usable_at(now))
.choose(rng)
.map(|ent| &ent.fallback)
.ok_or_else(|| PickGuardError::AllFallbacksDown {
retry_at: self.next_retry(),
})
}
/// Return the next time at which any member of this set will become ready.
///
/// Returns None if no elements are failing.
fn next_retry(&self) -> Option<Instant> {
self.fallbacks
.iter()
.filter_map(|ent| ent.status.next_retriable())
.min()
}
/// Return a mutable reference to the entry whose identity is `id`, if there is one.
fn lookup_mut(&mut self, id: &GuardId) -> Option<&mut Entry> {
match self.fallbacks.binary_search_by(|e| e.id().cmp(id)) {
Ok(idx) => Some(&mut self.fallbacks[idx]),
Err(_) => None,
}
}
/// Record that a success has occurred for the fallback with the given
/// identity.
///
/// Be aware that for fallbacks, we only count a successful directory
/// operation as a success: a circuit success is not enough.
pub(crate) fn note_success(&mut self, id: &GuardId) {
if let Some(entry) = self.lookup_mut(id) {
entry.status.note_success();
}
}
/// Record that a failure has occurred for the fallback with the given
/// identity.
pub(crate) fn note_failure(&mut self, id: &GuardId, now: Instant) {
if let Some(entry) = self.lookup_mut(id) {
entry.status.note_failure(now);
}
}
}

View File

@ -0,0 +1,107 @@
//! Types and code to track the readiness status of a fallback directory.
use std::time::{Duration, Instant};
use tor_basic_utils::retry::RetryDelay;
/// Status information about whether a [`FallbackDir`](super::FallbackDir) is
/// currently usable.
///
/// This structure is used to track whether the fallback cache has recently
/// failed, and if so, when it can be retried.
#[derive(Debug, Clone)]
pub(crate) struct Status {
/// Used to decide how long to delay before retrying a fallback cache
/// that has failed.
delay: RetryDelay,
/// A time before which we should assume that this fallback cache is broken.
///
/// If None, then this fallback cache is ready to use right away.
retry_at: Option<Instant>,
}
/// Least amount of time we'll wait before retrying a fallback cache.
//
// TODO: we may want to make this configurable to a smaller value for chutney networks.
const FALLBACK_RETRY_FLOOR: Duration = Duration::from_secs(150);
impl Default for Status {
fn default() -> Self {
Status {
delay: RetryDelay::from_duration(FALLBACK_RETRY_FLOOR),
retry_at: None,
}
}
}
impl Status {
/// Return true if this `Status` is usable at the time `now`.
pub(crate) fn usable_at(&self, now: Instant) -> bool {
match self.retry_at {
Some(ready) => now >= ready,
None => true,
}
}
/// Return the time at which this `Status` can next be retried.
///
/// A return value of `None`, or of a time in the past, indicates that this
/// status can be used immediately.
pub(crate) fn next_retriable(&self) -> Option<Instant> {
self.retry_at
}
/// Restore this `Status` to its original state.
pub(crate) fn reset(&mut self) {
self.retry_at = None;
self.delay = RetryDelay::from_duration(FALLBACK_RETRY_FLOOR);
}
/// Record that the associated fallback directory has been used successfully.
///
/// This should only be done after successfully handling a whole reply from the
/// directory.
pub(crate) fn note_success(&mut self) {
self.reset();
}
/// Record that the associated fallback directory has failed.
pub(crate) fn note_failure(&mut self, now: Instant) {
let mut rng = rand::thread_rng();
self.retry_at = Some(now + self.delay.next_delay(&mut rng));
}
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
#[test]
fn status_basics() {
let now = Instant::now();
let mut status = Status::default();
// newly created status is usable.
assert!(status.usable_at(now));
// no longer usable after a failure.
status.note_failure(now);
assert_eq!(status.next_retriable().unwrap(), now + FALLBACK_RETRY_FLOOR);
assert!(!status.usable_at(now));
// Not enough time has passed.
assert!(!status.usable_at(now + FALLBACK_RETRY_FLOOR / 2));
// Enough time has passed.
assert!(status.usable_at(now + FALLBACK_RETRY_FLOOR));
// Mark as failed again; the timeout will (probably) be longer.
status.note_failure(now + FALLBACK_RETRY_FLOOR);
assert!(status.next_retriable().unwrap() >= now + FALLBACK_RETRY_FLOOR * 2);
assert!(!status.usable_at(now + FALLBACK_RETRY_FLOOR));
// Mark as succeeded; it should be usable immediately.
status.note_success();
assert!(status.usable_at(now + FALLBACK_RETRY_FLOOR));
}
}

View File

@ -232,7 +232,7 @@ struct GuardMgrInner {
/// A list of fallback directories used to access the directory system
/// when no other directory information is yet known.
// TODO: reconfigure when the configuration changes.
fallbacks: fallback::FallbackList,
fallbacks: fallback::FallbackSet,
/// Location in which to store persistent state.
storage: DynStorageHandle<GuardSets>,
@ -288,7 +288,7 @@ impl<R: Runtime> GuardMgr<R> {
ctrl,
pending: HashMap::new(),
waiting: Vec::new(),
fallbacks,
fallbacks: fallbacks.into(),
storage,
}));
{
@ -461,7 +461,7 @@ impl<R: Runtime> GuardMgr<R> {
// it should _probably_ not hurt.)
inner.guards.active_guards_mut().consider_all_retries(now);
let (origin, guard) = inner.select_guard_with_expand(&usage, netdir, wallclock)?;
let (origin, guard) = inner.select_guard_with_expand(&usage, netdir, now, wallclock)?;
trace!(?guard, ?usage, "Guard selected");
let (usable, usable_sender) = if origin.usable_immediately() {
@ -510,7 +510,8 @@ impl<R: Runtime> GuardMgr<R> {
Ok((guard, monitor, usable))
}
/// Record that after we tried to connect to the guard described in `Guard`
/// Record that _after_ we built a circuit with `guard`,something described
/// in `external_failure` went wrong with it.
pub fn note_external_failure(&self, guard: &GuardId, external_failure: ExternalFailure) {
let now = self.runtime.now();
let mut inner = self.inner.lock().expect("Poisoned lock");
@ -519,6 +520,21 @@ impl<R: Runtime> GuardMgr<R> {
.guards
.active_guards_mut()
.record_failure(guard, Some(external_failure), now);
if external_failure == ExternalFailure::DirCache {
inner.fallbacks.note_failure(guard, now);
}
}
/// Record that _after_ we built a circuit with `guard`, some activity described in `external_activity` was successful with it.
///
pub fn note_external_success(&self, guard: &GuardId, external_activity: ExternalFailure) {
// XXXX: rename the ExternalFailure type, since we're using it in this way too.
let mut inner = self.inner.lock().expect("Poisoned lock");
if external_activity == ExternalFailure::DirCache {
inner.fallbacks.note_success(guard);
}
}
/// Ensure that the message queue is flushed before proceeding to
@ -540,7 +556,7 @@ impl<R: Runtime> GuardMgr<R> {
/// A reason for marking a guard as failed that can't be observed from inside
/// the `GuardMgr` code.
#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum ExternalFailure {
/// This guard has somehow failed to operate as a good directory cache.
@ -666,10 +682,16 @@ impl GuardMgrInner {
let guard_id = pending.guard_id();
trace!(?guard_id, ?status, "Received report of guard status");
// TODO: all the branches below are no-ops (or close to it) if the
// "guard" is actually a fallback. We'll add handling for that later.
match status {
GuardStatus::Success => {
match (status, pending.source()) {
(GuardStatus::Failure, sample::ListKind::Fallback) => {
// We used a fallback, and we weren't able to build a circuit through it.
self.fallbacks.note_failure(guard_id, runtime.now());
}
(_, sample::ListKind::Fallback) => {
// We don't record any other kind of circuit activity if we
// took the entry from the fallback list.
}
(GuardStatus::Success, _) => {
// If we had gone too long without any net activity when we
// gave out this guard, and now we're seeing a circuit
// succeed, tell the primary guards that they might be
@ -697,19 +719,19 @@ impl GuardMgrInner {
self.waiting.push(pending);
}
}
GuardStatus::Failure => {
(GuardStatus::Failure, _) => {
self.guards
.active_guards_mut()
.record_failure(guard_id, None, runtime.now());
pending.reply(false);
}
GuardStatus::AttemptAbandoned => {
(GuardStatus::AttemptAbandoned, _) => {
self.guards
.active_guards_mut()
.record_attempt_abandoned(guard_id);
pending.reply(false);
}
GuardStatus::Indeterminate => {
(GuardStatus::Indeterminate, _) => {
self.guards
.active_guards_mut()
.record_indeterminate_result(guard_id);
@ -810,7 +832,8 @@ impl GuardMgrInner {
&mut self,
usage: &GuardUsage,
netdir: Option<&NetDir>,
now: SystemTime,
now: Instant,
wallclock: SystemTime,
) -> Result<(sample::ListKind, Guard), PickGuardError> {
// Try to find a guard.
let res1 = self.select_guard_once(usage);
@ -822,11 +845,11 @@ impl GuardMgrInner {
// That didn't work. If we have a netdir, expand the sample and try again.
if let Some(dir) = netdir {
trace!("No guards available, trying to extend the sample.");
self.update(now, Some(dir));
self.update(wallclock, Some(dir));
if self
.guards
.active_guards_mut()
.extend_sample_as_needed(now, &self.params, dir)
.extend_sample_as_needed(wallclock, &self.params, dir)
{
self.guards
.active_guards_mut()
@ -842,7 +865,7 @@ impl GuardMgrInner {
// Okay, that didn't work either. If we were asked for a directory
// guard, then we may be able to use a fallback.
if usage.kind == GuardUsageKind::OneHopDirectory {
return self.select_fallback();
return self.select_fallback(now);
}
// Couldn't extend the sample or use a fallback; return the original error.
@ -872,9 +895,9 @@ impl GuardMgrInner {
///
/// Called when we have no guard information to use. Return values are as
/// for [`select_guard()`]
fn select_fallback(&self) -> Result<(sample::ListKind, Guard), PickGuardError> {
let fallback = self.fallbacks.choose(&mut rand::thread_rng())?;
Ok((sample::ListKind::Fallback, fallback.as_guard()))
fn select_fallback(&self, now: Instant) -> Result<(sample::ListKind, Guard), PickGuardError> {
let fallback = self.fallbacks.choose(&mut rand::thread_rng(), now)?;
Ok((sample::ListKind::Fallback, fallback.clone()))
}
}
@ -974,7 +997,7 @@ impl TryFrom<&NetParameters> for GuardParams {
///
/// (This is implemented internally using both of the guard's Ed25519
/// and RSA identities.)
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct GuardId {
/// Ed25519 identity key for a guard
ed25519: pk::ed25519::Ed25519Identity,

View File

@ -311,7 +311,6 @@ impl PendingRequest {
}
/// Return the source from which we took this guard from.
#[allow(dead_code)] // TODO: this will get used later in this branch.
pub(crate) fn source(&self) -> ListKind {
self.source
}