guardmgr: Use a better persistent data format
Previously we stored only one guard sample, in a state file called "default_guards". That's not future-proof, since we want to have multiple samples in the future. (`guard-spec.txt` specifies separate samples for highly restrictive filters, and for bridge usage.) This patch changes our behavior so that we can store multiple samples in a new "guards" file. I had thought about automatically migrating from the previous file format and location, but I don't think that's necessary given our current (lack of) stability guarantees. Closes #176.
This commit is contained in:
parent
e735b3b208
commit
70a2e2e751
|
@ -186,16 +186,11 @@ struct GuardMgrInner {
|
|||
/// these attempts.
|
||||
last_primary_retry_time: Instant,
|
||||
|
||||
/// The currently active [`GuardSet`] object.
|
||||
/// Persistent guard manager state.
|
||||
///
|
||||
/// This object remembers a persistent set of guards that we can use, along
|
||||
/// with their relative priorities and statuses.
|
||||
///
|
||||
/// Right now, there's only one `GuardSet` per `GuardMgr`, but we
|
||||
/// expect that to change: our algorithm specifies that there can
|
||||
/// be multiple named guard sets, and we can swap between them
|
||||
/// depending on the user's selected [`GuardFilter`].
|
||||
active_guards: GuardSet,
|
||||
/// This object remembers one or more persistent set of guards that we can
|
||||
/// use, along with their relative priorities and statuses.
|
||||
guards: GuardSets,
|
||||
|
||||
/// Configuration values derived from the consensus parameters.
|
||||
///
|
||||
|
@ -233,11 +228,32 @@ struct GuardMgrInner {
|
|||
waiting: Vec<PendingRequest>,
|
||||
|
||||
/// Location in which to store persistent state.
|
||||
///
|
||||
/// (This is only the state for the default set of guards.)
|
||||
default_storage: DynStorageHandle<GuardSet>,
|
||||
storage: DynStorageHandle<GuardSets>,
|
||||
}
|
||||
|
||||
/// Persistent state for a guard manager, as serialized to disk.
|
||||
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
|
||||
struct GuardSets {
|
||||
/// The default set of guards to use.
|
||||
///
|
||||
/// Right now, this is the _only_ `GuardSet` for each `GuardMgr`, but we
|
||||
/// expect that to change: our algorithm specifies that there can
|
||||
/// be multiple named guard sets, and we can swap between them
|
||||
/// depending on the user's selected [`GuardFilter`].
|
||||
default: GuardSet,
|
||||
|
||||
/// Unrecognized fields, including (possibly) other guard sets.
|
||||
#[serde(flatten)]
|
||||
remaining: HashMap<String, tor_persist::JsonValue>,
|
||||
}
|
||||
|
||||
/// The key (filename) we use for storing our persistent guard state in the
|
||||
/// `StateMgr`.
|
||||
///
|
||||
/// We used to store this in a different format in a filename called
|
||||
/// "default_guards" (before Arti 0.1.0).
|
||||
const STORAGE_KEY: &str = "guards";
|
||||
|
||||
impl<R: Runtime> GuardMgr<R> {
|
||||
/// Create a new "empty" guard manager and launch its background tasks.
|
||||
///
|
||||
|
@ -248,16 +264,20 @@ impl<R: Runtime> GuardMgr<R> {
|
|||
S: StateMgr + Send + Sync + 'static,
|
||||
{
|
||||
let (ctrl, rcv) = mpsc::unbounded();
|
||||
let default_storage = state_mgr.create_handle("default_guards");
|
||||
let active_guards = default_storage.load()?.unwrap_or_else(GuardSet::new);
|
||||
let storage: DynStorageHandle<GuardSets> = state_mgr.create_handle(STORAGE_KEY);
|
||||
// TODO(nickm): We should do something about the old state in
|
||||
// `default_guards`. Probably it would be best to delete it. We could
|
||||
// try to migrate it instead, but that's beyond the stability guarantee
|
||||
// that we're getting at this stage of our (pre-0.1) development.
|
||||
let state = storage.load()?.unwrap_or_default();
|
||||
let inner = Arc::new(Mutex::new(GuardMgrInner {
|
||||
active_guards,
|
||||
guards: state,
|
||||
last_primary_retry_time: runtime.now(),
|
||||
params: GuardParams::default(),
|
||||
ctrl,
|
||||
pending: HashMap::new(),
|
||||
waiting: Vec::new(),
|
||||
default_storage,
|
||||
storage,
|
||||
}));
|
||||
{
|
||||
let weak_inner = Arc::downgrade(&inner);
|
||||
|
@ -277,7 +297,7 @@ impl<R: Runtime> GuardMgr<R> {
|
|||
pub fn store_persistent_state(&self) -> Result<(), GuardMgrError> {
|
||||
let inner = self.inner.lock().expect("Poisoned lock");
|
||||
trace!("Flushing guard state to disk.");
|
||||
inner.default_storage.store(&inner.active_guards)?;
|
||||
inner.storage.store(&inner.guards)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -287,7 +307,7 @@ impl<R: Runtime> GuardMgr<R> {
|
|||
/// files. If we have the lock, we only want to save.
|
||||
pub fn reload_persistent_state(&self) -> Result<(), GuardMgrError> {
|
||||
let mut inner = self.inner.lock().expect("Poisoned lock");
|
||||
if let Some(new_guards) = inner.default_storage.load()? {
|
||||
if let Some(new_guards) = inner.storage.load()? {
|
||||
let now = self.runtime.wallclock();
|
||||
inner.replace_guards_with(new_guards, now);
|
||||
}
|
||||
|
@ -299,8 +319,8 @@ impl<R: Runtime> GuardMgr<R> {
|
|||
/// Requires that we hold the lock on the state files.
|
||||
pub fn upgrade_to_owned_persistent_state(&self) -> Result<(), GuardMgrError> {
|
||||
let mut inner = self.inner.lock().expect("Poisoned lock");
|
||||
debug_assert!(inner.default_storage.can_store());
|
||||
let new_guards = inner.default_storage.load()?.unwrap_or_else(GuardSet::new);
|
||||
debug_assert!(inner.storage.can_store());
|
||||
let new_guards = inner.storage.load()?.unwrap_or_default();
|
||||
let now = self.runtime.wallclock();
|
||||
inner.replace_guards_with(new_guards, now);
|
||||
Ok(())
|
||||
|
@ -309,7 +329,11 @@ impl<R: Runtime> GuardMgr<R> {
|
|||
/// Return true if `netdir` has enough information to safely become our new netdir.
|
||||
pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
|
||||
let mut inner = self.inner.lock().expect("Poisoned lock");
|
||||
inner.active_guards.missing_primary_microdescriptors(netdir) == 0
|
||||
inner
|
||||
.guards
|
||||
.active_guards_mut()
|
||||
.missing_primary_microdescriptors(netdir)
|
||||
== 0
|
||||
}
|
||||
|
||||
/// Update the state of this [`GuardMgr`] based on a new or modified
|
||||
|
@ -368,7 +392,10 @@ impl<R: Runtime> GuardMgr<R> {
|
|||
"Guard filter replaced."
|
||||
);
|
||||
|
||||
inner.active_guards.set_filter(filter, restrictive_filter);
|
||||
inner
|
||||
.guards
|
||||
.active_guards_mut()
|
||||
.set_filter(filter, restrictive_filter);
|
||||
inner.update(now, Some(netdir));
|
||||
}
|
||||
|
||||
|
@ -409,11 +436,12 @@ impl<R: Runtime> GuardMgr<R> {
|
|||
|
||||
// (I am not 100% sure that we need to consider_all_retries here, but
|
||||
// it should _probably_ not hurt.)
|
||||
inner.active_guards.consider_all_retries(now);
|
||||
inner.guards.active_guards_mut().consider_all_retries(now);
|
||||
|
||||
let (origin, guard_id) = inner.select_guard_with_retries(&usage, netdir, wallclock)?;
|
||||
let guard = inner
|
||||
.active_guards
|
||||
.guards
|
||||
.active_guards()
|
||||
.get(&guard_id)
|
||||
.expect("Selected guard that wasn't in our sample!?")
|
||||
.get_external_rep();
|
||||
|
@ -435,14 +463,20 @@ impl<R: Runtime> GuardMgr<R> {
|
|||
// don't want to acknowledge the net as down before that point, since
|
||||
// we don't mark all the primary guards as retriable unless
|
||||
// we've been forced to non-primary guards.
|
||||
let net_has_been_down = inner.active_guards.all_primary_guards_are_unreachable()
|
||||
let net_has_been_down = inner
|
||||
.guards
|
||||
.active_guards_mut()
|
||||
.all_primary_guards_are_unreachable()
|
||||
&& tor_proto::time_since_last_incoming_traffic() >= inner.params.internet_down_timeout;
|
||||
|
||||
let pending_request =
|
||||
pending::PendingRequest::new(guard_id.clone(), usage, usable_sender, net_has_been_down);
|
||||
inner.pending.insert(request_id, pending_request);
|
||||
|
||||
inner.active_guards.record_attempt(&guard_id, now);
|
||||
inner
|
||||
.guards
|
||||
.active_guards_mut()
|
||||
.record_attempt(&guard_id, now);
|
||||
|
||||
Ok((guard, monitor, usable))
|
||||
}
|
||||
|
@ -464,6 +498,29 @@ impl<R: Runtime> GuardMgr<R> {
|
|||
}
|
||||
}
|
||||
|
||||
impl GuardSets {
|
||||
/// Return a reference to the currently active set of guards.
|
||||
///
|
||||
/// (That's easy enough for now, since there is never more than one set of
|
||||
/// guards. But eventually that will change, as we add support for more
|
||||
/// complex filter types, and for bridge relays. Those will use separate
|
||||
/// `GuardSet` instances, and this accessor will choose the right one.)
|
||||
fn active_guards(&self) -> &GuardSet {
|
||||
&self.default
|
||||
}
|
||||
|
||||
/// Return a mutable reference to the currently active set of guards.
|
||||
fn active_guards_mut(&mut self) -> &mut GuardSet {
|
||||
&mut self.default
|
||||
}
|
||||
|
||||
/// Update all non-persistent state for the guards in this object with the
|
||||
/// state in `other`.
|
||||
fn copy_status_from(&mut self, other: &GuardSets) {
|
||||
self.default.copy_status_from(&other.default);
|
||||
}
|
||||
}
|
||||
|
||||
impl GuardMgrInner {
|
||||
/// Update the status of all guards in the active set, based on
|
||||
/// the passage of time and (optionally) a network directory.
|
||||
|
@ -480,33 +537,46 @@ impl GuardMgrInner {
|
|||
}
|
||||
|
||||
// Then expire guards. Do that early, in case we need more.
|
||||
self.active_guards.expire_old_guards(&self.params, now);
|
||||
self.guards
|
||||
.active_guards_mut()
|
||||
.expire_old_guards(&self.params, now);
|
||||
|
||||
if let Some(netdir) = netdir {
|
||||
if self.active_guards.missing_primary_microdescriptors(netdir) > 0 {
|
||||
if self
|
||||
.guards
|
||||
.active_guards_mut()
|
||||
.missing_primary_microdescriptors(netdir)
|
||||
> 0
|
||||
{
|
||||
// We are missing primary guard descriptors, so we shouldn't update our guard
|
||||
// status.
|
||||
return;
|
||||
}
|
||||
self.active_guards.update_status_from_netdir(netdir);
|
||||
self.guards
|
||||
.active_guards_mut()
|
||||
.update_status_from_netdir(netdir);
|
||||
loop {
|
||||
let added_any =
|
||||
self.active_guards
|
||||
.extend_sample_as_needed(now, &self.params, netdir);
|
||||
let added_any = self.guards.active_guards_mut().extend_sample_as_needed(
|
||||
now,
|
||||
&self.params,
|
||||
netdir,
|
||||
);
|
||||
if !added_any {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.active_guards.select_primary_guards(&self.params);
|
||||
self.guards
|
||||
.active_guards_mut()
|
||||
.select_primary_guards(&self.params);
|
||||
}
|
||||
|
||||
/// Replace the active guard set with `new_guards`, preserving
|
||||
/// Replace the active guard state with `new_state`, preserving
|
||||
/// non-persistent state for any guards that are retained.
|
||||
fn replace_guards_with(&mut self, mut new_guards: GuardSet, now: SystemTime) {
|
||||
new_guards.copy_status_from(&self.active_guards);
|
||||
self.active_guards = new_guards;
|
||||
fn replace_guards_with(&mut self, mut new_guards: GuardSets, now: SystemTime) {
|
||||
new_guards.copy_status_from(&self.guards);
|
||||
self.guards = new_guards;
|
||||
self.update(now, None);
|
||||
}
|
||||
|
||||
|
@ -524,7 +594,9 @@ impl GuardMgrInner {
|
|||
let interval = self.params.internet_down_timeout;
|
||||
if self.last_primary_retry_time + interval <= now {
|
||||
debug!("Successfully reached a guard after a while off the internet; marking all primary guards retriable.");
|
||||
self.active_guards.mark_primary_guards_retriable();
|
||||
self.guards
|
||||
.active_guards_mut()
|
||||
.mark_primary_guards_retriable();
|
||||
self.last_primary_retry_time = now;
|
||||
}
|
||||
}
|
||||
|
@ -555,8 +627,11 @@ impl GuardMgrInner {
|
|||
}
|
||||
|
||||
// The guard succeeded. Tell the GuardSet.
|
||||
self.active_guards
|
||||
.record_success(guard_id, &self.params, runtime.wallclock());
|
||||
self.guards.active_guards_mut().record_success(
|
||||
guard_id,
|
||||
&self.params,
|
||||
runtime.wallclock(),
|
||||
);
|
||||
// Either tell the request whether the guard is
|
||||
// usable, or schedule it as a "waiting" request.
|
||||
if let Some(usable) = self.guard_usability_status(&pending, runtime.now()) {
|
||||
|
@ -571,15 +646,21 @@ impl GuardMgrInner {
|
|||
}
|
||||
}
|
||||
GuardStatus::Failure => {
|
||||
self.active_guards.record_failure(guard_id, runtime.now());
|
||||
self.guards
|
||||
.active_guards_mut()
|
||||
.record_failure(guard_id, runtime.now());
|
||||
pending.reply(false);
|
||||
}
|
||||
GuardStatus::AttemptAbandoned => {
|
||||
self.active_guards.record_attempt_abandoned(guard_id);
|
||||
self.guards
|
||||
.active_guards_mut()
|
||||
.record_attempt_abandoned(guard_id);
|
||||
pending.reply(false);
|
||||
}
|
||||
GuardStatus::Indeterminate => {
|
||||
self.active_guards.record_indeterminate_result(guard_id);
|
||||
self.guards
|
||||
.active_guards_mut()
|
||||
.record_indeterminate_result(guard_id);
|
||||
pending.reply(false);
|
||||
}
|
||||
};
|
||||
|
@ -592,7 +673,9 @@ impl GuardMgrInner {
|
|||
|
||||
// We might need to update the primary guards based on changes in the
|
||||
// status of guards above.
|
||||
self.active_guards.select_primary_guards(&self.params);
|
||||
self.guards
|
||||
.active_guards_mut()
|
||||
.select_primary_guards(&self.params);
|
||||
|
||||
// Some waiting request may just have become ready (usable or
|
||||
// not); we need to give them the information they're waiting
|
||||
|
@ -607,7 +690,7 @@ impl GuardMgrInner {
|
|||
/// Return None if we can't yet give an answer about whether such
|
||||
/// a circuit is usable.
|
||||
fn guard_usability_status(&self, pending: &PendingRequest, now: Instant) -> Option<bool> {
|
||||
self.active_guards.circ_usability_status(
|
||||
self.guards.active_guards().circ_usability_status(
|
||||
pending.guard_id(),
|
||||
pending.usage(),
|
||||
&self.params,
|
||||
|
@ -679,7 +762,7 @@ impl GuardMgrInner {
|
|||
now: SystemTime,
|
||||
) -> Result<(sample::ListKind, GuardId), PickGuardError> {
|
||||
// Try to find a guard.
|
||||
if let Ok(s) = self.active_guards.pick_guard(usage, &self.params) {
|
||||
if let Ok(s) = self.guards.active_guards().pick_guard(usage, &self.params) {
|
||||
return Ok(s);
|
||||
}
|
||||
|
||||
|
@ -688,11 +771,14 @@ impl GuardMgrInner {
|
|||
trace!("No guards available, trying to extend the sample.");
|
||||
self.update(now, Some(dir));
|
||||
if self
|
||||
.active_guards
|
||||
.guards
|
||||
.active_guards_mut()
|
||||
.extend_sample_as_needed(now, &self.params, dir)
|
||||
{
|
||||
self.active_guards.select_primary_guards(&self.params);
|
||||
if let Ok(s) = self.active_guards.pick_guard(usage, &self.params) {
|
||||
self.guards
|
||||
.active_guards_mut()
|
||||
.select_primary_guards(&self.params);
|
||||
if let Ok(s) = self.guards.active_guards().pick_guard(usage, &self.params) {
|
||||
return Ok(s);
|
||||
}
|
||||
}
|
||||
|
@ -700,8 +786,8 @@ impl GuardMgrInner {
|
|||
|
||||
// That didn't work either. Mark everybody as potentially retriable.
|
||||
info!("All guards seem down. Marking them retriable and trying again.");
|
||||
self.active_guards.mark_all_guards_retriable();
|
||||
self.active_guards.pick_guard(usage, &self.params)
|
||||
self.guards.active_guards_mut().mark_all_guards_retriable();
|
||||
self.guards.active_guards().pick_guard(usage, &self.params)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -100,6 +100,7 @@ impl ListKind {
|
|||
|
||||
impl GuardSet {
|
||||
/// Return a new empty guard set.
|
||||
#[allow(unused)] // only used for tests right now.
|
||||
pub(crate) fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue