Finish the timeout-inference side of shared state.

This commit is contained in:
Nick Mathewson 2021-10-20 13:13:15 -04:00
parent db0921fd7e
commit c8cfbda339
4 changed files with 113 additions and 32 deletions

View File

@ -1,7 +1,7 @@
//! Facilities to build circuits directly, instead of via a circuit manager.
use crate::path::{OwnedPath, TorPath};
use crate::timeouts::{self, pareto::ParetoTimeoutEstimator, Action};
use crate::timeouts::{self, Action};
use crate::{Error, Result};
use async_trait::async_trait;
use futures::channel::oneshot;
@ -19,7 +19,6 @@ use tor_guardmgr::GuardStatus;
use tor_linkspec::{ChanTarget, OwnedChanTarget, OwnedCircTarget};
use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc};
use tor_rtcompat::{Runtime, SleepProviderExt};
use tracing::warn;
mod guardstatus;
@ -283,15 +282,7 @@ impl<R: Runtime> CircuitBuilder<R> {
storage: crate::TimeoutStateHandle,
guardmgr: tor_guardmgr::GuardMgr<R>,
) -> Self {
let timeouts = match storage.load() {
Ok(Some(v)) => ParetoTimeoutEstimator::from_state(v),
Ok(None) => ParetoTimeoutEstimator::default(),
Err(e) => {
warn!("Unable to load timeout state: {}", e);
ParetoTimeoutEstimator::default()
}
};
let timeouts = timeouts::Estimator::new(timeouts);
let timeouts = timeouts::Estimator::from_storage(&storage);
CircuitBuilder {
builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
@ -302,10 +293,32 @@ impl<R: Runtime> CircuitBuilder<R> {
}
/// Flush state to the state manager.
pub fn save_state(&self) -> Result<()> {
pub(crate) fn save_state(&self) -> Result<()> {
// TODO: someday we'll want to only do this if there is something
// changed.
self.builder.timeouts.save_state(&self.storage)
self.builder.timeouts.save_state(&self.storage)?;
self.guardmgr.store_persistent_state()?;
Ok(())
}
/// Replace our state with a new owning state, assuming we have
/// storage permission.
pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
self.builder
.timeouts
.upgrade_to_owning_storage(&self.storage);
self.guardmgr.upgrade_to_owned_persistent_state()?;
Ok(())
}
/// Reload persistent state from disk, if we don't have storage permission.
pub(crate) fn reload_state(&self) -> Result<()> {
if !self.storage.can_store() {
self.builder
.timeouts
.reload_readonly_from_storage(&self.storage);
}
self.guardmgr.reload_persistent_state()?;
Ok(())
}
/// Reconfigure this builder using the latest set of network parameters.

View File

@ -153,9 +153,6 @@ impl<'a> DirInfo<'a> {
pub struct CircMgr<R: Runtime> {
/// The underlying circuit manager object that implements our behavior.
mgr: Arc<mgr::AbstractCircMgr<build::CircuitBuilder<R>, R>>,
/// A handle to the state manager for recording timeout history.
storage: TimeoutStateHandle,
}
impl<R: Runtime> CircMgr<R> {
@ -177,21 +174,18 @@ impl<R: Runtime> CircMgr<R> {
let guardmgr = tor_guardmgr::GuardMgr::new(runtime.clone(), storage.clone())?;
let storage = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
let builder = build::CircuitBuilder::new(
runtime.clone(),
chanmgr,
path_config,
Arc::clone(&storage),
storage_handle,
guardmgr,
);
let mgr =
mgr::AbstractCircMgr::new(builder, runtime.clone(), request_timing, circuit_timing);
let circmgr = Arc::new(CircMgr {
mgr: Arc::new(mgr),
storage,
});
let circmgr = Arc::new(CircMgr { mgr: Arc::new(mgr) });
runtime.spawn(continually_expire_circuits(
runtime.clone(),
@ -206,7 +200,7 @@ impl<R: Runtime> CircMgr<R> {
/// We only call this method if we _don't_ have the lock on the state
/// files. If we have the lock, we only want to save.
pub fn reload_persistent_state(&self) -> Result<()> {
warn!("reload_persistent_state isn't implemented.");
self.mgr.peek_builder().reload_state()?;
Ok(())
}
@ -214,7 +208,7 @@ impl<R: Runtime> CircMgr<R> {
///
/// Requires that we hold the lock on the state files.
pub fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
warn!("upgrade_to_owned_persistent_state isn't implemented.");
self.mgr.peek_builder().upgrade_to_owned_state()?;
Ok(())
}
@ -222,10 +216,6 @@ impl<R: Runtime> CircMgr<R> {
/// we have the lock.
pub fn store_persistent_state(&self) -> Result<()> {
self.mgr.peek_builder().save_state()?;
self.mgr
.peek_builder()
.guardmgr()
.update_persistent_state()?;
Ok(())
}

View File

@ -1,9 +1,15 @@
//! Declarations for a [`TimeoutEstimator`] type that can change implementation.
use crate::timeouts::{Action, TimeoutEstimator};
use crate::timeouts::{
pareto::{ParetoTimeoutEstimator, ParetoTimeoutState},
readonly::ReadonlyTimeoutEstimator,
Action, TimeoutEstimator,
};
use crate::TimeoutStateHandle;
use std::sync::Mutex;
use std::time::Duration;
use tor_netdir::params::NetParameters;
use tracing::{debug, warn};
/// A timeout estimator that can change its inner implementation and share its
/// implementation among multiple threads.
@ -20,6 +26,37 @@ impl Estimator {
}
}
/// Create this estimator based on the values stored in `storage`, and whether
/// this storage is read-only.
pub(crate) fn from_storage(storage: &TimeoutStateHandle) -> Self {
let (_, est) = estimator_from_storage(storage);
Self {
inner: Mutex::new(est),
}
}
/// Assuming that we can read and write to `storage`, replace our state with
/// a new state that estimates timeouts.
pub(crate) fn upgrade_to_owning_storage(&self, storage: &TimeoutStateHandle) {
let (readonly, est) = estimator_from_storage(storage);
if readonly {
warn!("Unable to upgrade to owned persistent storage.");
return;
}
*self.inner.lock().expect("Timeout estimator lock poisoned") = est;
}
/// Replace the contents of this estimator with a read-only state estimator
/// based on the contents of `storage`.
pub(crate) fn reload_readonly_from_storage(&self, storage: &TimeoutStateHandle) {
if let Ok(Some(v)) = storage.load() {
let est = ReadonlyTimeoutEstimator::from_state(&v);
*self.inner.lock().expect("Timeout estimator lock poisoned") = Box::new(est);
} else {
debug!("Unable to reload timeout state.")
}
}
/// Record that a given circuit hop has completed.
///
/// The `hop` number is a zero-indexed value for which hop just completed.
@ -76,7 +113,7 @@ impl Estimator {
}
/// Store any state associated with this timeout esimator into `storage`.
pub(crate) fn save_state(&self, storage: &crate::TimeoutStateHandle) -> crate::Result<()> {
pub(crate) fn save_state(&self, storage: &TimeoutStateHandle) -> crate::Result<()> {
let state = {
let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
inner.build_state()
@ -87,3 +124,27 @@ impl Estimator {
Ok(())
}
}
/// Try to construct a new boxed TimeoutEstimator based on the contents of
/// storage, and whether it is read-only.
///
/// Returns true on a read-only state.
fn estimator_from_storage(
storage: &TimeoutStateHandle,
) -> (bool, Box<dyn TimeoutEstimator + Send + 'static>) {
let state = match storage.load() {
Ok(Some(v)) => v,
Ok(None) => ParetoTimeoutState::default(),
Err(e) => {
warn!("Unable to load timeout state: {}", e);
return (true, Box::new(ReadonlyTimeoutEstimator::new()));
}
};
if storage.can_store() {
// We own the lock, so we're going to use a full estimator.
(false, Box::new(ParetoTimeoutEstimator::from_state(state)))
} else {
(true, Box::new(ReadonlyTimeoutEstimator::from_state(&state)))
}
}

View File

@ -276,13 +276,30 @@ impl<R: Runtime> GuardMgr<R> {
/// Flush our current guard state to the state manager, if there
/// is any unsaved state.
pub fn update_persistent_state(&self) -> Result<(), GuardMgrError> {
pub fn store_persistent_state(&self) -> Result<(), GuardMgrError> {
let inner = self.inner.lock().expect("Poisoned lock");
trace!("Flushing guard state to disk.");
inner.default_storage.store(&inner.active_guards)?;
Ok(())
}
/// Reload state from the state manager.
///
/// We only call this method if we _don't_ have the lock on the state
/// files. If we have the lock, we only want to save.
pub fn reload_persistent_state(&self) -> Result<(), GuardMgrError> {
warn!("Not yet implemented");
Ok(())
}
/// Switch from having an unowned persistent state to having an owned one.
///
/// Requires that we hold the lock on the state files.
pub fn upgrade_to_owned_persistent_state(&self) -> Result<(), GuardMgrError> {
warn!("Not yet implemented");
Ok(())
}
/// Update the state of this [`GuardMgr`] based on a new or modified
/// [`NetDir`] object.
///
@ -937,7 +954,7 @@ mod test {
// Save the state...
guardmgr.flush_msg_queue().await;
guardmgr.update_persistent_state().unwrap();
guardmgr.store_persistent_state().unwrap();
drop(guardmgr);
// Try reloading from the state...