guardmgr: take sampled guards from Universe.

This is a trickier case, since we have to deal with weights.
This commit is contained in:
Nick Mathewson 2022-10-26 13:32:23 -04:00
parent 7733146c8c
commit 82e9a57012
3 changed files with 139 additions and 82 deletions

View File

@ -1,7 +1,6 @@
//! Code to represent its single guard node and track its status.
use tor_basic_utils::retry::RetryDelay;
use tor_netdir::{NetDir, RelayWeight};
use educe::Educe;
use serde::{Deserialize, Serialize};
@ -716,14 +715,6 @@ impl Guard {
}
}
/// Return the weight of this guard (if any) according to `dir`.
///
/// We use this information to decide whether we are about to sample
/// too much of the network as guards.
pub(crate) fn get_weight(&self, dir: &NetDir) -> Option<RelayWeight> {
dir.weight_by_rsa_id(self.id.0.rsa_identity()?, tor_netdir::WeightRole::Guard)
}
/// Return a [`FirstHop`](crate::FirstHop) object to represent this guard.
pub(crate) fn get_external_rep(&self, selection: GuardSetSelector) -> crate::FirstHop {
crate::FirstHop {
@ -1101,14 +1092,10 @@ mod test {
assert!(Some(guard22.added_at) <= Some(now));
// Can we still get the relay back?
let id = FirstHopId::in_sample(GuardSetSelector::Default, guard22.id.clone());
let id = FirstHopId::in_sample(GuardSetSelector::Default, guard22.id);
let r = id.get_relay(&netdir).unwrap();
assert!(r.same_relay_ids(&relay22));
// Can we check on the guard's weight?
let w = guard22.get_weight(&netdir).unwrap();
assert_eq!(w, 3000.into());
// Now try a guard that isn't in the netdir.
let guard255 = Guard::new(
GuardId::new([255; 32].into(), [255; 20].into()),
@ -1116,9 +1103,8 @@ mod test {
None,
now,
);
let id = FirstHopId::in_sample(GuardSetSelector::Default, guard255.id.clone());
let id = FirstHopId::in_sample(GuardSetSelector::Default, guard255.id);
assert!(id.get_relay(&netdir).is_none());
assert!(guard255.get_weight(&netdir).is_none());
}
#[test]

View File

@ -15,8 +15,7 @@ use crate::{
};
use crate::{FirstHop, GuardSetSelector};
use tor_basic_utils::iter::{FilterCount, IteratorExt as _};
use tor_linkspec::{ByRelayIds, HasRelayIds};
use tor_netdir::{NetDir, Relay};
use tor_linkspec::{ByRelayIds, ChanTarget, HasRelayIds};
use itertools::Itertools;
use rand::seq::SliceRandom;
@ -340,12 +339,6 @@ impl GuardSet {
guard_set
}
/// Return false if `relay` (or some other relay that shares an ID with it)
/// is a member if this set.
fn can_add_relay(&self, relay: &Relay<'_>) -> bool {
self.guards.all_overlapping(relay).is_empty()
}
/// Return `Ok(true)` if `id` is definitely a member of this set, and
/// `Ok(false)` if it is definitely not a member.
///
@ -375,11 +368,11 @@ impl GuardSet {
/// Guards always start out un-confirmed.
///
/// Return true if any guards were added.
pub(crate) fn extend_sample_as_needed(
pub(crate) fn extend_sample_as_needed<U: Universe>(
&mut self,
now: SystemTime,
params: &GuardParams,
dir: &NetDir,
dir: &U,
) -> bool {
let mut any_added = false;
while self.extend_sample_inner(now, params, dir) {
@ -397,7 +390,12 @@ impl GuardSet {
/// this function will add fewer filter-permitted guards than we had wanted.
/// Because of that, this is a separate function, and
/// extend_sample_as_needed runs it in a loop until it returns false.
fn extend_sample_inner(&mut self, now: SystemTime, params: &GuardParams, dir: &NetDir) -> bool {
fn extend_sample_inner<U: Universe>(
&mut self,
now: SystemTime,
params: &GuardParams,
dir: &U,
) -> bool {
self.assert_consistency();
let n_filtered_usable = self
.guards
@ -420,62 +418,33 @@ impl GuardSet {
let want_to_add = params.min_filtered_sample_size - n_filtered_usable;
let n_to_add = std::cmp::min(max_to_add, want_to_add);
// What's the most weight we're willing to have in the sample?
let target_weight = {
let total_weight = dir.total_weight(tor_netdir::WeightRole::Guard, |r| {
r.is_flagged_guard() && r.is_dir_cache()
});
total_weight
.ratio(params.max_sample_bw_fraction)
.unwrap_or(total_weight)
};
let mut current_weight: tor_netdir::RelayWeight = self
.guards
.values()
.filter_map(|guard| guard.get_weight(dir))
.sum();
if current_weight >= target_weight {
return false; // Can't add any more weight.
}
let candidate::WeightThreshold {
mut current_weight,
maximum_weight,
} = dir.weight_threshold(&self.guards, params);
// Ask the netdir for a set of guards we could use.
let n_candidates = if self.filter_is_restrictive || self.active_filter.is_unfiltered() {
n_to_add
} else {
// The filter will probably reject a bunch of guards, but we sample
// before filtering, so we make this larger on an ad-hoc basis.
n_to_add * 3
};
let candidates = dir.pick_n_relays(
&mut rand::thread_rng(),
n_candidates,
tor_netdir::WeightRole::Guard,
|relay| {
let filter_ok = if self.filter_is_restrictive {
// If we have a very restrictive filter, we only add
// relays permitted by that filter.
self.active_filter.permits(relay)
} else {
// Otherwise we add any relay to the sample.
true
};
filter_ok
&& relay.is_flagged_guard()
&& relay.is_dir_cache()
&& self.can_add_relay(relay)
},
);
let no_filter = GuardFilter::unfiltered();
let (n_candidates, pre_filter) =
if self.filter_is_restrictive || self.active_filter.is_unfiltered() {
(n_to_add, &self.active_filter)
} else {
// The filter will probably reject a bunch of guards, but we sample
// before filtering, so we make this larger on an ad-hoc basis.
(n_to_add * 3, &no_filter)
};
// Add those candidates to the sample, up to our maximum weight.
let candidates = dir.sample(&self.guards, pre_filter, n_candidates);
// Add those candidates to the sample.
let mut any_added = false;
let mut n_filtered_usable = n_filtered_usable;
for candidate in candidates {
if current_weight >= target_weight
for (candidate, weight) in candidates {
// Don't add any more if we have met the minimal sample size, and we
// have added too much weight.
if current_weight >= maximum_weight
&& self.guards.len() >= params.min_filtered_sample_size
{
// Can't add any more weight. (We only enforce target_weight
// if we have at least 'min_filtered_sample_size' in
// our total sample.)
break;
}
if self.guards.len() >= params.max_sample_size {
@ -486,15 +455,13 @@ impl GuardSet {
// We've reached our target; no need to add more.
break;
}
let candidate_weight = dir.relay_weight(&candidate, tor_netdir::WeightRole::Guard);
if self.active_filter.permits(&candidate) {
n_filtered_usable += 1;
}
current_weight += candidate_weight;
current_weight += weight;
self.add_guard(&candidate, now, params);
any_added = true;
}
self.assert_consistency();
any_added
}
@ -502,7 +469,7 @@ impl GuardSet {
/// Add `relay` as a new guard.
///
/// Does nothing if it is already a guard.
fn add_guard(&mut self, relay: &Relay<'_>, now: SystemTime, params: &GuardParams) {
fn add_guard<T: ChanTarget>(&mut self, relay: &T, now: SystemTime, params: &GuardParams) {
let id = GuardId::from_relay_ids(relay);
if self.guards.by_all_ids(&id).is_some() {
return;
@ -1012,6 +979,7 @@ impl<'a> From<GuardSample<'a>> for GuardSet {
mod test {
#![allow(clippy::unwrap_used)]
use tor_linkspec::{HasRelayIds, RelayIdType};
use tor_netdir::{NetDir, Relay};
use tor_netdoc::doc::netstatus::{RelayFlags, RelayWeight};
use super::*;

View File

@ -3,8 +3,10 @@
use std::time::SystemTime;
use tor_linkspec::{HasRelayIds, OwnedChanTarget};
use tor_netdir::NetDir;
use tor_linkspec::{ByRelayIds, HasRelayIds, OwnedChanTarget};
use tor_netdir::{NetDir, Relay, RelayWeight};
use crate::{GuardFilter, GuardParams};
/// A "Universe" is a source from which guard candidates are drawn, and from
/// which guards are updated.
@ -23,6 +25,25 @@ pub(crate) trait Universe {
/// Return the time at which this Universe last changed. This can be
/// approximate.
fn timestamp(&self) -> SystemTime;
/// Return information about how much of this universe has been added to
/// `sample`, and how much we're willing to add according to `params`.
fn weight_threshold<T>(&self, sample: &ByRelayIds<T>, params: &GuardParams) -> WeightThreshold
where
T: HasRelayIds;
/// Return up to `n` of new candidate guards from this Universe.
///
/// Only return elements that have no conflicts with identities in
/// `pre_existing`, and which obey `filter`.
fn sample<T>(
&self,
pre_existing: &ByRelayIds<T>,
filter: &GuardFilter,
n: usize,
) -> Vec<(OwnedChanTarget, RelayWeight)>
where
T: HasRelayIds;
}
/// Information about a single guard candidate, as returned by
@ -46,6 +67,22 @@ pub(crate) enum CandidateStatus {
Uncertain,
}
/// Information about how much of the universe we are using in a guard sample,
/// and how much we are allowed to use.
///
/// We use this to avoid adding the whole network to our guard sample.
#[derive(Debug, Clone)]
pub(crate) struct WeightThreshold {
/// The amount of the universe that we are using, in [`RelayWeight`].
pub(crate) current_weight: RelayWeight,
/// The greatest amount that we are willing to use, in [`RelayWeight`].
///
/// We can violate this maximum if it's necessary in order to meet our
/// minimum number of guards; otherwise, were're willing to add a _single_
/// guard that exceeds this threshold, but no more.
pub(crate) maximum_weight: RelayWeight,
}
impl Universe for NetDir {
fn timestamp(&self) -> SystemTime {
NetDir::lifetime(self).valid_after()
@ -69,4 +106,70 @@ impl Universe for NetDir {
},
}
}
fn weight_threshold<T>(&self, sample: &ByRelayIds<T>, params: &GuardParams) -> WeightThreshold
where
T: HasRelayIds,
{
// When adding from a netdir, we impose total limit on the fraction of
// the universe we're willing to add.
let maximum_weight = {
let total_weight = self.total_weight(tor_netdir::WeightRole::Guard, |r| {
r.is_flagged_guard() && r.is_dir_cache()
});
total_weight
.ratio(params.max_sample_bw_fraction)
.unwrap_or(total_weight)
};
let current_weight: tor_netdir::RelayWeight = sample
.values()
.filter_map(|guard| {
self.weight_by_rsa_id(guard.rsa_identity()?, tor_netdir::WeightRole::Guard)
})
.sum();
WeightThreshold {
current_weight,
maximum_weight,
}
}
fn sample<T>(
&self,
pre_existing: &ByRelayIds<T>,
filter: &GuardFilter,
n: usize,
) -> Vec<(OwnedChanTarget, RelayWeight)>
where
T: HasRelayIds,
{
/// Return the weight for this relay, if we can find it.
///
/// (We should always be able to find it as netdirs are constructed
/// today.)
fn weight(dir: &NetDir, relay: &Relay<'_>) -> Option<RelayWeight> {
dir.weight_by_rsa_id(relay.rsa_identity()?, tor_netdir::WeightRole::Guard)
}
self.pick_n_relays(
&mut rand::thread_rng(),
n,
tor_netdir::WeightRole::Guard,
|relay| {
filter.permits(relay)
&& relay.is_flagged_guard()
&& relay.is_dir_cache()
&& pre_existing.all_overlapping(relay).is_empty()
},
)
.iter()
.map(|relay| {
(
OwnedChanTarget::from_chan_target(relay),
weight(self, relay).unwrap_or_else(|| RelayWeight::from(0)),
)
})
.collect()
}
}