Merge branch 'netdir_provider_in_guardmgr_v2' into 'main'

Use NetDirProvider in GuardMgr

Closes #93

See merge request tpo/core/arti!568
This commit is contained in:
Nick Mathewson 2022-06-07 20:46:27 +00:00
commit b1b7f30de0
7 changed files with 190 additions and 66 deletions

View File

@ -19,7 +19,7 @@ pub trait DirProviderBuilder<R: Runtime> {
runtime: R,
circmgr: Arc<tor_circmgr::CircMgr<R>>,
config: DirMgrConfig,
) -> Result<Arc<dyn tor_dirmgr::DirProvider + Send + Sync + 'static>>;
) -> Result<Arc<dyn tor_dirmgr::DirProvider + 'static>>;
}
/// A DirProviderBuilder that constructs a regular DirMgr.
@ -32,7 +32,7 @@ impl<R: Runtime> DirProviderBuilder<R> for DirMgrBuilder {
runtime: R,
circmgr: Arc<tor_circmgr::CircMgr<R>>,
config: DirMgrConfig,
) -> Result<Arc<dyn tor_dirmgr::DirProvider + Send + Sync + 'static>> {
) -> Result<Arc<dyn tor_dirmgr::DirProvider + 'static>> {
let dirmgr = tor_dirmgr::DirMgr::create_unbootstrapped(config, runtime, circmgr)
.map_err(ErrorDetail::from)?;
Ok(Arc::new(dirmgr))

View File

