From c8cfbda3394b6541deb09e9116cb495587f1318f Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 20 Oct 2021 13:13:15 -0400 Subject: [PATCH] Finish the timeout-inference side of shared state. --- crates/tor-circmgr/src/build.rs | 39 ++++++++---- crates/tor-circmgr/src/lib.rs | 20 ++---- crates/tor-circmgr/src/timeouts/estimator.rs | 65 +++++++++++++++++++- crates/tor-guardmgr/src/lib.rs | 21 ++++++- 4 files changed, 113 insertions(+), 32 deletions(-) diff --git a/crates/tor-circmgr/src/build.rs b/crates/tor-circmgr/src/build.rs index e111723dc..16680af1c 100644 --- a/crates/tor-circmgr/src/build.rs +++ b/crates/tor-circmgr/src/build.rs @@ -1,7 +1,7 @@ //! Facilities to build circuits directly, instead of via a circuit manager. use crate::path::{OwnedPath, TorPath}; -use crate::timeouts::{self, pareto::ParetoTimeoutEstimator, Action}; +use crate::timeouts::{self, Action}; use crate::{Error, Result}; use async_trait::async_trait; use futures::channel::oneshot; @@ -19,7 +19,6 @@ use tor_guardmgr::GuardStatus; use tor_linkspec::{ChanTarget, OwnedChanTarget, OwnedCircTarget}; use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc}; use tor_rtcompat::{Runtime, SleepProviderExt}; -use tracing::warn; mod guardstatus; @@ -283,15 +282,7 @@ impl CircuitBuilder { storage: crate::TimeoutStateHandle, guardmgr: tor_guardmgr::GuardMgr, ) -> Self { - let timeouts = match storage.load() { - Ok(Some(v)) => ParetoTimeoutEstimator::from_state(v), - Ok(None) => ParetoTimeoutEstimator::default(), - Err(e) => { - warn!("Unable to load timeout state: {}", e); - ParetoTimeoutEstimator::default() - } - }; - let timeouts = timeouts::Estimator::new(timeouts); + let timeouts = timeouts::Estimator::from_storage(&storage); CircuitBuilder { builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)), @@ -302,10 +293,32 @@ impl CircuitBuilder { } /// Flush state to the state manager. - pub fn save_state(&self) -> Result<()> { + pub(crate) fn save_state(&self) -> Result<()> { // TODO: someday we'll want to only do this if there is something // changed. - self.builder.timeouts.save_state(&self.storage) + self.builder.timeouts.save_state(&self.storage)?; + self.guardmgr.store_persistent_state()?; + Ok(()) + } + + /// Replace our state with a new owning state, assuming we have + /// storage permission. + pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> { + self.builder + .timeouts + .upgrade_to_owning_storage(&self.storage); + self.guardmgr.upgrade_to_owned_persistent_state()?; + Ok(()) + } + /// Reload persistent state from disk, if we don't have storage permission. + pub(crate) fn reload_state(&self) -> Result<()> { + if !self.storage.can_store() { + self.builder + .timeouts + .reload_readonly_from_storage(&self.storage); + } + self.guardmgr.reload_persistent_state()?; + Ok(()) } /// Reconfigure this builder using the latest set of network parameters. diff --git a/crates/tor-circmgr/src/lib.rs b/crates/tor-circmgr/src/lib.rs index 9f725d742..5f3904f47 100644 --- a/crates/tor-circmgr/src/lib.rs +++ b/crates/tor-circmgr/src/lib.rs @@ -153,9 +153,6 @@ impl<'a> DirInfo<'a> { pub struct CircMgr { /// The underlying circuit manager object that implements our behavior. mgr: Arc, R>>, - - /// A handle to the state manager for recording timeout history. - storage: TimeoutStateHandle, } impl CircMgr { @@ -177,21 +174,18 @@ impl CircMgr { let guardmgr = tor_guardmgr::GuardMgr::new(runtime.clone(), storage.clone())?; - let storage = storage.create_handle(PARETO_TIMEOUT_DATA_KEY); + let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY); let builder = build::CircuitBuilder::new( runtime.clone(), chanmgr, path_config, - Arc::clone(&storage), + storage_handle, guardmgr, ); let mgr = mgr::AbstractCircMgr::new(builder, runtime.clone(), request_timing, circuit_timing); - let circmgr = Arc::new(CircMgr { - mgr: Arc::new(mgr), - storage, - }); + let circmgr = Arc::new(CircMgr { mgr: Arc::new(mgr) }); runtime.spawn(continually_expire_circuits( runtime.clone(), @@ -206,7 +200,7 @@ impl CircMgr { /// We only call this method if we _don't_ have the lock on the state /// files. If we have the lock, we only want to save. pub fn reload_persistent_state(&self) -> Result<()> { - warn!("reload_persistent_state isn't implemented."); + self.mgr.peek_builder().reload_state()?; Ok(()) } @@ -214,7 +208,7 @@ impl CircMgr { /// /// Requires that we hold the lock on the state files. pub fn upgrade_to_owned_persistent_state(&self) -> Result<()> { - warn!("upgrade_to_owned_persistent_state isn't implemented."); + self.mgr.peek_builder().upgrade_to_owned_state()?; Ok(()) } @@ -222,10 +216,6 @@ impl CircMgr { /// we have the lock. pub fn store_persistent_state(&self) -> Result<()> { self.mgr.peek_builder().save_state()?; - self.mgr - .peek_builder() - .guardmgr() - .update_persistent_state()?; Ok(()) } diff --git a/crates/tor-circmgr/src/timeouts/estimator.rs b/crates/tor-circmgr/src/timeouts/estimator.rs index e937d4279..5f60bfca2 100644 --- a/crates/tor-circmgr/src/timeouts/estimator.rs +++ b/crates/tor-circmgr/src/timeouts/estimator.rs @@ -1,9 +1,15 @@ //! Declarations for a [`TimeoutEstimator`] type that can change implementation. -use crate::timeouts::{Action, TimeoutEstimator}; +use crate::timeouts::{ + pareto::{ParetoTimeoutEstimator, ParetoTimeoutState}, + readonly::ReadonlyTimeoutEstimator, + Action, TimeoutEstimator, +}; +use crate::TimeoutStateHandle; use std::sync::Mutex; use std::time::Duration; use tor_netdir::params::NetParameters; +use tracing::{debug, warn}; /// A timeout estimator that can change its inner implementation and share its /// implementation among multiple threads. @@ -20,6 +26,37 @@ impl Estimator { } } + /// Create this estimator based on the values stored in `storage`, and whether + /// this storage is read-only. + pub(crate) fn from_storage(storage: &TimeoutStateHandle) -> Self { + let (_, est) = estimator_from_storage(storage); + Self { + inner: Mutex::new(est), + } + } + + /// Assuming that we can read and write to `storage`, replace our state with + /// a new state that estimates timeouts. + pub(crate) fn upgrade_to_owning_storage(&self, storage: &TimeoutStateHandle) { + let (readonly, est) = estimator_from_storage(storage); + if readonly { + warn!("Unable to upgrade to owned persistent storage."); + return; + } + *self.inner.lock().expect("Timeout estimator lock poisoned") = est; + } + + /// Replace the contents of this estimator with a read-only state estimator + /// based on the contents of `storage`. + pub(crate) fn reload_readonly_from_storage(&self, storage: &TimeoutStateHandle) { + if let Ok(Some(v)) = storage.load() { + let est = ReadonlyTimeoutEstimator::from_state(&v); + *self.inner.lock().expect("Timeout estimator lock poisoned") = Box::new(est); + } else { + debug!("Unable to reload timeout state.") + } + } + /// Record that a given circuit hop has completed. /// /// The `hop` number is a zero-indexed value for which hop just completed. @@ -76,7 +113,7 @@ impl Estimator { } /// Store any state associated with this timeout esimator into `storage`. - pub(crate) fn save_state(&self, storage: &crate::TimeoutStateHandle) -> crate::Result<()> { + pub(crate) fn save_state(&self, storage: &TimeoutStateHandle) -> crate::Result<()> { let state = { let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned."); inner.build_state() @@ -87,3 +124,27 @@ impl Estimator { Ok(()) } } + +/// Try to construct a new boxed TimeoutEstimator based on the contents of +/// storage, and whether it is read-only. +/// +/// Returns true on a read-only state. +fn estimator_from_storage( + storage: &TimeoutStateHandle, +) -> (bool, Box) { + let state = match storage.load() { + Ok(Some(v)) => v, + Ok(None) => ParetoTimeoutState::default(), + Err(e) => { + warn!("Unable to load timeout state: {}", e); + return (true, Box::new(ReadonlyTimeoutEstimator::new())); + } + }; + + if storage.can_store() { + // We own the lock, so we're going to use a full estimator. + (false, Box::new(ParetoTimeoutEstimator::from_state(state))) + } else { + (true, Box::new(ReadonlyTimeoutEstimator::from_state(&state))) + } +} diff --git a/crates/tor-guardmgr/src/lib.rs b/crates/tor-guardmgr/src/lib.rs index 4d2b3bcab..726ee4f80 100644 --- a/crates/tor-guardmgr/src/lib.rs +++ b/crates/tor-guardmgr/src/lib.rs @@ -276,13 +276,30 @@ impl GuardMgr { /// Flush our current guard state to the state manager, if there /// is any unsaved state. - pub fn update_persistent_state(&self) -> Result<(), GuardMgrError> { + pub fn store_persistent_state(&self) -> Result<(), GuardMgrError> { let inner = self.inner.lock().expect("Poisoned lock"); trace!("Flushing guard state to disk."); inner.default_storage.store(&inner.active_guards)?; Ok(()) } + /// Reload state from the state manager. + /// + /// We only call this method if we _don't_ have the lock on the state + /// files. If we have the lock, we only want to save. + pub fn reload_persistent_state(&self) -> Result<(), GuardMgrError> { + warn!("Not yet implemented"); + Ok(()) + } + + /// Switch from having an unowned persistent state to having an owned one. + /// + /// Requires that we hold the lock on the state files. + pub fn upgrade_to_owned_persistent_state(&self) -> Result<(), GuardMgrError> { + warn!("Not yet implemented"); + Ok(()) + } + /// Update the state of this [`GuardMgr`] based on a new or modified /// [`NetDir`] object. /// @@ -937,7 +954,7 @@ mod test { // Save the state... guardmgr.flush_msg_queue().await; - guardmgr.update_persistent_state().unwrap(); + guardmgr.store_persistent_state().unwrap(); drop(guardmgr); // Try reloading from the state...