tor-dirmgr/state.rs: refactor GetConsensusState::new

- GetConsensusState::new now takes a set of parameters matching what it
  actually needs, instead of just taking a writedir. (It still *does*
  take a writedir, and indeed still uses it for basically everything,
  but that will eventually go away.)
- Its call sites were updated.
  - Some tests now need to take a runtime, and got indented a lot as a
    result.
  - Resetting was made non-functional, because we need to thread through
    the parameters passed to GetConsensusState to all of the other
    states, too. This will happen in a later commit.
This commit is contained in:
eta 2022-04-27 17:45:25 +01:00
parent 62ece0ea21
commit a9bae9adfe
3 changed files with 273 additions and 208 deletions

View File

@ -517,7 +517,7 @@ impl<R: Runtime> DirMgr<R> {
weak: Weak<Self>,
mut on_complete: Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut state: Box<dyn DirState> = Box::new(state::GetConsensusState::new(
let mut state: Box<dyn DirState> = Box::new(state::GetConsensusState::bodge_new(
Weak::clone(&weak),
CacheUsage::CacheOkay,
)?);
@ -727,7 +727,8 @@ impl<R: Runtime> DirMgr<R> {
///
/// Return false if there is no such consensus.
async fn load_directory(self: &Arc<Self>) -> Result<bool> {
let state = state::GetConsensusState::new(Arc::downgrade(self), CacheUsage::CacheOnly)?;
let state =
state::GetConsensusState::bodge_new(Arc::downgrade(self), CacheUsage::CacheOnly)?;
let _ = bootstrap::load(Arc::clone(self), Box::new(state)).await?;
Ok(self.netdir.get().is_some())

View File

@ -29,7 +29,7 @@ use crate::{
event,
retry::DownloadSchedule,
shared_ref::SharedMutArc,
CacheUsage, ClientRequest, DirMgrConfig, DocId, DocumentText, Error, Readiness, Result,
CacheUsage, ClientRequest, DirMgr, DirMgrConfig, DocId, DocumentText, Error, Readiness, Result,
};
use crate::{DirEvent, DocSource};
use tor_checkable::{ExternallySigned, SelfSigned, Timebound};
@ -196,7 +196,7 @@ impl<R: Runtime> WriteNetDir for crate::DirMgr<R> {
/// Initial state: fetching or loading a consensus directory.
#[derive(Clone, Debug)]
pub(crate) struct GetConsensusState<DM: WriteNetDir> {
pub(crate) struct GetConsensusState<DM: WriteNetDir, R: Runtime> {
/// How should we get the consensus from the cache, if at all?
cache_usage: CacheUsage,
@ -222,39 +222,66 @@ pub(crate) struct GetConsensusState<DM: WriteNetDir> {
/// A weak reference to the directory manager that wants us to
/// fetch this information. When this references goes away, we exit.
writedir: Weak<DM>,
/// A `Runtime` implementation.
#[allow(dead_code)] // FIXME(eta): remove when used
rt: R,
/// The configuration of the directory manager. Used for download configuration
/// purposes.
config: Arc<DirMgrConfig>,
/// If one exists, the netdir we're trying to update.
#[allow(dead_code)] // FIXME(eta): remove when used
prev_netdir: Option<Arc<NetDir>>,
}
impl<DM: WriteNetDir> GetConsensusState<DM> {
/// Create a new GetConsensusState from a weak reference to a
/// directory manager and a `cache_usage` flag.
pub(crate) fn new(writedir: Weak<DM>, cache_usage: CacheUsage) -> Result<Self> {
let (authority_ids, after) = if let Some(writedir) = Weak::upgrade(&writedir) {
let ids: Vec<_> = writedir
.config()
.authorities()
.iter()
.map(|auth| auth.v3ident)
.collect();
let after = writedir
.netdir()
.get()
.map(|nd| nd.lifetime().valid_after());
(ids, after)
impl<R: Runtime> GetConsensusState<DirMgr<R>, R> {
/// Bodge version of Self::new() with the old pre-refactor signature.
/// This will go away when the refactor is complete.
pub(crate) fn bodge_new(writedir: Weak<DirMgr<R>>, cache_usage: CacheUsage) -> Result<Self> {
if let Some(netdir) = Weak::upgrade(&writedir) {
let config = netdir.config.get();
let prev_netdir = netdir.opt_netdir();
let rt = netdir.runtime.clone();
Ok(Self::new(rt, config, cache_usage, prev_netdir, writedir))
} else {
return Err(Error::ManagerDropped);
};
Ok(GetConsensusState {
Err(Error::ManagerDropped)
}
}
}
impl<DM: WriteNetDir, R: Runtime> GetConsensusState<DM, R> {
/// Create a new `GetConsensusState`, using the cache as per `cache_usage` and downloading as
/// per the relevant sections of `config`. If `prev_netdir` is supplied, information from that
/// directory may be used to complete the next one.
pub(crate) fn new(
rt: R,
config: Arc<DirMgrConfig>,
cache_usage: CacheUsage,
prev_netdir: Option<Arc<NetDir>>,
// NOTE(eta): This `writedir` is going away soon.
writedir: Weak<DM>,
) -> Self {
let authority_ids = config
.authorities()
.iter()
.map(|auth| auth.v3ident)
.collect();
let after = prev_netdir.as_ref().map(|nd| nd.lifetime().valid_after());
GetConsensusState {
cache_usage,
after,
next: None,
authority_ids,
writedir,
})
rt,
config,
prev_netdir,
}
}
}
impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
impl<DM: WriteNetDir, R: Runtime> DirState for GetConsensusState<DM, R> {
fn describe(&self) -> String {
if self.next.is_some() {
"About to fetch certificates."
@ -291,11 +318,7 @@ impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
}
}
fn dl_config(&self) -> Result<DownloadSchedule> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok(wd.config().schedule.retry_consensus)
} else {
Err(Error::ManagerDropped)
}
Ok(self.config.schedule.retry_consensus)
}
fn add_from_cache(
&mut self,
@ -349,7 +372,7 @@ impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
}
}
impl<DM: WriteNetDir> GetConsensusState<DM> {
impl<DM: WriteNetDir, R: Runtime> GetConsensusState<DM, R> {
/// Helper: try to set the current consensus text from an input
/// string `text`. Refuse it if the authorities could never be
/// correct, or if it is ill-formed.
@ -589,10 +612,15 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
Some(self.consensus_meta.lifetime().valid_until())
}
fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
// FIXME(eta): This can't be made to work until the necessary parameters are passed through.
// (This will happen very soon.)
todo!()
/*
Ok(Box::new(GetConsensusState::new(
self.writedir,
self.cache_usage,
)?))
*/
}
}
@ -600,6 +628,7 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
#[derive(Debug, Clone)]
struct GetMicrodescsState<DM: WriteNetDir> {
/// How should we get the consensus from the cache, if at all?
#[allow(dead_code)] // FIXME(eta): remove when used
cache_usage: CacheUsage,
/// Total number of microdescriptors listed in the consensus.
n_microdescs: usize,
@ -962,6 +991,10 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
Some(self.reset_time)
}
fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
// FIXME(eta): This can't be made to work until the necessary parameters are passed through.
// (This will happen very soon.)
todo!()
/*
let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
// Cache only means we can't ever download.
CacheUsage::CacheOnly
@ -979,6 +1012,7 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
self.writedir,
cache_usage,
)?))
*/
}
}
@ -1194,209 +1228,236 @@ mod test {
#[test]
fn get_consensus_state() {
let rcv = Arc::new(DirRcv::new(test_time(), None));
tor_rtcompat::test_with_one_runtime!(|rt| async move {
let rcv = Arc::new(DirRcv::new(test_time(), None));
let (_tempdir, store) = temp_store();
let (_tempdir, store) = temp_store();
let mut state =
GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap();
let mut state = GetConsensusState::new(
rt.clone(),
rcv.cfg.clone(),
CacheUsage::CacheOkay,
None,
Arc::downgrade(&rcv),
);
// Is description okay?
assert_eq!(&state.describe(), "Looking for a consensus.");
// Is description okay?
assert_eq!(&state.describe(), "Looking for a consensus.");
// Basic properties: without a consensus it is not ready to advance.
assert!(!state.can_advance());
assert!(!state.is_ready(Readiness::Complete));
assert!(!state.is_ready(Readiness::Usable));
// Basic properties: without a consensus it is not ready to advance.
assert!(!state.can_advance());
assert!(!state.is_ready(Readiness::Complete));
assert!(!state.is_ready(Readiness::Usable));
// Basic properties: it doesn't want to reset.
assert!(state.reset_time().is_none());
// Basic properties: it doesn't want to reset.
assert!(state.reset_time().is_none());
// Its starting DirStatus is "fetching a consensus".
assert_eq!(state.bootstrap_status().to_string(), "fetching a consensus");
// Its starting DirStatus is "fetching a consensus".
assert_eq!(state.bootstrap_status().to_string(), "fetching a consensus");
// Download configuration is simple: only 1 request can be done in
// parallel. It uses a consensus retry schedule.
let retry = state.dl_config().unwrap();
assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus);
// Download configuration is simple: only 1 request can be done in
// parallel. It uses a consensus retry schedule.
let retry = state.dl_config().unwrap();
assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus);
// Do we know what we want?
let docs = state.missing_docs();
assert_eq!(docs.len(), 1);
let docid = docs[0];
// Do we know what we want?
let docs = state.missing_docs();
assert_eq!(docs.len(), 1);
let docid = docs[0];
assert!(matches!(
docid,
DocId::LatestConsensus {
flavor: ConsensusFlavor::Microdesc,
cache_usage: CacheUsage::CacheOkay,
}
));
assert!(matches!(
docid,
DocId::LatestConsensus {
flavor: ConsensusFlavor::Microdesc,
cache_usage: CacheUsage::CacheOkay,
}
));
// Now suppose that we get some complete junk from a download.
let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
let req = crate::docid::ClientRequest::Consensus(req);
let outcome = state.add_from_download("this isn't a consensus", &req, Some(&store));
assert!(matches!(outcome, Err(Error::NetDocError { .. })));
// make sure it wasn't stored...
assert!(store
.lock()
.unwrap()
.latest_consensus(ConsensusFlavor::Microdesc, None)
.unwrap()
.is_none());
// Now suppose that we get some complete junk from a download.
let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
let req = crate::docid::ClientRequest::Consensus(req);
let outcome = state.add_from_download("this isn't a consensus", &req, Some(&store));
assert!(matches!(outcome, Err(Error::NetDocError { .. })));
// make sure it wasn't stored...
assert!(store
.lock()
.unwrap()
.latest_consensus(ConsensusFlavor::Microdesc, None)
.unwrap()
.is_none());
// Now try again, with a real consensus... but the wrong authorities.
let outcome = state.add_from_download(CONSENSUS, &req, Some(&store));
assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities)));
assert!(store
.lock()
.unwrap()
.latest_consensus(ConsensusFlavor::Microdesc, None)
.unwrap()
.is_none());
// Now try again, with a real consensus... but the wrong authorities.
let outcome = state.add_from_download(CONSENSUS, &req, Some(&store));
assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities)));
assert!(store
.lock()
.unwrap()
.latest_consensus(ConsensusFlavor::Microdesc, None)
.unwrap()
.is_none());
// Great. Change the receiver to use a configuration where these test
// authorities are recognized.
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
// Great. Change the receiver to use a configuration where these test
// authorities are recognized.
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
let mut state =
GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap();
let outcome = state.add_from_download(CONSENSUS, &req, Some(&store));
assert!(outcome.unwrap());
assert!(store
.lock()
.unwrap()
.latest_consensus(ConsensusFlavor::Microdesc, None)
.unwrap()
.is_some());
let mut state = GetConsensusState::new(
rt.clone(),
rcv.cfg.clone(),
CacheUsage::CacheOkay,
None,
Arc::downgrade(&rcv),
);
let outcome = state.add_from_download(CONSENSUS, &req, Some(&store));
assert!(outcome.unwrap());
assert!(store
.lock()
.unwrap()
.latest_consensus(ConsensusFlavor::Microdesc, None)
.unwrap()
.is_some());
// And with that, we should be asking for certificates
assert!(state.can_advance());
assert_eq!(&state.describe(), "About to fetch certificates.");
assert_eq!(state.missing_docs(), Vec::new());
let next = Box::new(state).advance().unwrap();
assert_eq!(
&next.describe(),
"Downloading certificates for consensus (we are missing 2/2)."
);
// And with that, we should be asking for certificates
assert!(state.can_advance());
assert_eq!(&state.describe(), "About to fetch certificates.");
assert_eq!(state.missing_docs(), Vec::new());
let next = Box::new(state).advance().unwrap();
assert_eq!(
&next.describe(),
"Downloading certificates for consensus (we are missing 2/2)."
);
// Try again, but this time get the state from the cache.
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
let mut state =
GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap();
let text: crate::storage::InputString = CONSENSUS.to_owned().into();
let map = vec![(docid, text.into())].into_iter().collect();
let outcome = state.add_from_cache(map, None);
assert!(outcome.unwrap());
assert!(state.can_advance());
// Try again, but this time get the state from the cache.
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
let mut state = GetConsensusState::new(
rt,
rcv.cfg.clone(),
CacheUsage::CacheOkay,
None,
Arc::downgrade(&rcv),
);
let text: crate::storage::InputString = CONSENSUS.to_owned().into();
let map = vec![(docid, text.into())].into_iter().collect();
let outcome = state.add_from_cache(map, None);
assert!(outcome.unwrap());
assert!(state.can_advance());
});
}
#[test]
fn get_certs_state() {
/// Construct a GetCertsState with our test data
fn new_getcerts_state() -> (Arc<DirRcv>, Box<dyn DirState>) {
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
let mut state =
GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap();
let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
let req = crate::docid::ClientRequest::Consensus(req);
let outcome = state.add_from_download(CONSENSUS, &req, None);
assert!(outcome.unwrap());
(rcv, Box::new(state).advance().unwrap())
}
tor_rtcompat::test_with_one_runtime!(|rt| async move {
/// Construct a GetCertsState with our test data
fn new_getcerts_state(rt: impl Runtime) -> (Arc<DirRcv>, Box<dyn DirState>) {
let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
let mut state = GetConsensusState::new(
rt,
rcv.cfg.clone(),
CacheUsage::CacheOkay,
None,
Arc::downgrade(&rcv),
);
let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
let req = crate::docid::ClientRequest::Consensus(req);
let outcome = state.add_from_download(CONSENSUS, &req, None);
assert!(outcome.unwrap());
(rcv, Box::new(state).advance().unwrap())
}
let (_tempdir, store) = temp_store();
let (_rcv, mut state) = new_getcerts_state();
// Basic properties: description, status, reset time.
assert_eq!(
&state.describe(),
"Downloading certificates for consensus (we are missing 2/2)."
);
assert!(!state.can_advance());
assert!(!state.is_ready(Readiness::Complete));
assert!(!state.is_ready(Readiness::Usable));
let consensus_expires = datetime!(2020-08-07 12:43:20 UTC).into();
assert_eq!(state.reset_time(), Some(consensus_expires));
let retry = state.dl_config().unwrap();
assert_eq!(retry, DownloadScheduleConfig::default().retry_certs);
let (_tempdir, store) = temp_store();
let (_rcv, mut state) = new_getcerts_state(rt.clone());
// Basic properties: description, status, reset time.
assert_eq!(
&state.describe(),
"Downloading certificates for consensus (we are missing 2/2)."
);
assert!(!state.can_advance());
assert!(!state.is_ready(Readiness::Complete));
assert!(!state.is_ready(Readiness::Usable));
let consensus_expires = datetime!(2020-08-07 12:43:20 UTC).into();
assert_eq!(state.reset_time(), Some(consensus_expires));
let retry = state.dl_config().unwrap();
assert_eq!(retry, DownloadScheduleConfig::default().retry_certs);
// Bootstrap status okay?
assert_eq!(
state.bootstrap_status().to_string(),
"fetching authority certificates (0/2)"
);
// Bootstrap status okay?
assert_eq!(
state.bootstrap_status().to_string(),
"fetching authority certificates (0/2)"
);
// Check that we get the right list of missing docs.
let missing = state.missing_docs();
assert_eq!(missing.len(), 2); // We are missing two certificates.
assert!(missing.contains(&DocId::AuthCert(authcert_id_5696())));
assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
// we don't ask for this one because we don't recognize its authority
assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47())));
// Check that we get the right list of missing docs.
let missing = state.missing_docs();
assert_eq!(missing.len(), 2); // We are missing two certificates.
assert!(missing.contains(&DocId::AuthCert(authcert_id_5696())));
assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
// we don't ask for this one because we don't recognize its authority
assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47())));
// Add one from the cache; make sure the list is still right
let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into();
// let text2: crate::storage::InputString = AUTHCERT_5A23.to_owned().into();
let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())]
.into_iter()
.collect();
let outcome = state.add_from_cache(docs, None);
assert!(outcome.unwrap()); // no error, and something changed.
assert!(!state.can_advance()); // But we aren't done yet.
let missing = state.missing_docs();
assert_eq!(missing.len(), 1); // Now we're only missing one!
assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
assert_eq!(
state.bootstrap_status().to_string(),
"fetching authority certificates (1/2)"
);
// Add one from the cache; make sure the list is still right
let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into();
// let text2: crate::storage::InputString = AUTHCERT_5A23.to_owned().into();
let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())]
.into_iter()
.collect();
let outcome = state.add_from_cache(docs, None);
assert!(outcome.unwrap()); // no error, and something changed.
assert!(!state.can_advance()); // But we aren't done yet.
let missing = state.missing_docs();
assert_eq!(missing.len(), 1); // Now we're only missing one!
assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
assert_eq!(
state.bootstrap_status().to_string(),
"fetching authority certificates (1/2)"
);
// Now try to add the other from a download ... but fail
// because we didn't ask for it.
let mut req = tor_dirclient::request::AuthCertRequest::new();
req.push(authcert_id_5696()); // it's the wrong id.
let req = ClientRequest::AuthCert(req);
let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store));
assert!(!outcome.unwrap()); // no error, but nothing changed.
let missing2 = state.missing_docs();
assert_eq!(missing, missing2); // No change.
assert!(store
.lock()
.unwrap()
.authcerts(&[authcert_id_5a23()])
.unwrap()
.is_empty());
// Now try to add the other from a download ... but fail
// because we didn't ask for it.
let mut req = tor_dirclient::request::AuthCertRequest::new();
req.push(authcert_id_5696()); // it's the wrong id.
let req = ClientRequest::AuthCert(req);
let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store));
assert!(!outcome.unwrap()); // no error, but nothing changed.
let missing2 = state.missing_docs();
assert_eq!(missing, missing2); // No change.
assert!(store
.lock()
.unwrap()
.authcerts(&[authcert_id_5a23()])
.unwrap()
.is_empty());
// Now try to add the other from a download ... for real!
let mut req = tor_dirclient::request::AuthCertRequest::new();
req.push(authcert_id_5a23()); // Right idea this time!
let req = ClientRequest::AuthCert(req);
let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store));
assert!(outcome.unwrap()); // No error, _and_ something changed!
let missing3 = state.missing_docs();
assert!(missing3.is_empty());
assert!(state.can_advance());
assert!(!store
.lock()
.unwrap()
.authcerts(&[authcert_id_5a23()])
.unwrap()
.is_empty());
// Now try to add the other from a download ... for real!
let mut req = tor_dirclient::request::AuthCertRequest::new();
req.push(authcert_id_5a23()); // Right idea this time!
let req = ClientRequest::AuthCert(req);
let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store));
assert!(outcome.unwrap()); // No error, _and_ something changed!
let missing3 = state.missing_docs();
assert!(missing3.is_empty());
assert!(state.can_advance());
assert!(!store
.lock()
.unwrap()
.authcerts(&[authcert_id_5a23()])
.unwrap()
.is_empty());
let next = state.advance().unwrap();
assert_eq!(
&next.describe(),
"Downloading microdescriptors (we are missing 6)."
);
let next = state.advance().unwrap();
assert_eq!(
&next.describe(),
"Downloading microdescriptors (we are missing 6)."
);
// If we start from scratch and reset, we're back in GetConsensus.
let (_rcv, state) = new_getcerts_state();
let state = state.reset().unwrap();
assert_eq!(&state.describe(), "Looking for a consensus.");
/*
FIXME(eta): reinstate
// If we start from scratch and reset, we're back in GetConsensus.
let (_rcv, state) = new_getcerts_state(rt);
let state = state.reset().unwrap();
assert_eq!(&state.describe(), "Looking for a consensus.");
*/
// TODO: I'd like even more tests to make sure that we never
// accept a certificate for an authority we don't believe in.
// TODO: I'd like even more tests to make sure that we never
// accept a certificate for an authority we don't believe in.
});
}
#[test]
@ -1423,10 +1484,13 @@ mod test {
base64::decode(s).unwrap().try_into().unwrap()
}
/*
FIXME(eta): reinstate
// If we start from scratch and reset, we're back in GetConsensus.
let (_rcv, state) = new_getmicrodescs_state();
let state = Box::new(state).reset().unwrap();
assert_eq!(&state.describe(), "Looking for a consensus.");
*/
// Check the basics.
let (_rcv, mut state) = new_getmicrodescs_state();

View File

@ -18,8 +18,8 @@ use crate::{Error, Result};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fs::File;
use std::str::Utf8Error;
use std::ops::{Deref, DerefMut};
use std::str::Utf8Error;
use std::time::SystemTime;
use time::Duration;