GuardMgr: record clock skew information.
(It is not yet actually used.)
This commit is contained in:
parent
c3c43b088e
commit
99146da2c2
|
@ -41,11 +41,11 @@ pub(crate) async fn report_status_events(
|
|||
) {
|
||||
loop {
|
||||
match events.next().await {
|
||||
Some(Msg::Status(id, status, _skew)) => {
|
||||
Some(Msg::Status(id, status, skew)) => {
|
||||
// We've got a report about a guard status.
|
||||
if let Some(inner) = inner.upgrade() {
|
||||
let mut inner = inner.lock().expect("Poisoned lock");
|
||||
inner.handle_msg(id, status, &runtime);
|
||||
inner.handle_msg(id, status, skew, &runtime);
|
||||
} else {
|
||||
// The guard manager has gone away.
|
||||
return;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
//! Declare the [`FallbackState`] type, which is used to store a set of FallbackDir.
|
||||
|
||||
use crate::skew::SkewObservation;
|
||||
use rand::seq::IteratorRandom;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
|
@ -55,6 +56,17 @@ pub(crate) struct FallbackState {
|
|||
fallbacks: Vec<Entry>,
|
||||
}
|
||||
|
||||
/// The status information we store about a single fallback directory.
|
||||
#[derive(Debug, Clone)]
|
||||
struct FallbackStatus {
|
||||
/// Whether the directory is currently usable, and if not, when we can retry
|
||||
/// it.
|
||||
status: DirStatus,
|
||||
/// The latest clock skew observation we have from this fallback directory
|
||||
/// (if any).
|
||||
clock_skew: Option<SkewObservation>,
|
||||
}
|
||||
|
||||
/// Wrapper type for FallbackDir converted into crate::Guard, and Status.
|
||||
///
|
||||
/// Defines a sort order to ensure that we can look up fallback directories
|
||||
|
@ -65,9 +77,9 @@ pub(super) struct Entry {
|
|||
///
|
||||
/// (TODO: We represent this as a `FirstHop`, which could technically hold a
|
||||
/// guard as well. Ought to fix that.)
|
||||
pub(super) fallback: crate::FirstHop,
|
||||
fallback: crate::FirstHop,
|
||||
/// The status for the fallback directory.
|
||||
pub(super) status: DirStatus,
|
||||
status: FallbackStatus,
|
||||
}
|
||||
|
||||
/// Least amount of time we'll wait before retrying a fallback cache.
|
||||
|
@ -79,7 +91,13 @@ impl From<FallbackDir> for Entry {
|
|||
fn from(fallback: FallbackDir) -> Self {
|
||||
let fallback = fallback.as_guard();
|
||||
let status = DirStatus::new(FALLBACK_RETRY_FLOOR);
|
||||
Entry { fallback, status }
|
||||
Entry {
|
||||
fallback,
|
||||
status: FallbackStatus {
|
||||
status,
|
||||
clock_skew: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,7 +134,7 @@ impl FallbackState {
|
|||
|
||||
self.fallbacks
|
||||
.iter()
|
||||
.filter(|ent| ent.status.usable_at(now))
|
||||
.filter(|ent| ent.status.status.usable_at(now))
|
||||
.choose(rng)
|
||||
.map(|ent| &ent.fallback)
|
||||
.ok_or_else(|| PickGuardError::AllFallbacksDown {
|
||||
|
@ -130,7 +148,7 @@ impl FallbackState {
|
|||
fn next_retry(&self) -> Option<Instant> {
|
||||
self.fallbacks
|
||||
.iter()
|
||||
.filter_map(|ent| ent.status.next_retriable())
|
||||
.filter_map(|ent| ent.status.status.next_retriable())
|
||||
.min()
|
||||
}
|
||||
|
||||
|
@ -162,7 +180,7 @@ impl FallbackState {
|
|||
/// operation as a success: a circuit success is not enough.
|
||||
pub(crate) fn note_success(&mut self, id: &FallbackId) {
|
||||
if let Some(entry) = self.get_mut(id) {
|
||||
entry.status.note_success();
|
||||
entry.status.status.note_success();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -170,7 +188,7 @@ impl FallbackState {
|
|||
/// identity.
|
||||
pub(crate) fn note_failure(&mut self, id: &FallbackId, now: Instant) {
|
||||
if let Some(entry) = self.get_mut(id) {
|
||||
entry.status.note_failure(now);
|
||||
entry.status.status.note_failure(now);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,6 +208,13 @@ impl FallbackState {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Record that a given fallback has told us about clock skew.
|
||||
pub(crate) fn note_skew(&mut self, id: &FallbackId, observation: SkewObservation) {
|
||||
if let Some(entry) = self.get_mut(id) {
|
||||
entry.status.clock_skew = Some(observation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -332,28 +357,31 @@ mod test {
|
|||
|
||||
// Mark somebody down; try accessors.
|
||||
set.note_failure(&ids[3], now);
|
||||
assert!(set.fallbacks[3].status.next_retriable().unwrap() > now);
|
||||
assert!(!set.fallbacks[3].status.usable_at(now));
|
||||
assert_eq!(set.next_retry(), set.fallbacks[3].status.next_retriable());
|
||||
assert!(set.fallbacks[3].status.status.next_retriable().unwrap() > now);
|
||||
assert!(!set.fallbacks[3].status.status.usable_at(now));
|
||||
assert_eq!(
|
||||
set.next_retry(),
|
||||
set.fallbacks[3].status.status.next_retriable()
|
||||
);
|
||||
|
||||
// Mark somebody else down; try accessors.
|
||||
set.note_failure(&ids[0], now);
|
||||
assert!(set.fallbacks[0].status.next_retriable().unwrap() > now);
|
||||
assert!(!set.fallbacks[0].status.usable_at(now));
|
||||
assert!(set.fallbacks[0].status.status.next_retriable().unwrap() > now);
|
||||
assert!(!set.fallbacks[0].status.status.usable_at(now));
|
||||
assert_eq!(
|
||||
set.next_retry().unwrap(),
|
||||
std::cmp::min(
|
||||
set.fallbacks[0].status.next_retriable().unwrap(),
|
||||
set.fallbacks[3].status.next_retriable().unwrap()
|
||||
set.fallbacks[0].status.status.next_retriable().unwrap(),
|
||||
set.fallbacks[3].status.status.next_retriable().unwrap()
|
||||
)
|
||||
);
|
||||
|
||||
// Mark somebody as running; try accessors.
|
||||
set.note_success(&ids[0]);
|
||||
assert!(set.fallbacks[0].status.next_retriable().is_none());
|
||||
assert!(set.fallbacks[0].status.usable_at(now));
|
||||
assert!(set.fallbacks[0].status.status.next_retriable().is_none());
|
||||
assert!(set.fallbacks[0].status.status.usable_at(now));
|
||||
|
||||
assert!(set.get_mut(&ids[0]).unwrap().status.usable_at(now));
|
||||
assert!(set.get_mut(&ids[0]).unwrap().status.status.usable_at(now));
|
||||
|
||||
for id in ids.iter() {
|
||||
dbg!(id, set.get_mut(id).map(|e| e.id()));
|
||||
|
@ -374,10 +402,10 @@ mod test {
|
|||
assert_eq!(set2.fallbacks.len(), 6); // Started with 4, added 3, removed 1.
|
||||
|
||||
// Make sure that the status entries are correctly copied.
|
||||
assert!(set2.get_mut(&ids[0]).unwrap().status.usable_at(now));
|
||||
assert!(set2.get_mut(&ids[1]).unwrap().status.usable_at(now));
|
||||
assert!(set2.get_mut(&ids[0]).unwrap().status.status.usable_at(now));
|
||||
assert!(set2.get_mut(&ids[1]).unwrap().status.status.usable_at(now));
|
||||
assert!(set2.get_mut(&ids[2]).is_none());
|
||||
assert!(!set2.get_mut(&ids[3]).unwrap().status.usable_at(now));
|
||||
assert!(!set2.get_mut(&ids[3]).unwrap().status.status.usable_at(now));
|
||||
|
||||
// Make sure that the new fbs are there.
|
||||
for new_fb in fbs_new {
|
||||
|
@ -385,6 +413,7 @@ mod test {
|
|||
.get_mut(&FallbackId::from_chan_target(&new_fb))
|
||||
.unwrap()
|
||||
.status
|
||||
.status
|
||||
.usable_at(now));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ use std::time::{Duration, Instant, SystemTime};
|
|||
use tracing::{trace, warn};
|
||||
|
||||
use crate::dirstatus::DirStatus;
|
||||
use crate::skew::SkewObservation;
|
||||
use crate::util::randomize_time;
|
||||
use crate::{ids::GuardId, GuardParams, GuardRestriction, GuardUsage};
|
||||
use crate::{ExternalActivity, FirstHopId, GuardUsageKind};
|
||||
|
@ -177,6 +178,10 @@ pub(crate) struct Guard {
|
|||
#[serde(skip)]
|
||||
suspicious_behavior_warned: bool,
|
||||
|
||||
/// Latest clock skew (if any) we have observed from this guard.
|
||||
#[serde(skip)]
|
||||
clock_skew: Option<SkewObservation>,
|
||||
|
||||
/// Fields from the state file that was used to make this `Guard` that
|
||||
/// this version of Arti doesn't understand.
|
||||
#[serde(flatten)]
|
||||
|
@ -241,6 +246,7 @@ impl Guard {
|
|||
exploratory_circ_pending: false,
|
||||
circ_history: CircHistory::default(),
|
||||
suspicious_behavior_warned: false,
|
||||
clock_skew: None,
|
||||
unknown_fields: Default::default(),
|
||||
}
|
||||
}
|
||||
|
@ -319,6 +325,7 @@ impl Guard {
|
|||
circ_history: other.circ_history,
|
||||
suspicious_behavior_warned: other.suspicious_behavior_warned,
|
||||
dir_status: other.dir_status,
|
||||
clock_skew: other.clock_skew,
|
||||
// Note that we _could_ remove either of the above blocks and add
|
||||
// `..self` or `..other`, but that would be risky: it would increase
|
||||
// the odds that we would forget to add some persistent or
|
||||
|
@ -661,6 +668,11 @@ impl Guard {
|
|||
}
|
||||
}
|
||||
|
||||
/// Record that a given fallback has told us about clock skew.
|
||||
pub(crate) fn note_skew(&mut self, observation: SkewObservation) {
|
||||
self.clock_skew = Some(observation);
|
||||
}
|
||||
|
||||
/// Testing only: Return true if this guard was ever contacted successfully.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn confirmed(&self) -> bool {
|
||||
|
|
|
@ -139,6 +139,7 @@ use std::convert::{TryFrom, TryInto};
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use tor_proto::ClockSkew;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
use tor_llcrypto::pk;
|
||||
|
@ -155,6 +156,7 @@ mod guard;
|
|||
mod ids;
|
||||
mod pending;
|
||||
mod sample;
|
||||
mod skew;
|
||||
mod util;
|
||||
|
||||
pub use err::{GuardMgrError, PickGuardError};
|
||||
|
@ -704,6 +706,7 @@ impl GuardMgrInner {
|
|||
&mut self,
|
||||
request_id: RequestId,
|
||||
status: GuardStatus,
|
||||
skew: Option<ClockSkew>,
|
||||
runtime: &impl tor_rtcompat::SleepProvider,
|
||||
) {
|
||||
if let Some(mut pending) = self.pending.remove(&request_id) {
|
||||
|
@ -711,6 +714,23 @@ impl GuardMgrInner {
|
|||
let guard_id = pending.guard_id();
|
||||
trace!(?guard_id, ?status, "Received report of guard status");
|
||||
|
||||
// First, handle the skew report (if any)
|
||||
if let Some(skew) = skew {
|
||||
let observation = skew::SkewObservation {
|
||||
skew,
|
||||
when: runtime.now(),
|
||||
};
|
||||
|
||||
match &guard_id.0 {
|
||||
FirstHopIdInner::Guard(id) => {
|
||||
self.guards.active_guards_mut().record_skew(id, observation);
|
||||
}
|
||||
FirstHopIdInner::Fallback(id) => {
|
||||
self.fallbacks.note_skew(id, observation);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match (status, &guard_id.0) {
|
||||
(GuardStatus::Failure, FirstHopIdInner::Fallback(id)) => {
|
||||
// We used a fallback, and we weren't able to build a circuit through it.
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
use crate::filter::GuardFilter;
|
||||
use crate::guard::{Guard, NewlyConfirmed, Reachable};
|
||||
use crate::skew::SkewObservation;
|
||||
use crate::{
|
||||
ids::GuardId, ExternalActivity, GuardParams, GuardUsage, GuardUsageKind, PickGuardError,
|
||||
};
|
||||
|
@ -661,6 +662,13 @@ impl GuardSet {
|
|||
}
|
||||
}
|
||||
|
||||
/// Record that a given guard has told us about clock skew.
|
||||
pub(crate) fn record_skew(&mut self, guard_id: &GuardId, observation: SkewObservation) {
|
||||
if let Some(guard) = self.guards.get_mut(guard_id) {
|
||||
guard.note_skew(observation);
|
||||
}
|
||||
}
|
||||
|
||||
/// Return whether the circuit manager can be allowed to use a
|
||||
/// circuit with the `guard_id`.
|
||||
///
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
//! Code for creating and manipulating observations about clock skew.
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
use tor_proto::ClockSkew;
|
||||
|
||||
/// A single observation related to reported clock skew.
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(dead_code)] //XXXX Nothing reads these yet.
|
||||
pub(crate) struct SkewObservation {
|
||||
/// The reported clock skew
|
||||
pub(crate) skew: ClockSkew,
|
||||
/// The time when we added this observation.
|
||||
pub(crate) when: Instant,
|
||||
}
|
Loading…
Reference in New Issue