Merge branch 'share_state'

This commit is contained in:
Nick Mathewson 2021-10-21 13:34:38 -04:00
commit f3dc66d964
20 changed files with 728 additions and 340 deletions

View File

@ -1,7 +1,7 @@
//! Facilities to build circuits directly, instead of via a circuit manager.
use crate::path::{OwnedPath, TorPath};
use crate::timeouts::{pareto::ParetoTimeoutEstimator, Action, TimeoutEstimator};
use crate::timeouts::{self, Action};
use crate::{Error, Result};
use async_trait::async_trait;
use futures::channel::oneshot;
@ -19,7 +19,6 @@ use tor_guardmgr::GuardStatus;
use tor_linkspec::{ChanTarget, OwnedChanTarget, OwnedCircTarget};
use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc};
use tor_rtcompat::{Runtime, SleepProviderExt};
use tracing::warn;
mod guardstatus;
@ -130,30 +129,21 @@ impl Buildable for Arc<ClientCirc> {
///
/// In general, you should not need to construct or use this object yourself,
/// unless you are choosing your own paths.
struct Builder<
R: Runtime,
C: Buildable + Sync + Send + 'static,
T: TimeoutEstimator + Send + Sync + 'static,
> {
struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
/// The runtime used by this circuit builder.
runtime: R,
/// A channel manager that this circuit builder uses to make channels.
chanmgr: Arc<ChanMgr<R>>,
/// An estimator to determine the correct timeouts for circuit building.
timeouts: T,
timeouts: timeouts::Estimator,
/// We don't actually hold any clientcircs, so we need to put this
/// type here so the compiler won't freak out.
_phantom: std::marker::PhantomData<C>,
}
impl<
R: Runtime,
C: Buildable + Sync + Send + 'static,
T: TimeoutEstimator + Send + Sync + 'static,
> Builder<R, C, T>
{
impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
/// Construct a new [`Builder`].
fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: T) -> Self {
fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
Builder {
runtime,
chanmgr,
@ -271,7 +261,7 @@ impl<
/// unless you are choosing your own paths.
pub struct CircuitBuilder<R: Runtime> {
/// The underlying [`Builder`] object
builder: Arc<Builder<R, Arc<ClientCirc>, ParetoTimeoutEstimator>>,
builder: Arc<Builder<R, Arc<ClientCirc>>>,
/// Configuration for how to choose paths for circuits.
path_config: crate::PathConfig,
/// State-manager object to use in storing current state.
@ -292,14 +282,7 @@ impl<R: Runtime> CircuitBuilder<R> {
storage: crate::TimeoutStateHandle,
guardmgr: tor_guardmgr::GuardMgr<R>,
) -> Self {
let timeouts = match storage.load() {
Ok(Some(v)) => ParetoTimeoutEstimator::from_state(v),
Ok(None) => ParetoTimeoutEstimator::default(),
Err(e) => {
warn!("Unable to load timeout state: {}", e);
ParetoTimeoutEstimator::default()
}
};
let timeouts = timeouts::Estimator::from_storage(&storage);
CircuitBuilder {
builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
@ -310,12 +293,31 @@ impl<R: Runtime> CircuitBuilder<R> {
}
/// Flush state to the state manager.
pub fn save_state(&self) -> Result<()> {
pub(crate) fn save_state(&self) -> Result<()> {
// TODO: someday we'll want to only do this if there is something
// changed.
let _ignore = self.storage.try_lock()?; // XXXX don't ignore.
let state = self.builder.timeouts.build_state();
self.storage.store(&state)?;
self.builder.timeouts.save_state(&self.storage)?;
self.guardmgr.store_persistent_state()?;
Ok(())
}
/// Replace our state with a new owning state, assuming we have
/// storage permission.
pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
self.builder
.timeouts
.upgrade_to_owning_storage(&self.storage);
self.guardmgr.upgrade_to_owned_persistent_state()?;
Ok(())
}
/// Reload persistent state from disk, if we don't have storage permission.
pub(crate) fn reload_state(&self) -> Result<()> {
if !self.storage.can_store() {
self.builder
.timeouts
.reload_readonly_from_storage(&self.storage);
}
self.guardmgr.reload_persistent_state()?;
Ok(())
}
@ -323,7 +325,7 @@ impl<R: Runtime> CircuitBuilder<R> {
///
/// (NOTE: for now, this only affects circuit timeout estimation.)
pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
self.builder.timeouts.update_params(p.into());
self.builder.timeouts.update_params(p);
}
/// DOCDOC
@ -422,6 +424,7 @@ where
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::timeouts::TimeoutEstimator;
use futures::channel::oneshot;
use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
use std::sync::Mutex;
@ -603,24 +606,28 @@ mod test {
}
}
impl TimeoutEstimator for Arc<Mutex<TimeoutRecorder>> {
fn note_hop_completed(&self, hop: u8, delay: Duration, is_last: bool) {
fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
if !is_last {
return;
}
let mut h = self.lock().unwrap();
h.hist.push((true, hop, delay));
let mut this = self.lock().unwrap();
this.hist.push((true, hop, delay));
}
fn note_circ_timeout(&self, hop: u8, delay: Duration) {
let mut h = self.lock().unwrap();
h.hist.push((false, hop, delay));
fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
let mut this = self.lock().unwrap();
this.hist.push((false, hop, delay));
}
fn timeouts(&self, _action: &Action) -> (Duration, Duration) {
fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
(Duration::from_secs(3), Duration::from_secs(100))
}
fn learning_timeouts(&self) -> bool {
false
}
fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
None
}
}
/// Testing only: create a bogus circuit target
@ -650,8 +657,11 @@ mod test {
]);
let chanmgr = Arc::new(ChanMgr::new(rt.clone()));
let timeouts = Arc::new(Mutex::new(TimeoutRecorder::new()));
let builder: Builder<_, Mutex<FakeCirc>, _> =
Builder::new(rt.clone(), chanmgr, Arc::clone(&timeouts));
let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
rt.clone(),
chanmgr,
timeouts::Estimator::new(Arc::clone(&timeouts)),
);
let builder = Arc::new(builder);
let rng =
StdRng::from_rng(rand::thread_rng()).expect("couldn't construct temporary rng");
@ -678,15 +688,14 @@ mod test {
);
{
let mut h = timeouts.lock().unwrap();
assert_eq!(h.hist.len(), 2);
assert!(h.hist[0].0); // completed
assert_eq!(h.hist[0].1, 0); // last hop completed
// TODO: test time elapsed, once wait_for is more reliable.
assert!(h.hist[1].0); // completed
assert_eq!(h.hist[1].1, 2); // last hop completed
// TODO: test time elapsed, once wait_for is more reliable.
h.hist.clear();
let timeouts = timeouts.lock().unwrap();
assert_eq!(timeouts.hist.len(), 2);
assert!(timeouts.hist[0].0); // completed
assert_eq!(timeouts.hist[0].1, 0); // last hop completed
// TODO: test time elapsed, once wait_for is more reliable.
assert!(timeouts.hist[1].0); // completed
assert_eq!(timeouts.hist[1].1, 2); // last hop completed
// TODO: test time elapsed, once wait_for is more reliable.
}
// Try a very long timeout.
@ -701,11 +710,10 @@ mod test {
assert!(outcome.is_err());
{
let mut h = timeouts.lock().unwrap();
assert_eq!(h.hist.len(), 1);
assert!(!h.hist[0].0);
assert_eq!(h.hist[0].1, 2);
h.hist.clear();
let timeouts = timeouts.lock().unwrap();
assert_eq!(timeouts.hist.len(), 3);
assert!(!timeouts.hist[2].0);
assert_eq!(timeouts.hist[2].1, 2);
}
// Now try a recordable timeout.
@ -722,26 +730,27 @@ mod test {
rt.advance(Duration::from_millis(100)).await;
}
{
let h = timeouts.lock().unwrap();
dbg!(&h.hist);
let timeouts = timeouts.lock().unwrap();
dbg!(&timeouts.hist);
assert!(timeouts.hist.len() >= 4);
// First we notice a circuit timeout after 2 hops
assert!(!h.hist[0].0);
assert_eq!(h.hist[0].1, 2);
assert!(!timeouts.hist[3].0);
assert_eq!(timeouts.hist[3].1, 2);
// TODO: check timeout more closely.
assert!(h.hist[0].2 < Duration::from_secs(100));
assert!(h.hist[0].2 >= Duration::from_secs(3));
assert!(timeouts.hist[3].2 < Duration::from_secs(100));
assert!(timeouts.hist[3].2 >= Duration::from_secs(3));
// This test is not reliable under test coverage; see arti#149.
#[cfg(not(tarpaulin))]
{
assert_eq!(h.hist.len(), 2);
assert_eq!(timeouts.hist.len(), 5);
// Then we notice a circuit completing at its third hop.
assert!(h.hist[1].0);
assert_eq!(h.hist[1].1, 2);
assert!(timeouts.hist[4].0);
assert_eq!(timeouts.hist[4].1, 2);
// TODO: check timeout more closely.
assert!(h.hist[1].2 < Duration::from_secs(100));
assert!(h.hist[1].2 >= Duration::from_secs(5));
assert!(h.hist[0].2 < h.hist[1].2);
assert!(timeouts.hist[4].2 < Duration::from_secs(100));
assert!(timeouts.hist[4].2 >= Duration::from_secs(5));
assert!(timeouts.hist[3].2 < timeouts.hist[4].2);
}
}
HOP3_DELAY.store(300, SeqCst); // undo previous run.

View File

@ -60,7 +60,7 @@ pub enum Error {
/// Problem creating or updating a guard manager.
#[error("Problem creating or updating guards list: {0}")]
GuardMgr(#[from] tor_guardmgr::GuardMgrError),
GuardMgr(#[source] tor_guardmgr::GuardMgrError),
/// Problem selecting a guard relay.
#[error("Unable to select a guard relay: {0}")]
@ -84,3 +84,12 @@ impl From<tor_rtcompat::TimeoutError> for Error {
Error::CircTimeout
}
}
impl From<tor_guardmgr::GuardMgrError> for Error {
fn from(err: tor_guardmgr::GuardMgrError) -> Error {
match err {
tor_guardmgr::GuardMgrError::State(e) => Error::State(e),
_ => Error::GuardMgr(err),
}
}
}

View File

@ -174,10 +174,15 @@ impl<R: Runtime> CircMgr<R> {
let guardmgr = tor_guardmgr::GuardMgr::new(runtime.clone(), storage.clone())?;
let storage = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
let builder =
build::CircuitBuilder::new(runtime.clone(), chanmgr, path_config, storage, guardmgr);
let builder = build::CircuitBuilder::new(
runtime.clone(),
chanmgr,
path_config,
storage_handle,
guardmgr,
);
let mgr =
mgr::AbstractCircMgr::new(builder, runtime.clone(), request_timing, circuit_timing);
let circmgr = Arc::new(CircMgr { mgr: Arc::new(mgr) });
@ -190,13 +195,27 @@ impl<R: Runtime> CircMgr<R> {
Ok(circmgr)
}
/// Flush state to the state manager, if there is any unsaved state.
pub fn update_persistent_state(&self) -> Result<()> {
/// Reload state from the state manager.
///
/// We only call this method if we _don't_ have the lock on the state
/// files. If we have the lock, we only want to save.
pub fn reload_persistent_state(&self) -> Result<()> {
self.mgr.peek_builder().reload_state()?;
Ok(())
}
/// Switch from having an unowned persistent state to having an owned one.
///
/// Requires that we hold the lock on the state files.
pub fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
self.mgr.peek_builder().upgrade_to_owned_state()?;
Ok(())
}
/// Flush state to the state manager, if there is any unsaved state and
/// we have the lock.
pub fn store_persistent_state(&self) -> Result<()> {
self.mgr.peek_builder().save_state()?;
self.mgr
.peek_builder()
.guardmgr()
.update_persistent_state()?;
Ok(())
}

View File

@ -10,7 +10,11 @@
use std::time::Duration;
pub(crate) mod estimator;
pub(crate) mod pareto;
pub(crate) mod readonly;
pub(crate) use estimator::Estimator;
/// An object that calculates circuit timeout thresholds from the history
/// of circuit build times.
@ -23,7 +27,7 @@ pub(crate) trait TimeoutEstimator {
/// circuit.
///
/// If this is the last hop of the circuit, then `is_last` is true.
fn note_hop_completed(&self, hop: u8, delay: Duration, is_last: bool);
fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool);
/// Record that a circuit failed to complete because it took too long.
///
@ -32,7 +36,7 @@ pub(crate) trait TimeoutEstimator {
///
/// The `delay` number is the amount of time after we first launched the
/// circuit.
fn note_circ_timeout(&self, hop: u8, delay: Duration);
fn note_circ_timeout(&mut self, hop: u8, delay: Duration);
/// Return the current estimation for how long we should wait for a given
/// [`Action`] to complete.
@ -43,11 +47,21 @@ pub(crate) trait TimeoutEstimator {
/// building it in order see how long it takes. After `abandon`
/// has elapsed since circuit launch, the circuit should be
/// abandoned completely.
fn timeouts(&self, action: &Action) -> (Duration, Duration);
fn timeouts(&mut self, action: &Action) -> (Duration, Duration);
/// Return true if we're currently trying to learn more timeouts
/// by launching testing circuits.
fn learning_timeouts(&self) -> bool;
/// Replace the network parameters used by this estimator (if any)
/// with ones derived from `params`.
fn update_params(&mut self, params: &tor_netdir::params::NetParameters);
/// Construct a new ParetoTimeoutState to represent the current state
/// of this estimator, if it is possible to store the state to disk.
///
/// TODO: change the type used for the state.
fn build_state(&mut self) -> Option<pareto::ParetoTimeoutState>;
}
/// A possible action for which we can try to estimate a timeout.

View File

@ -0,0 +1,150 @@
//! Declarations for a [`TimeoutEstimator`] type that can change implementation.
use crate::timeouts::{
pareto::{ParetoTimeoutEstimator, ParetoTimeoutState},
readonly::ReadonlyTimeoutEstimator,
Action, TimeoutEstimator,
};
use crate::TimeoutStateHandle;
use std::sync::Mutex;
use std::time::Duration;
use tor_netdir::params::NetParameters;
use tracing::{debug, warn};
/// A timeout estimator that can change its inner implementation and share its
/// implementation among multiple threads.
pub(crate) struct Estimator {
/// The estimator we're currently using.
inner: Mutex<Box<dyn TimeoutEstimator + Send + 'static>>,
}
impl Estimator {
/// Construct a new estimator from some variant.
pub(crate) fn new(est: impl TimeoutEstimator + Send + 'static) -> Self {
Self {
inner: Mutex::new(Box::new(est)),
}
}
/// Create this estimator based on the values stored in `storage`, and whether
/// this storage is read-only.
pub(crate) fn from_storage(storage: &TimeoutStateHandle) -> Self {
let (_, est) = estimator_from_storage(storage);
Self {
inner: Mutex::new(est),
}
}
/// Assuming that we can read and write to `storage`, replace our state with
/// a new state that estimates timeouts.
pub(crate) fn upgrade_to_owning_storage(&self, storage: &TimeoutStateHandle) {
let (readonly, est) = estimator_from_storage(storage);
if readonly {
warn!("Unable to upgrade to owned persistent storage.");
return;
}
*self.inner.lock().expect("Timeout estimator lock poisoned") = est;
}
/// Replace the contents of this estimator with a read-only state estimator
/// based on the contents of `storage`.
pub(crate) fn reload_readonly_from_storage(&self, storage: &TimeoutStateHandle) {
if let Ok(Some(v)) = storage.load() {
let est = ReadonlyTimeoutEstimator::from_state(&v);
*self.inner.lock().expect("Timeout estimator lock poisoned") = Box::new(est);
} else {
debug!("Unable to reload timeout state.")
}
}
/// Record that a given circuit hop has completed.
///
/// The `hop` number is a zero-indexed value for which hop just completed.
///
/// The `delay` value is the amount of time after we first launched the
/// circuit.
///
/// If this is the last hop of the circuit, then `is_last` is true.
pub(crate) fn note_hop_completed(&self, hop: u8, delay: Duration, is_last: bool) {
let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
inner.note_hop_completed(hop, delay, is_last);
}
/// Record that a circuit failed to complete because it took too long.
///
/// The `hop` number is a the number of hops that were successfully
/// completed.
///
/// The `delay` number is the amount of time after we first launched the
/// circuit.
pub(crate) fn note_circ_timeout(&self, hop: u8, delay: Duration) {
let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
inner.note_circ_timeout(hop, delay);
}
/// Return the current estimation for how long we should wait for a given
/// [`Action`] to complete.
///
/// This function should return a 2-tuple of `(timeout, abandon)`
/// durations. After `timeout` has elapsed since circuit launch,
/// the circuit should no longer be used, but we should still keep
/// building it in order see how long it takes. After `abandon`
/// has elapsed since circuit launch, the circuit should be
/// abandoned completely.
pub(crate) fn timeouts(&self, action: &Action) -> (Duration, Duration) {
let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
inner.timeouts(action)
}
/// Return true if we're currently trying to learn more timeouts
/// by launching testing circuits.
pub(crate) fn learning_timeouts(&self) -> bool {
let inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
inner.learning_timeouts()
}
/// Replace the network parameters used by this estimator (if any)
/// with ones derived from `params`.
pub(crate) fn update_params(&self, params: &NetParameters) {
let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
inner.update_params(params);
}
/// Store any state associated with this timeout esimator into `storage`.
pub(crate) fn save_state(&self, storage: &TimeoutStateHandle) -> crate::Result<()> {
let state = {
let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
inner.build_state()
};
if let Some(state) = state {
storage.store(&state)?;
}
Ok(())
}
}
/// Try to construct a new boxed TimeoutEstimator based on the contents of
/// storage, and whether it is read-only.
///
/// Returns true on a read-only state.
fn estimator_from_storage(
storage: &TimeoutStateHandle,
) -> (bool, Box<dyn TimeoutEstimator + Send + 'static>) {
let state = match storage.load() {
Ok(Some(v)) => v,
Ok(None) => ParetoTimeoutState::default(),
Err(e) => {
warn!("Unable to load timeout state: {}", e);
return (true, Box::new(ReadonlyTimeoutEstimator::new()));
}
};
if storage.can_store() {
// We own the lock, so we're going to use a full estimator.
(false, Box::new(ParetoTimeoutEstimator::from_state(state)))
} else {
(true, Box::new(ReadonlyTimeoutEstimator::from_state(&state)))
}
}

View File

@ -16,8 +16,8 @@ use bounded_vec_deque::BoundedVecDeque;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::sync::Mutex;
use std::time::Duration;
use tor_netdir::params::NetParameters;
use super::Action;
@ -402,8 +402,8 @@ impl Default for Params {
}
}
impl From<&tor_netdir::params::NetParameters> for Params {
fn from(p: &tor_netdir::params::NetParameters) -> Params {
impl From<&NetParameters> for Params {
fn from(p: &NetParameters) -> Params {
// Because of the underlying bounds, the "unwrap_or_else"
// conversions here should be impossible, and the "as"
// conversions should always be in-range.
@ -431,10 +431,15 @@ impl From<&tor_netdir::params::NetParameters> for Params {
}
}
/// Implementation type for [`ParetoTimeoutEstimator`]
/// Tor's default circuit build timeout estimator.
///
/// (This type hides behind a mutex to allow concurrent modification.)
struct ParetoEstimatorInner {
/// This object records a set of observed circuit build times, and
/// uses it to determine good values for how long we should allow
/// circuits to build.
///
/// For full details of the algorithms used, see
/// [`path-spec.txt`](https://gitlab.torproject.org/tpo/core/torspec/-/blob/master/path-spec.txt).
pub(crate) struct ParetoTimeoutEstimator {
/// Our observations for circuit build times and success/failure
/// history.
history: History,
@ -457,19 +462,6 @@ struct ParetoEstimatorInner {
p: Params,
}
/// Tor's default circuit build timeout estimator.
///
/// This object records a set of observed circuit build times, and
/// uses it to determine good values for how long we should allow
/// circuits to build.
///
/// For full details of the algorithms used, see
/// [`path-spec.txt`](https://gitlab.torproject.org/tpo/core/torspec/-/blob/master/path-spec.txt).
pub(crate) struct ParetoTimeoutEstimator {
/// The actual data inside this estimator.
est: Mutex<ParetoEstimatorInner>,
}
impl Default for ParetoTimeoutEstimator {
fn default() -> Self {
Self::from_history(History::new_empty())
@ -490,19 +482,24 @@ pub(crate) struct ParetoTimeoutState {
// XXXX Do we need a HashMap to represent additional fields? I think we may.
}
impl ParetoTimeoutState {
/// Return the latest base timeout estimate, as recorded in this state.
pub(crate) fn latest_estimate(&self) -> Option<Duration> {
self.current_timeout
.map(|m| Duration::from_millis(m.0.into()))
}
}
impl ParetoTimeoutEstimator {
/// Construct a new ParetoTimeoutEstimator from the provided history
/// object.
fn from_history(history: History) -> Self {
let p = Params::default();
let inner = ParetoEstimatorInner {
ParetoTimeoutEstimator {
history,
timeouts: None,
fallback_timeouts: p.default_thresholds,
p,
};
ParetoTimeoutEstimator {
est: Mutex::new(inner),
}
}
@ -513,114 +510,6 @@ impl ParetoTimeoutEstimator {
Self::from_history(history)
}
/// Construct a new ParetoTimeoutState to represent the current state
/// of this estimator.
pub(crate) fn build_state(&self) -> ParetoTimeoutState {
let mut this = self
.est
.lock()
.expect("Poisoned lock for ParetoTimeoutEstimator");
let cur_timeout = MsecDuration::new_saturating(&this.base_timeouts().0);
ParetoTimeoutState {
version: 1,
histogram: this.history.sparse_histogram().collect(),
current_timeout: Some(cur_timeout),
}
}
/// Change the parameters used for this estimator.
pub(crate) fn update_params(&self, parameters: Params) {
let mut this = self
.est
.lock()
.expect("Poisoned lock for ParetoTimeoutEstimator");
this.p = parameters;
let new_success_len = this.p.success_history_len;
this.history.set_success_history_len(new_success_len);
}
}
impl super::TimeoutEstimator for ParetoTimeoutEstimator {
fn note_hop_completed(&self, hop: u8, delay: Duration, is_last: bool) {
let mut this = self
.est
.lock()
.expect("Poisoned lock for ParetoTimeoutEstimator");
if hop == this.p.significant_hop {
let time = MsecDuration::new_saturating(&delay);
this.history.add_time(time);
this.timeouts.take();
}
if is_last {
this.history.add_success(true);
}
}
fn note_circ_timeout(&self, hop: u8, _delay: Duration) {
// XXXXX This only counts if we have recent-enough
// activity. See circuit_build_times_network_check_live.
if hop > 0 {
let mut this = self
.est
.lock()
.expect("Poisoned lock for ParetoTimeoutEstimator");
this.history.add_success(false);
if this.history.n_recent_timeouts() > this.p.reset_after_timeouts {
let base_timeouts = this.base_timeouts();
this.history.clear();
this.timeouts.take();
// If we already had a timeout that was at least the
// length of our fallback timeouts, we should double
// those fallback timeouts.
if base_timeouts.0 >= this.fallback_timeouts.0 {
this.fallback_timeouts.0 *= 2;
this.fallback_timeouts.1 *= 2;
}
}
}
}
fn timeouts(&self, action: &Action) -> (Duration, Duration) {
let mut this = self
.est
.lock()
.expect("Poisoned lock for ParetoTimeoutEstimator");
let (base_t, base_a) = if this.p.use_estimates {
this.base_timeouts()
} else {
// If we aren't using this estimator, then just return the
// default thresholds from our parameters.
return this.p.default_thresholds;
};
let reference_action = Action::BuildCircuit {
length: this.p.significant_hop as usize + 1,
};
debug_assert!(reference_action.timeout_scale() > 0);
let multiplier =
(action.timeout_scale() as f64) / (reference_action.timeout_scale() as f64);
// TODO-SPEC The spec define any of this. Tor doesn't multiply the
// abandon timeout.
// XXXX `mul_f64()` can panic if we overflow Duration.
(base_t.mul_f64(multiplier), base_a.mul_f64(multiplier))
}
fn learning_timeouts(&self) -> bool {
let this = self
.est
.lock()
.expect("Poisoned lock for ParetoTimeoutEstimator");
this.p.use_estimates && this.history.n_times() < this.p.min_observations.into()
}
}
impl ParetoEstimatorInner {
/// Compute an unscaled basic pair of timeouts for a circuit of
/// the "normal" length.
///
@ -660,6 +549,82 @@ impl ParetoEstimatorInner {
}
}
impl super::TimeoutEstimator for ParetoTimeoutEstimator {
fn update_params(&mut self, p: &NetParameters) {
let parameters = p.into();
self.p = parameters;
let new_success_len = self.p.success_history_len;
self.history.set_success_history_len(new_success_len);
}
fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
if hop == self.p.significant_hop {
let time = MsecDuration::new_saturating(&delay);
self.history.add_time(time);
self.timeouts.take();
}
if is_last {
self.history.add_success(true);
}
}
fn note_circ_timeout(&mut self, hop: u8, _delay: Duration) {
// XXXXX This only counts if we have recent-enough
// activity. See circuit_build_times_network_check_live.
if hop > 0 {
self.history.add_success(false);
if self.history.n_recent_timeouts() > self.p.reset_after_timeouts {
let base_timeouts = self.base_timeouts();
self.history.clear();
self.timeouts.take();
// If we already had a timeout that was at least the
// length of our fallback timeouts, we should double
// those fallback timeouts.
if base_timeouts.0 >= self.fallback_timeouts.0 {
self.fallback_timeouts.0 *= 2;
self.fallback_timeouts.1 *= 2;
}
}
}
}
fn timeouts(&mut self, action: &Action) -> (Duration, Duration) {
let (base_t, base_a) = if self.p.use_estimates {
self.base_timeouts()
} else {
// If we aren't using this estimator, then just return the
// default thresholds from our parameters.
return self.p.default_thresholds;
};
let reference_action = Action::BuildCircuit {
length: self.p.significant_hop as usize + 1,
};
debug_assert!(reference_action.timeout_scale() > 0);
let multiplier =
(action.timeout_scale() as f64) / (reference_action.timeout_scale() as f64);
// TODO-SPEC The spec define any of self. Tor doesn't multiply the
// abandon timeout.
// XXXX `mul_f64()` can panic if we overflow Duration.
(base_t.mul_f64(multiplier), base_a.mul_f64(multiplier))
}
fn learning_timeouts(&self) -> bool {
self.p.use_estimates && self.history.n_times() < self.p.min_observations.into()
}
fn build_state(&mut self) -> Option<ParetoTimeoutState> {
let cur_timeout = MsecDuration::new_saturating(&self.base_timeouts().0);
Some(ParetoTimeoutState {
version: 1,
histogram: self.history.sparse_histogram().collect(),
current_timeout: Some(cur_timeout),
})
}
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
@ -847,19 +812,16 @@ mod test {
#[test]
fn pareto_estimate_timeout() {
let est = ParetoTimeoutEstimator::default();
let mut est = ParetoTimeoutEstimator::default();
assert_eq!(
est.timeouts(&b3()),
(Duration::from_secs(60), Duration::from_secs(60))
);
{
// Set the parameters up to mimic the situation in
// `pareto_estimate` above.
let mut inner = est.est.lock().unwrap();
inner.p.min_observations = 0;
inner.p.n_modes_for_xm = 2;
}
// Set the parameters up to mimic the situation in
// `pareto_estimate` above.
est.p.min_observations = 0;
est.p.n_modes_for_xm = 2;
assert_eq!(
est.timeouts(&b3()),
(Duration::from_secs(60), Duration::from_secs(60))
@ -884,16 +846,12 @@ mod test {
#[test]
fn pareto_estimate_clear() {
let est = ParetoTimeoutEstimator::default();
let mut est = ParetoTimeoutEstimator::default();
// Set the parameters up to mimic the situation in
// `pareto_estimate` above.
let params = Params {
min_observations: 1,
n_modes_for_xm: 2,
..Params::default()
};
est.update_params(params);
let params = NetParameters::from_map(&"cbtmincircs=1 cbtnummodes=2".parse().unwrap());
est.update_params(&params);
assert_eq!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
assert!(est.learning_timeouts());
@ -904,10 +862,7 @@ mod test {
}
assert_ne!(est.timeouts(&b3()).0.as_micros(), 60_000_000);
assert!(!est.learning_timeouts());
{
let inner = est.est.lock().unwrap();
assert_eq!(inner.history.n_recent_timeouts(), 0);
}
assert_eq!(est.history.n_recent_timeouts(), 0);
// 17 timeouts happen and we're still getting real numbers...
for _ in 0..18 {
@ -941,18 +896,18 @@ mod test {
// that the histogram conversion happens.
use rand::Rng;
let est = ParetoTimeoutEstimator::default();
let mut est = ParetoTimeoutEstimator::default();
let mut rng = rand::thread_rng();
for _ in 0..1000 {
let d = Duration::from_millis(rng.gen_range(10..3_000));
est.note_hop_completed(2, d, true);
}
let state = est.build_state();
let state = est.build_state().unwrap();
assert_eq!(state.version, 1);
assert!(state.current_timeout.is_some());
let est2 = ParetoTimeoutEstimator::from_state(state);
let mut est2 = ParetoTimeoutEstimator::from_state(state);
let act = Action::BuildCircuit { length: 3 };
// This isn't going to be exact, since we're recording histogram bins
// instead of exact timeouts.

View File

@ -0,0 +1,84 @@
//! Implement a timeout estimator that just uses another process's estimates.
use crate::timeouts::{pareto::ParetoTimeoutState, Action, TimeoutEstimator};
use std::convert::TryInto;
use std::time::Duration;
/// A timeout estimator based on reading timeouts that another timeout estimator
/// is computing, in another process.
pub(crate) struct ReadonlyTimeoutEstimator {
/// Are we using the timeouts?
using_estimates: bool,
/// Latest estimate from the persistent state.
latest_timeout: Option<Duration>,
/// Timeout to use if we don't have a computed timeout.
default_timeout: Duration,
}
impl ReadonlyTimeoutEstimator {
/// Create a new ReadonlyTimeoutEstimator with default settings.
pub(crate) fn new() -> Self {
ReadonlyTimeoutEstimator {
using_estimates: true,
latest_timeout: None,
default_timeout: Duration::from_secs(60),
}
}
/// Create a new ReadonlyTimeoutEstimator, based on persistent state
pub(crate) fn from_state(s: &ParetoTimeoutState) -> Self {
let mut est = Self::new();
est.update_from_state(s);
est
}
/// Update this estimator based on a newly read state.
pub(crate) fn update_from_state(&mut self, s: &ParetoTimeoutState) {
self.latest_timeout = s.latest_estimate();
}
}
impl TimeoutEstimator for ReadonlyTimeoutEstimator {
fn note_hop_completed(&mut self, _hop: u8, _delay: Duration, _is_last: bool) {
// We don't record any timeouts with this estimator.
}
fn note_circ_timeout(&mut self, _hop: u8, _delay: Duration) {
// as above
}
fn timeouts(&mut self, action: &Action) -> (Duration, Duration) {
let base = match (self.using_estimates, self.latest_timeout) {
(true, Some(d)) => d,
(_, _) => self.default_timeout,
};
let reference_action = Action::BuildCircuit { length: 3 };
debug_assert!(reference_action.timeout_scale() > 0);
let multiplier =
(action.timeout_scale() as f64) / (reference_action.timeout_scale() as f64);
// XXXX `mul_f64()` can panic if we overflow Duration.
let timeout = base.mul_f64(multiplier);
// We use the same timeout twice here, since we don't have separate
// abandon and timeout thresholds here.
(timeout, timeout)
}
fn learning_timeouts(&self) -> bool {
false
}
fn update_params(&mut self, params: &tor_netdir::params::NetParameters) {
self.using_estimates = !bool::from(params.cbt_learning_disabled);
self.default_timeout = params
.cbt_initial_timeout
.try_into()
.unwrap_or_else(|_| Duration::from_secs(60));
}
fn build_state(&mut self) -> Option<ParetoTimeoutState> {
None
}
}

View File

@ -5,9 +5,11 @@
//! connections ("streams") over the Tor network using
//! `TorClient::connect()`.
use crate::address::IntoTorAddr;
use crate::config::{ClientAddrConfig, TorClientConfig};
use tor_circmgr::{IsolationToken, TargetPort};
use tor_dirmgr::DirEvent;
use tor_persist::{FsStateMgr, StateMgr};
use tor_proto::circuit::{ClientCirc, IpVersionPreference};
use tor_proto::stream::DataStream;
use tor_rtcompat::{Runtime, SleepProviderExt};
@ -148,10 +150,17 @@ impl<R: Runtime> TorClient<R> {
addr_cfg,
}: TorClientConfig,
) -> Result<TorClient<R>> {
let statemgr = tor_persist::FsStateMgr::from_path(state_cfg)?;
let statemgr = FsStateMgr::from_path(state_cfg)?;
if statemgr.try_lock()?.held() {
debug!("It appears we have the lock on our state files.");
} else {
info!(
"Another process has the lock on our state files. We'll proceed in read-only mode."
);
}
let chanmgr = Arc::new(tor_chanmgr::ChanMgr::new(runtime.clone()));
let circmgr =
tor_circmgr::CircMgr::new(circ_cfg, statemgr, &runtime, Arc::clone(&chanmgr))?;
tor_circmgr::CircMgr::new(circ_cfg, statemgr.clone(), &runtime, Arc::clone(&chanmgr))?;
let dirmgr = tor_dirmgr::DirMgr::bootstrap_from_config(
dir_cfg,
runtime.clone(),
@ -169,9 +178,10 @@ impl<R: Runtime> TorClient<R> {
Arc::downgrade(&dirmgr),
))?;
runtime.spawn(flush_state_to_disk(
runtime.spawn(update_persistent_state(
runtime.clone(),
Arc::downgrade(&circmgr),
statemgr,
))?;
runtime.spawn(continually_launch_timeout_testing_circuits(
@ -302,12 +312,6 @@ impl<R: Runtime> TorClient<R> {
Ok(circ)
}
/// Try to flush persistent state into storage.
fn update_persistent_state(&self) -> Result<()> {
self.circmgr.update_persistent_state()?;
Ok(())
}
}
/// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
@ -355,19 +359,46 @@ async fn keep_circmgr_params_updated<R: Runtime>(
/// Exit when we notice that `circmgr` has been dropped.
///
/// This is a daemon task: it runs indefinitely in the background.
async fn flush_state_to_disk<R: Runtime>(runtime: R, circmgr: Weak<tor_circmgr::CircMgr<R>>) {
// TODO: Consider moving this into tor-circmgr after we have more
async fn update_persistent_state<R: Runtime>(
runtime: R,
circmgr: Weak<tor_circmgr::CircMgr<R>>,
statemgr: FsStateMgr,
) {
// TODO: Consider moving this function into tor-circmgr after we have more
// experience with the state system.
loop {
if let Some(circmgr) = Weak::upgrade(&circmgr) {
if let Err(e) = circmgr.update_persistent_state() {
error!("Unable to flush circmgr state: {}", e);
break;
use tor_persist::LockStatus::*;
match statemgr.try_lock() {
Err(e) => {
error!("Problem with state lock file: {}", e);
break;
}
Ok(NewlyAcquired) => {
info!("We now own the lock on our state files.");
if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
error!("Unable to upgrade to owneed state files: {}", e);
break;
}
}
Ok(AlreadyHeld) => {
if let Err(e) = circmgr.store_persistent_state() {
error!("Unable to flush circmgr state: {}", e);
break;
}
}
Ok(NoLock) => {
if let Err(e) = circmgr.reload_persistent_state() {
error!("Unable to reload circmgr state: {}", e);
break;
}
}
}
} else {
debug!("Circmgr has disappeared; task exiting.");
break;
return;
}
// XXXX This delay is probably too small.
//
@ -377,6 +408,8 @@ async fn flush_state_to_disk<R: Runtime>(runtime: R, circmgr: Weak<tor_circmgr::
// changes.
runtime.sleep(Duration::from_secs(60)).await;
}
error!("State update task is exiting prematurely.");
}
/// Run indefinitely, launching circuits as needed to get a good
@ -420,9 +453,12 @@ impl<R: Runtime> Drop for TorClient<R> {
// TODO: Consider moving this into tor-circmgr after we have more
// experience with the state system.
fn drop(&mut self) {
info!("Flushing persistent state at exit.");
if let Err(e) = self.update_persistent_state() {
error!("Unable to flush state on client exit: {}", e);
match self.circmgr.store_persistent_state() {
Ok(()) => info!("Flushed persistent state at exit."),
Err(tor_circmgr::Error::State(tor_persist::Error::NoLock)) => {
debug!("Lock not held; no state to flush.")
}
Err(e) => error!("Unable to flush state on client exit: {}", e),
}
}
}

View File

@ -19,8 +19,8 @@ pub enum Error {
#[error("Protocol error while launching a stream: {0}")]
Proto(#[from] tor_proto::Error),
/// A protocol error while launching a stream
#[error("Persist error while launching a stream: {0}")]
/// An error while interfacing with the persistent data layer.
#[error("Error from state manager: {0}")]
Persist(#[from] tor_persist::Error),
/// The directory cache took too long to reply to us.

View File

@ -20,13 +20,13 @@ use tor_rtcompat::{Runtime, SleepProviderExt};
use tracing::{info, trace, warn};
/// Try to read a set of documents from `dirmgr` by ID.
async fn load_all<R: Runtime>(
fn load_all<R: Runtime>(
dirmgr: &DirMgr<R>,
missing: Vec<DocId>,
) -> Result<HashMap<DocId, DocumentText>> {
let mut loaded = HashMap::new();
for query in docid::partition_by_type(missing.into_iter()).values() {
dirmgr.load_documents_into(query, &mut loaded).await?;
dirmgr.load_documents_into(query, &mut loaded)?;
}
Ok(loaded)
}
@ -60,7 +60,7 @@ async fn fetch_multiple<R: Runtime>(
) -> Result<Vec<(ClientRequest, DirResponse)>> {
let mut requests = Vec::new();
for (_type, query) in docid::partition_by_type(missing.into_iter()) {
requests.extend(dirmgr.query_into_requests(query).await?);
requests.extend(dirmgr.query_into_requests(query)?);
}
// TODO: instead of waiting for all the queries to finish, we
@ -98,8 +98,8 @@ async fn load_once<R: Runtime>(
"Found {} missing documents; trying to load them",
missing.len()
);
let documents = load_all(dirmgr, missing).await?;
state.add_from_cache(documents)
let documents = load_all(dirmgr, missing)?;
state.add_from_cache(documents, dirmgr.store_if_rw())
};
dirmgr.notify().await;
outcome
@ -154,11 +154,9 @@ async fn download_attempt<R: Runtime>(
let fetched = fetch_multiple(Arc::clone(dirmgr), missing, parallelism).await?;
for (client_req, dir_response) in fetched {
let text = String::from_utf8(dir_response.into_output())?;
match dirmgr.expand_response_text(&client_req, text).await {
match dirmgr.expand_response_text(&client_req, text) {
Ok(text) => {
let outcome = state
.add_from_download(&text, &client_req, Some(&dirmgr.store))
.await;
let outcome = state.add_from_download(&text, &client_req, Some(&dirmgr.store));
dirmgr.notify().await;
match outcome {
Ok(b) => changed |= b,

View File

@ -70,13 +70,12 @@ use tor_circmgr::CircMgr;
use tor_netdir::NetDir;
use tor_netdoc::doc::netstatus::ConsensusFlavor;
use async_trait::async_trait;
use futures::{channel::oneshot, lock::Mutex, task::SpawnExt};
use futures::{channel::oneshot, task::SpawnExt};
use tor_rtcompat::{Runtime, SleepProviderExt};
use tracing::{info, trace, warn};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, sync::Weak};
use std::{fmt::Debug, time::SystemTime};
@ -114,10 +113,6 @@ pub struct DirMgr<R: Runtime> {
/// Handle to our sqlite cache.
// XXXX I'd like to use an rwlock, but that's not feasible, since
// rusqlite::Connection isn't Sync.
// TODO: Does this have to be a futures::Mutex? I would rather have
// a rule that we never hold the guard for this mutex across an async
// suspend point. But that will be hard to enforce until the
// `must_not_suspend` lint is in stable.
store: Mutex<SqliteStore>,
/// Our latest sufficiently bootstrapped directory, if we have one.
///
@ -261,7 +256,7 @@ impl<R: Runtime> DirMgr<R> {
{
let dirmgr = upgrade_weak_ref(weak)?;
trace!("Trying to take ownership of the directory cache lock");
if dirmgr.try_upgrade_to_readwrite().await? {
if dirmgr.try_upgrade_to_readwrite()? {
// We now own the lock! (Maybe we owned it before; the
// upgrade_to_readwrite() function is idempotent.) We can
// do our own bootstrapping.
@ -397,8 +392,26 @@ impl<R: Runtime> DirMgr<R> {
/// Return true if we got the lock, or if we already had it.
///
/// Return false if another process has the lock
async fn try_upgrade_to_readwrite(&self) -> Result<bool> {
self.store.lock().await.upgrade_to_readwrite()
fn try_upgrade_to_readwrite(&self) -> Result<bool> {
self.store
.lock()
.expect("Directory storage lock poisoned")
.upgrade_to_readwrite()
}
/// Return a reference to the store, if it is currently read-write.
fn store_if_rw(&self) -> Option<&Mutex<SqliteStore>> {
let rw = !self
.store
.lock()
.expect("Directory storage lock poisoned")
.is_readonly();
// A race-condition is possible here, but I believe it's harmless.
if rw {
Some(&self.store)
} else {
None
}
}
/// Construct a DirMgr from a DirMgrConfig.
@ -430,8 +443,6 @@ impl<R: Runtime> DirMgr<R> {
///
/// Return false if there is no such consensus.
async fn load_directory(self: &Arc<Self>) -> Result<bool> {
//let store = &self.store;
let state = state::GetConsensusState::new(Arc::downgrade(self), CacheUsage::CacheOnly)?;
let _ = bootstrap::load(Arc::clone(self), Box::new(state)).await?;
@ -464,11 +475,11 @@ impl<R: Runtime> DirMgr<R> {
/// Try to load the text of a single document described by `doc` from
/// storage.
pub async fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
use itertools::Itertools;
let mut result = HashMap::new();
let query = (*doc).into();
self.load_documents_into(&query, &mut result).await?;
self.load_documents_into(&query, &mut result)?;
let item = result.into_iter().at_most_one().map_err(|_| {
Error::CacheCorruption("Found more than one entry in storage for given docid")
})?;
@ -488,14 +499,14 @@ impl<R: Runtime> DirMgr<R> {
///
/// If many of the documents have the same type, this can be more
/// efficient than calling [`text`](Self::text).
pub async fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
where
T: IntoIterator<Item = DocId>,
{
let partitioned = docid::partition_by_type(docs);
let mut result = HashMap::new();
for (_, query) in partitioned.into_iter() {
self.load_documents_into(&query, &mut result).await?
self.load_documents_into(&query, &mut result)?;
}
Ok(result)
}
@ -518,13 +529,13 @@ impl<R: Runtime> DirMgr<R> {
}
/// Load all the documents for a single DocumentQuery from the store.
async fn load_documents_into(
fn load_documents_into(
&self,
query: &DocQuery,
result: &mut HashMap<DocId, DocumentText>,
) -> Result<()> {
use DocQuery::*;
let store = self.store.lock().await;
let store = self.store.lock().expect("Directory storage lock poisoned");
match query {
LatestConsensus {
flavor,
@ -573,12 +584,12 @@ impl<R: Runtime> DirMgr<R> {
///
/// This conversion has to be a function of the dirmgr, since it may
/// require knowledge about our current state.
async fn query_into_requests(&self, q: DocQuery) -> Result<Vec<ClientRequest>> {
fn query_into_requests(&self, q: DocQuery) -> Result<Vec<ClientRequest>> {
let mut res = Vec::new();
for q in q.split_for_download() {
match q {
DocQuery::LatestConsensus { flavor, .. } => {
res.push(self.make_consensus_request(flavor).await?);
res.push(self.make_consensus_request(flavor)?);
}
DocQuery::AuthCert(ids) => {
res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
@ -596,10 +607,11 @@ impl<R: Runtime> DirMgr<R> {
/// Construct an appropriate ClientRequest to download a consensus
/// of the given flavor.
async fn make_consensus_request(&self, flavor: ConsensusFlavor) -> Result<ClientRequest> {
fn make_consensus_request(&self, flavor: ConsensusFlavor) -> Result<ClientRequest> {
#![allow(clippy::unnecessary_wraps)]
let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
let r = self.store.lock().await;
let r = self.store.lock().expect("Directory storage lock poisoned");
match r.latest_consensus_meta(flavor) {
Ok(Some(meta)) => {
request.set_last_consensus_date(meta.lifetime().valid_after());
@ -621,12 +633,12 @@ impl<R: Runtime> DirMgr<R> {
/// Currently, this handles expanding consensus diffs, and nothing
/// else. We do it at this stage of our downloading operation
/// because it requires access to the store.
async fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
if let ClientRequest::Consensus(req) = req {
if tor_consdiff::looks_like_diff(&text) {
if let Some(old_d) = req.old_consensus_digests().next() {
let db_val = {
let s = self.store.lock().await;
let s = self.store.lock().expect("Directory storage lock poisoned");
s.consensus_by_sha3_digest_of_signed_part(old_d)?
};
if let Some((old_consensus, meta)) = db_val {
@ -672,7 +684,6 @@ enum Readiness {
/// Resetting happens when this state needs to go back to an initial
/// state in order to start over -- either because of an error or
/// because the information it has downloaded is no longer timely.
#[async_trait]
trait DirState: Send {
/// Return a human-readable description of this state.
fn describe(&self) -> String;
@ -692,7 +703,14 @@ trait DirState: Send {
fn can_advance(&self) -> bool;
/// Add one or more documents from our cache; returns 'true' if there
/// was any change in this state.
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool>;
///
/// If `storage` is provided, then we should write any state changes into
/// it.
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool>;
/// Add information that we have just downloaded to this state; returns
/// 'true' if there as any change in this state.
@ -707,7 +725,7 @@ trait DirState: Send {
// TODO: It would be better to not have this function be async,
// once the `must_not_suspend` lint is stable.
// TODO: this should take a "DirSource" too.
async fn add_from_download(
fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,

View File

@ -10,13 +10,11 @@
//! [`bootstrap`](crate::bootstrap) module for functions that actually
//! load or download directory information.
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::lock::Mutex;
use rand::Rng;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Weak;
use std::sync::{Mutex, Weak};
use std::time::{Duration, SystemTime};
use tor_netdir::{MdReceiver, NetDir, PartialNetDir};
use tor_netdoc::doc::netstatus::Lifetime;
@ -132,7 +130,6 @@ impl<DM: WriteNetDir> GetConsensusState<DM> {
}
}
#[async_trait]
impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
fn describe(&self) -> String {
if self.next.is_some() {
@ -169,7 +166,11 @@ impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
Err(Error::ManagerDropped)
}
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool> {
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
_storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool> {
let text = match docs.into_iter().next() {
None => return Ok(false),
Some((
@ -185,7 +186,7 @@ impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
self.add_consensus_text(true, text.as_str()?)
.map(|meta| meta.is_some())
}
async fn add_from_download(
fn add_from_download(
&mut self,
text: &str,
_request: &ClientRequest,
@ -193,7 +194,7 @@ impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
) -> Result<bool> {
if let Some(meta) = self.add_consensus_text(false, text)? {
if let Some(store) = storage {
let mut w = store.lock().await;
let mut w = store.lock().expect("Directory storage lock poisoned");
w.store_consensus(meta, ConsensusFlavor::Microdesc, true, text)?;
}
Ok(true)
@ -302,7 +303,6 @@ struct GetCertsState<DM: WriteNetDir> {
writedir: Weak<DM>,
}
#[async_trait]
impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
fn describe(&self) -> String {
let total = self.certs.len() + self.missing_certs.len();
@ -331,7 +331,11 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
Err(Error::ManagerDropped)
}
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool> {
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
_storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool> {
let mut changed = false;
// Here we iterate over the documents we want, taking them from
// our input and remembering them.
@ -349,7 +353,7 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
}
Ok(changed)
}
async fn add_from_download(
fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,
@ -398,7 +402,7 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
.iter()
.map(|(cert, s)| (AuthCertMeta::from_authcert(cert), *s))
.collect();
let mut w = store.lock().await;
let mut w = store.lock().expect("Directory storage lock poisoned");
w.store_authcerts(&v[..])?;
}
@ -539,7 +543,6 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
}
}
#[async_trait]
impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
fn describe(&self) -> String {
format!(
@ -569,7 +572,11 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
Err(Error::ManagerDropped)
}
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool> {
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool> {
let mut microdescs = Vec::new();
for (id, text) in docs {
if let DocId::Microdesc(digest) = id {
@ -588,11 +595,19 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
}
let changed = !microdescs.is_empty();
self.register_microdescs(microdescs);
if self.register_microdescs(microdescs) {
if let Some(store) = storage {
let mut store = store.lock().expect("Directory storage lock poisoned");
info!("Marked consensus usable.");
store.mark_consensus_usable(&self.meta)?;
// DOCDOC: explain why we're doing this here.
store.expire_all()?;
}
}
Ok(changed)
}
async fn add_from_download(
fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,
@ -622,7 +637,7 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
let mark_listed = self.meta.lifetime().valid_after();
if let Some(store) = storage {
let mut s = store.lock().await;
let mut s = store.lock().expect("Directory storage lock poisoned");
if !self.newly_listed.is_empty() {
s.update_microdescs_listed(self.newly_listed.iter(), mark_listed)?;
self.newly_listed.clear();
@ -637,7 +652,7 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
if self.register_microdescs(new_mds.into_iter().map(|(_, md)| md)) {
// oh hey, this is no longer pending.
if let Some(store) = storage {
let mut store = store.lock().await;
let mut store = store.lock().expect("Directory storage lock poisoned");
info!("Marked consensus usable.");
store.mark_consensus_usable(&self.meta)?;
// DOCDOC: explain why we're doing this here.

View File

@ -201,6 +201,20 @@ impl Guard {
self.reachable
}
/// Copy all _non-persistent_ status from `other` to self.
///
/// Requires that the two `Guard`s have the same ID.
pub(crate) fn copy_status_from(&mut self, other: &Guard) {
debug_assert_eq!(self.id, other.id);
self.last_tried_to_connect_at = other.last_tried_to_connect_at;
self.retry_at = other.retry_at;
self.reachable = other.reachable;
self.failing_since = other.failing_since;
self.is_dir_cache = other.is_dir_cache;
self.exploratory_circ_pending = other.exploratory_circ_pending;
}
/// Change the reachability status for this guard.
fn set_reachable(&mut self, r: Reachable) {
if self.reachable != r {

View File

@ -276,18 +276,36 @@ impl<R: Runtime> GuardMgr<R> {
/// Flush our current guard state to the state manager, if there
/// is any unsaved state.
///
/// Return true if we were able to save, and false if we couldn't
/// get the lock.
pub fn update_persistent_state(&self) -> Result<bool, GuardMgrError> {
pub fn store_persistent_state(&self) -> Result<(), GuardMgrError> {
let inner = self.inner.lock().expect("Poisoned lock");
if inner.default_storage.try_lock()? {
trace!("Flushing guard state to disk.");
inner.default_storage.store(&inner.active_guards)?;
Ok(true)
} else {
Ok(false)
trace!("Flushing guard state to disk.");
inner.default_storage.store(&inner.active_guards)?;
Ok(())
}
/// Reload state from the state manager.
///
/// We only call this method if we _don't_ have the lock on the state
/// files. If we have the lock, we only want to save.
pub fn reload_persistent_state(&self) -> Result<(), GuardMgrError> {
let mut inner = self.inner.lock().expect("Poisoned lock");
if let Some(new_guards) = inner.default_storage.load()? {
let now = self.runtime.wallclock();
inner.replace_guards_with(new_guards, now);
}
Ok(())
}
/// Switch from having an unowned persistent state to having an owned one.
///
/// Requires that we hold the lock on the state files.
pub fn upgrade_to_owned_persistent_state(&self) -> Result<(), GuardMgrError> {
let mut inner = self.inner.lock().expect("Poisoned lock");
debug_assert!(inner.default_storage.can_store());
let new_guards = inner.default_storage.load()?.unwrap_or_else(GuardSet::new);
let now = self.runtime.wallclock();
inner.replace_guards_with(new_guards, now);
Ok(())
}
/// Update the state of this [`GuardMgr`] based on a new or modified
@ -482,6 +500,14 @@ impl GuardMgrInner {
self.active_guards.select_primary_guards(&self.params);
}
/// Replace the active guard set with `new_guards`, preserving
/// non-persistent state for any guards that are retained.
fn replace_guards_with(&mut self, mut new_guards: GuardSet, now: SystemTime) {
new_guards.copy_status_from(&self.active_guards);
self.active_guards = new_guards;
self.update(now, None);
}
/// Called when the circuit manager reports (via [`GuardMonitor`]) that
/// a guard succeeded or failed.
///
@ -910,6 +936,8 @@ mod test {
fn init<R: Runtime>(rt: R) -> (GuardMgr<R>, TestingStateMgr, NetDir) {
use tor_netdir::{testnet, MdReceiver, PartialNetDir};
let statemgr = TestingStateMgr::new();
let have_lock = statemgr.try_lock().unwrap();
assert!(have_lock.held());
let guardmgr = GuardMgr::new(rt, statemgr.clone()).unwrap();
let (con, mds) = testnet::construct_network().unwrap();
let override_p = "guard-min-filtered-sample-size=5 guard-n-primary-guards=2"
@ -943,7 +971,7 @@ mod test {
// Save the state...
guardmgr.flush_msg_queue().await;
guardmgr.update_persistent_state().unwrap();
guardmgr.store_persistent_state().unwrap();
drop(guardmgr);
// Try reloading from the state...

View File

@ -163,6 +163,15 @@ impl GuardSet {
self.primary_guards_invalidated = true;
}
/// Copy non-persistent status from every guard shared with `other`.
pub(crate) fn copy_status_from(&mut self, other: &GuardSet) {
for (id, guard) in self.guards.iter_mut() {
if let Some(other_guard) = other.get(id) {
guard.copy_status_from(other_guard);
}
}
}
/// Return a serializable state object that can be stored to disk
/// to capture the current state of this GuardSet.
fn get_state(&self) -> GuardSample<'_> {

View File

@ -293,6 +293,15 @@ impl Default for NetParameters {
}
impl NetParameters {
/// Construct a new NetParameters from a given list of key=value parameters.
///
/// Unrecognized parameters are ignored.
pub fn from_map(p: &tor_netdoc::doc::netstatus::NetParams<i32>) -> Self {
let mut params = NetParameters::default();
let _ = params.saturating_update(p.iter());
params
}
/// Replace a list of parameters, using the logic of
/// `set_saturating`.
///

View File

@ -1,6 +1,6 @@
//! Filesystem + JSON implementation of StateMgr.
use crate::{Error, Result, StateMgr};
use crate::{Error, LockStatus, Result, StateMgr};
use serde::{de::DeserializeOwned, Serialize};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
@ -93,16 +93,18 @@ impl StateMgr for FsStateMgr {
.expect("Poisoned lock on state lockfile");
lockfile.owns_lock()
}
fn try_lock(&self) -> Result<bool> {
fn try_lock(&self) -> Result<LockStatus> {
let mut lockfile = self
.inner
.lockfile
.lock()
.expect("Poisoned lock on state lockfile");
if lockfile.owns_lock() {
Ok(true)
Ok(LockStatus::AlreadyHeld)
} else if lockfile.try_lock()? {
Ok(LockStatus::NewlyAcquired)
} else {
Ok(lockfile.try_lock()?)
Ok(LockStatus::NoLock)
}
}
fn load<D>(&self, key: &str) -> Result<Option<D>>
@ -156,7 +158,7 @@ mod test {
let dir = tempfile::TempDir::new().unwrap();
let store = FsStateMgr::from_path(dir.path())?;
assert!(store.try_lock()?);
assert_eq!(store.try_lock()?, LockStatus::NewlyAcquired);
let stuff: HashMap<_, _> = vec![("hello".to_string(), "world".to_string())]
.into_iter()
.collect();
@ -179,7 +181,7 @@ mod test {
assert!(matches!(store.store("xyz", &stuff4), Err(Error::NoLock)));
assert!(store.try_lock()?);
assert_eq!(store.try_lock()?, LockStatus::NewlyAcquired);
store.store("xyz", &stuff4)?;
let stuff5: Option<HashMap<String, String>> = store.load("xyz")?;

View File

@ -22,9 +22,6 @@ pub trait StorageHandle<T: Serialize + DeserializeOwned> {
/// Return true if we have the lock; see [`StateMgr::can_store`].
fn can_store(&self) -> bool;
/// Try to acquire the lock; see [`StateMgr::can_store`].
fn try_lock(&self) -> Result<bool>;
}
/// Type wrapper for a reference-counted `dyn` [`StorageHandle`].
@ -69,9 +66,6 @@ where
fn can_store(&self) -> bool {
self.mgr.can_store()
}
fn try_lock(&self) -> Result<bool> {
self.mgr.try_lock()
}
}
impl<M, T> StorageHandleImpl<M, T>

View File

@ -87,9 +87,11 @@ pub trait StateMgr: Clone {
/// Try to become a read-write state manager if possible, without
/// blocking.
///
/// This function will return `Ok(true)` if we now hold the lock,
/// and `Ok(false)` if some other process holds the lock.
fn try_lock(&self) -> Result<bool>;
/// This function will return an error only if something really
/// unexpected went wrong. It may return `Ok(_)` even if we don't
/// acquire the lock: check the return value or call
/// `[StateMgr::can_store()`] to see if the lock is held.
fn try_lock(&self) -> Result<LockStatus>;
/// Make a new [`StorageHandle`] to store values of particular type
/// at a particular key.
@ -102,6 +104,25 @@ pub trait StateMgr: Clone {
}
}
/// A possible outcome from calling [`StateMgr::try_lock()`]
#[allow(clippy::exhaustive_enums)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum LockStatus {
/// We didn't have the lock and were unable to acquire it.
NoLock,
/// We already held the lock, and didn't have anything to do.
AlreadyHeld,
/// We successfully acquired the lock for the first time.
NewlyAcquired,
}
impl LockStatus {
/// Return true if this status indicates that we hold the lock.
pub fn held(&self) -> bool {
!matches!(self, LockStatus::NoLock)
}
}
/// An error type returned from a persistent state manager.
#[derive(thiserror::Error, Debug, Clone)]
#[non_exhaustive]

View File

@ -1,6 +1,6 @@
//! Testing-only StateMgr that stores values in a hash table.
use crate::{Error, Result, StateMgr};
use crate::{Error, LockStatus, Result, StateMgr};
use serde::{de::DeserializeOwned, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
@ -92,13 +92,15 @@ impl StateMgr for TestingStateMgr {
inner.lock_held
}
fn try_lock(&self) -> Result<bool> {
fn try_lock(&self) -> Result<LockStatus> {
let mut inner = self.inner.lock().expect("Lock poisoned.");
if inner.lock_blocked {
Ok(false)
Ok(LockStatus::NoLock)
} else if inner.lock_held {
Ok(LockStatus::AlreadyHeld)
} else {
inner.lock_held = true;
Ok(true)
Ok(LockStatus::NewlyAcquired)
}
}
}
@ -133,7 +135,7 @@ mod test {
assert!(matches!(mgr.store("item1", &v1), Err(Error::NoLock)));
assert!(!mgr.can_store());
assert!(mgr.try_lock().unwrap());
assert_eq!(mgr.try_lock().unwrap(), LockStatus::NewlyAcquired);
assert!(mgr.can_store());
assert!(mgr.store("item1", &v1).is_ok());
@ -156,13 +158,15 @@ mod test {
assert!(!mgr.can_store());
mgr.block_lock_attempts();
assert!(!mgr.try_lock().unwrap()); // can't get the lock.
assert_eq!(mgr.try_lock().unwrap(), LockStatus::NoLock);
assert!(!mgr.can_store()); // can't store.
mgr.unblock_lock_attempts();
assert!(!mgr.can_store()); // can't store.
assert!(mgr.try_lock().unwrap()); // *can* get the lock.
assert!(mgr.can_store()); // can't store.
assert_eq!(mgr.try_lock().unwrap(), LockStatus::NewlyAcquired);
assert!(mgr.can_store()); // can store.
assert_eq!(mgr.try_lock().unwrap(), LockStatus::AlreadyHeld);
}
#[test]
@ -172,7 +176,7 @@ mod test {
let h1: DynStorageHandle<Ex1> = mgr.clone().create_handle("foo");
let h2: DynStorageHandle<Ex2> = mgr.clone().create_handle("bar");
let h3: DynStorageHandle<Ex2> = mgr.create_handle("baz");
let h3: DynStorageHandle<Ex2> = mgr.clone().create_handle("baz");
let v1 = Ex1 { v1: 1, v2: 2 };
let s1 = Ex2 {
@ -185,7 +189,7 @@ mod test {
};
assert!(matches!(h1.store(&v1), Err(Error::NoLock)));
assert!(h1.try_lock().unwrap());
assert!(mgr.try_lock().unwrap().held());
assert!(h1.can_store());
assert!(h1.store(&v1).is_ok());