Make daemon tasks self-contained; introduce NetDirProvider

The various background daemon tasks that `arti-client` used to spawn are
now handled inside their respective crates instead, with functions
provided to spawn them that return `TaskHandle`s.

This required introducing a new trait, `NetDirProvider`, which steals
some functionality from the `DirProvider` trait to enable `tor-circmgr`
to depend on it (`tor-circmgr` is a dependency of `tor-dirmgr`, so it
can't depend on `DirProvider` directly).

While we're at it, we also make some of the tasks wait for events from
the `NetDirProvider` instead of sleeping, slightly increasing
efficiency.
This commit is contained in:
eta 2022-03-28 13:54:36 +01:00
parent fd081742fa
commit ac64bdea27
11 changed files with 382 additions and 300 deletions

1
Cargo.lock generated
View File

@ -3469,6 +3469,7 @@ dependencies = [
"bitflags",
"derive_builder",
"derive_more",
"futures",
"hex",
"hex-literal",
"rand 0.8.5",

View File

@ -8,9 +8,8 @@ use crate::address::IntoTorAddr;
use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig};
use tor_circmgr::isolation::Isolation;
use tor_circmgr::{isolation::StreamIsolationBuilder, DirInfo, IsolationToken, TargetPort};
use tor_circmgr::{isolation::StreamIsolationBuilder, IsolationToken, TargetPort};
use tor_config::MutCfg;
use tor_dirmgr::DirEvent;
use tor_persist::{FsStateMgr, StateMgr};
use tor_proto::circuit::ClientCirc;
use tor_proto::stream::{DataStream, IpVersionPreference, StreamParameters};
@ -18,19 +17,17 @@ use tor_rtcompat::{PreferredRuntime, Runtime, SleepProviderExt};
use educe::Educe;
use futures::lock::Mutex as AsyncMutex;
use futures::stream::StreamExt;
use futures::task::SpawnExt;
use std::convert::TryInto;
use std::net::IpAddr;
use std::result::Result as StdResult;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use std::sync::{Arc, Mutex};
use crate::err::ErrorDetail;
use crate::{status, util, TorClientBuilder};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
use tracing::{debug, error, info, warn};
use tor_rtcompat::scheduler::TaskHandle;
use tracing::{debug, info};
/// An active client session on the Tor network.
///
@ -378,7 +375,16 @@ impl<R: Runtime> TorClient<R> {
.build(runtime.clone(), Arc::clone(&circmgr), dir_cfg)
.map_err(crate::Error::into_detail)?;
let mut periodic_task_handles = vec![];
let mut periodic_task_handles = circmgr
.launch_background_tasks(&runtime, &dirmgr, statemgr.clone())
.map_err(ErrorDetail::CircMgrSetup)?;
periodic_task_handles.extend(
chanmgr
.launch_background_tasks(&runtime)
.map_err(ErrorDetail::ChanMgrSetup)?
.into_iter(),
);
let conn_status = chanmgr.bootstrap_events();
let dir_status = dirmgr.bootstrap_events();
@ -390,59 +396,6 @@ impl<R: Runtime> TorClient<R> {
))
.map_err(|e| ErrorDetail::from_spawn("top-level status reporter", e))?;
let (expiry_sched, expiry_handle) = TaskSchedule::new(runtime.clone());
periodic_task_handles.push(expiry_handle);
runtime
.spawn(continually_expire_channels(
expiry_sched,
Arc::downgrade(&chanmgr),
))
.map_err(|e| ErrorDetail::from_spawn("channel expiration task", e))?;
// Launch a daemon task to inform the circmgr about new
// network parameters.
runtime
.spawn(keep_circmgr_params_updated(
dirmgr.events(),
Arc::downgrade(&circmgr),
Arc::downgrade(&dirmgr),
))
.map_err(|e| ErrorDetail::from_spawn("circmgr parameter updater", e))?;
let (persist_sched, persist_handle) = TaskSchedule::new(runtime.clone());
periodic_task_handles.push(persist_handle);
runtime
.spawn(update_persistent_state(
persist_sched,
Arc::downgrade(&circmgr),
statemgr.clone(),
))
.map_err(|e| ErrorDetail::from_spawn("persistent state updater", e))?;
let (timeout_sched, timeout_handle) = TaskSchedule::new(runtime.clone());
periodic_task_handles.push(timeout_handle);
runtime
.spawn(continually_launch_timeout_testing_circuits(
timeout_sched,
Arc::downgrade(&circmgr),
Arc::downgrade(&dirmgr),
))
.map_err(|e| ErrorDetail::from_spawn("timeout-probe circuit launcher", e))?;
let (preempt_sched, preempt_handle) = TaskSchedule::new(runtime.clone());
periodic_task_handles.push(preempt_handle);
runtime
.spawn(continually_preemptively_build_circuits(
preempt_sched,
Arc::downgrade(&circmgr),
Arc::downgrade(&dirmgr),
))
.map_err(|e| ErrorDetail::from_spawn("preemptive circuit launcher", e))?;
let client_isolation = IsolationToken::new();
Ok(TorClient {
@ -940,206 +893,6 @@ where
ErrorDetail::from(err).into()
}
/// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
/// `circmgr` with the consensus parameters from `dirmgr`.
///
/// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
/// dangling.
///
/// This is a daemon task: it runs indefinitely in the background.
async fn keep_circmgr_params_updated<R: Runtime>(
mut events: impl futures::Stream<Item = DirEvent> + Unpin,
circmgr: Weak<tor_circmgr::CircMgr<R>>,
dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
) {
use DirEvent::*;
while let Some(event) = events.next().await {
match event {
NewConsensus => {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
let netdir = dm
.latest_netdir()
.expect("got new consensus event, without a netdir?");
cm.update_network_parameters(netdir.params());
cm.update_network(&netdir);
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
}
}
NewDescriptors => {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
let netdir = dm
.latest_netdir()
.expect("got new descriptors event, without a netdir?");
cm.update_network(&netdir);
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
}
}
_ => {
// Nothing we recognize.
}
}
}
}
/// Run forever, periodically telling `circmgr` to update its persistent
/// state.
///
/// Exit when we notice that `circmgr` has been dropped.
///
/// This is a daemon task: it runs indefinitely in the background.
async fn update_persistent_state<R: Runtime>(
mut sched: TaskSchedule<R>,
circmgr: Weak<tor_circmgr::CircMgr<R>>,
statemgr: FsStateMgr,
) {
// TODO: Consider moving this function into tor-circmgr after we have more
// experience with the state system.
while sched.next().await.is_some() {
if let Some(circmgr) = Weak::upgrade(&circmgr) {
use tor_persist::LockStatus::*;
match statemgr.try_lock() {
Err(e) => {
error!("Problem with state lock file: {}", e);
break;
}
Ok(NewlyAcquired) => {
info!("We now own the lock on our state files.");
if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
error!("Unable to upgrade to owned state files: {}", e);
break;
}
}
Ok(AlreadyHeld) => {
if let Err(e) = circmgr.store_persistent_state() {
error!("Unable to flush circmgr state: {}", e);
break;
}
}
Ok(NoLock) => {
if let Err(e) = circmgr.reload_persistent_state() {
error!("Unable to reload circmgr state: {}", e);
break;
}
}
}
} else {
debug!("Circmgr has disappeared; task exiting.");
return;
}
// TODO(nickm): This delay is probably too small.
//
// Also, we probably don't even want a fixed delay here. Instead,
// we should be updating more frequently when the data is volatile
// or has important info to save, and not at all when there are no
// changes.
sched.fire_in(Duration::from_secs(60));
}
error!("State update task is exiting prematurely.");
}
/// Run indefinitely, launching circuits as needed to get a good
/// estimate for our circuit build timeouts.
///
/// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
///
/// This is a daemon task: it runs indefinitely in the background.
///
/// # Note
///
/// I'd prefer this to be handled entirely within the tor-circmgr crate;
/// see [`tor_circmgr::CircMgr::launch_timeout_testing_circuit_if_appropriate`]
/// for more information.
async fn continually_launch_timeout_testing_circuits<R: Runtime>(
mut sched: TaskSchedule<R>,
circmgr: Weak<tor_circmgr::CircMgr<R>>,
dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
) {
while sched.next().await.is_some() {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
if let Some(netdir) = dm.latest_netdir() {
if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
warn!("Problem launching a timeout testing circuit: {}", e);
}
let delay = netdir
.params()
.cbt_testing_delay
.try_into()
.expect("Out-of-bounds value from BoundedInt32");
drop((cm, dm));
sched.fire_in(delay);
} else {
// TODO(eta): ideally, this should wait until we successfully bootstrap using
// the bootstrap status API
sched.fire_in(Duration::from_secs(10));
}
} else {
return;
}
}
}
/// Run indefinitely, launching circuits where the preemptive circuit
/// predictor thinks it'd be a good idea to have them.
///
/// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
///
/// This is a daemon task: it runs indefinitely in the background.
///
/// # Note
///
/// This would be better handled entirely within `tor-circmgr`, like
/// other daemon tasks.
async fn continually_preemptively_build_circuits<R: Runtime>(
mut sched: TaskSchedule<R>,
circmgr: Weak<tor_circmgr::CircMgr<R>>,
dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
) {
while sched.next().await.is_some() {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
if let Some(netdir) = dm.latest_netdir() {
cm.launch_circuits_preemptively(DirInfo::Directory(&netdir))
.await;
sched.fire_in(Duration::from_secs(10));
} else {
// TODO(eta): ideally, this should wait until we successfully bootstrap using
// the bootstrap status API
sched.fire_in(Duration::from_secs(10));
}
} else {
return;
}
}
}
/// Periodically expire any channels that have been unused beyond
/// the maximum duration allowed.
///
/// Exist when we find that `chanmgr` is dropped
///
/// This is a daemon task that runs indefinitely in the background
async fn continually_expire_channels<R: Runtime>(
mut sched: TaskSchedule<R>,
chanmgr: Weak<tor_chanmgr::ChanMgr<R>>,
) {
while sched.next().await.is_some() {
let delay = if let Some(cm) = Weak::upgrade(&chanmgr) {
cm.expire_channels()
} else {
// channel manager is closed.
return;
};
// This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
sched.fire_in(Duration::from_secs(delay.as_secs()));
}
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]

View File

@ -108,6 +108,10 @@ pub_if_error_detail! {
#[derive(Error, Clone, Debug)]
#[non_exhaustive]
enum ErrorDetail {
/// Error setting up the channel manager
#[error("Error setting up the channel manager {0}")]
ChanMgrSetup(#[source] tor_chanmgr::Error), // TODO should this be its own type?
/// Error setting up the circuit manager
#[error("Error setting up the circuit manager {0}")]
CircMgrSetup(#[source] tor_circmgr::Error), // TODO should this be its own type?
@ -253,6 +257,7 @@ impl tor_error::HasKind for ErrorDetail {
E::OnionAddressNotSupported => EK::NotImplemented,
E::Address(_) | E::InvalidHostname => EK::InvalidStreamTarget,
E::LocalAddress => EK::ForbiddenStreamTarget,
E::ChanMgrSetup(e) => e.kind(),
}
}
}

View File

@ -52,6 +52,9 @@ mod mgr;
#[cfg(test)]
mod testing;
use futures::task::SpawnExt;
use futures::StreamExt;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_proto::channel::Channel;
@ -64,6 +67,7 @@ use tor_rtcompat::Runtime;
pub type Result<T> = std::result::Result<T, Error>;
pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
/// A Type that remembers a set of live channels, and launches new
/// ones on request.
@ -80,6 +84,10 @@ pub struct ChanMgr<R: Runtime> {
impl<R: Runtime> ChanMgr<R> {
/// Construct a new channel manager.
///
/// # Usage note
///
/// For the manager to work properly, you will need to call `ChanMgr::launch_background_tasks`.
pub fn new(runtime: R) -> Self {
let (sender, receiver) = event::channel();
let builder = builder::ChanBuilder::new(runtime, sender);
@ -90,6 +98,20 @@ impl<R: Runtime> ChanMgr<R> {
}
}
/// Launch the periodic daemon task required by the manager to function properly.
///
/// Returns a [`TaskHandle`] that can be used to manage the daemon task.
pub fn launch_background_tasks(self: &Arc<Self>, runtime: &R) -> Result<Vec<TaskHandle>> {
let (sched, handle) = TaskSchedule::new(runtime.clone());
runtime
.spawn(Self::continually_expire_channels(
sched,
Arc::downgrade(self),
))
.map_err(|e| Error::from_spawn("channel expiration task", e))?;
Ok(vec![handle])
}
/// Try to get a suitable channel to the provided `target`,
/// launching one if one does not exist.
///
@ -122,4 +144,23 @@ impl<R: Runtime> ChanMgr<R> {
pub fn expire_channels(&self) -> Duration {
self.mgr.expire_channels()
}
/// Periodically expire any channels that have been unused beyond
/// the maximum duration allowed.
///
/// Exist when we find that `chanmgr` is dropped
///
/// This is a daemon task that runs indefinitely in the background
async fn continually_expire_channels(mut sched: TaskSchedule<R>, chanmgr: Weak<Self>) {
while sched.next().await.is_some() {
let delay = if let Some(cm) = Weak::upgrade(&chanmgr) {
cm.expire_channels()
} else {
// channel manager is closed.
return;
};
// This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
sched.fire_in(Duration::from_secs(delay.as_secs()));
}
}
}

View File

@ -51,13 +51,15 @@
#![deny(clippy::unwrap_used)]
use tor_chanmgr::ChanMgr;
use tor_netdir::{fallback::FallbackDir, NetDir};
use tor_netdir::{fallback::FallbackDir, DirEvent, NetDir, NetDirProvider};
use tor_proto::circuit::{CircParameters, ClientCirc, UniqId};
use tor_rtcompat::Runtime;
use futures::task::SpawnExt;
use futures::StreamExt;
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use tracing::{debug, error, info, warn};
pub mod build;
@ -85,6 +87,8 @@ use crate::preemptive::PreemptiveCircuitPredictor;
use usage::TargetCircUsage;
pub use tor_guardmgr::{ExternalFailure, GuardId};
use tor_persist::{FsStateMgr, StateMgr};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
/// A Result type as returned from this crate.
pub type Result<T> = std::result::Result<T, Error>;
@ -162,6 +166,10 @@ pub struct CircMgr<R: Runtime> {
impl<R: Runtime> CircMgr<R> {
/// Construct a new circuit manager.
///
/// # Usage note
///
/// For the manager to work properly, you will need to call `CircMgr::launch_background_tasks`.
pub fn new<SM, CFG: CircMgrConfig>(
config: &CFG,
storage: SM,
@ -196,6 +204,66 @@ impl<R: Runtime> CircMgr<R> {
Ok(circmgr)
}
/// Launch the periodic daemon tasks required by the manager to function properly.
///
/// Returns a set of [`TaskHandle`]s that can be used to manage the daemon tasks.
//
// NOTE(eta): The ?Sized on D is so we can pass a trait object in.
pub fn launch_background_tasks<D>(
self: &Arc<Self>,
runtime: &R,
dir_provider: &Arc<D>,
state_mgr: FsStateMgr,
) -> Result<Vec<TaskHandle>>
where
D: NetDirProvider + Send + Sync + 'static + ?Sized,
{
let mut ret = vec![];
runtime
.spawn(Self::keep_circmgr_params_updated(
dir_provider.events(),
Arc::downgrade(self),
Arc::downgrade(dir_provider),
))
.map_err(|e| Error::from_spawn("circmgr parameter updater", e))?;
let (sched, handle) = TaskSchedule::new(runtime.clone());
ret.push(handle);
runtime
.spawn(Self::update_persistent_state(
sched,
Arc::downgrade(self),
state_mgr,
))
.map_err(|e| Error::from_spawn("persistent state updater", e))?;
let (sched, handle) = TaskSchedule::new(runtime.clone());
ret.push(handle);
runtime
.spawn(Self::continually_launch_timeout_testing_circuits(
sched,
Arc::downgrade(self),
Arc::downgrade(dir_provider),
))
.map_err(|e| Error::from_spawn("timeout-probe circuit launcher", e))?;
let (sched, handle) = TaskSchedule::new(runtime.clone());
ret.push(handle);
runtime
.spawn(Self::continually_preemptively_build_circuits(
sched,
Arc::downgrade(self),
Arc::downgrade(dir_provider),
))
.map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
Ok(ret)
}
/// Try to change our configuration settings to `new_config`.
///
/// The actual behavior here will depend on the value of `how`.
@ -432,6 +500,186 @@ impl<R: Runtime> CircMgr<R> {
Ok(())
}
/// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
/// `circmgr` with the consensus parameters from `dirmgr`.
///
/// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
/// dangling.
///
/// This is a daemon task: it runs indefinitely in the background.
async fn keep_circmgr_params_updated<D>(
mut events: impl futures::Stream<Item = DirEvent> + Unpin,
circmgr: Weak<Self>,
dirmgr: Weak<D>,
) where
D: NetDirProvider + Send + Sync + 'static + ?Sized,
{
use DirEvent::*;
while let Some(event) = events.next().await {
match event {
NewConsensus => {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr))
{
let netdir = dm
.latest_netdir()
.expect("got new consensus event, without a netdir?");
cm.update_network_parameters(netdir.params());
cm.update_network(&netdir);
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
}
}
NewDescriptors => {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr))
{
let netdir = dm
.latest_netdir()
.expect("got new descriptors event, without a netdir?");
cm.update_network(&netdir);
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
}
}
_ => {
// Nothing we recognize.
}
}
}
}
/// Run indefinitely, launching circuits as needed to get a good
/// estimate for our circuit build timeouts.
///
/// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
///
/// This is a daemon task: it runs indefinitely in the background.
async fn continually_launch_timeout_testing_circuits<D>(
mut sched: TaskSchedule<R>,
circmgr: Weak<Self>,
dirmgr: Weak<D>,
) where
D: NetDirProvider + Send + Sync + 'static + ?Sized,
{
while sched.next().await.is_some() {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
if let Some(netdir) = dm.latest_netdir() {
if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
warn!("Problem launching a timeout testing circuit: {}", e);
}
let delay = netdir
.params()
.cbt_testing_delay
.try_into()
.expect("Out-of-bounds value from BoundedInt32");
drop((cm, dm));
sched.fire_in(delay);
} else {
// wait for the provider to announce some event, which will probably be
// NewConsensus; this is therefore a decent yardstick for rechecking
let _ = dm.events().next().await;
sched.fire();
}
} else {
return;
}
}
}
/// Run forever, periodically telling `circmgr` to update its persistent
/// state.
///
/// Exit when we notice that `circmgr` has been dropped.
///
/// This is a daemon task: it runs indefinitely in the background.
async fn update_persistent_state(
mut sched: TaskSchedule<R>,
circmgr: Weak<Self>,
statemgr: FsStateMgr,
) {
while sched.next().await.is_some() {
if let Some(circmgr) = Weak::upgrade(&circmgr) {
use tor_persist::LockStatus::*;
match statemgr.try_lock() {
Err(e) => {
error!("Problem with state lock file: {}", e);
break;
}
Ok(NewlyAcquired) => {
info!("We now own the lock on our state files.");
if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
error!("Unable to upgrade to owned state files: {}", e);
break;
}
}
Ok(AlreadyHeld) => {
if let Err(e) = circmgr.store_persistent_state() {
error!("Unable to flush circmgr state: {}", e);
break;
}
}
Ok(NoLock) => {
if let Err(e) = circmgr.reload_persistent_state() {
error!("Unable to reload circmgr state: {}", e);
break;
}
}
}
} else {
debug!("Circmgr has disappeared; task exiting.");
return;
}
// TODO(nickm): This delay is probably too small.
//
// Also, we probably don't even want a fixed delay here. Instead,
// we should be updating more frequently when the data is volatile
// or has important info to save, and not at all when there are no
// changes.
sched.fire_in(Duration::from_secs(60));
}
debug!("State update task exiting (potentially due to handle drop).");
}
/// Run indefinitely, launching circuits where the preemptive circuit
/// predictor thinks it'd be a good idea to have them.
///
/// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
///
/// This is a daemon task: it runs indefinitely in the background.
///
/// # Note
///
/// This would be better handled entirely within `tor-circmgr`, like
/// other daemon tasks.
async fn continually_preemptively_build_circuits<D>(
mut sched: TaskSchedule<R>,
circmgr: Weak<Self>,
dirmgr: Weak<D>,
) where
D: NetDirProvider + Send + Sync + 'static + ?Sized,
{
while sched.next().await.is_some() {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
if let Some(netdir) = dm.latest_netdir() {
cm.launch_circuits_preemptively(DirInfo::Directory(&netdir))
.await;
sched.fire_in(Duration::from_secs(10));
} else {
// wait for the provider to announce some event, which will probably be
// NewConsensus; this is therefore a decent yardstick for rechecking
let _ = dm.events().next().await;
sched.fire();
}
} else {
return;
}
}
}
/// Record that a failure occurred on a circuit with a given guard, in a way
/// that makes us unwilling to use that guard for future circuits.
pub fn note_external_failure(&self, id: &GuardId, external_failure: ExternalFailure) {

View File

@ -20,28 +20,9 @@ use educe::Educe;
use futures::{stream::Stream, Future, StreamExt};
use time::OffsetDateTime;
use tor_basic_utils::skip_fmt;
use tor_netdir::DirEvent;
use tor_netdoc::doc::netstatus;
/// An event that a DirMgr can broadcast to indicate that a change in
/// the status of its directory.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DirEvent {
/// A new consensus has been received, and has enough information to be
/// used.
///
/// This event is also broadcast when a new set of consensus parameters is
/// available, even if that set of parameters comes from a configuration
/// change rather than from the latest consensus.
NewConsensus,
/// New descriptors have been received for the current consensus.
///
/// (This event is _not_ broadcast when receiving new descriptors for a
/// consensus which is not yet ready to replace the current consensus.)
NewDescriptors,
}
/// A trait to indicate something that can be published with [`FlagPublisher`].
///
/// Since the implementation of `FlagPublisher` requires that its events be
@ -69,6 +50,8 @@ impl FlagEvent for DirEvent {
match self {
DirEvent::NewConsensus => 0,
DirEvent::NewDescriptors => 1,
// HACK(eta): This is an unfortunate consequence of marking DirEvent #[non_exhaustive].
_ => panic!("DirEvent updated without updating its FlagEvent impl"),
}
}
fn from_index(flag: u16) -> Option<Self> {

View File

@ -78,7 +78,7 @@ use crate::storage::DynStore;
use postage::watch;
pub use retry::DownloadSchedule;
use tor_circmgr::CircMgr;
use tor_netdir::NetDir;
use tor_netdir::{DirEvent, NetDir, NetDirProvider};
use tor_netdoc::doc::netstatus::ConsensusFlavor;
use async_trait::async_trait;
@ -99,7 +99,7 @@ pub use config::{
};
pub use docid::DocId;
pub use err::Error;
pub use event::{DirBootstrapEvents, DirBootstrapStatus, DirEvent, DirStatus};
pub use event::{DirBootstrapEvents, DirBootstrapStatus, DirStatus};
pub use storage::DocumentText;
pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder};
@ -108,18 +108,7 @@ pub type Result<T> = std::result::Result<T, Error>;
/// Trait for DirMgr implementations
#[async_trait]
pub trait DirProvider {
/// Return a handle to our latest directory, if we have one.
fn latest_netdir(&self) -> Option<Arc<NetDir>>;
/// Return a new asynchronous stream that will receive notification
/// whenever the consensus has changed.
///
/// Multiple events may be batched up into a single item: each time
/// this stream yields an event, all you can assume is that the event has
/// occurred at least once.
fn events(&self) -> BoxStream<'static, DirEvent>;
pub trait DirProvider: NetDirProvider {
/// Try to change our configuration to `new_config`.
///
/// Actual behavior will depend on the value of `how`.
@ -140,8 +129,9 @@ pub trait DirProvider {
fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
}
#[async_trait]
impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
// NOTE(eta): We can't implement this for Arc<DirMgr<R>> due to trait coherence rules, so instead
// there's a blanket impl for Arc<T> in tor-netdir.
impl<R: Runtime> NetDirProvider for DirMgr<R> {
fn latest_netdir(&self) -> Option<Arc<NetDir>> {
self.opt_netdir()
}
@ -149,7 +139,10 @@ impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
fn events(&self) -> BoxStream<'static, DirEvent> {
Box::pin(self.events.subscribe())
}
}
#[async_trait]
impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
fn reconfigure(
&self,
new_config: &DirMgrConfig,

View File

@ -42,6 +42,7 @@ serde = { version = "1.0.103", features = ["derive"] }
signature = "1"
thiserror = "1"
tracing = "0.1.18"
futures = "0.3.14"
[dev-dependencies]
hex = "0.4"

View File

@ -72,9 +72,11 @@ use tor_netdoc::doc::microdesc::{MdDigest, Microdesc};
use tor_netdoc::doc::netstatus::{self, MdConsensus, RouterStatus};
use tor_netdoc::types::policy::PortPolicy;
use futures::stream::BoxStream;
use serde::Deserialize;
use std::collections::HashMap;
use std::net::IpAddr;
use std::ops::Deref;
use std::sync::Arc;
use tracing::warn;
@ -253,6 +255,54 @@ pub struct NetDir {
weights: weight::WeightSet,
}
/// An event that a [`NetDirProvider`] can broadcast to indicate that a change in
/// the status of its directory.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DirEvent {
/// A new consensus has been received, and has enough information to be
/// used.
///
/// This event is also broadcast when a new set of consensus parameters is
/// available, even if that set of parameters comes from a configuration
/// change rather than from the latest consensus.
NewConsensus,
/// New descriptors have been received for the current consensus.
///
/// (This event is _not_ broadcast when receiving new descriptors for a
/// consensus which is not yet ready to replace the current consensus.)
NewDescriptors,
}
/// An object that can provide [`NetDir`]s, as well as inform consumers when
/// they might have changed.
pub trait NetDirProvider {
/// Return a handle to our latest directory, if we have one.
fn latest_netdir(&self) -> Option<Arc<NetDir>>;
/// Return a new asynchronous stream that will receive notification
/// whenever the consensus has changed.
///
/// Multiple events may be batched up into a single item: each time
/// this stream yields an event, all you can assume is that the event has
/// occurred at least once.
fn events(&self) -> BoxStream<'static, DirEvent>;
}
impl<T> NetDirProvider for Arc<T>
where
T: NetDirProvider,
{
fn latest_netdir(&self) -> Option<Arc<NetDir>> {
self.deref().latest_netdir()
}
fn events(&self) -> BoxStream<'static, DirEvent> {
self.deref().events()
}
}
/// A partially build NetDir -- it can't be unwrapped until it has
/// enough information to build safe paths.
#[derive(Debug, Clone)]

View File

@ -69,6 +69,12 @@ impl<R: SleepProvider> TaskSchedule<R> {
self.instant_fire = false;
self.sleep = Some(Box::pin(self.rt.sleep(dur)));
}
/// Trigger the schedule instantly.
pub fn fire(&mut self) {
self.instant_fire = true;
self.sleep = None;
}
}
impl TaskHandle {

View File

@ -63,6 +63,7 @@ tor-dirmgr:
new-api: DirMgrConfig object now has accessors.
DirMgrCfg: totally changed, builder abolished.
Authority, NetworkConfig: removed several accessors for these config elements.
api-break: DirEvent is now in tor-netdir instead
tor-circmgr:
CircMgrCfg: totally changed, builder abolished.