Notify guard manager on network change and state flush.

This commit is contained in:
Nick Mathewson 2021-10-10 12:38:50 -04:00
parent c41dd01a14
commit 33ba697b5d
6 changed files with 67 additions and 8 deletions

View File

@ -254,7 +254,6 @@ pub struct CircuitBuilder<R: Runtime> {
storage: crate::TimeoutStateHandle,
/// Guard manager to tell us which guards nodes to use for the circuits
/// we build.
#[allow(dead_code)]
guardmgr: tor_guardmgr::GuardMgr<R>,
}
@ -339,6 +338,11 @@ impl<R: Runtime> CircuitBuilder<R> {
pub(crate) fn learning_timeouts(&self) -> bool {
self.builder.timeouts.learning_timeouts()
}
/// Return a reference to this builder's `GuardMgr`.
pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
&self.guardmgr
}
}
/// Helper function: spawn a future as a background task, and run it with

View File

@ -203,7 +203,12 @@ impl<R: Runtime> CircMgr<R> {
/// Flush state to the state manager, if there is any unsaved state.
pub fn update_persistent_state(&self) -> Result<()> {
self.mgr.peek_builder().save_state()
self.mgr.peek_builder().save_state()?;
self.mgr
.peek_builder()
.guardmgr()
.update_persistent_state()?;
Ok(())
}
/// Reconfigure this circuit manager using the latest set of
@ -215,6 +220,15 @@ impl<R: Runtime> CircMgr<R> {
self.mgr.peek_builder().update_network_parameters(p);
}
/// Reconfigure this circuit manager using the latest network directory.
///
/// This should be called on _any_ change to the network, as opposed to
/// [`CircMgr::update_network_parameters`], which should only be
/// called when the parameters change.
pub fn update_network(&self, netdir: &NetDir) {
self.mgr.peek_builder().guardmgr().update_network(netdir);
}
/// Return a circuit suitable for sending one-hop BEGINDIR streams,
/// launching it if necessary.
pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<ClientCirc>> {

View File

@ -321,13 +321,28 @@ async fn keep_circmgr_params_updated<R: Runtime>(
circmgr: Weak<tor_circmgr::CircMgr<R>>,
dirmgr: Weak<tor_dirmgr::DirMgr<R>>,
) {
use DirEvent::*;
while let Some(event) = events.next().await {
if let DirEvent::NewConsensus = event {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
cm.update_network_parameters(dm.netdir().params());
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
match event {
NewConsensus => {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
cm.update_network_parameters(dm.netdir().params());
cm.update_network(&dm.netdir());
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
}
}
NewDescriptors => {
if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
cm.update_network(&dm.netdir());
} else {
debug!("Circmgr or dirmgr has disappeared; task exiting.");
break;
}
}
_ => {
// Nothing we recognize.
}
}
}

View File

@ -12,6 +12,9 @@ pub enum DirEvent {
/// to be used.
NewConsensus,
/// New descriptors have been received for the latest consensus.
NewDescriptors,
/// A dummy event that's only used when we're testing.
#[cfg(test)]
Dummy,

View File

@ -127,6 +127,10 @@ pub struct DirMgr<R: Runtime> {
/// changed.
netdir_consensus_changed: AtomicBool,
/// A flag that gets set whenever the _descriptors_ part of `netdir` has
/// changed without adding a new consensus.
netdir_descriptors_changed: AtomicBool,
/// A publisher handle, used to inform others about changes in the
/// status of this directory handle.
publisher: event::Publisher,
@ -382,12 +386,14 @@ impl<R: Runtime> DirMgr<R> {
let store = Mutex::new(config.open_sqlite_store(readonly)?);
let netdir = SharedMutArc::new();
let netdir_consensus_changed = AtomicBool::new(false);
let netdir_descriptors_changed = AtomicBool::new(false);
let publisher = event::Publisher::new();
Ok(DirMgr {
config,
store,
netdir,
netdir_consensus_changed,
netdir_descriptors_changed,
publisher,
circmgr,
runtime,
@ -478,6 +484,12 @@ impl<R: Runtime> DirMgr<R> {
if self.netdir_consensus_changed.swap(false, Ordering::SeqCst) {
self.publisher.send(DirEvent::NewConsensus).await;
}
if self
.netdir_descriptors_changed
.swap(false, Ordering::SeqCst)
{
self.publisher.send(DirEvent::NewDescriptors).await;
}
}
/// Load all the documents for a single DocumentQuery from the store.

View File

@ -63,6 +63,10 @@ pub(crate) trait WriteNetDir: 'static + Sync + Send {
/// Called to note that the consensus stored in [`Self::netdir()`] has been
/// changed.
fn netdir_consensus_changed(&self);
/// Called to note that the descriptors stored in
/// [`Self::netdir()`] have been changed.
fn netdir_descriptors_changed(&self);
}
impl<R: Runtime> WriteNetDir for crate::DirMgr<R> {
@ -76,6 +80,11 @@ impl<R: Runtime> WriteNetDir for crate::DirMgr<R> {
use std::sync::atomic::Ordering;
self.netdir_consensus_changed.store(true, Ordering::SeqCst);
}
fn netdir_descriptors_changed(&self) {
use std::sync::atomic::Ordering;
self.netdir_descriptors_changed
.store(true, Ordering::SeqCst);
}
}
/// Initial state: fetching or loading a consensus directory.
@ -500,6 +509,7 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
for md in mds {
netdir.add_microdesc(md);
}
wd.netdir_descriptors_changed();
Ok(())
});
}
@ -517,6 +527,7 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
wd.netdir().replace(netdir);
wd.netdir_consensus_changed();
wd.netdir_descriptors_changed();
return true;
}
}