Merge branch 'bootstrap_reporting_api' into 'main'
Implement the basics of a bootstrap-status API. See merge request tpo/core/arti!237
This commit is contained in:
commit
ae9bea5d94
|
@ -108,11 +108,13 @@ version = "0.0.3"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"derive_builder",
|
||||
"derive_more",
|
||||
"directories",
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
"hyper",
|
||||
"pin-project",
|
||||
"postage",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
|
|
|
@ -33,8 +33,10 @@ tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3"}
|
|||
|
||||
humantime-serde = "1"
|
||||
derive_builder = "0.10"
|
||||
derive_more = "0.99"
|
||||
directories = "4"
|
||||
futures = "0.3"
|
||||
postage = { version = "0.4", default-features = false, features = ["futures-traits"] }
|
||||
tracing = "0.1.18"
|
||||
serde = { version = "1.0.103", features = ["derive"] }
|
||||
thiserror = "1"
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
use crate::address::IntoTorAddr;
|
||||
|
||||
use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig};
|
||||
use futures::channel::oneshot;
|
||||
use tor_circmgr::{DirInfo, IsolationToken, StreamIsolationBuilder, TargetPort};
|
||||
use tor_config::MutCfg;
|
||||
use tor_dirmgr::DirEvent;
|
||||
|
@ -22,7 +23,7 @@ use std::net::IpAddr;
|
|||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::{Error, Result};
|
||||
use crate::{status, Error, Result};
|
||||
#[cfg(feature = "async-std")]
|
||||
use tor_rtcompat::async_std::AsyncStdRuntime;
|
||||
#[cfg(feature = "tokio")]
|
||||
|
@ -61,6 +62,13 @@ pub struct TorClient<R: Runtime> {
|
|||
///
|
||||
/// See [`TorClient::reconfigure`] for more information on its use.
|
||||
reconfigure_lock: Arc<Mutex<()>>,
|
||||
|
||||
/// A stream of bootstrap messages that we can clone when a client asks for
|
||||
/// it.
|
||||
///
|
||||
/// (We don't need to observe this stream ourselves, since it drops each
|
||||
/// unobserved status change when the next status change occurs.)
|
||||
status_receiver: status::BootstrapEvents,
|
||||
}
|
||||
|
||||
/// Preferences for how to route a stream over the Tor network.
|
||||
|
@ -227,6 +235,16 @@ impl<R: Runtime> TorClient<R> {
|
|||
}
|
||||
let addr_cfg = config.address_filter.clone();
|
||||
let timeout_cfg = config.stream_timeouts.clone();
|
||||
|
||||
let (status_sender, status_receiver) = postage::watch::channel();
|
||||
let status_receiver = status::BootstrapEvents {
|
||||
inner: status_receiver,
|
||||
};
|
||||
// TODO(nickm): we should use a real set of information sources here.
|
||||
// This is just temporary.
|
||||
let (tor_ready_sender, tor_ready_receiver) = oneshot::channel();
|
||||
runtime.spawn(status::report_status(status_sender, tor_ready_receiver))?;
|
||||
|
||||
let chanmgr = Arc::new(tor_chanmgr::ChanMgr::new(runtime.clone()));
|
||||
let circmgr =
|
||||
tor_circmgr::CircMgr::new(circ_cfg, statemgr.clone(), &runtime, Arc::clone(&chanmgr))?;
|
||||
|
@ -267,6 +285,9 @@ impl<R: Runtime> TorClient<R> {
|
|||
|
||||
let client_isolation = IsolationToken::new();
|
||||
|
||||
// At this point, we're bootstrapped.
|
||||
let _ = tor_ready_sender.send(());
|
||||
|
||||
Ok(TorClient {
|
||||
runtime,
|
||||
client_isolation,
|
||||
|
@ -276,6 +297,7 @@ impl<R: Runtime> TorClient<R> {
|
|||
addrcfg: Arc::new(addr_cfg.into()),
|
||||
timeoutcfg: Arc::new(timeout_cfg.into()),
|
||||
reconfigure_lock: Arc::new(Mutex::new(())),
|
||||
status_receiver,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -505,6 +527,27 @@ impl<R: Runtime> TorClient<R> {
|
|||
|
||||
Ok(circ)
|
||||
}
|
||||
|
||||
/// Return a current [`status::BootstrapStatus`] describing how close this client
|
||||
/// is to being ready for user traffic.
|
||||
pub fn bootstrap_status(&self) -> status::BootstrapStatus {
|
||||
self.status_receiver.inner.borrow().clone()
|
||||
}
|
||||
|
||||
/// Return a stream of [`status::BootstrapStatus`] events that will be updated
|
||||
/// whenever the client's status changes.
|
||||
///
|
||||
/// The receiver might not receive every update sent to this stream, though
|
||||
/// when it does poll the stream it should get the most recent one.
|
||||
//
|
||||
// TODO(nickm): will this also need to implement Send and 'static?
|
||||
//
|
||||
// TODO(nickm): by the time the `TorClient` is visible to the user, this
|
||||
// status is always true. That will change with #293, however, and will
|
||||
// also change as BootstrapStatus becomes more complex with #96.
|
||||
pub fn bootstrap_events(&self) -> status::BootstrapEvents {
|
||||
self.status_receiver.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
|
||||
|
|
|
@ -176,6 +176,7 @@ mod address;
|
|||
mod client;
|
||||
|
||||
pub mod config;
|
||||
pub mod status;
|
||||
|
||||
pub use address::{DangerouslyIntoTorAddr, IntoTorAddr, TorAddr, TorAddrError};
|
||||
pub use client::{ConnectPrefs, TorClient};
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
//! Code to collect and publish information about a client's bootstrapping
|
||||
//! status.
|
||||
|
||||
use std::{borrow::Cow, fmt};
|
||||
|
||||
use derive_more::Display;
|
||||
use futures::{channel::oneshot, future, Stream, StreamExt};
|
||||
|
||||
/// Information about how ready a [`crate::TorClient`] is to handle requests.
|
||||
///
|
||||
/// Note that this status does not change monotonically: a `TorClient` can
|
||||
/// become more _or less_ bootstrapped over time. (For example, a client can
|
||||
/// become less bootstrapped if it loses its internet connectivity, or if its
|
||||
/// directory information expires before it's able to replace it.)
|
||||
//
|
||||
// # Note
|
||||
//
|
||||
// We need to keep this type fairly small, since it will get cloned whenever
|
||||
// it's observed on a stream. If it grows large, we can add an Arc<> around
|
||||
// its data.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct BootstrapStatus {
|
||||
/// A placeholder field: we'll be replacing this as the branch gets support
|
||||
/// for more information sources.
|
||||
ready: bool,
|
||||
}
|
||||
|
||||
impl BootstrapStatus {
|
||||
/// Return a rough fraction (from 0.0 to 1.0) representing how far along
|
||||
/// the client's bootstrapping efforts are.
|
||||
///
|
||||
/// 0 is defined as "just started"; 1 is defined as "ready to use."
|
||||
pub fn as_frac(&self) -> f32 {
|
||||
if self.ready {
|
||||
1.0
|
||||
} else {
|
||||
0.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Return true if the status indicates that the client is ready for
|
||||
/// traffic.
|
||||
///
|
||||
/// For the purposes of this function, the client is "ready for traffic" if,
|
||||
/// as far as we know, we can start acting on a new client request immediately.
|
||||
pub fn ready_for_traffic(&self) -> bool {
|
||||
self.ready
|
||||
}
|
||||
|
||||
/// If the client is unable to make forward progress for some reason, return
|
||||
/// that reason.
|
||||
///
|
||||
/// (Returns None if the client doesn't seem to be stuck.)
|
||||
///
|
||||
/// # Caveats
|
||||
///
|
||||
/// This function provides a "best effort" diagnostic: there
|
||||
/// will always be some blockage types that it can't diagnose
|
||||
/// correctly. It may declare that Arti is stuck for reasons that
|
||||
/// are incorrect; or it may declare that the client is not stuck
|
||||
/// when in fact no progress is being made.
|
||||
///
|
||||
/// Therefore, the caller should always use a certain amount of
|
||||
/// modesty when reporting these values to the user. For example,
|
||||
/// it's probably better to say "Arti says it's stuck because it
|
||||
/// can't make connections to the internet" rather than "You are
|
||||
/// not on the internet."
|
||||
pub fn blocked(&self) -> Option<Blockage> {
|
||||
// TODO(nickm): implement this or remove it.
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// A reason why a client believes it is stuck.
|
||||
#[derive(Clone, Debug, Display)]
|
||||
#[display(fmt = "{} ({})", "kind", "message")]
|
||||
pub struct Blockage {
|
||||
/// Why do we think we're blocked?
|
||||
kind: BlockageKind,
|
||||
/// A human-readable message about the blockage.
|
||||
message: Cow<'static, str>,
|
||||
}
|
||||
|
||||
/// A specific type of blockage that a client believes it is experiencing.
|
||||
///
|
||||
/// Used to distinguish among instances of [`Blockage`].
|
||||
#[derive(Clone, Debug, Display)]
|
||||
#[non_exhaustive]
|
||||
pub enum BlockageKind {
|
||||
/// It looks like we can't make connections to the internet.
|
||||
#[display(fmt = "Unable to connect to the internet")]
|
||||
NoInternet,
|
||||
/// It looks like we can't reach any Tor relays.
|
||||
#[display(fmt = "Unable to reach Tor")]
|
||||
CantReachTor,
|
||||
/// We've been unable to download our directory information for some reason.
|
||||
#[display(fmt = "Stalled fetching a Tor directory")]
|
||||
DirectoryStalled,
|
||||
}
|
||||
|
||||
impl fmt::Display for BootstrapStatus {
|
||||
/// Format this [`BootstrapStatus`].
|
||||
///
|
||||
/// Note that the string returned by this function is designed for human
|
||||
/// readability, not for machine parsing. Other code *should not* depend
|
||||
/// on particular elements of this string.
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let percent = (self.as_frac() * 100.0).round() as u32;
|
||||
if let Some(problem) = self.blocked() {
|
||||
write!(f, "Stuck at {}%: {}", percent, problem)
|
||||
} else {
|
||||
// TODO(nickm): describe what we're doing.
|
||||
write!(f, "{}%", percent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Task that runs forever, updating a client's status via the provided
|
||||
/// `sender`.
|
||||
///
|
||||
/// TODO(nickm): Eventually this will use real stream of events to see when we
|
||||
/// are bootstrapped or not. For now, it just says that we're not-ready until
|
||||
/// the given Receiver fires.
|
||||
///
|
||||
/// TODO(nickm): This should eventually close the stream when the client is
|
||||
/// dropped.
|
||||
pub(crate) async fn report_status(
|
||||
mut sender: postage::watch::Sender<BootstrapStatus>,
|
||||
ready: oneshot::Receiver<()>,
|
||||
) {
|
||||
{
|
||||
sender.borrow_mut().ready = false;
|
||||
}
|
||||
if ready.await.is_ok() {
|
||||
sender.borrow_mut().ready = true;
|
||||
}
|
||||
|
||||
// wait forever.
|
||||
future::pending::<()>().await;
|
||||
}
|
||||
|
||||
/// A [`Stream`] of [`BootstrapStatus`] events.
|
||||
///
|
||||
/// This stream isn't guaranteed to receive every change in bootstrap status; if
|
||||
/// changes happen more frequently than the receiver can observe, some of them
|
||||
/// will be dropped.
|
||||
//
|
||||
// Note: We use a wrapper type around watch::Receiver here, in order to hide its
|
||||
// implementation type. We do that because we might want to change the type in
|
||||
// the future, and because some of the functionality exposed by Receiver (like
|
||||
// `borrow()` and the postage::Stream trait) are extraneous to the API we want.
|
||||
#[derive(Clone)]
|
||||
pub struct BootstrapEvents {
|
||||
/// The receiver that implements this stream.
|
||||
pub(crate) inner: postage::watch::Receiver<BootstrapStatus>,
|
||||
}
|
||||
|
||||
// We can't derive(Debug) since postage::watch::Receiver doesn't implement
|
||||
// Debug.
|
||||
impl std::fmt::Debug for BootstrapEvents {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("BootstrapEvents").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for BootstrapEvents {
|
||||
type Item = BootstrapStatus;
|
||||
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.inner.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue