diff --git a/Cargo.lock b/Cargo.lock index 2dc9a13ce..6ca499168 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3469,6 +3469,7 @@ dependencies = [ "bitflags", "derive_builder", "derive_more", + "futures", "hex", "hex-literal", "rand 0.8.5", diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index 813845c5e..ec93c586c 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -8,9 +8,8 @@ use crate::address::IntoTorAddr; use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig}; use tor_circmgr::isolation::Isolation; -use tor_circmgr::{isolation::StreamIsolationBuilder, DirInfo, IsolationToken, TargetPort}; +use tor_circmgr::{isolation::StreamIsolationBuilder, IsolationToken, TargetPort}; use tor_config::MutCfg; -use tor_dirmgr::DirEvent; use tor_persist::{FsStateMgr, StateMgr}; use tor_proto::circuit::ClientCirc; use tor_proto::stream::{DataStream, IpVersionPreference, StreamParameters}; @@ -18,19 +17,17 @@ use tor_rtcompat::{PreferredRuntime, Runtime, SleepProviderExt}; use educe::Educe; use futures::lock::Mutex as AsyncMutex; -use futures::stream::StreamExt; use futures::task::SpawnExt; use std::convert::TryInto; use std::net::IpAddr; use std::result::Result as StdResult; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, Weak}; -use std::time::Duration; +use std::sync::{Arc, Mutex}; use crate::err::ErrorDetail; use crate::{status, util, TorClientBuilder}; -use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule}; -use tracing::{debug, error, info, warn}; +use tor_rtcompat::scheduler::TaskHandle; +use tracing::{debug, info}; /// An active client session on the Tor network. /// @@ -378,7 +375,16 @@ impl TorClient { .build(runtime.clone(), Arc::clone(&circmgr), dir_cfg) .map_err(crate::Error::into_detail)?; - let mut periodic_task_handles = vec![]; + let mut periodic_task_handles = circmgr + .launch_background_tasks(&runtime, &dirmgr, statemgr.clone()) + .map_err(ErrorDetail::CircMgrSetup)?; + + periodic_task_handles.extend( + chanmgr + .launch_background_tasks(&runtime) + .map_err(ErrorDetail::ChanMgrSetup)? + .into_iter(), + ); let conn_status = chanmgr.bootstrap_events(); let dir_status = dirmgr.bootstrap_events(); @@ -390,59 +396,6 @@ impl TorClient { )) .map_err(|e| ErrorDetail::from_spawn("top-level status reporter", e))?; - let (expiry_sched, expiry_handle) = TaskSchedule::new(runtime.clone()); - periodic_task_handles.push(expiry_handle); - - runtime - .spawn(continually_expire_channels( - expiry_sched, - Arc::downgrade(&chanmgr), - )) - .map_err(|e| ErrorDetail::from_spawn("channel expiration task", e))?; - - // Launch a daemon task to inform the circmgr about new - // network parameters. - runtime - .spawn(keep_circmgr_params_updated( - dirmgr.events(), - Arc::downgrade(&circmgr), - Arc::downgrade(&dirmgr), - )) - .map_err(|e| ErrorDetail::from_spawn("circmgr parameter updater", e))?; - - let (persist_sched, persist_handle) = TaskSchedule::new(runtime.clone()); - periodic_task_handles.push(persist_handle); - - runtime - .spawn(update_persistent_state( - persist_sched, - Arc::downgrade(&circmgr), - statemgr.clone(), - )) - .map_err(|e| ErrorDetail::from_spawn("persistent state updater", e))?; - - let (timeout_sched, timeout_handle) = TaskSchedule::new(runtime.clone()); - periodic_task_handles.push(timeout_handle); - - runtime - .spawn(continually_launch_timeout_testing_circuits( - timeout_sched, - Arc::downgrade(&circmgr), - Arc::downgrade(&dirmgr), - )) - .map_err(|e| ErrorDetail::from_spawn("timeout-probe circuit launcher", e))?; - - let (preempt_sched, preempt_handle) = TaskSchedule::new(runtime.clone()); - periodic_task_handles.push(preempt_handle); - - runtime - .spawn(continually_preemptively_build_circuits( - preempt_sched, - Arc::downgrade(&circmgr), - Arc::downgrade(&dirmgr), - )) - .map_err(|e| ErrorDetail::from_spawn("preemptive circuit launcher", e))?; - let client_isolation = IsolationToken::new(); Ok(TorClient { @@ -940,206 +893,6 @@ where ErrorDetail::from(err).into() } -/// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update -/// `circmgr` with the consensus parameters from `dirmgr`. -/// -/// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes -/// dangling. -/// -/// This is a daemon task: it runs indefinitely in the background. -async fn keep_circmgr_params_updated( - mut events: impl futures::Stream + Unpin, - circmgr: Weak>, - dirmgr: Weak, -) { - use DirEvent::*; - while let Some(event) = events.next().await { - match event { - NewConsensus => { - if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { - let netdir = dm - .latest_netdir() - .expect("got new consensus event, without a netdir?"); - cm.update_network_parameters(netdir.params()); - cm.update_network(&netdir); - } else { - debug!("Circmgr or dirmgr has disappeared; task exiting."); - break; - } - } - NewDescriptors => { - if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { - let netdir = dm - .latest_netdir() - .expect("got new descriptors event, without a netdir?"); - cm.update_network(&netdir); - } else { - debug!("Circmgr or dirmgr has disappeared; task exiting."); - break; - } - } - _ => { - // Nothing we recognize. - } - } - } -} - -/// Run forever, periodically telling `circmgr` to update its persistent -/// state. -/// -/// Exit when we notice that `circmgr` has been dropped. -/// -/// This is a daemon task: it runs indefinitely in the background. -async fn update_persistent_state( - mut sched: TaskSchedule, - circmgr: Weak>, - statemgr: FsStateMgr, -) { - // TODO: Consider moving this function into tor-circmgr after we have more - // experience with the state system. - - while sched.next().await.is_some() { - if let Some(circmgr) = Weak::upgrade(&circmgr) { - use tor_persist::LockStatus::*; - - match statemgr.try_lock() { - Err(e) => { - error!("Problem with state lock file: {}", e); - break; - } - Ok(NewlyAcquired) => { - info!("We now own the lock on our state files."); - if let Err(e) = circmgr.upgrade_to_owned_persistent_state() { - error!("Unable to upgrade to owned state files: {}", e); - break; - } - } - Ok(AlreadyHeld) => { - if let Err(e) = circmgr.store_persistent_state() { - error!("Unable to flush circmgr state: {}", e); - break; - } - } - Ok(NoLock) => { - if let Err(e) = circmgr.reload_persistent_state() { - error!("Unable to reload circmgr state: {}", e); - break; - } - } - } - } else { - debug!("Circmgr has disappeared; task exiting."); - return; - } - // TODO(nickm): This delay is probably too small. - // - // Also, we probably don't even want a fixed delay here. Instead, - // we should be updating more frequently when the data is volatile - // or has important info to save, and not at all when there are no - // changes. - sched.fire_in(Duration::from_secs(60)); - } - - error!("State update task is exiting prematurely."); -} - -/// Run indefinitely, launching circuits as needed to get a good -/// estimate for our circuit build timeouts. -/// -/// Exit when we notice that `circmgr` or `dirmgr` has been dropped. -/// -/// This is a daemon task: it runs indefinitely in the background. -/// -/// # Note -/// -/// I'd prefer this to be handled entirely within the tor-circmgr crate; -/// see [`tor_circmgr::CircMgr::launch_timeout_testing_circuit_if_appropriate`] -/// for more information. -async fn continually_launch_timeout_testing_circuits( - mut sched: TaskSchedule, - circmgr: Weak>, - dirmgr: Weak, -) { - while sched.next().await.is_some() { - if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { - if let Some(netdir) = dm.latest_netdir() { - if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) { - warn!("Problem launching a timeout testing circuit: {}", e); - } - let delay = netdir - .params() - .cbt_testing_delay - .try_into() - .expect("Out-of-bounds value from BoundedInt32"); - - drop((cm, dm)); - sched.fire_in(delay); - } else { - // TODO(eta): ideally, this should wait until we successfully bootstrap using - // the bootstrap status API - sched.fire_in(Duration::from_secs(10)); - } - } else { - return; - } - } -} - -/// Run indefinitely, launching circuits where the preemptive circuit -/// predictor thinks it'd be a good idea to have them. -/// -/// Exit when we notice that `circmgr` or `dirmgr` has been dropped. -/// -/// This is a daemon task: it runs indefinitely in the background. -/// -/// # Note -/// -/// This would be better handled entirely within `tor-circmgr`, like -/// other daemon tasks. -async fn continually_preemptively_build_circuits( - mut sched: TaskSchedule, - circmgr: Weak>, - dirmgr: Weak, -) { - while sched.next().await.is_some() { - if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { - if let Some(netdir) = dm.latest_netdir() { - cm.launch_circuits_preemptively(DirInfo::Directory(&netdir)) - .await; - sched.fire_in(Duration::from_secs(10)); - } else { - // TODO(eta): ideally, this should wait until we successfully bootstrap using - // the bootstrap status API - sched.fire_in(Duration::from_secs(10)); - } - } else { - return; - } - } -} -/// Periodically expire any channels that have been unused beyond -/// the maximum duration allowed. -/// -/// Exist when we find that `chanmgr` is dropped -/// -/// This is a daemon task that runs indefinitely in the background -async fn continually_expire_channels( - mut sched: TaskSchedule, - chanmgr: Weak>, -) { - while sched.next().await.is_some() { - let delay = if let Some(cm) = Weak::upgrade(&chanmgr) { - cm.expire_channels() - } else { - // channel manager is closed. - return; - }; - // This will sometimes be an underestimate, but it's no big deal; we just sleep some more. - sched.fire_in(Duration::from_secs(delay.as_secs())); - } -} - #[cfg(test)] mod test { #![allow(clippy::unwrap_used)] diff --git a/crates/arti-client/src/err.rs b/crates/arti-client/src/err.rs index a72a72c44..94a32f5e7 100644 --- a/crates/arti-client/src/err.rs +++ b/crates/arti-client/src/err.rs @@ -108,6 +108,10 @@ pub_if_error_detail! { #[derive(Error, Clone, Debug)] #[non_exhaustive] enum ErrorDetail { + /// Error setting up the channel manager + #[error("Error setting up the channel manager {0}")] + ChanMgrSetup(#[source] tor_chanmgr::Error), // TODO should this be its own type? + /// Error setting up the circuit manager #[error("Error setting up the circuit manager {0}")] CircMgrSetup(#[source] tor_circmgr::Error), // TODO should this be its own type? @@ -253,6 +257,7 @@ impl tor_error::HasKind for ErrorDetail { E::OnionAddressNotSupported => EK::NotImplemented, E::Address(_) | E::InvalidHostname => EK::InvalidStreamTarget, E::LocalAddress => EK::ForbiddenStreamTarget, + E::ChanMgrSetup(e) => e.kind(), } } } diff --git a/crates/tor-chanmgr/src/lib.rs b/crates/tor-chanmgr/src/lib.rs index 576ec2694..a61ceab6a 100644 --- a/crates/tor-chanmgr/src/lib.rs +++ b/crates/tor-chanmgr/src/lib.rs @@ -52,6 +52,9 @@ mod mgr; #[cfg(test)] mod testing; +use futures::task::SpawnExt; +use futures::StreamExt; +use std::sync::{Arc, Weak}; use std::time::Duration; use tor_linkspec::{ChanTarget, OwnedChanTarget}; use tor_proto::channel::Channel; @@ -64,6 +67,7 @@ use tor_rtcompat::Runtime; pub type Result = std::result::Result; pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents}; +use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule}; /// A Type that remembers a set of live channels, and launches new /// ones on request. @@ -80,6 +84,10 @@ pub struct ChanMgr { impl ChanMgr { /// Construct a new channel manager. + /// + /// # Usage note + /// + /// For the manager to work properly, you will need to call `ChanMgr::launch_background_tasks`. pub fn new(runtime: R) -> Self { let (sender, receiver) = event::channel(); let builder = builder::ChanBuilder::new(runtime, sender); @@ -90,6 +98,20 @@ impl ChanMgr { } } + /// Launch the periodic daemon task required by the manager to function properly. + /// + /// Returns a [`TaskHandle`] that can be used to manage the daemon task. + pub fn launch_background_tasks(self: &Arc, runtime: &R) -> Result> { + let (sched, handle) = TaskSchedule::new(runtime.clone()); + runtime + .spawn(Self::continually_expire_channels( + sched, + Arc::downgrade(self), + )) + .map_err(|e| Error::from_spawn("channel expiration task", e))?; + Ok(vec![handle]) + } + /// Try to get a suitable channel to the provided `target`, /// launching one if one does not exist. /// @@ -122,4 +144,23 @@ impl ChanMgr { pub fn expire_channels(&self) -> Duration { self.mgr.expire_channels() } + + /// Periodically expire any channels that have been unused beyond + /// the maximum duration allowed. + /// + /// Exist when we find that `chanmgr` is dropped + /// + /// This is a daemon task that runs indefinitely in the background + async fn continually_expire_channels(mut sched: TaskSchedule, chanmgr: Weak) { + while sched.next().await.is_some() { + let delay = if let Some(cm) = Weak::upgrade(&chanmgr) { + cm.expire_channels() + } else { + // channel manager is closed. + return; + }; + // This will sometimes be an underestimate, but it's no big deal; we just sleep some more. + sched.fire_in(Duration::from_secs(delay.as_secs())); + } + } } diff --git a/crates/tor-circmgr/src/lib.rs b/crates/tor-circmgr/src/lib.rs index ceb60a7da..26cd7a866 100644 --- a/crates/tor-circmgr/src/lib.rs +++ b/crates/tor-circmgr/src/lib.rs @@ -51,13 +51,15 @@ #![deny(clippy::unwrap_used)] use tor_chanmgr::ChanMgr; -use tor_netdir::{fallback::FallbackDir, NetDir}; +use tor_netdir::{fallback::FallbackDir, DirEvent, NetDir, NetDirProvider}; use tor_proto::circuit::{CircParameters, ClientCirc, UniqId}; use tor_rtcompat::Runtime; +use futures::task::SpawnExt; +use futures::StreamExt; use std::convert::TryInto; -use std::sync::{Arc, Mutex}; -use std::time::Instant; +use std::sync::{Arc, Mutex, Weak}; +use std::time::{Duration, Instant}; use tracing::{debug, error, info, warn}; pub mod build; @@ -85,6 +87,8 @@ use crate::preemptive::PreemptiveCircuitPredictor; use usage::TargetCircUsage; pub use tor_guardmgr::{ExternalFailure, GuardId}; +use tor_persist::{FsStateMgr, StateMgr}; +use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule}; /// A Result type as returned from this crate. pub type Result = std::result::Result; @@ -162,6 +166,10 @@ pub struct CircMgr { impl CircMgr { /// Construct a new circuit manager. + /// + /// # Usage note + /// + /// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`. pub fn new( config: &CFG, storage: SM, @@ -196,6 +204,66 @@ impl CircMgr { Ok(circmgr) } + /// Launch the periodic daemon tasks required by the manager to function properly. + /// + /// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks. + // + // NOTE(eta): The ?Sized on D is so we can pass a trait object in. + pub fn launch_background_tasks( + self: &Arc, + runtime: &R, + dir_provider: &Arc, + state_mgr: FsStateMgr, + ) -> Result> + where + D: NetDirProvider + Send + Sync + 'static + ?Sized, + { + let mut ret = vec![]; + + runtime + .spawn(Self::keep_circmgr_params_updated( + dir_provider.events(), + Arc::downgrade(self), + Arc::downgrade(dir_provider), + )) + .map_err(|e| Error::from_spawn("circmgr parameter updater", e))?; + + let (sched, handle) = TaskSchedule::new(runtime.clone()); + ret.push(handle); + + runtime + .spawn(Self::update_persistent_state( + sched, + Arc::downgrade(self), + state_mgr, + )) + .map_err(|e| Error::from_spawn("persistent state updater", e))?; + + let (sched, handle) = TaskSchedule::new(runtime.clone()); + ret.push(handle); + + runtime + .spawn(Self::continually_launch_timeout_testing_circuits( + sched, + Arc::downgrade(self), + Arc::downgrade(dir_provider), + )) + .map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?; + + let (sched, handle) = TaskSchedule::new(runtime.clone()); + ret.push(handle); + + runtime + .spawn(Self::continually_preemptively_build_circuits( + sched, + Arc::downgrade(self), + Arc::downgrade(dir_provider), + )) + .map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?; + + Ok(ret) + } + /// Try to change our configuration settings to `new_config`. /// /// The actual behavior here will depend on the value of `how`. @@ -432,6 +500,186 @@ impl CircMgr { Ok(()) } + /// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update + /// `circmgr` with the consensus parameters from `dirmgr`. + /// + /// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes + /// dangling. + /// + /// This is a daemon task: it runs indefinitely in the background. + async fn keep_circmgr_params_updated( + mut events: impl futures::Stream + Unpin, + circmgr: Weak, + dirmgr: Weak, + ) where + D: NetDirProvider + Send + Sync + 'static + ?Sized, + { + use DirEvent::*; + while let Some(event) = events.next().await { + match event { + NewConsensus => { + if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) + { + let netdir = dm + .latest_netdir() + .expect("got new consensus event, without a netdir?"); + cm.update_network_parameters(netdir.params()); + cm.update_network(&netdir); + } else { + debug!("Circmgr or dirmgr has disappeared; task exiting."); + break; + } + } + NewDescriptors => { + if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) + { + let netdir = dm + .latest_netdir() + .expect("got new descriptors event, without a netdir?"); + cm.update_network(&netdir); + } else { + debug!("Circmgr or dirmgr has disappeared; task exiting."); + break; + } + } + _ => { + // Nothing we recognize. + } + } + } + } + + /// Run indefinitely, launching circuits as needed to get a good + /// estimate for our circuit build timeouts. + /// + /// Exit when we notice that `circmgr` or `dirmgr` has been dropped. + /// + /// This is a daemon task: it runs indefinitely in the background. + async fn continually_launch_timeout_testing_circuits( + mut sched: TaskSchedule, + circmgr: Weak, + dirmgr: Weak, + ) where + D: NetDirProvider + Send + Sync + 'static + ?Sized, + { + while sched.next().await.is_some() { + if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { + if let Some(netdir) = dm.latest_netdir() { + if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) { + warn!("Problem launching a timeout testing circuit: {}", e); + } + let delay = netdir + .params() + .cbt_testing_delay + .try_into() + .expect("Out-of-bounds value from BoundedInt32"); + + drop((cm, dm)); + sched.fire_in(delay); + } else { + // wait for the provider to announce some event, which will probably be + // NewConsensus; this is therefore a decent yardstick for rechecking + let _ = dm.events().next().await; + sched.fire(); + } + } else { + return; + } + } + } + + /// Run forever, periodically telling `circmgr` to update its persistent + /// state. + /// + /// Exit when we notice that `circmgr` has been dropped. + /// + /// This is a daemon task: it runs indefinitely in the background. + async fn update_persistent_state( + mut sched: TaskSchedule, + circmgr: Weak, + statemgr: FsStateMgr, + ) { + while sched.next().await.is_some() { + if let Some(circmgr) = Weak::upgrade(&circmgr) { + use tor_persist::LockStatus::*; + + match statemgr.try_lock() { + Err(e) => { + error!("Problem with state lock file: {}", e); + break; + } + Ok(NewlyAcquired) => { + info!("We now own the lock on our state files."); + if let Err(e) = circmgr.upgrade_to_owned_persistent_state() { + error!("Unable to upgrade to owned state files: {}", e); + break; + } + } + Ok(AlreadyHeld) => { + if let Err(e) = circmgr.store_persistent_state() { + error!("Unable to flush circmgr state: {}", e); + break; + } + } + Ok(NoLock) => { + if let Err(e) = circmgr.reload_persistent_state() { + error!("Unable to reload circmgr state: {}", e); + break; + } + } + } + } else { + debug!("Circmgr has disappeared; task exiting."); + return; + } + // TODO(nickm): This delay is probably too small. + // + // Also, we probably don't even want a fixed delay here. Instead, + // we should be updating more frequently when the data is volatile + // or has important info to save, and not at all when there are no + // changes. + sched.fire_in(Duration::from_secs(60)); + } + + debug!("State update task exiting (potentially due to handle drop)."); + } + + /// Run indefinitely, launching circuits where the preemptive circuit + /// predictor thinks it'd be a good idea to have them. + /// + /// Exit when we notice that `circmgr` or `dirmgr` has been dropped. + /// + /// This is a daemon task: it runs indefinitely in the background. + /// + /// # Note + /// + /// This would be better handled entirely within `tor-circmgr`, like + /// other daemon tasks. + async fn continually_preemptively_build_circuits( + mut sched: TaskSchedule, + circmgr: Weak, + dirmgr: Weak, + ) where + D: NetDirProvider + Send + Sync + 'static + ?Sized, + { + while sched.next().await.is_some() { + if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { + if let Some(netdir) = dm.latest_netdir() { + cm.launch_circuits_preemptively(DirInfo::Directory(&netdir)) + .await; + sched.fire_in(Duration::from_secs(10)); + } else { + // wait for the provider to announce some event, which will probably be + // NewConsensus; this is therefore a decent yardstick for rechecking + let _ = dm.events().next().await; + sched.fire(); + } + } else { + return; + } + } + } + /// Record that a failure occurred on a circuit with a given guard, in a way /// that makes us unwilling to use that guard for future circuits. pub fn note_external_failure(&self, id: &GuardId, external_failure: ExternalFailure) { diff --git a/crates/tor-dirmgr/src/event.rs b/crates/tor-dirmgr/src/event.rs index b68f92a9a..dda2e5836 100644 --- a/crates/tor-dirmgr/src/event.rs +++ b/crates/tor-dirmgr/src/event.rs @@ -20,28 +20,9 @@ use educe::Educe; use futures::{stream::Stream, Future, StreamExt}; use time::OffsetDateTime; use tor_basic_utils::skip_fmt; +use tor_netdir::DirEvent; use tor_netdoc::doc::netstatus; -/// An event that a DirMgr can broadcast to indicate that a change in -/// the status of its directory. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[non_exhaustive] -pub enum DirEvent { - /// A new consensus has been received, and has enough information to be - /// used. - /// - /// This event is also broadcast when a new set of consensus parameters is - /// available, even if that set of parameters comes from a configuration - /// change rather than from the latest consensus. - NewConsensus, - - /// New descriptors have been received for the current consensus. - /// - /// (This event is _not_ broadcast when receiving new descriptors for a - /// consensus which is not yet ready to replace the current consensus.) - NewDescriptors, -} - /// A trait to indicate something that can be published with [`FlagPublisher`]. /// /// Since the implementation of `FlagPublisher` requires that its events be @@ -69,6 +50,8 @@ impl FlagEvent for DirEvent { match self { DirEvent::NewConsensus => 0, DirEvent::NewDescriptors => 1, + // HACK(eta): This is an unfortunate consequence of marking DirEvent #[non_exhaustive]. + _ => panic!("DirEvent updated without updating its FlagEvent impl"), } } fn from_index(flag: u16) -> Option { diff --git a/crates/tor-dirmgr/src/lib.rs b/crates/tor-dirmgr/src/lib.rs index 15f12cbb5..707d6b3b6 100644 --- a/crates/tor-dirmgr/src/lib.rs +++ b/crates/tor-dirmgr/src/lib.rs @@ -78,7 +78,7 @@ use crate::storage::DynStore; use postage::watch; pub use retry::DownloadSchedule; use tor_circmgr::CircMgr; -use tor_netdir::NetDir; +use tor_netdir::{DirEvent, NetDir, NetDirProvider}; use tor_netdoc::doc::netstatus::ConsensusFlavor; use async_trait::async_trait; @@ -99,7 +99,7 @@ pub use config::{ }; pub use docid::DocId; pub use err::Error; -pub use event::{DirBootstrapEvents, DirBootstrapStatus, DirEvent, DirStatus}; +pub use event::{DirBootstrapEvents, DirBootstrapStatus, DirStatus}; pub use storage::DocumentText; pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder}; @@ -108,18 +108,7 @@ pub type Result = std::result::Result; /// Trait for DirMgr implementations #[async_trait] -pub trait DirProvider { - /// Return a handle to our latest directory, if we have one. - fn latest_netdir(&self) -> Option>; - - /// Return a new asynchronous stream that will receive notification - /// whenever the consensus has changed. - /// - /// Multiple events may be batched up into a single item: each time - /// this stream yields an event, all you can assume is that the event has - /// occurred at least once. - fn events(&self) -> BoxStream<'static, DirEvent>; - +pub trait DirProvider: NetDirProvider { /// Try to change our configuration to `new_config`. /// /// Actual behavior will depend on the value of `how`. @@ -140,8 +129,9 @@ pub trait DirProvider { fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>; } -#[async_trait] -impl DirProvider for Arc> { +// NOTE(eta): We can't implement this for Arc> due to trait coherence rules, so instead +// there's a blanket impl for Arc in tor-netdir. +impl NetDirProvider for DirMgr { fn latest_netdir(&self) -> Option> { self.opt_netdir() } @@ -149,7 +139,10 @@ impl DirProvider for Arc> { fn events(&self) -> BoxStream<'static, DirEvent> { Box::pin(self.events.subscribe()) } +} +#[async_trait] +impl DirProvider for Arc> { fn reconfigure( &self, new_config: &DirMgrConfig, diff --git a/crates/tor-netdir/Cargo.toml b/crates/tor-netdir/Cargo.toml index 4ee908512..2be3e9335 100644 --- a/crates/tor-netdir/Cargo.toml +++ b/crates/tor-netdir/Cargo.toml @@ -42,6 +42,7 @@ serde = { version = "1.0.103", features = ["derive"] } signature = "1" thiserror = "1" tracing = "0.1.18" +futures = "0.3.14" [dev-dependencies] hex = "0.4" diff --git a/crates/tor-netdir/src/lib.rs b/crates/tor-netdir/src/lib.rs index 6b8ce8a4d..2db0a5982 100644 --- a/crates/tor-netdir/src/lib.rs +++ b/crates/tor-netdir/src/lib.rs @@ -72,9 +72,11 @@ use tor_netdoc::doc::microdesc::{MdDigest, Microdesc}; use tor_netdoc::doc::netstatus::{self, MdConsensus, RouterStatus}; use tor_netdoc::types::policy::PortPolicy; +use futures::stream::BoxStream; use serde::Deserialize; use std::collections::HashMap; use std::net::IpAddr; +use std::ops::Deref; use std::sync::Arc; use tracing::warn; @@ -253,6 +255,54 @@ pub struct NetDir { weights: weight::WeightSet, } +/// An event that a [`NetDirProvider`] can broadcast to indicate that a change in +/// the status of its directory. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum DirEvent { + /// A new consensus has been received, and has enough information to be + /// used. + /// + /// This event is also broadcast when a new set of consensus parameters is + /// available, even if that set of parameters comes from a configuration + /// change rather than from the latest consensus. + NewConsensus, + + /// New descriptors have been received for the current consensus. + /// + /// (This event is _not_ broadcast when receiving new descriptors for a + /// consensus which is not yet ready to replace the current consensus.) + NewDescriptors, +} + +/// An object that can provide [`NetDir`]s, as well as inform consumers when +/// they might have changed. +pub trait NetDirProvider { + /// Return a handle to our latest directory, if we have one. + fn latest_netdir(&self) -> Option>; + + /// Return a new asynchronous stream that will receive notification + /// whenever the consensus has changed. + /// + /// Multiple events may be batched up into a single item: each time + /// this stream yields an event, all you can assume is that the event has + /// occurred at least once. + fn events(&self) -> BoxStream<'static, DirEvent>; +} + +impl NetDirProvider for Arc +where + T: NetDirProvider, +{ + fn latest_netdir(&self) -> Option> { + self.deref().latest_netdir() + } + + fn events(&self) -> BoxStream<'static, DirEvent> { + self.deref().events() + } +} + /// A partially build NetDir -- it can't be unwrapped until it has /// enough information to build safe paths. #[derive(Debug, Clone)] diff --git a/crates/tor-rtcompat/src/scheduler.rs b/crates/tor-rtcompat/src/scheduler.rs index 5512b4ee3..686b4f47a 100644 --- a/crates/tor-rtcompat/src/scheduler.rs +++ b/crates/tor-rtcompat/src/scheduler.rs @@ -69,6 +69,12 @@ impl TaskSchedule { self.instant_fire = false; self.sleep = Some(Box::pin(self.rt.sleep(dur))); } + + /// Trigger the schedule instantly. + pub fn fire(&mut self) { + self.instant_fire = true; + self.sleep = None; + } } impl TaskHandle { diff --git a/doc/semver_status.md b/doc/semver_status.md index 744ce6b18..42603e32e 100644 --- a/doc/semver_status.md +++ b/doc/semver_status.md @@ -63,6 +63,7 @@ tor-dirmgr: new-api: DirMgrConfig object now has accessors. DirMgrCfg: totally changed, builder abolished. Authority, NetworkConfig: removed several accessors for these config elements. + api-break: DirEvent is now in tor-netdir instead tor-circmgr: CircMgrCfg: totally changed, builder abolished.