tor-dirmgr/state.rs: feed through additional parameters, use them

- The additional parameters passed to GetConsensusState are now passed
  through all the states, and used as well.
- WriteNetDir doesn't have a now() or config() method any more, since
  the states now get this from the runtime or the config parameters.
- This required modifying the tests to make a mocked runtime and custom
  config directly, instead of using DirRcv for this purpose.
- Additionally, because we don't have to upgrade a weak reference for
  DirState::dl_config(), that function no longer wraps its return value
  in Result.
- (A bunch of the FIXMEs from the previous commit that introduced the
  additional parameters have now been rectified as a result.)
This commit is contained in:
eta 2022-05-04 16:48:44 +01:00
parent f0739e46aa
commit cad815e31d
4 changed files with 220 additions and 238 deletions

1
Cargo.lock generated
View File

@ -3523,6 +3523,7 @@ dependencies = [
"tor-netdir",
"tor-netdoc",
"tor-rtcompat",
"tor-rtmock",
"tracing",
]

View File

@ -69,4 +69,5 @@ futures-await-test = "0.3.0"
hex-literal = "0.3"
tempfile = "3"
tor-rtcompat = { path = "../tor-rtcompat", version = "0.3.0", features = ["tokio", "native-tls"] }
tor-rtmock = { path = "../tor-rtmock", version = "0.3.0" }
float_eq = "0.7"

View File

