diff --git a/Cargo.lock b/Cargo.lock index 17db6505d..8b9b38809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3468,6 +3468,7 @@ dependencies = [ "humantime-serde", "itertools", "pin-project", + "postage", "rand 0.8.5", "retain_mut", "serde", diff --git a/crates/tor-guardmgr/Cargo.toml b/crates/tor-guardmgr/Cargo.toml index 0a64101e2..46b718e58 100644 --- a/crates/tor-guardmgr/Cargo.toml +++ b/crates/tor-guardmgr/Cargo.toml @@ -37,6 +37,7 @@ humantime = "2" humantime-serde = "1.1.1" itertools = "0.10.1" pin-project = "1" +postage = { version = "0.4", default-features = false, features = ["futures-traits"] } rand = "0.8" serde = { version = "1.0.103", features = ["derive"] } retain_mut = "0.1.3" diff --git a/crates/tor-guardmgr/src/events.rs b/crates/tor-guardmgr/src/events.rs new file mode 100644 index 000000000..cff6658f7 --- /dev/null +++ b/crates/tor-guardmgr/src/events.rs @@ -0,0 +1,42 @@ +//! Code to remotely notify other crates about changes in the status of the +//! `GuardMgr`. + +use std::{pin::Pin, task::Poll}; + +use crate::skew::SkewEstimate; +use educe::Educe; +use futures::{Stream, StreamExt}; +use tor_basic_utils::skip_fmt; + +/// A stream of [`SkewEstimate`] events. +/// +/// Note that this stream can be lossy: if multiple events trigger before you +/// read from it, you will only get the most recent estimate. +#[derive(Clone, Educe)] +#[educe(Debug)] +pub struct ClockSkewEvents { + /// The `postage::watch::Receiver` that we're wrapping. + /// + /// We wrap this type so that we don't expose its entire API, and so that we + /// can migrate to some other implementation in the future if we want. + #[educe(Debug(method = "skip_fmt"))] + pub(crate) inner: postage::watch::Receiver>, +} + +impl Stream for ClockSkewEvents { + type Item = Option; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + self.inner.poll_next_unpin(cx) + } +} +impl ClockSkewEvents { + /// Return our best estimate of our current clock skew, based on reports from the + /// guards and fallbacks we have contacted. + pub fn get(&self) -> Option { + self.inner.borrow().clone() + } +} diff --git a/crates/tor-guardmgr/src/lib.rs b/crates/tor-guardmgr/src/lib.rs index dddd26579..7bec8af83 100644 --- a/crates/tor-guardmgr/src/lib.rs +++ b/crates/tor-guardmgr/src/lib.rs @@ -150,6 +150,7 @@ use tor_rtcompat::Runtime; mod daemon; mod dirstatus; mod err; +mod events; pub mod fallback; mod filter; mod guard; @@ -160,6 +161,7 @@ mod skew; mod util; pub use err::{GuardMgrError, PickGuardError}; +pub use events::ClockSkewEvents; pub use filter::GuardFilter; pub use ids::FirstHopId; pub use pending::{GuardMonitor, GuardStatus, GuardUsable}; @@ -244,6 +246,13 @@ struct GuardMgrInner { /// Location in which to store persistent state. storage: DynStorageHandle, + + /// A sender object to publish changes in our estimated clock skew. + send_skew: postage::watch::Sender>, + + /// A receiver object to hand out to observers who want to know about + /// changes in our estimated clock skew. + recv_skew: events::ClockSkewEvents, } /// Persistent state for a guard manager, as serialized to disk. @@ -289,6 +298,10 @@ impl GuardMgr { // try to migrate it instead, but that's beyond the stability guarantee // that we're getting at this stage of our (pre-0.1) development. let state = storage.load()?.unwrap_or_default(); + + let (send_skew, recv_skew) = postage::watch::channel(); + let recv_skew = ClockSkewEvents { inner: recv_skew }; + let inner = Arc::new(Mutex::new(GuardMgrInner { guards: state, last_primary_retry_time: runtime.now(), @@ -298,6 +311,8 @@ impl GuardMgr { waiting: Vec::new(), fallbacks: fallbacks.into(), storage, + send_skew, + recv_skew, })); { let weak_inner = Arc::downgrade(&inner); @@ -569,12 +584,16 @@ impl GuardMgr { ); } - /// Return our best estimate of our current clock skew, based on reports from the - /// guards and fallbacks we have contacted. - pub fn skew_estimate(&self) -> Option { + /// Return a stream of events about our estimated clock skew; these events + /// are `None` when we don't have enough information to make an estimate, + /// and `Some(`[`SkewEstiamte`]`)` otherwise. + /// + /// Note that this stream can be lossy: if the estimate changes more than + /// one before you read from the stream, you might only get the most recent + /// update. + pub fn skew_events(&self) -> ClockSkewEvents { let inner = self.inner.lock().expect("Poisoned lock"); - let now = self.runtime.now(); - SkewEstimate::estimate_skew(inner.skew_observations(), now) + inner.recv_skew.clone() } /// Ensure that the message queue is flushed before proceeding to @@ -725,10 +744,8 @@ impl GuardMgrInner { // First, handle the skew report (if any) if let Some(skew) = skew { - let observation = skew::SkewObservation { - skew, - when: runtime.now(), - }; + let now = runtime.now(); + let observation = skew::SkewObservation { skew, when: now }; match &guard_id.0 { FirstHopIdInner::Guard(id) => { @@ -738,6 +755,14 @@ impl GuardMgrInner { self.fallbacks.note_skew(id, observation); } } + // TODO: We call this whenever we receive an observed clock + // skew. That's not the perfect timing for two reasons. First + // off, it might be too frequent: it does an O(n) calculation, + // which isn't ideal. Second, it might be too infrequent: after + // an hour has passed, a given observation won't be up-to-date + // any more, and we might want to recalculate the skew + // accordingly. + self.update_skew(now); } match (status, &guard_id.0) { @@ -852,6 +877,15 @@ impl GuardMgrInner { .chain(self.guards.active_guards().skew_observations()) } + /// Recalculate our estimated clock skew, and publish it to anybody who + /// cares. + fn update_skew(&mut self, now: Instant) { + let estimate = skew::SkewEstimate::estimate_skew(self.skew_observations(), now); + // TODO: we might want to do this only conditionally, when the skew + // estimate changes. + *self.send_skew.borrow_mut() = estimate; + } + /// If the circuit built because of a given [`PendingRequest`] may /// now be used (or discarded), return `Some(true)` or /// `Some(false)` respectively.