Implement the basics of a bootstrap-status API.

The purpose of a this API is to tell the user how far along Arti is
in getting bootstrapped, and if it's stuck, what it's stuck on.

This API doesn't yet expose any useful information: by the time it's
observable to a client, it's always "100% bootstrapped."  But I'm
putting it in a MR now so that we can review the basic idea, and to
avoid conflicts with later work on tickets like #293 and #278.

This is part of #96.
This commit is contained in:
Nick Mathewson 2022-01-12 12:46:26 -05:00
parent b4761f8cfd
commit 1bd2790d51
5 changed files with 224 additions and 1 deletions

2
Cargo.lock generated
View File

@ -108,11 +108,13 @@ version = "0.0.3"
dependencies = [
"anyhow",
"derive_builder",
"derive_more",
"directories",
"futures",
"humantime-serde",
"hyper",
"pin-project",
"postage",
"serde",
"thiserror",
"tokio",

View File

@ -33,8 +33,10 @@ tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3"}
humantime-serde = "1"
derive_builder = "0.10"
derive_more = "0.99"
directories = "4"
futures = "0.3"
postage = { version = "0.4", default-features = false, features = ["futures-traits"] }
tracing = "0.1.18"
serde = { version = "1.0.103", features = ["derive"] }
thiserror = "1"

View File

@ -7,6 +7,7 @@
use crate::address::IntoTorAddr;
use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig};
use futures::channel::oneshot;
use tor_circmgr::{DirInfo, IsolationToken, StreamIsolationBuilder, TargetPort};
use tor_config::MutCfg;
use tor_dirmgr::DirEvent;
@ -22,7 +23,7 @@ use std::net::IpAddr;
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use crate::{Error, Result};
use crate::{status, Error, Result};
#[cfg(feature = "async-std")]
use tor_rtcompat::async_std::AsyncStdRuntime;
#[cfg(feature = "tokio")]
@ -61,6 +62,13 @@ pub struct TorClient<R: Runtime> {
///
/// See [`TorClient::reconfigure`] for more information on its use.
reconfigure_lock: Arc<Mutex<()>>,
/// A stream of bootstrap messages that we can clone when a client asks for
/// it.
///
/// (We don't need to observe this stream ourselves, since it drops each
/// unobserved status change when the next status change occurs.)
status_receiver: status::BootstrapEvents,
}
/// Preferences for how to route a stream over the Tor network.
@ -227,6 +235,16 @@ impl<R: Runtime> TorClient<R> {
}
let addr_cfg = config.address_filter.clone();
let timeout_cfg = config.stream_timeouts.clone();
let (status_sender, status_receiver) = postage::watch::channel();
let status_receiver = status::BootstrapEvents {
inner: status_receiver,
};
// TODO(nickm): we should use a real set of information sources here.
// This is just temporary.
let (tor_ready_sender, tor_ready_receiver) = oneshot::channel();
runtime.spawn(status::report_status(status_sender, tor_ready_receiver))?;
let chanmgr = Arc::new(tor_chanmgr::ChanMgr::new(runtime.clone()));
let circmgr =
tor_circmgr::CircMgr::new(circ_cfg, statemgr.clone(), &runtime, Arc::clone(&chanmgr))?;
@ -267,6 +285,9 @@ impl<R: Runtime> TorClient<R> {
let client_isolation = IsolationToken::new();
// At this point, we're bootstrapped.
let _ = tor_ready_sender.send(());
Ok(TorClient {
runtime,
client_isolation,
@ -276,6 +297,7 @@ impl<R: Runtime> TorClient<R> {
addrcfg: Arc::new(addr_cfg.into()),
timeoutcfg: Arc::new(timeout_cfg.into()),
reconfigure_lock: Arc::new(Mutex::new(())),
status_receiver,
})
}
@ -505,6 +527,27 @@ impl<R: Runtime> TorClient<R> {
Ok(circ)
}
/// Return a current [`status::BootstrapStatus`] describing how close this client
/// is to being ready for user traffic.
pub fn bootstrap_status(&self) -> status::BootstrapStatus {
self.status_receiver.inner.borrow().clone()
}
/// Return a stream of [`status::BootstrapStatus`] events that will be updated
/// whenever the client's status changes.
///
/// The receiver might not receive every update sent to this stream, though
/// when it does poll the stream it should get the most recent one.
//
// TODO(nickm): will this also need to implement Send and 'static?
//
// TODO(nickm): by the time the `TorClient` is visible to the user, this
// status is always true. That will change with #293, however, and will
// also change as BootstrapStatus becomes more complex with #96.
pub fn bootstrap_events(&self) -> status::BootstrapEvents {
self.status_receiver.clone()
}
}
/// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update

View File

@ -176,6 +176,7 @@ mod address;
mod client;
pub mod config;
pub mod status;
pub use address::{DangerouslyIntoTorAddr, IntoTorAddr, TorAddr, TorAddrError};
pub use client::{ConnectPrefs, TorClient};

View File

