Merge branch 'stalled_directory' into 'main'

Detect and report stalled directory downloads

Closes #468

See merge request tpo/core/arti!587
This commit is contained in:
Ian Jackson 2022-06-21 19:13:40 +00:00
commit 9b4ba4893e
6 changed files with 592 additions and 208 deletions

View File

@ -9,7 +9,7 @@ use futures::{Stream, StreamExt};
use tor_basic_utils::skip_fmt;
use tor_chanmgr::{ConnBlockage, ConnStatus, ConnStatusEvents};
use tor_circmgr::{ClockSkewEvents, SkewEstimate};
use tor_dirmgr::DirBootstrapStatus;
use tor_dirmgr::{DirBlockage, DirBootstrapStatus};
use tracing::debug;
/// Information about how ready a [`crate::TorClient`] is to handle requests.
@ -85,6 +85,10 @@ impl BootstrapStatus {
} else {
Some(Blockage { kind, message })
}
} else if let Some(b) = self.dir_status.blockage(SystemTime::now()) {
let message = b.to_string().into();
let kind = b.into();
Some(Blockage { kind, message })
} else {
None
}
@ -140,6 +144,11 @@ pub enum BlockageKind {
/// successfully with relays and/or from finding a directory that we trust.
#[display(fmt = "Clock is skewed.")]
ClockSkewed,
/// We've encounted some kind of problem downloading directory
/// information, and it doesn't seem to be caused by any particular
/// connection problem.
#[display(fmt = "Can't bootstrap a Tor directory.")]
CantBootstrap,
}
impl From<ConnBlockage> for BlockageKind {
@ -153,6 +162,12 @@ impl From<ConnBlockage> for BlockageKind {
}
}
impl From<DirBlockage> for BlockageKind {
fn from(_: DirBlockage) -> Self {
BlockageKind::CantBootstrap
}
}
impl fmt::Display for BootstrapStatus {
/// Format this [`BootstrapStatus`].
///
@ -195,6 +210,7 @@ pub(crate) async fn report_status(
skew_status: ClockSkewEvents,
) {
/// Internal enumeration to combine incoming status changes.
#[allow(clippy::large_enum_variant)]
enum Event {
/// A connection status change
Conn(ConnStatus),

View File

@ -1,3 +1,5 @@
MODIFIED: DirProvider now has download_task_handle().
There's a default implementation, so this isn't a breaking change.
MODIFIED: DirBootstrapStatus now has a blockage() method.
BREAKING: DirStatus is no longer a public type.
(Nothing would actually give you one.)

View File

@ -1,6 +1,7 @@
//! Functions to download or load directory objects, using the
//! state machines in the `states` module.
use std::num::NonZeroUsize;
use std::ops::{Deref, DerefMut};
use std::{
collections::HashMap,
@ -48,6 +49,36 @@ macro_rules! propagate_fatal_errors {
};
}
/// Identifier for an attempt to bootstrap a directory.
///
/// Every time that we decide to download a new directory, _despite already
/// having one_, counts as a new attempt.
///
/// These are used to track the progress of each attempt independently.
#[derive(Copy, Clone, Debug, derive_more::Display, Eq, PartialEq, Ord, PartialOrd)]
#[display(fmt = "{0}", id)]
pub(crate) struct AttemptId {
/// Which attempt at downloading a directory is this?
id: NonZeroUsize,
}
impl AttemptId {
/// Return a new unused AtomicUsize that will be greater than any previous
/// one.
///
/// # Panics
///
/// Panics if we have exhausted the possible space of AtomicIds.
pub(crate) fn next() -> Self {
use std::sync::atomic::{AtomicUsize, Ordering};
/// atomic used to generate the next attempt.
static NEXT: AtomicUsize = AtomicUsize::new(1);
let id = NEXT.fetch_add(1, Ordering::Relaxed);
let id = id.try_into().expect("Allocated too many AttemptIds");
Self { id }
}
}
/// If there were errors from a peer in `outcome`, record those errors by
/// marking the circuit (if any) as needing retirement, and noting the peer
/// (if any) as having failed.
@ -313,9 +344,11 @@ async fn fetch_multiple<R: Runtime>(
async fn load_once<R: Runtime>(
dirmgr: &Arc<DirMgr<R>>,
state: &mut Box<dyn DirState>,
changed: &mut bool,
attempt_id: AttemptId,
changed_out: &mut bool,
) -> Result<()> {
let missing = state.missing_docs();
let mut changed = false;
let outcome: Result<()> = if missing.is_empty() {
trace!("Found no missing documents; can't advance current state");
Ok(())
@ -330,13 +363,16 @@ async fn load_once<R: Runtime>(
load_documents_from_store(&missing, store.deref())?
};
state.add_from_cache(documents, changed)
state.add_from_cache(documents, &mut changed)
};
// We have to update the status here regardless of the outcome: even if
// there was an error, we might have received partial information that
// changed our status.
dirmgr.update_status(state.bootstrap_status());
// We have to update the status here regardless of the outcome, if we got
// any information: even if there was an error, we might have received
// partial information that changed our status.
if changed {
dirmgr.update_progress(attempt_id, state.bootstrap_progress());
*changed_out = true;
}
outcome
}
@ -348,12 +384,13 @@ async fn load_once<R: Runtime>(
pub(crate) async fn load<R: Runtime>(
dirmgr: Arc<DirMgr<R>>,
mut state: Box<dyn DirState>,
attempt_id: AttemptId,
) -> Result<Box<dyn DirState>> {
let mut safety_counter = 0_usize;
loop {
trace!(state=%state.describe(), "Loading from cache");
let mut changed = false;
let outcome = load_once(&dirmgr, &mut state, &mut changed).await;
let outcome = load_once(&dirmgr, &mut state, attempt_id, &mut changed).await;
{
let mut store = dirmgr.store.lock().expect("store lock poisoned");
dirmgr.apply_netdir_changes(&mut state, store.deref_mut())?;
@ -399,9 +436,11 @@ async fn download_attempt<R: Runtime>(
dirmgr: &Arc<DirMgr<R>>,
state: &mut Box<dyn DirState>,
parallelism: usize,
attempt_id: AttemptId,
) -> Result<()> {
let missing = state.missing_docs();
let fetched = fetch_multiple(Arc::clone(dirmgr), &missing, parallelism).await?;
let mut n_errors = 0;
for (client_req, dir_response) in fetched {
let source = dir_response.source().map(Clone::clone);
let text = match String::from_utf8(dir_response.into_output())
@ -410,6 +449,7 @@ async fn download_attempt<R: Runtime>(
Ok(t) => t,
Err(e) => {
if let Some(source) = source {
n_errors += 1;
note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
}
continue;
@ -435,6 +475,7 @@ async fn download_attempt<R: Runtime>(
if let Some(source) = source {
if let Err(e) = &outcome {
n_errors += 1;
note_cache_error(dirmgr.circmgr()?.deref(), &source, e);
} else {
note_cache_success(dirmgr.circmgr()?.deref(), &source);
@ -442,6 +483,7 @@ async fn download_attempt<R: Runtime>(
}
if let Err(e) = &outcome {
dirmgr.note_errors(attempt_id, 1);
warn!("error while adding directory info: {}", e);
}
propagate_fatal_errors!(outcome);
@ -449,14 +491,17 @@ async fn download_attempt<R: Runtime>(
Err(e) => {
warn!("Error when expanding directory text: {}", e);
if let Some(source) = source {
n_errors += 1;
note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
}
propagate_fatal_errors!(Err(e));
}
}
}
dirmgr.update_status(state.bootstrap_status());
if n_errors != 0 {
dirmgr.note_errors(attempt_id, n_errors);
}
dirmgr.update_progress(attempt_id, state.bootstrap_progress());
Ok(())
}
@ -474,6 +519,7 @@ pub(crate) async fn download<R: Runtime>(
dirmgr: Weak<DirMgr<R>>,
state: &mut Box<dyn DirState>,
schedule: &mut TaskSchedule<R>,
attempt_id: AttemptId,
on_usable: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
@ -488,11 +534,12 @@ pub(crate) async fn download<R: Runtime>(
let mut now = {
let dirmgr = upgrade_weak_ref(&dirmgr)?;
let mut changed = false;
let load_result = load_once(&dirmgr, state, &mut changed).await;
let load_result = load_once(&dirmgr, state, attempt_id, &mut changed).await;
if let Err(e) = &load_result {
// If the load failed but the error can be blamed on a directory
// cache, do so.
if let Some(source) = e.responsible_cache() {
dirmgr.note_errors(attempt_id, 1);
note_cache_error(dirmgr.circmgr()?.deref(), source, e);
}
}
@ -552,7 +599,7 @@ pub(crate) async fn download<R: Runtime>(
now = {
let dirmgr = upgrade_weak_ref(&dirmgr)?;
futures::select_biased! {
outcome = download_attempt(&dirmgr, state, parallelism.into()).fuse() => {
outcome = download_attempt(&dirmgr, state, parallelism.into(), attempt_id).fuse() => {
if let Err(e) = outcome {
warn!("Error while downloading: {}", e);
propagate_fatal_errors!(Err(e));
@ -704,8 +751,8 @@ mod test {
fn describe(&self) -> String {
format!("{:?}", &self)
}
fn bootstrap_status(&self) -> crate::event::DirStatus {
crate::event::DirStatus::default()
fn bootstrap_progress(&self) -> crate::event::DirProgress {
crate::event::DirProgress::default()
}
fn is_ready(&self, ready: Readiness) -> bool {
match (ready, self.second_time_around) {
@ -801,10 +848,13 @@ mod test {
}
}
let mgr = Arc::new(mgr);
let attempt_id = AttemptId::next();
// Try just a load.
let state = Box::new(DemoState::new1());
let result = super::load(Arc::clone(&mgr), state).await.unwrap();
let result = super::load(Arc::clone(&mgr), state, attempt_id)
.await
.unwrap();
assert!(result.is_ready(Readiness::Complete));
// Try a bootstrap that could (but won't!) download.
@ -815,6 +865,7 @@ mod test {
Arc::downgrade(&mgr),
&mut state,
&mut schedule,
attempt_id,
&mut on_usable,
)
.await
@ -849,12 +900,14 @@ mod test {
}
let mgr = Arc::new(mgr);
let mut on_usable = None;
let attempt_id = AttemptId::next();
let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
super::download(
Arc::downgrade(&mgr),
&mut state,
&mut schedule,
attempt_id,
&mut on_usable,
)
.await

View File

@ -23,6 +23,8 @@ use tor_basic_utils::skip_fmt;
use tor_netdir::DirEvent;
use tor_netdoc::doc::netstatus;
use crate::bootstrap::AttemptId;
/// A trait to indicate something that can be published with [`FlagPublisher`].
///
/// Since the implementation of `FlagPublisher` requires that its events be
@ -226,32 +228,82 @@ impl<F: FlagEvent> Stream for FlagListener<F> {
/// This status does not necessarily increase monotonically: it can go backwards
/// if (for example) our directory information expires before we're able to get
/// new information.
//
// TODO(nickm): This type has gotten a bit large for being the type we send over
// a `postage::watch`: perhaps we'd be better off having this information stored
// in the guardmgr, and having only a summary of it sent over the
// `postage::watch`. But for now, let's not, unless it shows up in profiles.
#[derive(Clone, Debug, Default)]
pub struct DirBootstrapStatus {
/// The status for the current directory that we're using right now.
pub(crate) current: DirStatus,
/// The status for a directory that we're downloading to replace the current
/// directory.
pub struct DirBootstrapStatus(StatusEnum);
/// The contents of a DirBootstrapStatus.
///
/// This is a separate type since we don't want to make these variables public.
#[derive(Clone, Debug, educe::Educe)]
#[educe(Default)]
enum StatusEnum {
/// There is no active attempt to load or fetch a directory.
#[educe(Default)]
NoActivity,
/// We have only one attempt to fetch a directory.
Single {
/// The currently active directory attempt.
///
/// We're either using this directory now, or we plan to use it as soon
/// as it's complete enough.
current: StatusEntry,
},
/// We have an existing directory attempt, but it's stale, and we're
/// fetching a new one to replace it.
///
/// This is "None" if we haven't started fetching the next consensus yet.
pub(crate) next: Option<DirStatus>,
/// Invariant: `current.id < next.id`
Replacing {
/// The previous attempt's status. It may still be trying to fetch
/// information if it has descriptors left to download.
current: StatusEntry,
/// The current attempt's status. We are not yet using this directory
/// for our activity, since it does not (yet) have enough information.
next: StatusEntry,
},
}
/// The status and identifier of a single attempt to download a full directory.
#[derive(Clone, Debug)]
struct StatusEntry {
/// The identifier for this attempt.
id: AttemptId,
/// The latest status.
status: DirStatus,
}
/// The status for a single directory.
#[derive(Clone, Debug, Default)]
pub struct DirStatus(DirStatusInner);
#[derive(Clone, Debug, Default, derive_more::Display)]
#[display(fmt = "{0}", progress)]
pub(crate) struct DirStatus {
/// How much of the directory do we currently have?
progress: DirProgress,
/// How many resets have been forced while fetching this directory?
n_resets: usize,
/// How many errors have we encountered since last we advanced the
/// 'progress' on this directory?
n_errors: usize,
/// How many times has an `update_progress` call not actually moved us
/// forward since we last advanced the 'progress' on this directory?
n_stalls: usize,
}
/// The contents of a single DirStatus.
/// How much progress have we made in downloading a given directory?
///
/// This is a separate type so that we don't make the variants public.
#[derive(Clone, Debug, Educe)]
#[educe(Default)]
pub(crate) enum DirStatusInner {
pub(crate) enum DirProgress {
/// We don't have any information yet.
#[educe(Default)]
NoConsensus {
/// If present, we are fetching a consensus whose valid-after time
/// postdates this time.
#[allow(dead_code)]
after: Option<SystemTime>,
},
/// We've downloaded a consensus, but we haven't validated it yet.
@ -284,13 +336,53 @@ pub(crate) enum DirStatusInner {
},
}
impl From<DirStatusInner> for DirStatus {
fn from(inner: DirStatusInner) -> DirStatus {
DirStatus(inner)
}
/// A reported diagnostic for what kind of trouble we've seen while trying to
/// bootstap a directory.
///
/// These blockages types are not yet terribly specific: if you encounter one,
/// it's probably a good idea to check the logs to see what's really going on.
///
/// If you encounter connection blockage _and_ directory blockage at the same
/// time, the connection blockage is almost certainly the real problem.
//
// TODO(nickm): At present these diagnostics aren't very helpful; they say too
// much about _how we know_ that the process has gone wrong, but not so much
// about _what the problem is_. In the future, we may wish to look more closely
// at what _kind_ of errors or resets we've seen, so we can report better
// information. Probably, however, we should only do that after we get some
// experience with which problems people encounter in practice, and what
// diagnostics would be useful for them.
#[derive(Clone, Debug, derive_more::Display)]
#[non_exhaustive]
pub enum DirBlockage {
/// We've been downloading information without error, but we haven't
/// actually been getting anything that we want.
///
/// This might indicate that there's a problem with information propagating
/// through the Tor network, or it might indicate that a bogus consensus or
/// a bad clock has tricked us into asking for something that nobody has.
#[display(fmt = "Can't make progress.")]
Stalled,
/// We've gotten a lot of errors without making forward progress on our
/// bootstrap attempt.
///
/// This might indicate that something's wrong with the Tor network, or that
/// there's something buggy with our ability to handle directory responses.
/// It might also indicate a malfunction on our directory guards, or a bug
/// on our retry logic.
#[display(fmt = "Too many errors without making progress.")]
TooManyErrors,
/// We've reset our bootstrap attempt a lot of times.
///
/// This either indicates that we have been failing a lot for one of the
/// other reasons above, or that we keep getting served a consensus which
/// turns out, upon trying to fetch certificates, not to be usable. It can
/// also indicate a bug in our retry logic.
#[display(fmt = "Had to reset bootstrapping too many times.")]
TooManyResets,
}
impl fmt::Display for DirStatus {
impl fmt::Display for DirProgress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
/// Format this time in a format useful for displaying
/// lifetime boundaries.
@ -310,19 +402,19 @@ impl fmt::Display for DirStatus {
.unwrap_or_else(|_| "(could not format)".into())
}
match &self.0 {
DirStatusInner::NoConsensus { .. } => write!(f, "fetching a consensus"),
DirStatusInner::FetchingCerts { n_certs, .. } => write!(
match &self {
DirProgress::NoConsensus { .. } => write!(f, "fetching a consensus"),
DirProgress::FetchingCerts { n_certs, .. } => write!(
f,
"fetching authority certificates ({}/{})",
n_certs.0, n_certs.1
),
DirStatusInner::Validated {
DirProgress::Validated {
usable: false,
n_mds,
..
} => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
DirStatusInner::Validated {
DirProgress::Validated {
usable: true,
lifetime,
..
@ -338,15 +430,43 @@ impl fmt::Display for DirStatus {
impl fmt::Display for DirBootstrapStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "directory is {}", self.current)?;
if let Some(ref next) = self.next {
write!(f, "; next directory is {}", next)?;
match &self.0 {
StatusEnum::NoActivity => write!(f, "not downloading")?,
StatusEnum::Single { current } => write!(f, "directory is {}", current.status)?,
StatusEnum::Replacing { current, next } => write!(
f,
"directory is {}; next directory is {}",
current.status, next.status
)?,
}
Ok(())
}
}
impl DirBootstrapStatus {
/// Return the current DirStatus.
///
/// This is the _most complete_ status. If we have any usable status, it is
/// this one.
fn current(&self) -> Option<&DirStatus> {
match &self.0 {
StatusEnum::NoActivity => None,
StatusEnum::Single { current } => Some(&current.status),
StatusEnum::Replacing { current, .. } => Some(&current.status),
}
}
/// Return the next DirStatus, if there is one.
///
/// Testing-only.
#[cfg(test)]
fn next(&self) -> Option<&DirStatus> {
match &self.0 {
StatusEnum::Replacing { next, .. } => Some(&next.status),
_ => None,
}
}
/// Return the fraction of completion for directory download, in a form
/// suitable for a progress bar at some particular time.
///
@ -356,41 +476,146 @@ impl DirBootstrapStatus {
/// Callers _should not_ depend on the specific meaning of any particular
/// fraction; we may change these fractions in the future.
pub fn frac_at(&self, when: SystemTime) -> f32 {
self.current
.frac_at(when)
.or_else(|| self.next.as_ref().and_then(|next| next.frac_at(when)))
.unwrap_or(0.0)
match &self.0 {
StatusEnum::NoActivity => 0.0,
StatusEnum::Single { current } => current.status.frac_at(when).unwrap_or(0.0),
StatusEnum::Replacing { current, next } => current
.status
.frac_at(when)
.or_else(|| next.status.frac_at(when))
.unwrap_or(0.0),
}
}
/// Return true if this status indicates that we have a current usable
/// directory.
pub fn usable_at(&self, now: SystemTime) -> bool {
self.current.usable() && self.current.okay_to_use_at(now)
if let Some(current) = self.current() {
current.progress.usable() && current.okay_to_use_at(now)
} else {
false
}
}
/// Update this status by replacing its current status (or its next status)
/// with `new_status`, as appropriate.
pub(crate) fn update(&mut self, new_status: DirStatus) {
if new_status.usable() {
// This is a usable directory, but it might be a stale one still
// getting updated. Make sure that it is at least as new as the one
// in `current` before we set `current`.
if new_status.at_least_as_new_as(&self.current) {
// This one will be `current`. Should we clear `next`? Only if
// this one is at least as recent as `next` too.
if let Some(ref next) = self.next {
if new_status.at_least_as_new_as(next) {
self.next = None;
}
}
self.current = new_status;
/// If there is a problem with our attempts to bootstrap, return a
/// corresponding DirBlockage.
pub fn blockage(&self, now: SystemTime) -> Option<DirBlockage> {
if let Some(current) = self.current() {
if current.progress.usable() && current.declared_live_at(now) {
// The current directory is sufficient, and not even a little bit
// expired. There is no problem.
return None;
}
} else if !self.current.usable() {
// Not a usable directory, but we don't _have_ a usable directory. This is therefore current.
self.current = new_status;
} else {
// This is _not_ a usable directory, so it can only be `next`.
self.next = Some(new_status);
}
match &self.0 {
// We're not trying to fetch anything, so it can't be blocked.
StatusEnum::NoActivity => None,
// We have only one attempt: its blockage is the only relevant one.
StatusEnum::Single { current } => current.status.blockage(),
// We know about two attempts: any blockage in "current" is more
// serious.
StatusEnum::Replacing { current, next } => {
current.status.blockage().or_else(|| next.status.blockage())
}
}
}
/// Return the appropriate DirStatus for `AttemptId`, constructing it if
/// necessary.
///
/// Return None if all relevant attempts are more recent than this Id.
fn mut_status_for(&mut self, attempt_id: AttemptId) -> Option<&mut DirStatus> {
// First, we add a status for this attempt_id if appropriate.
//
// TODO: should make sure that the compiler is smart enough to optimize
// this mem::take() and replacement away, and turn it into a conditional
// replacement?
self.0 = match std::mem::take(&mut self.0) {
StatusEnum::NoActivity => StatusEnum::Single {
current: StatusEntry::new(attempt_id),
},
StatusEnum::Single { current } if current.id < attempt_id => StatusEnum::Replacing {
current,
next: StatusEntry::new(attempt_id),
},
StatusEnum::Replacing { current, next } if next.id < attempt_id => {
StatusEnum::Replacing {
current,
next: StatusEntry::new(attempt_id),
}
}
other => other,
};
// Now return the correct status.
match &mut self.0 {
StatusEnum::Single { current } if current.id == attempt_id => Some(&mut current.status),
StatusEnum::Replacing { current, .. } if current.id == attempt_id => {
Some(&mut current.status)
}
StatusEnum::Replacing { next, .. } if next.id == attempt_id => Some(&mut next.status),
_ => None,
}
}
/// If the "next" status is usable, replace the current status with it.
fn advance_status(&mut self) {
// TODO: should make sure that the compiler is smart enough to optimize
// this mem::take() and replacement away, and turn it into a conditional
// replacement?
self.0 = match std::mem::take(&mut self.0) {
StatusEnum::Replacing { next, .. } if next.status.progress.usable() => {
StatusEnum::Single { current: next }
}
other => other,
};
}
/// Update this status by replacing the `DirProgress` in its current status
/// (or its next status) with `new_status`, as appropriate.
pub(crate) fn update_progress(&mut self, attempt_id: AttemptId, new_progress: DirProgress) {
if let Some(status) = self.mut_status_for(attempt_id) {
let old_frac = status.frac();
status.progress = new_progress;
let new_frac = status.frac();
if new_frac > old_frac {
// This download has made progress: clear our count of errors
// and stalls.
status.n_errors = 0;
status.n_stalls = 0;
} else {
// This download didn't make progress; increment the stall
// count.
status.n_stalls += 1;
}
self.advance_status();
}
}
/// Update this status by noting that some errors have occurred in a given
/// download attempt.
pub(crate) fn note_errors(&mut self, attempt_id: AttemptId, n_errors: usize) {
if let Some(status) = self.mut_status_for(attempt_id) {
status.n_errors += n_errors;
}
}
/// Update this status by noting that we had to reset a given download attempt;
pub(crate) fn note_reset(&mut self, attempt_id: AttemptId) {
if let Some(status) = self.mut_status_for(attempt_id) {
status.n_resets += 1;
}
}
}
impl StatusEntry {
/// Construct a new StatusEntry with a given attempt id, and no progress
/// reported.
fn new(id: AttemptId) -> Self {
Self {
id,
status: DirStatus::default(),
}
}
}
@ -398,22 +623,22 @@ impl DirBootstrapStatus {
impl DirStatus {
/// Return the declared consensus lifetime for this directory, if we have one.
fn declared_lifetime(&self) -> Option<&netstatus::Lifetime> {
match &self.0 {
DirStatusInner::NoConsensus { .. } => None,
DirStatusInner::FetchingCerts { lifetime, .. } => Some(lifetime),
DirStatusInner::Validated { lifetime, .. } => Some(lifetime),
match &self.progress {
DirProgress::NoConsensus { .. } => None,
DirProgress::FetchingCerts { lifetime, .. } => Some(lifetime),
DirProgress::Validated { lifetime, .. } => Some(lifetime),
}
}
/// Return the consensus lifetime for this directory, if we have one, as
/// modified by our skew-tolerance settings.
fn usable_lifetime(&self) -> Option<&netstatus::Lifetime> {
match &self.0 {
DirStatusInner::NoConsensus { .. } => None,
DirStatusInner::FetchingCerts {
match &self.progress {
DirProgress::NoConsensus { .. } => None,
DirProgress::FetchingCerts {
usable_lifetime, ..
} => Some(usable_lifetime),
DirStatusInner::Validated {
DirProgress::Validated {
usable_lifetime, ..
} => Some(usable_lifetime),
}
@ -427,6 +652,14 @@ impl DirStatus {
.unwrap_or(false)
}
/// Return true if the directory is valid at the given time, _unmodified_ by our
/// clock skew settings.
fn declared_live_at(&self, when: SystemTime) -> bool {
self.declared_lifetime()
.map(|lt| lt.valid_at(when))
.unwrap_or(false)
}
/// As `frac`, but return None if this consensus is not valid at the given time,
/// and down-rate expired consensuses that we're still willing to use.
fn frac_at(&self, when: SystemTime) -> Option<f32> {
@ -445,11 +678,6 @@ impl DirStatus {
}
}
/// Return true if this status indicates a usable directory.
fn usable(&self) -> bool {
matches!(self.0, DirStatusInner::Validated { usable: true, .. })
}
/// Return the fraction of completion for directory download, in a form
/// suitable for a progress bar.
///
@ -463,46 +691,53 @@ impl DirStatus {
// downloading the certificates, and the remaining 65% is downloading
// the microdescriptors until we become usable. We may want to re-tune that in the future, but
// the documentation of this function should allow us to do so.
match &self.0 {
DirStatusInner::NoConsensus { .. } => 0.0,
DirStatusInner::FetchingCerts { n_certs, .. } => {
match &self.progress {
DirProgress::NoConsensus { .. } => 0.0,
DirProgress::FetchingCerts { n_certs, .. } => {
0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
}
DirStatusInner::Validated {
DirProgress::Validated {
usable: false,
n_mds,
..
} => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
DirStatusInner::Validated { usable: true, .. } => 1.0,
DirProgress::Validated { usable: true, .. } => 1.0,
}
}
/// Return true if the consensus in this DirStatus (if any) is at least as
/// new as the one in `other`.
fn at_least_as_new_as(&self, other: &DirStatus) -> bool {
/// return a candidate "valid after" time for a DirStatus, for comparison purposes.
fn start_time(st: &DirStatus) -> Option<SystemTime> {
match &st.0 {
DirStatusInner::NoConsensus { after: Some(t) } => {
Some(*t + std::time::Duration::new(1, 0)) // Make sure this sorts _after_ t.
}
DirStatusInner::FetchingCerts { lifetime, .. } => Some(lifetime.valid_after()),
DirStatusInner::Validated { lifetime, .. } => Some(lifetime.valid_after()),
_ => None,
}
}
/// If we think there is a problem with our bootstrapping process, return a
/// [`DirBlockage`] to describe it.
///
/// The caller may want to also check `usable_at` to avoid reporting trouble
/// if the directory is currently usable.
fn blockage(&self) -> Option<DirBlockage> {
/// How many resets are sufficient for us to report a blockage?
const RESET_THRESHOLD: usize = 2;
/// How many errors are sufficient for us to report a blockage?
const ERROR_THRESHOLD: usize = 6;
/// How many no-progress download attempts are sufficient for us to
/// report a blockage?
const STALL_THRESHOLD: usize = 8;
match (start_time(self), start_time(other)) {
// If both have a lifetime, compare their valid_after times.
(Some(l1), Some(l2)) => l1 >= l2,
// Any consensus is newer than none.
(Some(_), None) => true,
// No consensus is never newer than anything.
(None, _) => false,
if self.n_resets >= RESET_THRESHOLD {
Some(DirBlockage::TooManyResets)
} else if self.n_errors >= ERROR_THRESHOLD {
Some(DirBlockage::TooManyErrors)
} else if self.n_stalls >= STALL_THRESHOLD {
Some(DirBlockage::Stalled)
} else {
None
}
}
}
impl DirProgress {
/// Return true if this progress indicates a usable directory.
fn usable(&self) -> bool {
matches!(self, DirProgress::Validated { usable: true, .. })
}
}
/// A stream of [`DirBootstrapStatus`] events.
#[derive(Clone, Educe)]
#[educe(Debug)]
@ -664,21 +899,30 @@ mod test {
let now = SystemTime::now();
let hour = Duration::new(3600, 0);
let nothing = DirStatus(DirStatusInner::NoConsensus { after: None });
let nothing = DirStatus {
progress: DirProgress::NoConsensus { after: None },
..Default::default()
};
let lifetime = netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap();
let unval = DirStatus(DirStatusInner::FetchingCerts {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_certs: (3, 5),
});
let unval = DirStatus {
progress: DirProgress::FetchingCerts {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_certs: (3, 5),
},
..Default::default()
};
let lifetime =
netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap();
let with_c = DirStatus(DirStatusInner::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_mds: (30, 40),
usable: false,
});
let with_c = DirStatus {
progress: DirProgress::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_mds: (30, 40),
usable: false,
},
..Default::default()
};
// lifetime()
assert!(nothing.usable_lifetime().is_none());
@ -688,14 +932,6 @@ mod test {
now + hour * 3
);
// at_least_as_new_as()
assert!(!nothing.at_least_as_new_as(&nothing));
assert!(unval.at_least_as_new_as(&nothing));
assert!(unval.at_least_as_new_as(&unval));
assert!(!unval.at_least_as_new_as(&with_c));
assert!(with_c.at_least_as_new_as(&unval));
assert!(with_c.at_least_as_new_as(&with_c));
// frac() (It's okay if we change the actual numbers here later; the
// current ones are more or less arbitrary.)
const TOL: f32 = 0.00001;
@ -721,30 +957,42 @@ mod test {
let hour = Duration::new(3600, 0);
let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
let ds = DirStatus(DirStatusInner::NoConsensus { after: None });
let ds = DirStatus {
progress: DirProgress::NoConsensus { after: None },
..Default::default()
};
assert_eq!(ds.to_string(), "fetching a consensus");
let ds = DirStatus(DirStatusInner::FetchingCerts {
lifetime: lifetime.clone(),
usable_lifetime: lifetime.clone(),
n_certs: (3, 5),
});
let ds = DirStatus {
progress: DirProgress::FetchingCerts {
lifetime: lifetime.clone(),
usable_lifetime: lifetime.clone(),
n_certs: (3, 5),
},
..Default::default()
};
assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
let ds = DirStatus(DirStatusInner::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime.clone(),
n_mds: (30, 40),
usable: false,
});
let ds = DirStatus {
progress: DirProgress::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime.clone(),
n_mds: (30, 40),
usable: false,
},
..Default::default()
};
assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
let ds = DirStatus(DirStatusInner::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_mds: (30, 40),
usable: true,
});
let ds = DirStatus {
progress: DirProgress::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime,
n_mds: (30, 40),
usable: true,
},
..Default::default()
};
assert_eq!(
ds.to_string(),
"usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
@ -759,25 +1007,37 @@ mod test {
let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
let ds1: DirStatus = DirStatusInner::Validated {
let dp1 = DirProgress::Validated {
lifetime: lifetime.clone(),
usable_lifetime: lifetime.clone(),
n_mds: (3, 40),
usable: true,
}
.into();
let ds2: DirStatus = DirStatusInner::Validated {
};
let dp2 = DirProgress::Validated {
lifetime: lifetime2.clone(),
usable_lifetime: lifetime2.clone(),
n_mds: (5, 40),
usable: false,
}
.into();
let bs = DirBootstrapStatus {
current: ds1.clone(),
next: Some(ds2.clone()),
};
let attempt1 = AttemptId::next();
let attempt2 = AttemptId::next();
let bs = DirBootstrapStatus(StatusEnum::Replacing {
current: StatusEntry {
id: attempt1,
status: DirStatus {
progress: dp1.clone(),
..Default::default()
},
},
next: StatusEntry {
id: attempt2,
status: DirStatus {
progress: dp2.clone(),
..Default::default()
},
},
});
assert_eq!(bs.to_string(),
"directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
@ -795,48 +1055,63 @@ mod test {
// Case 1: we have a usable directory and the updated status isn't usable.
let mut bs = bs;
let ds3 = DirStatus(DirStatusInner::Validated {
let dp3 = DirProgress::Validated {
lifetime: lifetime2.clone(),
usable_lifetime: lifetime2.clone(),
n_mds: (10, 40),
usable: false,
});
bs.update(ds3);
};
bs.update_progress(attempt2, dp3);
assert!(matches!(
bs.next.as_ref().unwrap().0,
DirStatusInner::Validated {
n_mds: (10, 40),
bs.next().unwrap(),
DirStatus {
progress: DirProgress::Validated {
n_mds: (10, 40),
..
},
..
}
));
// Case 2: The new directory _is_ usable and newer. It will replace the old one.
let ds4 = DirStatus(DirStatusInner::Validated {
lifetime: lifetime2.clone(),
usable_lifetime: lifetime2.clone(),
n_mds: (20, 40),
usable: true,
});
bs.update(ds4);
assert!(bs.next.as_ref().is_none());
let ds4 = DirStatus {
progress: DirProgress::Validated {
lifetime: lifetime2.clone(),
usable_lifetime: lifetime2.clone(),
n_mds: (20, 40),
usable: true,
},
..Default::default()
};
bs.update_progress(attempt2, ds4.progress);
assert!(bs.next().is_none());
assert_eq!(
bs.current.usable_lifetime().unwrap().valid_after(),
bs.current()
.unwrap()
.usable_lifetime()
.unwrap()
.valid_after(),
lifetime2.valid_after()
);
// Case 3: The new directory is usable but older. Nothing will happen.
bs.update(ds1);
assert!(bs.next.as_ref().is_none());
bs.update_progress(attempt1, dp1);
assert!(bs.next().as_ref().is_none());
assert_ne!(
bs.current.usable_lifetime().unwrap().valid_after(),
bs.current()
.unwrap()
.usable_lifetime()
.unwrap()
.valid_after(),
lifetime.valid_after()
);
// Case 4: starting with an unusable directory, we always replace.
let mut bs = DirBootstrapStatus::default();
assert!(!ds2.usable());
assert!(bs.current.usable_lifetime().is_none());
bs.update(ds2);
assert!(bs.current.usable_lifetime().is_some());
assert!(!dp2.usable());
assert!(bs.current().is_none());
bs.update_progress(attempt2, dp2);
assert!(bs.current().unwrap().usable_lifetime().is_some());
}
}

View File

@ -79,6 +79,8 @@ use crate::shared_ref::SharedMutArc;
#[cfg(feature = "experimental-api")]
pub use crate::shared_ref::SharedMutArc;
use crate::storage::{DynStore, Store};
use bootstrap::AttemptId;
use event::DirProgress;
use postage::watch;
pub use retry::{DownloadSchedule, DownloadScheduleBuilder};
use scopeguard::ScopeGuard;
@ -108,7 +110,7 @@ pub use config::{
};
pub use docid::DocId;
pub use err::Error;
pub use event::{DirBootstrapEvents, DirBootstrapStatus, DirStatus};
pub use event::{DirBlockage, DirBootstrapEvents, DirBootstrapStatus};
pub use storage::DocumentText;
pub use tor_guardmgr::fallback::{FallbackDir, FallbackDirBuilder};
@ -291,7 +293,7 @@ impl<R: Runtime> DirMgr<R> {
let dirmgr = Arc::new(Self::from_config(config, runtime, None, true)?);
// TODO: add some way to return a directory that isn't up-to-date
let _success = dirmgr.load_directory().await?;
let _success = dirmgr.load_directory(AttemptId::next()).await?;
dirmgr.opt_netdir().ok_or(Error::DirectoryNotPresent)
}
@ -388,7 +390,8 @@ impl<R: Runtime> DirMgr<R> {
};
// Try to load from the cache.
let have_directory = self.load_directory().await?;
let attempt_id = AttemptId::next();
let have_directory = self.load_directory(attempt_id).await?;
let (mut sender, receiver) = if have_directory {
info!("Loaded a good directory from cache.");
@ -418,14 +421,16 @@ impl<R: Runtime> DirMgr<R> {
// Don't warn when these are Error::ManagerDropped: that
// means that the DirMgr has been shut down.
if let Err(e) =
Self::reload_until_owner(&dirmgr_weak, &mut schedule, &mut sender).await
Self::reload_until_owner(&dirmgr_weak, &mut schedule, attempt_id, &mut sender)
.await
{
match e {
Error::ManagerDropped => {}
_ => warn!("Unrecovered error while waiting for bootstrap: {}", e),
}
} else if let Err(e) =
Self::download_forever(dirmgr_weak.clone(), &mut schedule, sender).await
Self::download_forever(dirmgr_weak.clone(), &mut schedule, attempt_id, sender)
.await
{
match e {
Error::ManagerDropped => {}
@ -480,6 +485,7 @@ impl<R: Runtime> DirMgr<R> {
async fn reload_until_owner(
weak: &Weak<Self>,
schedule: &mut TaskSchedule<R>,
attempt_id: AttemptId,
on_complete: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut logged = false;
@ -527,7 +533,7 @@ impl<R: Runtime> DirMgr<R> {
{
let dirmgr = upgrade_weak_ref(weak)?;
trace!("Trying to load from the directory cache");
if dirmgr.load_directory().await? {
if dirmgr.load_directory(attempt_id).await? {
// Successfully loaded a bootstrapped directory.
if let Some(send_done) = on_complete.take() {
let _ = send_done.send(());
@ -548,6 +554,7 @@ impl<R: Runtime> DirMgr<R> {
async fn download_forever(
weak: Weak<Self>,
schedule: &mut TaskSchedule<R>,
mut attempt_id: AttemptId,
mut on_complete: Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut state: Box<dyn DirState> = {
@ -578,9 +585,14 @@ impl<R: Runtime> DirMgr<R> {
let mut retry_delay = retry_config.schedule();
'retry_attempt: for _ in retry_config.attempts() {
let outcome =
bootstrap::download(Weak::clone(&weak), &mut state, schedule, &mut on_complete)
.await;
let outcome = bootstrap::download(
Weak::clone(&weak),
&mut state,
schedule,
attempt_id,
&mut on_complete,
)
.await;
if let Err(err) = outcome {
if state.is_ready(Readiness::Usable) {
@ -605,6 +617,10 @@ impl<R: Runtime> DirMgr<R> {
"Unable to download a usable directory: {}. We will restart in {:?}.",
err, delay
);
{
let dirmgr = upgrade_weak_ref(&weak)?;
dirmgr.note_reset(attempt_id);
}
schedule.sleep(delay).await;
state = state.reset();
} else {
@ -633,6 +649,7 @@ impl<R: Runtime> DirMgr<R> {
Some(t) => schedule.sleep_until_wallclock(t).await,
None => return Ok(()),
}
attempt_id = bootstrap::AttemptId::next();
state = state.reset();
}
}
@ -695,14 +712,34 @@ impl<R: Runtime> DirMgr<R> {
self.receive_status.clone()
}
/// Replace the latest status with `new_status` and broadcast to anybody
/// Replace the latest status with `progress` and broadcast to anybody
/// watching via a [`DirBootstrapEvents`] stream.
fn update_status(&self, new_status: DirStatus) {
fn update_progress(&self, attempt_id: AttemptId, progress: DirProgress) {
// TODO(nickm): can I kill off this lock by having something else own the sender?
let mut sender = self.send_status.lock().expect("poisoned lock");
let mut status = sender.borrow_mut();
status.update(new_status);
status.update_progress(attempt_id, progress);
}
/// Update our status tracker to note that some number of errors has
/// occurred.
fn note_errors(&self, attempt_id: AttemptId, n_errors: usize) {
if n_errors == 0 {
return;
}
let mut sender = self.send_status.lock().expect("poisoned lock");
let mut status = sender.borrow_mut();
status.note_errors(attempt_id, n_errors);
}
/// Update our status tracker to note that we've needed to reset our download attempt.
fn note_reset(&self, attempt_id: AttemptId) {
let mut sender = self.send_status.lock().expect("poisoned lock");
let mut status = sender.borrow_mut();
status.note_reset(attempt_id);
}
/// Try to make this a directory manager with read-write access to its
@ -782,7 +819,7 @@ impl<R: Runtime> DirMgr<R> {
/// cache, if it is newer than the one we have.
///
/// Return false if there is no such consensus.
async fn load_directory(self: &Arc<Self>) -> Result<bool> {
async fn load_directory(self: &Arc<Self>, attempt_id: AttemptId) -> Result<bool> {
let state = state::GetConsensusState::new(
self.runtime.clone(),
self.config.get(),
@ -793,7 +830,7 @@ impl<R: Runtime> DirMgr<R> {
.clone()
.unwrap_or_else(|| Arc::new(crate::filter::NilFilter)),
);
let _ = bootstrap::load(Arc::clone(self), Box::new(state)).await?;
let _ = bootstrap::load(Arc::clone(self), Box::new(state), attempt_id).await?;
Ok(self.netdir.get().is_some())
}

View File

@ -23,7 +23,7 @@ use tor_netdoc::doc::authcert::UncheckedAuthCert;
use tor_netdoc::doc::netstatus::Lifetime;
use tracing::{info, warn};
use crate::event::{DirStatus, DirStatusInner};
use crate::event::DirProgress;
use crate::storage::DynStore;
use crate::{
@ -138,8 +138,8 @@ pub(crate) trait DirState: Send {
storage: Option<&Mutex<DynStore>>,
changed: &mut bool,
) -> Result<()>;
/// Return a summary of this state as a [`DirStatus`].
fn bootstrap_status(&self) -> event::DirStatus;
/// Return a summary of this state as a [`DirProgress`].
fn bootstrap_progress(&self) -> event::DirProgress;
/// Return a configuration for attempting downloads.
fn dl_config(&self) -> DownloadSchedule;
/// If possible, advance to the next state.
@ -265,11 +265,11 @@ impl<R: Runtime> DirState for GetConsensusState<R> {
fn can_advance(&self) -> bool {
self.next.is_some()
}
fn bootstrap_status(&self) -> DirStatus {
fn bootstrap_progress(&self) -> DirProgress {
if let Some(next) = &self.next {
next.bootstrap_status()
next.bootstrap_progress()
} else {
DirStatusInner::NoConsensus { after: self.after }.into()
DirProgress::NoConsensus { after: self.after }
}
}
fn dl_config(&self) -> DownloadSchedule {
@ -559,11 +559,11 @@ impl<R: Runtime> DirState for GetCertsState<R> {
fn can_advance(&self) -> bool {
matches!(self.consensus, GetCertsConsensus::Validated(_))
}
fn bootstrap_status(&self) -> DirStatus {
fn bootstrap_progress(&self) -> DirProgress {
let n_certs = self.certs.len();
let n_missing_certs = self.missing_certs.len();
let total_certs = n_missing_certs + n_certs;
DirStatusInner::FetchingCerts {
DirProgress::FetchingCerts {
lifetime: self.consensus_meta.lifetime().clone(),
usable_lifetime: self
.config
@ -572,7 +572,6 @@ impl<R: Runtime> DirState for GetCertsState<R> {
n_certs: (n_certs as u16, total_certs as u16),
}
.into()
}
fn dl_config(&self) -> DownloadSchedule {
self.config.schedule.retry_certs
@ -984,15 +983,14 @@ impl<R: Runtime> DirState for GetMicrodescsState<R> {
fn can_advance(&self) -> bool {
false
}
fn bootstrap_status(&self) -> DirStatus {
fn bootstrap_progress(&self) -> DirProgress {
let n_present = self.n_microdescs - self.partial.n_missing();
DirStatusInner::Validated {
DirProgress::Validated {
lifetime: self.meta.lifetime().clone(),
usable_lifetime: self.config.tolerance.extend_lifetime(self.meta.lifetime()),
n_mds: (n_present as u32, self.n_microdescs as u32),
usable: self.is_ready(Readiness::Usable),
}
.into()
}
fn dl_config(&self) -> DownloadSchedule {
self.config.schedule.retry_microdescs
@ -1218,7 +1216,7 @@ impl DirState for PoisonedState {
) -> Result<()> {
unimplemented!()
}
fn bootstrap_status(&self) -> event::DirStatus {
fn bootstrap_progress(&self) -> event::DirProgress {
unimplemented!()
}
fn dl_config(&self) -> DownloadSchedule {
@ -1392,7 +1390,10 @@ mod test {
assert!(state.reset_time().is_none());
// Its starting DirStatus is "fetching a consensus".
assert_eq!(state.bootstrap_status().to_string(), "fetching a consensus");
assert_eq!(
state.bootstrap_progress().to_string(),
"fetching a consensus"
);
// Download configuration is simple: only 1 request can be done in
// parallel. It uses a consensus retry schedule.
@ -1551,7 +1552,7 @@ mod test {
// Bootstrap status okay?
assert_eq!(
state.bootstrap_status().to_string(),
state.bootstrap_progress().to_string(),
"fetching authority certificates (0/2)"
);
@ -1578,7 +1579,7 @@ mod test {
assert_eq!(missing.len(), 1); // Now we're only missing one!
assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
assert_eq!(
state.bootstrap_status().to_string(),
state.bootstrap_progress().to_string(),
"fetching authority certificates (1/2)"
);
@ -1693,7 +1694,7 @@ mod test {
let retry = state.dl_config();
assert_eq!(retry, DownloadScheduleConfig::default().retry_microdescs);
assert_eq!(
state.bootstrap_status().to_string(),
state.bootstrap_progress().to_string(),
"fetching microdescriptors (0/4)"
);
@ -1730,7 +1731,7 @@ mod test {
assert_eq!(missing.len(), 3);
assert!(!missing.contains(&DocId::Microdesc(md1)));
assert_eq!(
state.bootstrap_status().to_string(),
state.bootstrap_progress().to_string(),
"fetching microdescriptors (1/4)"
);