diff --git a/Cargo.lock b/Cargo.lock index 88e03ae96..d2df1a118 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,11 +108,13 @@ version = "0.0.3" dependencies = [ "anyhow", "derive_builder", + "derive_more", "directories", "futures", "humantime-serde", "hyper", "pin-project", + "postage", "serde", "thiserror", "tokio", diff --git a/crates/arti-client/Cargo.toml b/crates/arti-client/Cargo.toml index 680b7eabe..c9d9f57b0 100644 --- a/crates/arti-client/Cargo.toml +++ b/crates/arti-client/Cargo.toml @@ -33,8 +33,10 @@ tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3"} humantime-serde = "1" derive_builder = "0.10" +derive_more = "0.99" directories = "4" futures = "0.3" +postage = { version = "0.4", default-features = false, features = ["futures-traits"] } tracing = "0.1.18" serde = { version = "1.0.103", features = ["derive"] } thiserror = "1" diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index 7d7df7a1c..ffbbb82a3 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -7,6 +7,7 @@ use crate::address::IntoTorAddr; use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig}; +use futures::channel::oneshot; use tor_circmgr::{DirInfo, IsolationToken, StreamIsolationBuilder, TargetPort}; use tor_config::MutCfg; use tor_dirmgr::DirEvent; @@ -22,7 +23,7 @@ use std::net::IpAddr; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; -use crate::{Error, Result}; +use crate::{status, Error, Result}; #[cfg(feature = "async-std")] use tor_rtcompat::async_std::AsyncStdRuntime; #[cfg(feature = "tokio")] @@ -61,6 +62,13 @@ pub struct TorClient { /// /// See [`TorClient::reconfigure`] for more information on its use. reconfigure_lock: Arc>, + + /// A stream of bootstrap messages that we can clone when a client asks for + /// it. + /// + /// (We don't need to observe this stream ourselves, since it drops each + /// unobserved status change when the next status change occurs.) + status_receiver: status::BootstrapEvents, } /// Preferences for how to route a stream over the Tor network. @@ -227,6 +235,16 @@ impl TorClient { } let addr_cfg = config.address_filter.clone(); let timeout_cfg = config.stream_timeouts.clone(); + + let (status_sender, status_receiver) = postage::watch::channel(); + let status_receiver = status::BootstrapEvents { + inner: status_receiver, + }; + // TODO(nickm): we should use a real set of information sources here. + // This is just temporary. + let (tor_ready_sender, tor_ready_receiver) = oneshot::channel(); + runtime.spawn(status::report_status(status_sender, tor_ready_receiver))?; + let chanmgr = Arc::new(tor_chanmgr::ChanMgr::new(runtime.clone())); let circmgr = tor_circmgr::CircMgr::new(circ_cfg, statemgr.clone(), &runtime, Arc::clone(&chanmgr))?; @@ -267,6 +285,9 @@ impl TorClient { let client_isolation = IsolationToken::new(); + // At this point, we're bootstrapped. + let _ = tor_ready_sender.send(()); + Ok(TorClient { runtime, client_isolation, @@ -276,6 +297,7 @@ impl TorClient { addrcfg: Arc::new(addr_cfg.into()), timeoutcfg: Arc::new(timeout_cfg.into()), reconfigure_lock: Arc::new(Mutex::new(())), + status_receiver, }) } @@ -505,6 +527,27 @@ impl TorClient { Ok(circ) } + + /// Return a current [`status::BootstrapStatus`] describing how close this client + /// is to being ready for user traffic. + pub fn bootstrap_status(&self) -> status::BootstrapStatus { + self.status_receiver.inner.borrow().clone() + } + + /// Return a stream of [`status::BootstrapStatus`] events that will be updated + /// whenever the client's status changes. + /// + /// The receiver might not receive every update sent to this stream, though + /// when it does poll the stream it should get the most recent one. + // + // TODO(nickm): will this also need to implement Send and 'static? + // + // TODO(nickm): by the time the `TorClient` is visible to the user, this + // status is always true. That will change with #293, however, and will + // also change as BootstrapStatus becomes more complex with #96. + pub fn bootstrap_events(&self) -> status::BootstrapEvents { + self.status_receiver.clone() + } } /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update diff --git a/crates/arti-client/src/lib.rs b/crates/arti-client/src/lib.rs index d114818e8..dac05e906 100644 --- a/crates/arti-client/src/lib.rs +++ b/crates/arti-client/src/lib.rs @@ -176,6 +176,7 @@ mod address; mod client; pub mod config; +pub mod status; pub use address::{DangerouslyIntoTorAddr, IntoTorAddr, TorAddr, TorAddrError}; pub use client::{ConnectPrefs, TorClient}; diff --git a/crates/arti-client/src/status.rs b/crates/arti-client/src/status.rs new file mode 100644 index 000000000..4eff6d3a9 --- /dev/null +++ b/crates/arti-client/src/status.rs @@ -0,0 +1,175 @@ +//! Code to collect and publish information about a client's bootstrapping +//! status. + +use std::{borrow::Cow, fmt}; + +use derive_more::Display; +use futures::{channel::oneshot, future, Stream, StreamExt}; + +/// Information about how ready a [`crate::TorClient`] is to handle requests. +/// +/// Note that this status does not change monotonically: a `TorClient` can +/// become more _or less_ bootstrapped over time. (For example, a client can +/// become less bootstrapped if it loses its internet connectivity, or if its +/// directory information expires before it's able to replace it.) +// +// # Note +// +// We need to keep this type fairly small, since it will get cloned whenever +// it's observed on a stream. If it grows large, we can add an Arc<> around +// its data. +#[derive(Debug, Clone, Default)] +pub struct BootstrapStatus { + /// A placeholder field: we'll be replacing this as the branch gets support + /// for more information sources. + ready: bool, +} + +impl BootstrapStatus { + /// Return a rough fraction (from 0.0 to 1.0) representing how far along + /// the client's bootstrapping efforts are. + /// + /// 0 is defined as "just started"; 1 is defined as "ready to use." + pub fn as_frac(&self) -> f32 { + if self.ready { + 1.0 + } else { + 0.0 + } + } + + /// Return true if the status indicates that the client is ready for + /// traffic. + /// + /// For the purposes of this function, the client is "ready for traffic" if, + /// as far as we know, we can start acting on a new client request immediately. + pub fn ready_for_traffic(&self) -> bool { + self.ready + } + + /// If the client is unable to make forward progress for some reason, return + /// that reason. + /// + /// (Returns None if the client doesn't seem to be stuck.) + /// + /// # Caveats + /// + /// This function provides a "best effort" diagnostic: there + /// will always be some blockage types that it can't diagnose + /// correctly. It may declare that Arti is stuck for reasons that + /// are incorrect; or it may declare that the client is not stuck + /// when in fact no progress is being made. + /// + /// Therefore, the caller should always use a certain amount of + /// modesty when reporting these values to the user. For example, + /// it's probably better to say "Arti says it's stuck because it + /// can't make connections to the internet" rather than "You are + /// not on the internet." + pub fn blocked(&self) -> Option { + // TODO(nickm): implement this or remove it. + None + } +} + +/// A reason why a client believes it is stuck. +#[derive(Clone, Debug, Display)] +#[display(fmt = "{} ({})", "kind", "message")] +pub struct Blockage { + /// Why do we think we're blocked? + kind: BlockageKind, + /// A human-readable message about the blockage. + message: Cow<'static, str>, +} + +/// A specific type of blockage that a client believes it is experiencing. +/// +/// Used to distinguish among instances of [`Blockage`]. +#[derive(Clone, Debug, Display)] +#[non_exhaustive] +pub enum BlockageKind { + /// It looks like we can't make connections to the internet. + #[display(fmt = "Unable to connect to the internet")] + NoInternet, + /// It looks like we can't reach any Tor relays. + #[display(fmt = "Unable to reach Tor")] + CantReachTor, + /// We've been unable to download our directory information for some reason. + #[display(fmt = "Stalled fetching a Tor directory")] + DirectoryStalled, +} + +impl fmt::Display for BootstrapStatus { + /// Format this [`BootstrapStatus`]. + /// + /// Note that the string returned by this function is designed for human + /// readability, not for machine parsing. Other code *should not* depend + /// on particular elements of this string. + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let percent = (self.as_frac() * 100.0).round() as u32; + if let Some(problem) = self.blocked() { + write!(f, "Stuck at {}%: {}", percent, problem) + } else { + // TODO(nickm): describe what we're doing. + write!(f, "{}%", percent) + } + } +} + +/// Task that runs forever, updating a client's status via the provided +/// `sender`. +/// +/// TODO(nickm): Eventually this will use real stream of events to see when we +/// are bootstrapped or not. For now, it just says that we're not-ready until +/// the given Receiver fires. +/// +/// TODO(nickm): This should eventually close the stream when the client is +/// dropped. +pub(crate) async fn report_status( + mut sender: postage::watch::Sender, + ready: oneshot::Receiver<()>, +) { + { + sender.borrow_mut().ready = false; + } + if ready.await.is_ok() { + sender.borrow_mut().ready = true; + } + + // wait forever. + future::pending::<()>().await; +} + +/// A [`Stream`] of [`BootstrapStatus`] events. +/// +/// This stream isn't guaranteed to receive every change in bootstrap status; if +/// changes happen more frequently than the receiver can observe, some of them +/// will be dropped. +// +// Note: We use a wrapper type around watch::Receiver here, in order to hide its +// implementation type. We do that because we might want to change the type in +// the future, and because some of the functionality exposed by Receiver (like +// `borrow()` and the postage::Stream trait) are extraneous to the API we want. +#[derive(Clone)] +pub struct BootstrapEvents { + /// The receiver that implements this stream. + pub(crate) inner: postage::watch::Receiver, +} + +// We can't derive(Debug) since postage::watch::Receiver doesn't implement +// Debug. +impl std::fmt::Debug for BootstrapEvents { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BootstrapEvents").finish_non_exhaustive() + } +} + +impl Stream for BootstrapEvents { + type Item = BootstrapStatus; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_next_unpin(cx) + } +}