@ -11,7 +11,6 @@ use std::{
use crate::state::DirState;
use crate::{
docid::{self, ClientRequest},
state::WriteNetDir,
upgrade_weak_ref, DirMgr, DocId, DocQuery, DocumentText, Error, Readiness, Result,
};
@ -413,7 +412,7 @@ pub(crate) async fn download<R: Runtime>(
let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
'next_state: loop {
let retry_config = state.dl_config()?;
let retry_config = state.dl_config();
let parallelism = retry_config.parallelism();
// In theory this could be inside the loop below maybe? If we
@ -422,7 +421,7 @@ pub(crate) async fn download<R: Runtime>(
let mut now = {
let dirmgr = upgrade_weak_ref(&dirmgr)?;
load_once(&dirmgr, &mut state).await?;
dirmgr.now()
dirmgr.runtime.wallclock()
};
// Skip the downloads if we can...
@ -486,7 +485,7 @@ pub(crate) async fn download<R: Runtime>(
continue 'next_state;
},
};
dirmgr.now()
dirmgr.runtime.wallclock()
};
// Exit if there is nothing more to download.
@ -665,8 +664,8 @@ mod test {
}
Ok(changed)
}
fn dl_config(&self) -> Result<DownloadSchedule> {
Ok(DownloadSchedule::default())
fn dl_config(&self) -> DownloadSchedule {
DownloadSchedule::default()
}
fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
if self.can_advance() {

View File

@ -113,7 +113,7 @@ pub(crate) trait DirState: Send {
fn bootstrap_status(&self) -> event::DirStatus;
/// Return a configuration for attempting downloads.
fn dl_config(&self) -> Result<DownloadSchedule>;
fn dl_config(&self) -> DownloadSchedule;
/// If possible, advance to the next state.
fn advance(self: Box<Self>) -> Result<Box<dyn DirState>>;
/// Return a time (if any) when downloaders should stop attempting to
@ -130,10 +130,6 @@ pub(crate) trait DirState: Send {
/// in this module can _only_ interact with the DirMgr through
/// modifying the NetDir and looking at the configuration.
pub(crate) trait WriteNetDir: 'static + Sync + Send {
/// Return a DirMgrConfig to use when asked how to retry downloads,
/// or when we need to find a list of descriptors.
fn config(&self) -> Arc<DirMgrConfig>;
/// Return a reference where we can write or modify a NetDir.
fn netdir(&self) -> &SharedMutArc<NetDir>;
@ -153,22 +149,12 @@ pub(crate) trait WriteNetDir: 'static + Sync + Send {
true
}
/// Called to find the current time.
///
/// This is just `SystemTime::now()` in production, but for
/// testing it is helpful to be able to mock our our current view
/// of the time.
fn now(&self) -> SystemTime;
/// Return the currently configured DynFilter for this state.
#[cfg(feature = "dirfilter")]
fn filter(&self) -> &dyn crate::filter::DirFilter;
}
impl<R: Runtime> WriteNetDir for crate::DirMgr<R> {
fn config(&self) -> Arc<DirMgrConfig> {
self.config.get()
}
fn netdir(&self) -> &SharedMutArc<NetDir> {
&self.netdir
}
@ -184,9 +170,6 @@ impl<R: Runtime> WriteNetDir for crate::DirMgr<R> {
None => true, // no circmgr? then we can use anything.
}
}
fn now(&self) -> SystemTime {
self.runtime.wallclock()
}
#[cfg(feature = "dirfilter")]
fn filter(&self) -> &dyn crate::filter::DirFilter {
@ -211,7 +194,7 @@ pub(crate) struct GetConsensusState<DM: WriteNetDir, R: Runtime> {
/// If present, our next state.
///
/// (This is present once we have a consensus.)
next: Option<GetCertsState<DM>>,
next: Option<GetCertsState<DM, R>>,
/// A list of RsaIdentity for the authorities that we believe in.
///
@ -224,13 +207,11 @@ pub(crate) struct GetConsensusState<DM: WriteNetDir, R: Runtime> {
writedir: Weak<DM>,
/// A `Runtime` implementation.
#[allow(dead_code)] // FIXME(eta): remove when used
rt: R,
/// The configuration of the directory manager. Used for download configuration
/// purposes.
config: Arc<DirMgrConfig>,
/// If one exists, the netdir we're trying to update.
#[allow(dead_code)] // FIXME(eta): remove when used
prev_netdir: Option<Arc<NetDir>>,
}
@ -317,8 +298,8 @@ impl<DM: WriteNetDir, R: Runtime> DirState for GetConsensusState<DM, R> {
DirStatusInner::NoConsensus { after: self.after }.into()
}
}
fn dl_config(&self) -> Result<DownloadSchedule> {
Ok(self.config.schedule.retry_consensus)
fn dl_config(&self) -> DownloadSchedule {
self.config.schedule.retry_consensus
}
fn add_from_cache(
&mut self,
@ -391,7 +372,7 @@ impl<DM: WriteNetDir, R: Runtime> GetConsensusState<DM, R> {
} else {
parsed
};
let now = current_time(&self.writedir)?;
let now = self.rt.wallclock();
let timely = parsed.check_valid_at(&now)?;
let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &timely);
(meta, timely)
@ -423,6 +404,9 @@ impl<DM: WriteNetDir, R: Runtime> GetConsensusState<DM, R> {
missing_certs: desired_certs,
certs: Vec::new(),
writedir: Weak::clone(&self.writedir),
rt: self.rt.clone(),
config: self.config.clone(),
prev_netdir: self.prev_netdir.take(),
});
// Unwrap should be safe because `next` was just assigned
@ -445,7 +429,7 @@ impl<DM: WriteNetDir, R: Runtime> GetConsensusState<DM, R> {
/// we are given a bad consensus signed with fictional certificates
/// that we can never find.
#[derive(Clone, Debug)]
struct GetCertsState<DM: WriteNetDir> {
struct GetCertsState<DM: WriteNetDir, R: Runtime> {
/// The cache usage we had in mind when we began. Used to reset.
cache_usage: CacheUsage,
/// Where did we get our consensus?
@ -461,9 +445,17 @@ struct GetCertsState<DM: WriteNetDir> {
certs: Vec<AuthCert>,
/// Reference to our directory manager.
writedir: Weak<DM>,
/// A `Runtime` implementation.
rt: R,
/// The configuration of the directory manager. Used for download configuration
/// purposes.
config: Arc<DirMgrConfig>,
/// If one exists, the netdir we're trying to update.
prev_netdir: Option<Arc<NetDir>>,
}
impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
impl<DM: WriteNetDir, R: Runtime> DirState for GetCertsState<DM, R> {
fn describe(&self) -> String {
let total = self.certs.len() + self.missing_certs.len();
format!(
@ -494,12 +486,8 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
}
.into()
}
fn dl_config(&self) -> Result<DownloadSchedule> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok(wd.config().schedule.retry_certs)
} else {
Err(Error::ManagerDropped)
}
fn dl_config(&self) -> DownloadSchedule {
self.config.schedule.retry_certs
}
fn add_from_cache(
&mut self,
@ -515,7 +503,7 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
let parsed = AuthCert::parse(text)
.map_err(|e| Error::from_netdoc(DocSource::LocalCache, e))?
.check_signature()?;
let now = current_time(&self.writedir)?;
let now = self.rt.wallclock();
let cert = parsed.check_valid_at(&now)?;
self.missing_certs.remove(cert.key_ids());
self.certs.push(cert);
@ -542,7 +530,7 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
.within(text)
.expect("Certificate was not in input as expected");
if let Ok(wellsigned) = parsed.check_signature() {
let now = current_time(&self.writedir)?;
let now = self.rt.wallclock();
let timely = wellsigned.check_valid_at(&now)?;
newcerts.push((timely, s));
} else {
@ -603,7 +591,10 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
validated,
self.consensus_meta,
self.writedir,
)?))
self.rt,
self.config,
self.prev_netdir,
)))
} else {
Ok(self)
}
@ -612,23 +603,20 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
Some(self.consensus_meta.lifetime().valid_until())
}
fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
// FIXME(eta): This can't be made to work until the necessary parameters are passed through.
// (This will happen very soon.)
todo!()
/*
Ok(Box::new(GetConsensusState::new(
self.writedir,
self.rt,
self.config,
self.cache_usage,
)?))
*/
self.prev_netdir,
self.writedir,
)))
}
}
/// Final state: we're fetching or loading microdescriptors
#[derive(Debug, Clone)]
struct GetMicrodescsState<DM: WriteNetDir> {
struct GetMicrodescsState<DM: WriteNetDir, R: Runtime> {
/// How should we get the consensus from the cache, if at all?
#[allow(dead_code)] // FIXME(eta): remove when used
cache_usage: CacheUsage,
/// Total number of microdescriptors listed in the consensus.
n_microdescs: usize,
@ -651,6 +639,14 @@ struct GetMicrodescsState<DM: WriteNetDir> {
///
/// Only cleared for testing.
expire_when_complete: bool,
/// A `Runtime` implementation.
rt: R,
/// The configuration of the directory manager. Used for download configuration
/// purposes.
config: Arc<DirMgrConfig>,
/// If one exists, the netdir we're trying to update.
prev_netdir: Option<Arc<NetDir>>,
}
/// A network directory that is not yet ready to become _the_ current network directory.
@ -716,7 +712,7 @@ impl PendingNetDir {
}
}
impl<DM: WriteNetDir> GetMicrodescsState<DM> {
impl<DM: WriteNetDir, R: Runtime> GetMicrodescsState<DM, R> {
/// Create a new [`GetMicrodescsState`] from a provided
/// microdescriptor consensus.
fn new(
@ -724,22 +720,18 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
consensus: MdConsensus,
meta: ConsensusMeta,
writedir: Weak<DM>,
) -> Result<Self> {
rt: R,
config: Arc<DirMgrConfig>,
prev_netdir: Option<Arc<NetDir>>,
) -> Self {
let reset_time = consensus.lifetime().valid_until();
let n_microdescs = consensus.relays().len();
let partial_dir = match Weak::upgrade(&writedir) {
Some(wd) => {
let config = wd.config();
let params = &config.override_net_params;
let mut dir = PartialNetDir::new(consensus, Some(params));
if let Some(old_dir) = wd.netdir().get() {
dir.fill_from_previous_netdir(&old_dir);
let mut partial_dir = PartialNetDir::new(consensus, Some(params));
if let Some(old_dir) = prev_netdir.as_ref() {
partial_dir.fill_from_previous_netdir(old_dir);
}
dir
}
None => return Err(Error::ManagerDropped),
};
let mut result = GetMicrodescsState {
cache_usage,
@ -750,10 +742,13 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
newly_listed: Vec::new(),
reset_time,
expire_when_complete: true,
rt,
config,
prev_netdir,
};
result.consider_upgrade();
Ok(result)
result
}
/// Add a bunch of microdescriptors to the in-progress netdir.
@ -800,7 +795,7 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
self.reset_time = pick_download_time(netdir.lifetime());
// We re-set the parameters here, in case they have been
// reconfigured.
netdir.replace_overridden_parameters(&wd.config().override_net_params);
netdir.replace_overridden_parameters(&self.config.override_net_params);
wd.netdir().replace(netdir);
wd.netdir_consensus_changed();
wd.netdir_descriptors_changed();
@ -832,7 +827,7 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
}
}
impl<DM: WriteNetDir> GetMicrodescsState<DM> {
impl<DM: WriteNetDir, R: Runtime> GetMicrodescsState<DM, R> {
/// Try to obtain info from an inner `MdReceiver`
///
/// Either finds an inner `MdReceiver` and calls `f` on it, or returns `default()`.
@ -861,7 +856,7 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
}
}
impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
impl<DM: WriteNetDir, R: Runtime> DirState for GetMicrodescsState<DM, R> {
fn describe(&self) -> String {
format!(
"Downloading microdescriptors (we are missing {}).",
@ -893,12 +888,8 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
}
.into()
}
fn dl_config(&self) -> Result<DownloadSchedule> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok(wd.config().schedule.retry_microdescs)
} else {
Err(Error::ManagerDropped)
}
fn dl_config(&self) -> DownloadSchedule {
self.config.schedule.retry_microdescs
}
fn add_from_cache(
&mut self,
@ -991,10 +982,6 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
Some(self.reset_time)
}
fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
// FIXME(eta): This can't be made to work until the necessary parameters are passed through.
// (This will happen very soon.)
todo!()
/*
let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
// Cache only means we can't ever download.
CacheUsage::CacheOnly
@ -1009,10 +996,12 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
CacheUsage::CacheOkay
};
Ok(Box::new(GetConsensusState::new(
self.writedir,
self.rt,
self.config,
cache_usage,
)?))
*/
self.prev_netdir,
self.writedir,
)))
}
}
@ -1054,15 +1043,6 @@ fn client_download_range(lt: &Lifetime) -> (SystemTime, Duration) {
(valid_after + lowbound, uncertainty)
}
/// Helper: call `now` on a Weak<WriteNetDir>.
fn current_time<DM: WriteNetDir>(writedir: &Weak<DM>) -> Result<SystemTime> {
if let Some(writedir) = Weak::upgrade(writedir) {
Ok(writedir.now())
} else {
Err(Error::ManagerDropped)
}
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
@ -1076,6 +1056,8 @@ mod test {
use tempfile::TempDir;
use time::macros::datetime;
use tor_netdoc::doc::authcert::AuthCertKeyIds;
use tor_rtcompat::CompoundRuntime;
use tor_rtmock::time::MockSleepProvider;
#[test]
fn download_schedule() {
@ -1114,16 +1096,12 @@ mod test {
(tempdir, Mutex::new(Box::new(store)))
}
struct DirRcv {
cfg: Arc<DirMgrConfig>,
netdir: SharedMutArc<NetDir>,
consensus_changed: AtomicBool,
descriptors_changed: AtomicBool,
now: SystemTime,
fn make_time_shifted_runtime(now: SystemTime, rt: impl Runtime) -> impl Runtime {
let msp = MockSleepProvider::new(now);
CompoundRuntime::new(rt.clone(), msp, rt.clone(), rt.clone(), rt)
}
impl DirRcv {
fn new(now: SystemTime, authorities: Option<Vec<AuthorityBuilder>>) -> Self {
fn make_dirmgr_config(authorities: Option<Vec<AuthorityBuilder>>) -> Arc<DirMgrConfig> {
let mut netcfg = crate::NetworkConfig::builder();
netcfg.set_fallback_caches(vec![]);
if let Some(a) = authorities {
@ -1134,10 +1112,18 @@ mod test {
network: netcfg.build().unwrap(),
..Default::default()
};
let cfg = Arc::new(cfg);
Arc::new(cfg)
}
struct DirRcv {
netdir: SharedMutArc<NetDir>,
consensus_changed: AtomicBool,
descriptors_changed: AtomicBool,
}
impl DirRcv {
fn new() -> Self {
DirRcv {
now,
cfg,
netdir: Default::default(),
consensus_changed: false.into(),
descriptors_changed: false.into(),
@ -1146,9 +1132,6 @@ mod test {
}
impl WriteNetDir for DirRcv {
fn config(&self) -> Arc<DirMgrConfig> {
Arc::clone(&self.cfg)
}
fn netdir(&self) -> &SharedMutArc<NetDir> {
&self.netdir
}
@ -1159,9 +1142,6 @@ mod test {
self.descriptors_changed
.store(true, atomic::Ordering::SeqCst);
}
fn now(&self) -> SystemTime {
self.now
}
#[cfg(feature = "dirfilter")]
fn filter(&self) -> &dyn crate::filter::DirFilter {
&crate::filter::NilFilter
@ -1229,13 +1209,15 @@ mod test {
#[test]
fn get_consensus_state() {
tor_rtcompat::test_with_one_runtime!(|rt| async move {
let rcv = Arc::new(DirRcv::new(test_time(), None));
let rcv = Arc::new(DirRcv::new());
let rt = make_time_shifted_runtime(test_time(), rt);
let cfg = make_dirmgr_config(None);
let (_tempdir, store) = temp_store();
let mut state = GetConsensusState::new(
rt.clone(),
rcv.cfg.clone(),
cfg,
CacheUsage::CacheOkay,
None,
Arc::downgrade(&rcv),
@ -1257,7 +1239,7 @@ mod test {
// Download configuration is simple: only 1 request can be done in
// parallel. It uses a consensus retry schedule.
let retry = state.dl_config().unwrap();
let retry = state.dl_config();
assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus);
// Do we know what we want?
@ -1298,11 +1280,11 @@ mod test {
// Great. Change the receiver to use a configuration where these test
// authorities are recognized.
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
let cfg = make_dirmgr_config(Some(test_authorities()));
let mut state = GetConsensusState::new(
rt.clone(),
rcv.cfg.clone(),
cfg,
CacheUsage::CacheOkay,
None,
Arc::downgrade(&rcv),
@ -1327,14 +1309,9 @@ mod test {
);
// Try again, but this time get the state from the cache.
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
let mut state = GetConsensusState::new(
rt,
rcv.cfg.clone(),
CacheUsage::CacheOkay,
None,
Arc::downgrade(&rcv),
);
let cfg = make_dirmgr_config(Some(test_authorities()));
let mut state =
GetConsensusState::new(rt, cfg, CacheUsage::CacheOkay, None, Arc::downgrade(&rcv));
let text: crate::storage::InputString = CONSENSUS.to_owned().into();
let map = vec![(docid, text.into())].into_iter().collect();
let outcome = state.add_from_cache(map, None);
@ -1348,10 +1325,12 @@ mod test {
tor_rtcompat::test_with_one_runtime!(|rt| async move {
/// Construct a GetCertsState with our test data
fn new_getcerts_state(rt: impl Runtime) -> (Arc<DirRcv>, Box<dyn DirState>) {
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
let rcv = Arc::new(DirRcv::new());
let rt = make_time_shifted_runtime(test_time(), rt);
let cfg = make_dirmgr_config(Some(test_authorities()));
let mut state = GetConsensusState::new(
rt,
rcv.cfg.clone(),
cfg,
CacheUsage::CacheOkay,
None,
Arc::downgrade(&rcv),
@ -1375,7 +1354,7 @@ mod test {
assert!(!state.is_ready(Readiness::Usable));
let consensus_expires = datetime!(2020-08-07 12:43:20 UTC).into();
assert_eq!(state.reset_time(), Some(consensus_expires));
let retry = state.dl_config().unwrap();
let retry = state.dl_config();
assert_eq!(retry, DownloadScheduleConfig::default().retry_certs);
// Bootstrap status okay?
@ -1447,13 +1426,10 @@ mod test {
"Downloading microdescriptors (we are missing 6)."
);
/*
FIXME(eta): reinstate
// If we start from scratch and reset, we're back in GetConsensus.
let (_rcv, state) = new_getcerts_state(rt);
let state = state.reset().unwrap();
assert_eq!(&state.describe(), "Looking for a consensus.");
*/
// TODO: I'd like even more tests to make sure that we never
// accept a certificate for an authority we don't believe in.
@ -1462,9 +1438,14 @@ mod test {
#[test]
fn get_microdescs_state() {
tor_rtcompat::test_with_one_runtime!(|rt| async move {
/// Construct a GetCertsState with our test data
fn new_getmicrodescs_state() -> (Arc<DirRcv>, GetMicrodescsState<DirRcv>) {
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
fn new_getmicrodescs_state(
rt: impl Runtime,
) -> (Arc<DirRcv>, GetMicrodescsState<DirRcv, impl Runtime>) {
let rcv = Arc::new(DirRcv::new());
let rt = make_time_shifted_runtime(test_time(), rt);
let cfg = make_dirmgr_config(Some(test_authorities()));
let (signed, rest, consensus) = MdConsensus::parse(CONSENSUS2).unwrap();
let consensus = consensus
.dangerously_assume_timely()
@ -1475,8 +1456,10 @@ mod test {
consensus,
meta,
Arc::downgrade(&rcv),
)
.unwrap();
rt,
cfg,
None,
);
(rcv, state)
}
@ -1484,16 +1467,13 @@ mod test {
base64::decode(s).unwrap().try_into().unwrap()
}
/*
FIXME(eta): reinstate
// If we start from scratch and reset, we're back in GetConsensus.
let (_rcv, state) = new_getmicrodescs_state();
let (_rcv, state) = new_getmicrodescs_state(rt.clone());
let state = Box::new(state).reset().unwrap();
assert_eq!(&state.describe(), "Looking for a consensus.");
*/
// Check the basics.
let (_rcv, mut state) = new_getmicrodescs_state();
let (_rcv, mut state) = new_getmicrodescs_state(rt.clone());
assert_eq!(
&state.describe(),
"Downloading microdescriptors (we are missing 4)."
@ -1508,7 +1488,7 @@ mod test {
assert!(reset_time >= fresh_until);
assert!(reset_time <= valid_until);
}
let retry = state.dl_config().unwrap();
let retry = state.dl_config();
assert_eq!(retry, DownloadScheduleConfig::default().retry_microdescs);
assert_eq!(
state.bootstrap_status().to_string(),
@ -1577,5 +1557,6 @@ mod test {
let missing = state.missing_docs();
assert!(missing.is_empty());
});
}
}