@ -56,7 +56,7 @@ pub struct TorClient<R: Runtime> {
/// them on-demand.
circmgr: Arc<tor_circmgr::CircMgr<R>>,
/// Directory manager for keeping our directory material up to date.
dirmgr: Arc<dyn tor_dirmgr::DirProvider + Send + Sync>,
dirmgr: Arc<dyn tor_dirmgr::DirProvider>,
/// Location on disk where we store persistent data.
statemgr: FsStateMgr,
/// Client address configuration
@ -469,13 +469,6 @@ impl<R: Runtime> TorClient<R> {
self.dirmgr.bootstrap().await?;
self.circmgr.update_network_parameters(
self.dirmgr
.latest_netdir()
.ok_or(ErrorDetail::DirMgr(tor_dirmgr::Error::DirectoryNotPresent))?
.params(),
);
// Since we succeeded, disarm the unlock guard.
unlock_guard.disarm();
@ -788,7 +781,7 @@ impl<R: Runtime> TorClient<R> {
/// This function is unstable. It is only enabled if the crate was
/// built with the `experimental-api` feature.
#[cfg(feature = "experimental-api")]
pub fn dirmgr(&self) -> &Arc<dyn tor_dirmgr::DirProvider + Send + Sync> {
pub fn dirmgr(&self) -> &Arc<dyn tor_dirmgr::DirProvider> {
&self.dirmgr
}

View File

@ -230,7 +230,7 @@ impl<R: Runtime> CircMgr<R> {
state_mgr: FsStateMgr,
) -> Result<Vec<TaskHandle>>
where
D: NetDirProvider + Send + Sync + 'static + ?Sized,
D: NetDirProvider + 'static + ?Sized,
{
let mut ret = vec![];
@ -275,6 +275,11 @@ impl<R: Runtime> CircMgr<R> {
))
.map_err(|e| Error::from_spawn("preemptive circuit launcher", e))?;
self.mgr
.peek_builder()
.guardmgr()
.install_netdir_provider(&dir_provider.clone().upcast_arc())?;
Ok(ret)
}
@ -353,7 +358,11 @@ impl<R: Runtime> CircMgr<R> {
/// Reconfigure this circuit manager using the latest set of
/// network parameters.
///
/// (NOTE: for now, this only affects circuit timeout estimation.)
/// This is deprecated as a public function: `launch_background_tasks` now
/// ensures that this happens as needed.
#[deprecated(
note = "There is no need to call this function if you have used launch_background_tasks"
)]
pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
self.mgr.update_network_parameters(p);
self.mgr.peek_builder().update_network_parameters(p);
@ -373,9 +382,10 @@ impl<R: Runtime> CircMgr<R> {
/// Reconfigure this circuit manager using the latest network directory.
///
/// This should be called on _any_ change to the network, as opposed to
/// [`CircMgr::update_network_parameters`], which should only be
/// called when the parameters change.
/// This function is deprecated: `launch_background_tasks` now makes sure that this happens as needed.
#[deprecated(
note = "There is no need to call this function if you have used launch_background_tasks"
)]
pub fn update_network(&self, netdir: &NetDir) {
self.mgr.peek_builder().guardmgr().update_network(netdir);
}
@ -546,38 +556,20 @@ impl<R: Runtime> CircMgr<R> {
circmgr: Weak<Self>,
dirmgr: Weak<D>,
) where
D: NetDirProvider + Send + Sync + 'static + ?Sized,
D: NetDirProvider + 'static + ?Sized,
{
use DirEvent::*;
while let Some(event) = events.next().await {
match event {
NewConsensus => {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr))
{
let netdir = dm
.latest_netdir()
.expect("got new consensus event, without a netdir?");
cm.update_network_parameters(netdir.params());
cm.update_network(&netdir);
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
}
}
NewDescriptors => {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr))
{
let netdir = dm
.latest_netdir()
.expect("got new descriptors event, without a netdir?");
cm.update_network(&netdir);
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
}
}
_ => {
// Nothing we recognize.
if matches!(event, NewConsensus) {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
let netdir = dm
.latest_netdir()
.expect("got new consensus event, without a netdir?");
#[allow(deprecated)]
cm.update_network_parameters(netdir.params());
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
}
}
}
@ -594,7 +586,7 @@ impl<R: Runtime> CircMgr<R> {
circmgr: Weak<Self>,
dirmgr: Weak<D>,
) where
D: NetDirProvider + Send + Sync + 'static + ?Sized,
D: NetDirProvider + 'static + ?Sized,
{
while sched.next().await.is_some() {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
@ -694,7 +686,7 @@ impl<R: Runtime> CircMgr<R> {
circmgr: Weak<Self>,
dirmgr: Weak<D>,
) where
D: NetDirProvider + Send + Sync + 'static + ?Sized,
D: NetDirProvider + 'static + ?Sized,
{
let base_delay = Duration::from_secs(10);
let mut retry = RetryDelay::from_duration(base_delay);

View File

@ -87,3 +87,33 @@ pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
runtime.sleep(delay).await;
}
}
/// Background task to keep a guard manager up-to-date with a given network
/// directory provider.
pub(crate) async fn keep_netdir_updated<RT: tor_rtcompat::Runtime>(
runtime: RT,
inner: Weak<Mutex<GuardMgrInner>>,
netdir_provider: Weak<dyn tor_netdir::NetDirProvider>,
) {
use tor_netdir::DirEvent;
let mut event_stream = match netdir_provider.upgrade().map(|p| p.events()) {
Some(s) => s,
None => return,
};
while let Some(event) = event_stream.next().await {
match event {
DirEvent::NewConsensus | DirEvent::NewDescriptors => {
if let (Some(inner), Some(provider)) = (inner.upgrade(), netdir_provider.upgrade())
{
let mut inner = inner.lock().expect("Poisoned lock");
if let Some(netdir) = provider.latest_netdir() {
inner.update(runtime.wallclock(), Some(&netdir));
}
}
}
_ => {}
}
}
}

View File

@ -139,8 +139,9 @@ use futures::task::SpawnExt;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant, SystemTime};
use tor_netdir::NetDirProvider;
use tor_proto::ClockSkew;
use tracing::{debug, info, trace, warn};
@ -257,6 +258,13 @@ struct GuardMgrInner {
/// A receiver object to hand out to observers who want to know about
/// changes in our estimated clock skew.
recv_skew: events::ClockSkewEvents,
/// A netdir provider that we can use for adding new guards when
/// insufficient guards are available.
///
/// This has to be an Option so it can be initialized from None: at the
/// time a GuardMgr is created, there is no NetDirProvider for it to use.
netdir_provider: Option<Weak<dyn NetDirProvider>>,
}
/// Persistent state for a guard manager, as serialized to disk.
@ -317,6 +325,7 @@ impl<R: Runtime> GuardMgr<R> {
storage,
send_skew,
recv_skew,
netdir_provider: None,
}));
{
let weak_inner = Arc::downgrade(&inner);
@ -335,6 +344,35 @@ impl<R: Runtime> GuardMgr<R> {
Ok(GuardMgr { runtime, inner })
}
/// Install a [`NetDirProvider`] for use by this guard manager.
///
/// It will be used to keep the guards up-to-date with changes from the
/// network directory, and to find new guards when no NetDir is provided to
/// select_guard().
///
/// TODO: we should eventually return some kind of a task handle from this
/// task, even though it is not strictly speaking periodic.
pub fn install_netdir_provider(
&self,
provider: &Arc<dyn NetDirProvider>,
) -> Result<(), GuardMgrError> {
let weak_provider = Arc::downgrade(provider);
{
let mut inner = self.inner.lock().expect("Poisoned lock");
inner.netdir_provider = Some(weak_provider.clone());
}
let weak_inner = Arc::downgrade(&self.inner);
let rt_clone = self.runtime.clone();
self.runtime
.spawn(daemon::keep_netdir_updated(
rt_clone,
weak_inner,
weak_provider,
))
.map_err(|e| GuardMgrError::from_spawn("periodic guard netdir updater", e))?;
Ok(())
}
/// Flush our current guard state to the state manager, if there
/// is any unsaved state.
pub fn store_persistent_state(&self) -> Result<(), GuardMgrError> {
@ -389,11 +427,12 @@ impl<R: Runtime> GuardMgr<R> {
/// Update the state of this [`GuardMgr`] based on a new or modified
/// [`NetDir`] object.
///
/// This method can add new guards, or notice that existing guards
/// have become unusable. It needs a `NetDir` so it can identify
/// potential candidate guards.
/// This method can add new guards, or notice that existing guards have
/// become unusable. It needs a `NetDir` so it can identify potential
/// candidate guards.
///
/// Call this method whenever the `NetDir` changes.
/// Call this method whenever the `NetDir` changes, unless you have used
/// `install_netdir_provider`.
pub fn update_network(&self, netdir: &NetDir) {
trace!("Updating guard state from network directory");
let now = self.runtime.wallclock();
@ -474,7 +513,8 @@ impl<R: Runtime> GuardMgr<R> {
/// # Limitations
///
/// This function will never return a guard that isn't listed in
/// the [`NetDir`] most recently passed to [`GuardMgr::update_network`].
/// the most recent [`NetDir`].
///
/// That's _usually_ what you'd want, but when we're trying to
/// bootstrap we might want to use _all_ guards as possible
/// directory caches. That's not implemented yet. (See ticket
@ -650,12 +690,48 @@ impl GuardSets {
}
impl GuardMgrInner {
/// Update the status of all guards in the active set, based on
/// the passage of time and (optionally) a network directory.
/// Look up the latest [`NetDir`] (if there is one) from our
/// [`NetDirProvider`] (if we have one).
fn latest_netdir(&self) -> Option<Arc<NetDir>> {
self.netdir_provider
.as_ref()
.and_then(Weak::upgrade)
.and_then(|np| np.latest_netdir())
}
/// Run a function that takes `&mut self` and an optional NetDir.
///
/// We can expire guards based on the time alone; we can only
/// add guards or change their status with a NetDir.
/// If a NetDir is provided, use that. Otherwise, try to use the netdir
/// from our [`NetDirProvider`] (if we have one).
//
// This function exists to handle the lifetime mess where sometimes the
// resulting NetDir will borrow from `netdir`, and sometimes it will borrow
// from an Arc returned by `self.latest_netdir()`.
fn with_opt_netdir<F, T>(&mut self, netdir: Option<&NetDir>, func: F) -> T
where
F: FnOnce(&mut Self, Option<&NetDir>) -> T,
{
if let Some(nd) = netdir {
func(self, Some(nd))
} else if let Some(nd) = self.latest_netdir() {
func(self, Some(nd.as_ref()))
} else {
func(self, None)
}
}
/// Update the status of all guards in the active set, based on the passage
/// of time and (optionally) a network directory. If no directory is
/// provided, we try to find one from the installed provider.
///
/// We can expire guards based on the time alone; we can only add guards or
/// change their status with a NetDir.
fn update(&mut self, now: SystemTime, netdir: Option<&NetDir>) {
self.with_opt_netdir(netdir, |this, netdir| this.update_internal(now, netdir));
}
/// As `update`, but do not try to look up a [`NetDir`] if none is given.
fn update_internal(&mut self, now: SystemTime, netdir: Option<&NetDir>) {
// Set the parameters.
if let Some(netdir) = netdir {
match GuardParams::try_from(netdir.params()) {
@ -1015,24 +1091,29 @@ impl GuardMgrInner {
};
// That didn't work. If we have a netdir, expand the sample and try again.
if let Some(dir) = netdir {
let res = self.with_opt_netdir(netdir, |this, dir| {
let dir = dir?;
trace!("No guards available, trying to extend the sample.");
self.update(wallclock, Some(dir));
if self
this.update_internal(wallclock, Some(dir));
if this
.guards
.active_guards_mut()
.extend_sample_as_needed(wallclock, &self.params, dir)
.extend_sample_as_needed(wallclock, &this.params, dir)
{
self.guards
this.guards
.active_guards_mut()
.select_primary_guards(&self.params);
match self.select_guard_once(usage, now) {
Ok(res) => return Ok(res),
.select_primary_guards(&this.params);
match this.select_guard_once(usage, now) {
Ok(res) => return Some(res),
Err(e) => {
trace!("Couldn't select guard after expanding sample: {}", e);
}
}
}
None
});
if let Some(res) = res {
return Ok(res);
}
// Okay, that didn't work either. If we were asked for a directory

View File

@ -0,0 +1,2 @@
BREAKING: every NetDirProvider must be Send and Sync.

View File

@ -279,7 +279,7 @@ pub enum DirEvent {
/// An object that can provide [`NetDir`]s, as well as inform consumers when
/// they might have changed.
pub trait NetDirProvider {
pub trait NetDirProvider: UpcastArcNetDirProvider + Send + Sync {
/// Return a handle to our latest directory, if we have one.
fn latest_netdir(&self) -> Option<Arc<NetDir>>;
@ -305,6 +305,32 @@ where
}
}
/// Helper trait: allows any `Arc<X>` to be upcast to a `Arc<dyn
/// NetDirProvider>` if X is an implementation or supertrait of NetDirProvider.
///
/// This trait exists to work around a limitation in rust: when trait upcasting
/// coercion is stable, this will be unnecessary.
///
/// The Rust tracking issue is <https://github.com/rust-lang/rust/issues/65991>.
pub trait UpcastArcNetDirProvider {
/// Return a view of this object as an `Arc<dyn NetDirProvider>`
fn upcast_arc<'a>(self: Arc<Self>) -> Arc<dyn NetDirProvider + 'a>
where
Self: 'a;
}
impl<T> UpcastArcNetDirProvider for T
where
T: NetDirProvider + Sized,
{
fn upcast_arc<'a>(self: Arc<Self>) -> Arc<dyn NetDirProvider + 'a>
where
Self: 'a,
{
self
}
}
/// A partially build NetDir -- it can't be unwrapped until it has
/// enough information to build safe paths.
#[derive(Debug, Clone)]