From a9bae9adfe55f27257214fb7551f14680a9672f3 Mon Sep 17 00:00:00 2001 From: eta Date: Wed, 27 Apr 2022 17:45:25 +0100 Subject: [PATCH] tor-dirmgr/state.rs: refactor GetConsensusState::new - GetConsensusState::new now takes a set of parameters matching what it actually needs, instead of just taking a writedir. (It still *does* take a writedir, and indeed still uses it for basically everything, but that will eventually go away.) - Its call sites were updated. - Some tests now need to take a runtime, and got indented a lot as a result. - Resetting was made non-functional, because we need to thread through the parameters passed to GetConsensusState to all of the other states, too. This will happen in a later commit. --- crates/tor-dirmgr/src/lib.rs | 5 +- crates/tor-dirmgr/src/state.rs | 474 ++++++++++++++++++------------- crates/tor-dirmgr/src/storage.rs | 2 +- 3 files changed, 273 insertions(+), 208 deletions(-) diff --git a/crates/tor-dirmgr/src/lib.rs b/crates/tor-dirmgr/src/lib.rs index 29462c5f0..f574de26d 100644 --- a/crates/tor-dirmgr/src/lib.rs +++ b/crates/tor-dirmgr/src/lib.rs @@ -517,7 +517,7 @@ impl DirMgr { weak: Weak, mut on_complete: Option>, ) -> Result<()> { - let mut state: Box = Box::new(state::GetConsensusState::new( + let mut state: Box = Box::new(state::GetConsensusState::bodge_new( Weak::clone(&weak), CacheUsage::CacheOkay, )?); @@ -727,7 +727,8 @@ impl DirMgr { /// /// Return false if there is no such consensus. async fn load_directory(self: &Arc) -> Result { - let state = state::GetConsensusState::new(Arc::downgrade(self), CacheUsage::CacheOnly)?; + let state = + state::GetConsensusState::bodge_new(Arc::downgrade(self), CacheUsage::CacheOnly)?; let _ = bootstrap::load(Arc::clone(self), Box::new(state)).await?; Ok(self.netdir.get().is_some()) diff --git a/crates/tor-dirmgr/src/state.rs b/crates/tor-dirmgr/src/state.rs index 8c49866af..0a6c294ec 100644 --- a/crates/tor-dirmgr/src/state.rs +++ b/crates/tor-dirmgr/src/state.rs @@ -29,7 +29,7 @@ use crate::{ event, retry::DownloadSchedule, shared_ref::SharedMutArc, - CacheUsage, ClientRequest, DirMgrConfig, DocId, DocumentText, Error, Readiness, Result, + CacheUsage, ClientRequest, DirMgr, DirMgrConfig, DocId, DocumentText, Error, Readiness, Result, }; use crate::{DirEvent, DocSource}; use tor_checkable::{ExternallySigned, SelfSigned, Timebound}; @@ -196,7 +196,7 @@ impl WriteNetDir for crate::DirMgr { /// Initial state: fetching or loading a consensus directory. #[derive(Clone, Debug)] -pub(crate) struct GetConsensusState { +pub(crate) struct GetConsensusState { /// How should we get the consensus from the cache, if at all? cache_usage: CacheUsage, @@ -222,39 +222,66 @@ pub(crate) struct GetConsensusState { /// A weak reference to the directory manager that wants us to /// fetch this information. When this references goes away, we exit. writedir: Weak, + + /// A `Runtime` implementation. + #[allow(dead_code)] // FIXME(eta): remove when used + rt: R, + /// The configuration of the directory manager. Used for download configuration + /// purposes. + config: Arc, + /// If one exists, the netdir we're trying to update. + #[allow(dead_code)] // FIXME(eta): remove when used + prev_netdir: Option>, } -impl GetConsensusState { - /// Create a new GetConsensusState from a weak reference to a - /// directory manager and a `cache_usage` flag. - pub(crate) fn new(writedir: Weak, cache_usage: CacheUsage) -> Result { - let (authority_ids, after) = if let Some(writedir) = Weak::upgrade(&writedir) { - let ids: Vec<_> = writedir - .config() - .authorities() - .iter() - .map(|auth| auth.v3ident) - .collect(); - let after = writedir - .netdir() - .get() - .map(|nd| nd.lifetime().valid_after()); - - (ids, after) +impl GetConsensusState, R> { + /// Bodge version of Self::new() with the old pre-refactor signature. + /// This will go away when the refactor is complete. + pub(crate) fn bodge_new(writedir: Weak>, cache_usage: CacheUsage) -> Result { + if let Some(netdir) = Weak::upgrade(&writedir) { + let config = netdir.config.get(); + let prev_netdir = netdir.opt_netdir(); + let rt = netdir.runtime.clone(); + Ok(Self::new(rt, config, cache_usage, prev_netdir, writedir)) } else { - return Err(Error::ManagerDropped); - }; - Ok(GetConsensusState { + Err(Error::ManagerDropped) + } + } +} + +impl GetConsensusState { + /// Create a new `GetConsensusState`, using the cache as per `cache_usage` and downloading as + /// per the relevant sections of `config`. If `prev_netdir` is supplied, information from that + /// directory may be used to complete the next one. + pub(crate) fn new( + rt: R, + config: Arc, + cache_usage: CacheUsage, + prev_netdir: Option>, + // NOTE(eta): This `writedir` is going away soon. + writedir: Weak, + ) -> Self { + let authority_ids = config + .authorities() + .iter() + .map(|auth| auth.v3ident) + .collect(); + let after = prev_netdir.as_ref().map(|nd| nd.lifetime().valid_after()); + + GetConsensusState { cache_usage, after, next: None, authority_ids, writedir, - }) + rt, + config, + prev_netdir, + } } } -impl DirState for GetConsensusState { +impl DirState for GetConsensusState { fn describe(&self) -> String { if self.next.is_some() { "About to fetch certificates." @@ -291,11 +318,7 @@ impl DirState for GetConsensusState { } } fn dl_config(&self) -> Result { - if let Some(wd) = Weak::upgrade(&self.writedir) { - Ok(wd.config().schedule.retry_consensus) - } else { - Err(Error::ManagerDropped) - } + Ok(self.config.schedule.retry_consensus) } fn add_from_cache( &mut self, @@ -349,7 +372,7 @@ impl DirState for GetConsensusState { } } -impl GetConsensusState { +impl GetConsensusState { /// Helper: try to set the current consensus text from an input /// string `text`. Refuse it if the authorities could never be /// correct, or if it is ill-formed. @@ -589,10 +612,15 @@ impl DirState for GetCertsState { Some(self.consensus_meta.lifetime().valid_until()) } fn reset(self: Box) -> Result> { + // FIXME(eta): This can't be made to work until the necessary parameters are passed through. + // (This will happen very soon.) + todo!() + /* Ok(Box::new(GetConsensusState::new( self.writedir, self.cache_usage, )?)) + */ } } @@ -600,6 +628,7 @@ impl DirState for GetCertsState { #[derive(Debug, Clone)] struct GetMicrodescsState { /// How should we get the consensus from the cache, if at all? + #[allow(dead_code)] // FIXME(eta): remove when used cache_usage: CacheUsage, /// Total number of microdescriptors listed in the consensus. n_microdescs: usize, @@ -962,6 +991,10 @@ impl DirState for GetMicrodescsState { Some(self.reset_time) } fn reset(self: Box) -> Result> { + // FIXME(eta): This can't be made to work until the necessary parameters are passed through. + // (This will happen very soon.) + todo!() + /* let cache_usage = if self.cache_usage == CacheUsage::CacheOnly { // Cache only means we can't ever download. CacheUsage::CacheOnly @@ -979,6 +1012,7 @@ impl DirState for GetMicrodescsState { self.writedir, cache_usage, )?)) + */ } } @@ -1194,209 +1228,236 @@ mod test { #[test] fn get_consensus_state() { - let rcv = Arc::new(DirRcv::new(test_time(), None)); + tor_rtcompat::test_with_one_runtime!(|rt| async move { + let rcv = Arc::new(DirRcv::new(test_time(), None)); - let (_tempdir, store) = temp_store(); + let (_tempdir, store) = temp_store(); - let mut state = - GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap(); + let mut state = GetConsensusState::new( + rt.clone(), + rcv.cfg.clone(), + CacheUsage::CacheOkay, + None, + Arc::downgrade(&rcv), + ); - // Is description okay? - assert_eq!(&state.describe(), "Looking for a consensus."); + // Is description okay? + assert_eq!(&state.describe(), "Looking for a consensus."); - // Basic properties: without a consensus it is not ready to advance. - assert!(!state.can_advance()); - assert!(!state.is_ready(Readiness::Complete)); - assert!(!state.is_ready(Readiness::Usable)); + // Basic properties: without a consensus it is not ready to advance. + assert!(!state.can_advance()); + assert!(!state.is_ready(Readiness::Complete)); + assert!(!state.is_ready(Readiness::Usable)); - // Basic properties: it doesn't want to reset. - assert!(state.reset_time().is_none()); + // Basic properties: it doesn't want to reset. + assert!(state.reset_time().is_none()); - // Its starting DirStatus is "fetching a consensus". - assert_eq!(state.bootstrap_status().to_string(), "fetching a consensus"); + // Its starting DirStatus is "fetching a consensus". + assert_eq!(state.bootstrap_status().to_string(), "fetching a consensus"); - // Download configuration is simple: only 1 request can be done in - // parallel. It uses a consensus retry schedule. - let retry = state.dl_config().unwrap(); - assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus); + // Download configuration is simple: only 1 request can be done in + // parallel. It uses a consensus retry schedule. + let retry = state.dl_config().unwrap(); + assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus); - // Do we know what we want? - let docs = state.missing_docs(); - assert_eq!(docs.len(), 1); - let docid = docs[0]; + // Do we know what we want? + let docs = state.missing_docs(); + assert_eq!(docs.len(), 1); + let docid = docs[0]; - assert!(matches!( - docid, - DocId::LatestConsensus { - flavor: ConsensusFlavor::Microdesc, - cache_usage: CacheUsage::CacheOkay, - } - )); + assert!(matches!( + docid, + DocId::LatestConsensus { + flavor: ConsensusFlavor::Microdesc, + cache_usage: CacheUsage::CacheOkay, + } + )); - // Now suppose that we get some complete junk from a download. - let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc); - let req = crate::docid::ClientRequest::Consensus(req); - let outcome = state.add_from_download("this isn't a consensus", &req, Some(&store)); - assert!(matches!(outcome, Err(Error::NetDocError { .. }))); - // make sure it wasn't stored... - assert!(store - .lock() - .unwrap() - .latest_consensus(ConsensusFlavor::Microdesc, None) - .unwrap() - .is_none()); + // Now suppose that we get some complete junk from a download. + let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc); + let req = crate::docid::ClientRequest::Consensus(req); + let outcome = state.add_from_download("this isn't a consensus", &req, Some(&store)); + assert!(matches!(outcome, Err(Error::NetDocError { .. }))); + // make sure it wasn't stored... + assert!(store + .lock() + .unwrap() + .latest_consensus(ConsensusFlavor::Microdesc, None) + .unwrap() + .is_none()); - // Now try again, with a real consensus... but the wrong authorities. - let outcome = state.add_from_download(CONSENSUS, &req, Some(&store)); - assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities))); - assert!(store - .lock() - .unwrap() - .latest_consensus(ConsensusFlavor::Microdesc, None) - .unwrap() - .is_none()); + // Now try again, with a real consensus... but the wrong authorities. + let outcome = state.add_from_download(CONSENSUS, &req, Some(&store)); + assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities))); + assert!(store + .lock() + .unwrap() + .latest_consensus(ConsensusFlavor::Microdesc, None) + .unwrap() + .is_none()); - // Great. Change the receiver to use a configuration where these test - // authorities are recognized. - let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities()))); + // Great. Change the receiver to use a configuration where these test + // authorities are recognized. + let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities()))); - let mut state = - GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap(); - let outcome = state.add_from_download(CONSENSUS, &req, Some(&store)); - assert!(outcome.unwrap()); - assert!(store - .lock() - .unwrap() - .latest_consensus(ConsensusFlavor::Microdesc, None) - .unwrap() - .is_some()); + let mut state = GetConsensusState::new( + rt.clone(), + rcv.cfg.clone(), + CacheUsage::CacheOkay, + None, + Arc::downgrade(&rcv), + ); + let outcome = state.add_from_download(CONSENSUS, &req, Some(&store)); + assert!(outcome.unwrap()); + assert!(store + .lock() + .unwrap() + .latest_consensus(ConsensusFlavor::Microdesc, None) + .unwrap() + .is_some()); - // And with that, we should be asking for certificates - assert!(state.can_advance()); - assert_eq!(&state.describe(), "About to fetch certificates."); - assert_eq!(state.missing_docs(), Vec::new()); - let next = Box::new(state).advance().unwrap(); - assert_eq!( - &next.describe(), - "Downloading certificates for consensus (we are missing 2/2)." - ); + // And with that, we should be asking for certificates + assert!(state.can_advance()); + assert_eq!(&state.describe(), "About to fetch certificates."); + assert_eq!(state.missing_docs(), Vec::new()); + let next = Box::new(state).advance().unwrap(); + assert_eq!( + &next.describe(), + "Downloading certificates for consensus (we are missing 2/2)." + ); - // Try again, but this time get the state from the cache. - let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities()))); - let mut state = - GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap(); - let text: crate::storage::InputString = CONSENSUS.to_owned().into(); - let map = vec![(docid, text.into())].into_iter().collect(); - let outcome = state.add_from_cache(map, None); - assert!(outcome.unwrap()); - assert!(state.can_advance()); + // Try again, but this time get the state from the cache. + let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities()))); + let mut state = GetConsensusState::new( + rt, + rcv.cfg.clone(), + CacheUsage::CacheOkay, + None, + Arc::downgrade(&rcv), + ); + let text: crate::storage::InputString = CONSENSUS.to_owned().into(); + let map = vec![(docid, text.into())].into_iter().collect(); + let outcome = state.add_from_cache(map, None); + assert!(outcome.unwrap()); + assert!(state.can_advance()); + }); } #[test] fn get_certs_state() { - /// Construct a GetCertsState with our test data - fn new_getcerts_state() -> (Arc, Box) { - let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities()))); - let mut state = - GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap(); - let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc); - let req = crate::docid::ClientRequest::Consensus(req); - let outcome = state.add_from_download(CONSENSUS, &req, None); - assert!(outcome.unwrap()); - (rcv, Box::new(state).advance().unwrap()) - } + tor_rtcompat::test_with_one_runtime!(|rt| async move { + /// Construct a GetCertsState with our test data + fn new_getcerts_state(rt: impl Runtime) -> (Arc, Box) { + let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities()))); + let mut state = GetConsensusState::new( + rt, + rcv.cfg.clone(), + CacheUsage::CacheOkay, + None, + Arc::downgrade(&rcv), + ); + let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc); + let req = crate::docid::ClientRequest::Consensus(req); + let outcome = state.add_from_download(CONSENSUS, &req, None); + assert!(outcome.unwrap()); + (rcv, Box::new(state).advance().unwrap()) + } - let (_tempdir, store) = temp_store(); - let (_rcv, mut state) = new_getcerts_state(); - // Basic properties: description, status, reset time. - assert_eq!( - &state.describe(), - "Downloading certificates for consensus (we are missing 2/2)." - ); - assert!(!state.can_advance()); - assert!(!state.is_ready(Readiness::Complete)); - assert!(!state.is_ready(Readiness::Usable)); - let consensus_expires = datetime!(2020-08-07 12:43:20 UTC).into(); - assert_eq!(state.reset_time(), Some(consensus_expires)); - let retry = state.dl_config().unwrap(); - assert_eq!(retry, DownloadScheduleConfig::default().retry_certs); + let (_tempdir, store) = temp_store(); + let (_rcv, mut state) = new_getcerts_state(rt.clone()); + // Basic properties: description, status, reset time. + assert_eq!( + &state.describe(), + "Downloading certificates for consensus (we are missing 2/2)." + ); + assert!(!state.can_advance()); + assert!(!state.is_ready(Readiness::Complete)); + assert!(!state.is_ready(Readiness::Usable)); + let consensus_expires = datetime!(2020-08-07 12:43:20 UTC).into(); + assert_eq!(state.reset_time(), Some(consensus_expires)); + let retry = state.dl_config().unwrap(); + assert_eq!(retry, DownloadScheduleConfig::default().retry_certs); - // Bootstrap status okay? - assert_eq!( - state.bootstrap_status().to_string(), - "fetching authority certificates (0/2)" - ); + // Bootstrap status okay? + assert_eq!( + state.bootstrap_status().to_string(), + "fetching authority certificates (0/2)" + ); - // Check that we get the right list of missing docs. - let missing = state.missing_docs(); - assert_eq!(missing.len(), 2); // We are missing two certificates. - assert!(missing.contains(&DocId::AuthCert(authcert_id_5696()))); - assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23()))); - // we don't ask for this one because we don't recognize its authority - assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47()))); + // Check that we get the right list of missing docs. + let missing = state.missing_docs(); + assert_eq!(missing.len(), 2); // We are missing two certificates. + assert!(missing.contains(&DocId::AuthCert(authcert_id_5696()))); + assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23()))); + // we don't ask for this one because we don't recognize its authority + assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47()))); - // Add one from the cache; make sure the list is still right - let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into(); - // let text2: crate::storage::InputString = AUTHCERT_5A23.to_owned().into(); - let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())] - .into_iter() - .collect(); - let outcome = state.add_from_cache(docs, None); - assert!(outcome.unwrap()); // no error, and something changed. - assert!(!state.can_advance()); // But we aren't done yet. - let missing = state.missing_docs(); - assert_eq!(missing.len(), 1); // Now we're only missing one! - assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23()))); - assert_eq!( - state.bootstrap_status().to_string(), - "fetching authority certificates (1/2)" - ); + // Add one from the cache; make sure the list is still right + let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into(); + // let text2: crate::storage::InputString = AUTHCERT_5A23.to_owned().into(); + let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())] + .into_iter() + .collect(); + let outcome = state.add_from_cache(docs, None); + assert!(outcome.unwrap()); // no error, and something changed. + assert!(!state.can_advance()); // But we aren't done yet. + let missing = state.missing_docs(); + assert_eq!(missing.len(), 1); // Now we're only missing one! + assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23()))); + assert_eq!( + state.bootstrap_status().to_string(), + "fetching authority certificates (1/2)" + ); - // Now try to add the other from a download ... but fail - // because we didn't ask for it. - let mut req = tor_dirclient::request::AuthCertRequest::new(); - req.push(authcert_id_5696()); // it's the wrong id. - let req = ClientRequest::AuthCert(req); - let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store)); - assert!(!outcome.unwrap()); // no error, but nothing changed. - let missing2 = state.missing_docs(); - assert_eq!(missing, missing2); // No change. - assert!(store - .lock() - .unwrap() - .authcerts(&[authcert_id_5a23()]) - .unwrap() - .is_empty()); + // Now try to add the other from a download ... but fail + // because we didn't ask for it. + let mut req = tor_dirclient::request::AuthCertRequest::new(); + req.push(authcert_id_5696()); // it's the wrong id. + let req = ClientRequest::AuthCert(req); + let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store)); + assert!(!outcome.unwrap()); // no error, but nothing changed. + let missing2 = state.missing_docs(); + assert_eq!(missing, missing2); // No change. + assert!(store + .lock() + .unwrap() + .authcerts(&[authcert_id_5a23()]) + .unwrap() + .is_empty()); - // Now try to add the other from a download ... for real! - let mut req = tor_dirclient::request::AuthCertRequest::new(); - req.push(authcert_id_5a23()); // Right idea this time! - let req = ClientRequest::AuthCert(req); - let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store)); - assert!(outcome.unwrap()); // No error, _and_ something changed! - let missing3 = state.missing_docs(); - assert!(missing3.is_empty()); - assert!(state.can_advance()); - assert!(!store - .lock() - .unwrap() - .authcerts(&[authcert_id_5a23()]) - .unwrap() - .is_empty()); + // Now try to add the other from a download ... for real! + let mut req = tor_dirclient::request::AuthCertRequest::new(); + req.push(authcert_id_5a23()); // Right idea this time! + let req = ClientRequest::AuthCert(req); + let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store)); + assert!(outcome.unwrap()); // No error, _and_ something changed! + let missing3 = state.missing_docs(); + assert!(missing3.is_empty()); + assert!(state.can_advance()); + assert!(!store + .lock() + .unwrap() + .authcerts(&[authcert_id_5a23()]) + .unwrap() + .is_empty()); - let next = state.advance().unwrap(); - assert_eq!( - &next.describe(), - "Downloading microdescriptors (we are missing 6)." - ); + let next = state.advance().unwrap(); + assert_eq!( + &next.describe(), + "Downloading microdescriptors (we are missing 6)." + ); - // If we start from scratch and reset, we're back in GetConsensus. - let (_rcv, state) = new_getcerts_state(); - let state = state.reset().unwrap(); - assert_eq!(&state.describe(), "Looking for a consensus."); + /* + FIXME(eta): reinstate + // If we start from scratch and reset, we're back in GetConsensus. + let (_rcv, state) = new_getcerts_state(rt); + let state = state.reset().unwrap(); + assert_eq!(&state.describe(), "Looking for a consensus."); + */ - // TODO: I'd like even more tests to make sure that we never - // accept a certificate for an authority we don't believe in. + // TODO: I'd like even more tests to make sure that we never + // accept a certificate for an authority we don't believe in. + }); } #[test] @@ -1423,10 +1484,13 @@ mod test { base64::decode(s).unwrap().try_into().unwrap() } + /* + FIXME(eta): reinstate // If we start from scratch and reset, we're back in GetConsensus. let (_rcv, state) = new_getmicrodescs_state(); let state = Box::new(state).reset().unwrap(); assert_eq!(&state.describe(), "Looking for a consensus."); + */ // Check the basics. let (_rcv, mut state) = new_getmicrodescs_state(); diff --git a/crates/tor-dirmgr/src/storage.rs b/crates/tor-dirmgr/src/storage.rs index acf7b7b7c..5f598285b 100644 --- a/crates/tor-dirmgr/src/storage.rs +++ b/crates/tor-dirmgr/src/storage.rs @@ -18,8 +18,8 @@ use crate::{Error, Result}; use std::cell::RefCell; use std::collections::HashMap; use std::fs::File; -use std::str::Utf8Error; use std::ops::{Deref, DerefMut}; +use std::str::Utf8Error; use std::time::SystemTime; use time::Duration;