@ -0,0 +1,175 @@
//! Code to collect and publish information about a client's bootstrapping
//! status.
use std::{borrow::Cow, fmt};
use derive_more::Display;
use futures::{channel::oneshot, future, Stream, StreamExt};
/// Information about how ready a [`crate::TorClient`] is to handle requests.
///
/// Note that this status does not change monotonically: a `TorClient` can
/// become more _or less_ bootstrapped over time. (For example, a client can
/// become less bootstrapped if it loses its internet connectivity, or if its
/// directory information expires before it's able to replace it.)
//
// # Note
//
// We need to keep this type fairly small, since it will get cloned whenever
// it's observed on a stream. If it grows large, we can add an Arc<> around
// its data.
#[derive(Debug, Clone, Default)]
pub struct BootstrapStatus {
/// A placeholder field: we'll be replacing this as the branch gets support
/// for more information sources.
ready: bool,
}
impl BootstrapStatus {
/// Return a rough fraction (from 0.0 to 1.0) representing how far along
/// the client's bootstrapping efforts are.
///
/// 0 is defined as "just started"; 1 is defined as "ready to use."
pub fn as_frac(&self) -> f32 {
if self.ready {
1.0
} else {
0.0
}
}
/// Return true if the status indicates that the client is ready for
/// traffic.
///
/// For the purposes of this function, the client is "ready for traffic" if,
/// as far as we know, we can start acting on a new client request immediately.
pub fn ready_for_traffic(&self) -> bool {
self.ready
}
/// If the client is unable to make forward progress for some reason, return
/// that reason.
///
/// (Returns None if the client doesn't seem to be stuck.)
///
/// # Caveats
///
/// This function provides a "best effort" diagnostic: there
/// will always be some blockage types that it can't diagnose
/// correctly. It may declare that Arti is stuck for reasons that
/// are incorrect; or it may declare that the client is not stuck
/// when in fact no progress is being made.
///
/// Therefore, the caller should always use a certain amount of
/// modesty when reporting these values to the user. For example,
/// it's probably better to say "Arti says it's stuck because it
/// can't make connections to the internet" rather than "You are
/// not on the internet."
pub fn blocked(&self) -> Option<Blockage> {
// TODO(nickm): implement this or remove it.
None
}
}
/// A reason why a client believes it is stuck.
#[derive(Clone, Debug, Display)]
#[display(fmt = "{} ({})", "kind", "message")]
pub struct Blockage {
/// Why do we think we're blocked?
kind: BlockageKind,
/// A human-readable message about the blockage.
message: Cow<'static, str>,
}
/// A specific type of blockage that a client believes it is experiencing.
///
/// Used to distinguish among instances of [`Blockage`].
#[derive(Clone, Debug, Display)]
#[non_exhaustive]
pub enum BlockageKind {
/// It looks like we can't make connections to the internet.
#[display(fmt = "Unable to connect to the internet")]
NoInternet,
/// It looks like we can't reach any Tor relays.
#[display(fmt = "Unable to reach Tor")]
CantReachTor,
/// We've been unable to download our directory information for some reason.
#[display(fmt = "Stalled fetching a Tor directory")]
DirectoryStalled,
}
impl fmt::Display for BootstrapStatus {
/// Format this [`BootstrapStatus`].
///
/// Note that the string returned by this function is designed for human
/// readability, not for machine parsing. Other code *should not* depend
/// on particular elements of this string.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let percent = (self.as_frac() * 100.0).round() as u32;
if let Some(problem) = self.blocked() {
write!(f, "Stuck at {}%: {}", percent, problem)
} else {
// TODO(nickm): describe what we're doing.
write!(f, "{}%", percent)
}
}
}
/// Task that runs forever, updating a client's status via the provided
/// `sender`.
///
/// TODO(nickm): Eventually this will use real stream of events to see when we
/// are bootstrapped or not. For now, it just says that we're not-ready until
/// the given Receiver fires.
///
/// TODO(nickm): This should eventually close the stream when the client is
/// dropped.
pub(crate) async fn report_status(
mut sender: postage::watch::Sender<BootstrapStatus>,
ready: oneshot::Receiver<()>,
) {
{
sender.borrow_mut().ready = false;
}
if ready.await.is_ok() {
sender.borrow_mut().ready = true;
}
// wait forever.
future::pending::<()>().await;
}
/// A [`Stream`] of [`BootstrapStatus`] events.
///
/// This stream isn't guaranteed to receive every change in bootstrap status; if
/// changes happen more frequently than the receiver can observe, some of them
/// will be dropped.
//
// Note: We use a wrapper type around watch::Receiver here, in order to hide its
// implementation type. We do that because we might want to change the type in
// the future, and because some of the functionality exposed by Receiver (like
// `borrow()` and the postage::Stream trait) are extraneous to the API we want.
#[derive(Clone)]
pub struct BootstrapEvents {
/// The receiver that implements this stream.
pub(crate) inner: postage::watch::Receiver<BootstrapStatus>,
}
// We can't derive(Debug) since postage::watch::Receiver doesn't implement
// Debug.
impl std::fmt::Debug for BootstrapEvents {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BootstrapEvents").finish_non_exhaustive()
}
}
impl Stream for BootstrapEvents {
type Item = BootstrapStatus;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}