GuardMgr: publish skew estimates.

Instead of just having a function that recalculates the latest clock
skew, instead recalculate the clock skew when it may have changed,
and notify other processes via a postage::watch.
This commit is contained in:
Nick Mathewson 2022-04-11 15:13:48 -04:00
parent 61080dcaec
commit 5f5cbdc08e
4 changed files with 87 additions and 9 deletions

1
Cargo.lock generated
View File

@ -3468,6 +3468,7 @@ dependencies = [
"humantime-serde", "humantime-serde",
"itertools", "itertools",
"pin-project", "pin-project",
"postage",
"rand 0.8.5", "rand 0.8.5",
"retain_mut", "retain_mut",
"serde", "serde",

View File

@ -37,6 +37,7 @@ humantime = "2"
humantime-serde = "1.1.1" humantime-serde = "1.1.1"
itertools = "0.10.1" itertools = "0.10.1"
pin-project = "1" pin-project = "1"
postage = { version = "0.4", default-features = false, features = ["futures-traits"] }
rand = "0.8" rand = "0.8"
serde = { version = "1.0.103", features = ["derive"] } serde = { version = "1.0.103", features = ["derive"] }
retain_mut = "0.1.3" retain_mut = "0.1.3"

View File

@ -0,0 +1,42 @@
//! Code to remotely notify other crates about changes in the status of the
//! `GuardMgr`.
use std::{pin::Pin, task::Poll};
use crate::skew::SkewEstimate;
use educe::Educe;
use futures::{Stream, StreamExt};
use tor_basic_utils::skip_fmt;
/// A stream of [`SkewEstimate`] events.
///
/// Note that this stream can be lossy: if multiple events trigger before you
/// read from it, you will only get the most recent estimate.
#[derive(Clone, Educe)]
#[educe(Debug)]
pub struct ClockSkewEvents {
/// The `postage::watch::Receiver` that we're wrapping.
///
/// We wrap this type so that we don't expose its entire API, and so that we
/// can migrate to some other implementation in the future if we want.
#[educe(Debug(method = "skip_fmt"))]
pub(crate) inner: postage::watch::Receiver<Option<SkewEstimate>>,
}
impl Stream for ClockSkewEvents {
type Item = Option<SkewEstimate>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl ClockSkewEvents {
/// Return our best estimate of our current clock skew, based on reports from the
/// guards and fallbacks we have contacted.
pub fn get(&self) -> Option<SkewEstimate> {
self.inner.borrow().clone()
}
}

View File

@ -150,6 +150,7 @@ use tor_rtcompat::Runtime;
mod daemon; mod daemon;
mod dirstatus; mod dirstatus;
mod err; mod err;
mod events;
pub mod fallback; pub mod fallback;
mod filter; mod filter;
mod guard; mod guard;
@ -160,6 +161,7 @@ mod skew;
mod util; mod util;
pub use err::{GuardMgrError, PickGuardError}; pub use err::{GuardMgrError, PickGuardError};
pub use events::ClockSkewEvents;
pub use filter::GuardFilter; pub use filter::GuardFilter;
pub use ids::FirstHopId; pub use ids::FirstHopId;
pub use pending::{GuardMonitor, GuardStatus, GuardUsable}; pub use pending::{GuardMonitor, GuardStatus, GuardUsable};
@ -244,6 +246,13 @@ struct GuardMgrInner {
/// Location in which to store persistent state. /// Location in which to store persistent state.
storage: DynStorageHandle<GuardSets>, storage: DynStorageHandle<GuardSets>,
/// A sender object to publish changes in our estimated clock skew.
send_skew: postage::watch::Sender<Option<SkewEstimate>>,
/// A receiver object to hand out to observers who want to know about
/// changes in our estimated clock skew.
recv_skew: events::ClockSkewEvents,
} }
/// Persistent state for a guard manager, as serialized to disk. /// Persistent state for a guard manager, as serialized to disk.
@ -289,6 +298,10 @@ impl<R: Runtime> GuardMgr<R> {
// try to migrate it instead, but that's beyond the stability guarantee // try to migrate it instead, but that's beyond the stability guarantee
// that we're getting at this stage of our (pre-0.1) development. // that we're getting at this stage of our (pre-0.1) development.
let state = storage.load()?.unwrap_or_default(); let state = storage.load()?.unwrap_or_default();
let (send_skew, recv_skew) = postage::watch::channel();
let recv_skew = ClockSkewEvents { inner: recv_skew };
let inner = Arc::new(Mutex::new(GuardMgrInner { let inner = Arc::new(Mutex::new(GuardMgrInner {
guards: state, guards: state,
last_primary_retry_time: runtime.now(), last_primary_retry_time: runtime.now(),
@ -298,6 +311,8 @@ impl<R: Runtime> GuardMgr<R> {
waiting: Vec::new(), waiting: Vec::new(),
fallbacks: fallbacks.into(), fallbacks: fallbacks.into(),
storage, storage,
send_skew,
recv_skew,
})); }));
{ {
let weak_inner = Arc::downgrade(&inner); let weak_inner = Arc::downgrade(&inner);
@ -569,12 +584,16 @@ impl<R: Runtime> GuardMgr<R> {
); );
} }
/// Return our best estimate of our current clock skew, based on reports from the /// Return a stream of events about our estimated clock skew; these events
/// guards and fallbacks we have contacted. /// are `None` when we don't have enough information to make an estimate,
pub fn skew_estimate(&self) -> Option<SkewEstimate> { /// and `Some(`[`SkewEstiamte`]`)` otherwise.
///
/// Note that this stream can be lossy: if the estimate changes more than
/// one before you read from the stream, you might only get the most recent
/// update.
pub fn skew_events(&self) -> ClockSkewEvents {
let inner = self.inner.lock().expect("Poisoned lock"); let inner = self.inner.lock().expect("Poisoned lock");
let now = self.runtime.now(); inner.recv_skew.clone()
SkewEstimate::estimate_skew(inner.skew_observations(), now)
} }
/// Ensure that the message queue is flushed before proceeding to /// Ensure that the message queue is flushed before proceeding to
@ -725,10 +744,8 @@ impl GuardMgrInner {
// First, handle the skew report (if any) // First, handle the skew report (if any)
if let Some(skew) = skew { if let Some(skew) = skew {
let observation = skew::SkewObservation { let now = runtime.now();
skew, let observation = skew::SkewObservation { skew, when: now };
when: runtime.now(),
};
match &guard_id.0 { match &guard_id.0 {
FirstHopIdInner::Guard(id) => { FirstHopIdInner::Guard(id) => {
@ -738,6 +755,14 @@ impl GuardMgrInner {
self.fallbacks.note_skew(id, observation); self.fallbacks.note_skew(id, observation);
} }
} }
// TODO: We call this whenever we receive an observed clock
// skew. That's not the perfect timing for two reasons. First
// off, it might be too frequent: it does an O(n) calculation,
// which isn't ideal. Second, it might be too infrequent: after
// an hour has passed, a given observation won't be up-to-date
// any more, and we might want to recalculate the skew
// accordingly.
self.update_skew(now);
} }
match (status, &guard_id.0) { match (status, &guard_id.0) {
@ -852,6 +877,15 @@ impl GuardMgrInner {
.chain(self.guards.active_guards().skew_observations()) .chain(self.guards.active_guards().skew_observations())
} }
/// Recalculate our estimated clock skew, and publish it to anybody who
/// cares.
fn update_skew(&mut self, now: Instant) {
let estimate = skew::SkewEstimate::estimate_skew(self.skew_observations(), now);
// TODO: we might want to do this only conditionally, when the skew
// estimate changes.
*self.send_skew.borrow_mut() = estimate;
}
/// If the circuit built because of a given [`PendingRequest`] may /// If the circuit built because of a given [`PendingRequest`] may
/// now be used (or discarded), return `Some(true)` or /// now be used (or discarded), return `Some(true)` or
/// `Some(false)` respectively. /// `Some(false)` respectively.