Major revision on DirMgr logic -- almost a complete rewrite.

The big idea of this revision is to separate the code that knows
about doing downloads from the code that decides what to download.
Later, we can make a similar change for database access.  With these
changes together, we can make our code much more testable, and
eventually enable more download types in parallel.
This commit is contained in:
Nick Mathewson 2021-04-01 10:59:43 -04:00
parent 48bcde631f
commit bec5fa3e9c
10 changed files with 1297 additions and 1138 deletions

View File

@ -50,7 +50,7 @@ pub async fn get_resource<CR>(
circ_mgr: Arc<CircMgr>,
) -> anyhow::Result<DirResponse>
where
CR: request::Requestable,
CR: request::Requestable + ?Sized,
{
use tor_rtcompat::timer::timeout;
@ -96,7 +96,7 @@ pub async fn download<R, S>(
source: Option<SourceInfo>,
) -> Result<DirResponse>
where
R: request::Requestable,
R: request::Requestable + ?Sized,
S: AsyncRead + AsyncWrite + Unpin,
{
let partial_ok = req.partial_docs_ok();

278
tor-dirmgr/src/bootstrap.rs Normal file
View File

@ -0,0 +1,278 @@
//! Functions to download or load directory objects, using the
//! state machines in the `states` module.
use std::{
collections::HashMap,
sync::{Arc, Weak},
time::{Duration, SystemTime},
};
use crate::{
docid::{self, ClientRequest},
upgrade_weak_ref, DirMgr, DirState, DocId, DocumentText, Error, Readiness, Result,
};
use futures::channel::oneshot;
use futures::FutureExt;
use futures::StreamExt;
use log::{info, warn};
use tor_dirclient::DirResponse;
use tor_rtcompat::timer::sleep_until_wallclock;
/// Try to read a set of documents from `dirmgr` by ID.
async fn load_all(dirmgr: &DirMgr, missing: Vec<DocId>) -> Result<HashMap<DocId, DocumentText>> {
let mut loaded = HashMap::new();
for query in docid::partition_by_type(missing.into_iter()).values() {
dirmgr.load_documents_into(query, &mut loaded).await?;
}
Ok(loaded)
}
/// Launch a single client request and get an associated response.
async fn fetch_single(
dirmgr: Arc<DirMgr>,
request: ClientRequest,
) -> Result<(ClientRequest, DirResponse)> {
let circmgr = dirmgr.circmgr()?;
let cur_netdir = dirmgr.opt_netdir();
let dirinfo = match cur_netdir {
Some(ref nd) => nd.as_ref().into(),
None => dirmgr.config.fallbacks().into(),
};
let resource = tor_dirclient::get_resource(request.as_requestable(), dirinfo, circmgr).await?;
Ok((request, resource))
}
/// Launch a set of download requests for a set of missing objects in
/// `missing`, and return each request along with the response it received.
///
/// Don't launch more than `parallelism` requests at once.
async fn fetch_multiple(
dirmgr: Arc<DirMgr>,
missing: Vec<DocId>,
parallelism: usize,
) -> Result<Vec<(ClientRequest, DirResponse)>> {
let mut requests = Vec::new();
for (_type, query) in docid::partition_by_type(missing.into_iter()) {
requests.extend(dirmgr.query_into_requests(query).await?);
}
// TODO: instead of waiting for all the queries to finish, we
// could stream the responses back or something.
let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
.map(|query| fetch_single(Arc::clone(&dirmgr), query))
.buffer_unordered(parallelism)
.collect()
.await;
let mut useful_responses = Vec::new();
for r in responses {
match r {
Ok(x) => useful_responses.push(x),
// TODO: in this case we might want to stop using this source.
Err(e) => warn!("error while downloading: {:?}", e),
}
}
Ok(useful_responses)
}
/// Try tp update `state` by loading cached information from `dirmgr`.
/// Return true if anything changed.
async fn load_once(dirmgr: &Arc<DirMgr>, state: &mut Box<dyn DirState>) -> Result<bool> {
let missing = state.missing_docs();
if missing.is_empty() {
Ok(false)
} else {
let documents = load_all(&dirmgr, missing).await?;
state.add_from_cache(documents)
}
}
/// Try to load as much state as possible for a provided `state` from the
/// cache in `dirmgr`, advancing the state to the extent possible.
///
/// No downloads are performed; the provided state will not be reset.
pub(crate) async fn load(
dirmgr: Arc<DirMgr>,
mut state: Box<dyn DirState>,
) -> Result<Box<dyn DirState>> {
let mut safety_counter = 0_usize;
loop {
let changed = load_once(&dirmgr, &mut state).await?;
if state.can_advance() {
state = state.advance()?;
safety_counter = 0;
} else {
if !changed {
break;
}
safety_counter += 1;
if safety_counter == 100 {
panic!("Spent 100 iterations in the same state: this is a bug");
}
}
}
Ok(state)
}
/// Helper: Make a set of download attempts for the current directory state,
/// and on success feed their results into the state object.
///
/// This can launch one or more download requests, but will not launch more
/// than `parallelism` requests at a time.
///
/// Return true if the state reports that it changed.
async fn download_attempt(
dirmgr: &Arc<DirMgr>,
state: &mut Box<dyn DirState>,
parallelism: usize,
) -> Result<bool> {
let mut changed = false;
let missing = state.missing_docs();
let fetched = fetch_multiple(Arc::clone(dirmgr), missing, parallelism).await?;
for (client_req, dir_response) in fetched {
let text = dir_response.into_output();
match dirmgr.expand_response_text(&client_req, text).await {
Ok(text) => {
let outcome = state
.add_from_download(&text, &client_req, Some(&dirmgr.store))
.await;
match outcome {
Ok(b) => changed |= b,
// TODO: in this case we might want to stop using this source.
Err(e) => warn!("error while adding directory info: {}", e),
}
}
Err(e) => {
// TODO: in this case we might want to stop using this source.
warn!("Error when expanding directory text: {}", e);
}
}
}
Ok(changed)
}
/// Download information into a DirState state machine until it is
/// ["complete"](Readiness::Complete), or until we hit a
/// non-recoverable error.
///
/// Use `dirmgr` to load from the cache or to launch downloads.
///
/// Keep resetting the state as needed.
///
/// The first time that the state becomes ["usable"](Readiness::Usable),
/// notify the sender in `on_usable`.
///
/// Return Err only on a non-recoverable error. On an error that
/// merits another bootstrap attempt with the same state, return the
/// state and an Error object in an option.
pub(crate) async fn download(
dirmgr: Weak<DirMgr>,
mut state: Box<dyn DirState>,
mut on_usable: Option<oneshot::Sender<()>>,
) -> Result<(Box<dyn DirState>, Option<Error>)> {
'next_state: loop {
let (parallelism, retry_config) = state.dl_config()?;
// In theory this could be inside the loop below maybe? If we
// want to drop the restriction that the missing() members of a
// state must never grow, then we'll need to move it inside.
{
let dirmgr = upgrade_weak_ref(&dirmgr)?;
load_once(&dirmgr, &mut state).await?;
}
// Skip the downloads if we can...
if state.can_advance() {
state = state.advance()?;
continue 'next_state;
}
if state.is_ready(Readiness::Complete) {
return Ok((state, None));
}
let mut retry = retry_config.schedule();
// Make several attempts to fetch whatever we're missing,
// until either we can advance, or we've got a complete
// document, or we run out of tries, or we run out of time.
'next_attempt: for attempt in retry_config.attempts() {
info!("{}: {}", attempt + 1, state.describe());
let reset_time = no_more_than_a_week(state.reset_time());
{
let dirmgr = upgrade_weak_ref(&dirmgr)?;
futures::select_biased! {
outcome = download_attempt(&dirmgr, &mut state, parallelism).fuse() => {
match outcome {
Err(e) => {
warn!("Error while downloading: {}", e);
continue 'next_attempt;
}
Ok(changed) => changed
}
}
_ = sleep_until_wallclock(reset_time).fuse() => {
// We need to reset. This can happen if (for
// example) we're downloading the last few
// microdescriptors on a consensus that now
// we're ready to replace.
state = state.reset()?;
continue 'next_state;
},
};
}
// Exit if there is nothing more to download.
if state.is_ready(Readiness::Complete) {
return Ok((state, None));
}
// Report usable-ness if appropriate.
if on_usable.is_some() && state.is_ready(Readiness::Usable) {
let _ = on_usable.take().unwrap().send(());
}
if state.can_advance() {
// We have enough info to advance to another state.
state = state.advance()?;
continue 'next_state;
} else {
// We should wait a bit, and then retry.
// TODO: we shouldn't wait on the final attempt.
let reset_time = no_more_than_a_week(state.reset_time());
let delay = retry.next_delay(&mut rand::thread_rng());
futures::select_biased! {
_ = sleep_until_wallclock(reset_time).fuse() => {
state = state.reset()?;
continue 'next_state;
}
_ = tor_rtcompat::timer::sleep(delay).fuse() => {}
};
}
}
// We didn't advance the state, after all the retries.
return Ok((state, Some(Error::CantAdvanceState)));
}
}
/// Helper: Clamp `v` so that it is no more than one week from now.
///
/// If `v` is absent, return the time that's one week from now.
///
/// We use this to determine a reset time when no reset time is
/// available, or when it is too far in the future.
fn no_more_than_a_week(v: Option<SystemTime>) -> SystemTime {
let now = SystemTime::now();
let one_week_later = now + Duration::new(86400 * 7, 0);
match v {
Some(t) => std::cmp::min(t, one_week_later),
None => one_week_later,
}
}

View File

@ -8,20 +8,17 @@ use tor_netdoc::doc::{
authcert::AuthCertKeyIds, microdesc::MdDigest, netstatus::ConsensusFlavor, routerdesc::RdDigest,
};
/// The identity of a single document, in enough detail to load it from
/// storage.
/// The identity of a single document, in enough detail to load it
/// from storage.
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
#[non_exhaustive]
pub enum DocId {
/// A request for the most recent consensus document.
LatestConsensus {
/// The flavor of consensus to request
/// The flavor of consensus to request.
flavor: ConsensusFlavor,
/// If present, a specific pending status to request.
///
/// (A "pending" consensus is one where we don't have all the
/// certificates and/or descriptors yet.)
pending: Option<bool>,
/// Rules for loading this consensus from the cache.
cache_usage: CacheUsage,
},
/// A request for an authority certificate, by the SHA1 digests of
/// its identity key and signing key.
@ -34,14 +31,18 @@ pub enum DocId {
/// The underlying type of a DocId.
///
/// Documents with the same type can be grouped into the same query.
/// Documents with the same type can be grouped into the same query; others
/// cannot.
#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
#[non_exhaustive]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) enum DocType {
/// A consensus document
Consensus(ConsensusFlavor),
/// An authority certificate
AuthCert,
/// A microdescriptor
Microdesc,
/// A router descriptor.
Routerdesc,
}
@ -59,26 +60,78 @@ impl DocId {
}
}
/// A request for a specific kind of directory resource that a DirMgr can
/// request.
#[derive(Clone, Debug)]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) enum ClientRequest {
/// Request for a consensus
Consensus(request::ConsensusRequest),
/// Request for one or more authority certificates
AuthCert(request::AuthCertRequest),
/// Request for one or more microdescriptors
Microdescs(request::MicrodescRequest),
/// Request for one or more router descriptors
Routerdescs(request::RouterDescRequest),
}
impl ClientRequest {
/// Turn a ClientRequest into a Requestable.
pub(crate) fn as_requestable(&self) -> &(dyn request::Requestable + Send + Sync) {
use ClientRequest::*;
match self {
Consensus(a) => a,
AuthCert(a) => a,
Microdescs(a) => a,
Routerdescs(a) => a,
}
}
}
/// Description of how to start out a given bootstrap attempt.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub enum CacheUsage {
/// The bootstrap attempt will only use the cache. Therefore, don't
/// load a pending consensus from the cache, since we won't be able
/// to find enough information to make it usable.
CacheOnly,
/// The bootstrap attempt is willing to download information or to
/// use the cache. Therefore, we want the latest cached
/// consensus, whether it is pending or not.
CacheOkay,
/// The bootstrap attempt is trying to fetch a new consensus. Therefore,
/// we don't want a consensus from the cache.
MustDownload,
}
impl CacheUsage {
/// Turn this CacheUsage into a pending_ok field for use with
/// SqliteStorage.
pub(crate) fn pending_ok(&self) -> Option<bool> {
match self {
CacheUsage::CacheOnly => Some(true),
_ => None,
}
}
}
/// A group of DocIds that can be downloaded or loaded from the database
/// together.
///
/// TODO: Perhaps this should be the same as ClientRequest?
#[derive(Clone, Debug)]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) enum DocQuery {
/// A request for the lastet consensus
LatestConsensus {
/// A desired flavor of consenus
flavor: ConsensusFlavor,
pending: Option<bool>,
/// Whether we can or must use the cache
cache_usage: CacheUsage,
},
/// A request for authority certificates
AuthCert(Vec<AuthCertKeyIds>),
/// A request for microdescriptors
Microdesc(Vec<MdDigest>),
/// A request for router descriptors
Routerdesc(Vec<RdDigest>),
}
@ -86,7 +139,13 @@ impl DocQuery {
/// Construct an "empty" docquery from the given DocId
pub fn empty_from_docid(id: &DocId) -> Self {
match *id {
DocId::LatestConsensus { flavor, pending } => Self::LatestConsensus { flavor, pending },
DocId::LatestConsensus {
flavor,
cache_usage,
} => Self::LatestConsensus {
flavor,
cache_usage,
},
DocId::AuthCert(_) => Self::AuthCert(Vec::new()),
DocId::Microdesc(_) => Self::Microdesc(Vec::new()),
DocId::Routerdesc(_) => Self::Routerdesc(Vec::new()),
@ -152,5 +211,3 @@ where
}
result
}
// TODO: code to read one of these from storage.

View File

@ -4,6 +4,7 @@ use thiserror::Error;
/// An error originated by the directory manager code
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Error {
/// We received a document we didn't want at all.
#[error("unwanted object: {0}")]
@ -37,4 +38,14 @@ pub enum Error {
/// Another process has locked the store for writing.
#[error("couldn't get write lock on directory cache")]
CacheIsLocked,
/// A consensus document is signed by an unrecognized authority set.
#[error("authorities on consensus do not match what we expect.")]
UnrecognizedAuthorities,
/// A directory manager has been dropped; background tasks can exit too.
#[error("dirmgr has been dropped; background tasks exiting")]
ManagerDropped,
/// We made a bunch of attempts, but weren't unable to advance the
/// state of a download.
#[error("unable to finish bootstrapping a directory")]
CantAdvanceState,
}

File diff suppressed because it is too large Load Diff

View File

@ -111,7 +111,7 @@ impl Default for RetryDelay {
}
}
/// Configuration for how many times to retry a download, and with what
/// Configuration for how many times to retry a download, with what
/// frequency.
#[derive(Debug, Copy, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
@ -154,6 +154,12 @@ impl RetryConfig {
0..(self.num.into())
}
/// Return the number of times that we're supposed to retry, according
/// to this RetryConfig.
pub fn n_attempts(&self) -> u32 {
self.num.into()
}
/// Return a RetryDelay object for this configuration.
///
/// If the initial delay is longer than 32

677
tor-dirmgr/src/state.rs Normal file
View File

@ -0,0 +1,677 @@
//! Implementation for the primary directory state machine.
//!
//! There are three (active) states that a download can be in: looking
//! for a consensus ([`GetConsensusState`]), looking for certificates
//! to validate that consensus ([`GetCertsState`]), and looking for
//! microdescriptors ([`GetMicrodescsState`]).
//!
//! These states have no contact with the network, and are purely
//! reactive to other code that drives them. See the [`bootstrap`]
//! module for functions that actually load or download directory
//! information.
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::lock::Mutex;
use log::{info, warn};
use rand::Rng;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Weak;
use std::time::{Duration, SystemTime};
use tor_netdir::{MdReceiver, NetDir, PartialNetDir};
use tor_netdoc::doc::netstatus::Lifetime;
use crate::{
docmeta::{AuthCertMeta, ConsensusMeta},
retry::RetryConfig,
shared_ref::SharedMutArc,
storage::sqlite::SqliteStore,
CacheUsage, ClientRequest, DirState, DocId, DocumentText, Error, NetDirConfig, Readiness,
Result,
};
use tor_checkable::{ExternallySigned, SelfSigned, Timebound};
use tor_llcrypto::pk::rsa::RsaIdentity;
use tor_netdoc::doc::{
microdesc::{MdDigest, Microdesc},
netstatus::MdConsensus,
};
use tor_netdoc::{
doc::{
authcert::{AuthCert, AuthCertKeyIds},
microdesc::MicrodescReader,
netstatus::{ConsensusFlavor, UnvalidatedMdConsensus},
},
AllowAnnotations,
};
/// An object where we can put a usable netdir.
///
/// Note that there's only one implementation for this trait: DirMgr.
/// We make this a trait anyway to make sure that the different states
/// in this module can _only_ interact with the DirMgr through
/// modifying the NetDir and looking at the configuration.
pub(crate) trait WriteNetDir: 'static + Sync + Send {
/// Return a NetDirConfig to use when asked how to retry downloads,
/// or when we need to find a list of descriptors.
fn config(&self) -> &NetDirConfig;
/// Return a reference where we can write or modify a NetDir.
fn netdir(&self) -> &SharedMutArc<NetDir>;
}
impl WriteNetDir for crate::DirMgr {
fn config(&self) -> &NetDirConfig {
&self.config
}
fn netdir(&self) -> &SharedMutArc<NetDir> {
&self.netdir
}
}
/// Initial state: fetching or loading a consensus directory.
#[derive(Clone, Debug)]
pub(crate) struct GetConsensusState<DM: WriteNetDir> {
/// How should we get the consensus from the cache, if at all?
cache_usage: CacheUsage,
/// If present, our next state.
///
/// (This is present once we have a consensus.)
next: Option<GetCertsState<DM>>,
/// A list of RsaIdentity for the authorities that we believe in.
///
/// No consensus can be valid unless it purports to be signed by
/// more than half of these authorities.
authority_ids: Vec<RsaIdentity>,
/// A weak reference to the directory manager that wants us to
/// fetch this information. When this references goes away, we exit.
writedir: Weak<DM>,
}
impl<DM: WriteNetDir> GetConsensusState<DM> {
/// Create a new GetConsensusState from a weak reference to a
/// directory manager and a `cache_usage` flag.
pub(crate) fn new(writedir: Weak<DM>, cache_usage: CacheUsage) -> Result<Self> {
let authority_ids: Vec<_> = if let Some(writedir) = Weak::upgrade(&writedir) {
writedir
.config()
.authorities()
.iter()
.map(|auth| *auth.v3ident())
.collect()
} else {
return Err(Error::ManagerDropped.into());
};
Ok(GetConsensusState {
cache_usage,
next: None,
authority_ids,
writedir,
})
}
}
#[async_trait]
impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
fn describe(&self) -> String {
if self.next.is_some() {
"About to fetch certificates."
} else {
match self.cache_usage {
CacheUsage::CacheOnly => "Looking for a cached consensus.",
CacheUsage::CacheOkay => "Looking for a consensus.",
CacheUsage::MustDownload => "Downloading a consensus.",
}
}
.to_string()
}
fn missing_docs(&self) -> Vec<DocId> {
if self.can_advance() {
return Vec::new();
}
let flavor = ConsensusFlavor::Microdesc;
vec![DocId::LatestConsensus {
flavor,
cache_usage: self.cache_usage,
}]
}
fn is_ready(&self, _ready: Readiness) -> bool {
false
}
fn can_advance(&self) -> bool {
self.next.is_some()
}
fn dl_config(&self) -> Result<(usize, RetryConfig)> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok((1, *wd.config().timing().retry_consensus()))
} else {
Err(Error::ManagerDropped.into())
}
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool> {
let text = match docs.into_iter().next() {
None => return Ok(false),
Some((
DocId::LatestConsensus {
flavor: ConsensusFlavor::Microdesc,
..
},
text,
)) => text,
_ => return Err(Error::Unwanted("Not an md consensus").into()),
};
self.add_consensus_text(true, text.as_str()?)
.map(|meta| meta.is_some())
}
async fn add_from_download(
&mut self,
text: &str,
_request: &ClientRequest,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool> {
if let Some(meta) = self.add_consensus_text(false, text)? {
if let Some(store) = storage {
let mut w = store.lock().await;
w.store_consensus(meta, ConsensusFlavor::Microdesc, true, text)?;
}
Ok(true)
} else {
Ok(false)
}
}
fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
Ok(match self.next {
Some(next) => Box::new(next),
None => self,
})
}
fn reset_time(&self) -> Option<SystemTime> {
None
}
fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
Ok(self)
}
}
impl<DM: WriteNetDir> GetConsensusState<DM> {
/// Helper: try to set the current consensus text from an input
/// string `text`. Refuse it if the authorities could never be
/// correct, or if it is illformed.
fn add_consensus_text(
&mut self,
from_cache: bool,
text: &str,
) -> Result<Option<&ConsensusMeta>> {
// Try to parse it and get its metadata.
let (consensus_meta, unvalidated) = {
let (signedval, remainder, parsed) = MdConsensus::parse(text)?;
if let Ok(timely) = parsed.check_valid_now() {
let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &timely);
(meta, timely)
} else {
return Ok(None);
}
};
// Check out what authorities we believe in, and see if enough
// of them are purported to have singed this consensus.
let n_authorities = self.authority_ids.len() as u16;
let unvalidated = unvalidated.set_n_authorities(n_authorities);
let id_refs: Vec<_> = self.authority_ids.iter().collect();
if !unvalidated.authorities_are_correct(&id_refs[..]) {
return Err(Error::UnrecognizedAuthorities.into());
}
// Make a set of all the certificates we want -- the subset of
// those listed on the consensus that we would indeed accept as
// authoritative.
let desired_certs = unvalidated
.signing_cert_ids()
.filter(|m| self.recognizes_authority(&m.id_fingerprint))
.collect();
self.next = Some(GetCertsState {
cache_usage: self.cache_usage,
from_cache,
unvalidated,
consensus_meta,
missing_certs: desired_certs,
certs: Vec::new(),
writedir: Weak::clone(&self.writedir),
});
Ok(Some(&self.next.as_ref().unwrap().consensus_meta))
}
/// Return true if `id` is an authority identity we recognize
fn recognizes_authority(&self, id: &RsaIdentity) -> bool {
self.authority_ids.iter().any(|auth| auth == id)
}
}
/// Second state: fetching or loading authority certificates.
///
/// TODO: we should probably do what C tor does, and try to use the
/// same directory that gave us the consensus.
///
/// TODO SECURITY: This needs better handling for the DOS attack where
/// we are given a bad consensus signed with fictional certificates
/// that we can never find.
#[derive(Clone, Debug)]
struct GetCertsState<DM: WriteNetDir> {
/// The cache usage we had in mind when we began. Used to reset.
cache_usage: CacheUsage,
/// True iff we loaded the consensus from our cache.
from_cache: bool,
/// The consensus that we are trying to validate.
unvalidated: UnvalidatedMdConsensus,
/// Metadata for the consensus.
consensus_meta: ConsensusMeta,
/// A set of the certificate keypairs for the certificates we don't
/// have yet.
missing_certs: HashSet<AuthCertKeyIds>,
/// A list of the certificates we've been able to load or download.
certs: Vec<AuthCert>,
/// Reference to our directory manager.
writedir: Weak<DM>,
}
#[async_trait]
impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
fn describe(&self) -> String {
let total = self.certs.len() + self.missing_certs.len();
format!(
"Downloading certificates for consensus (we are missing {}/{}).",
self.missing_certs.len(),
total
)
}
fn missing_docs(&self) -> Vec<DocId> {
self.missing_certs
.iter()
.map(|id| DocId::AuthCert(*id))
.collect()
}
fn is_ready(&self, _ready: Readiness) -> bool {
false
}
fn can_advance(&self) -> bool {
self.unvalidated.key_is_correct(&self.certs[..]).is_ok()
}
fn dl_config(&self) -> Result<(usize, RetryConfig)> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok((1, *wd.config().timing().retry_certs()))
} else {
Err(Error::ManagerDropped.into())
}
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool> {
let mut changed = false;
// Here we iterate over the documents we want, taking them from
// our input and remembering them.
for id in self.missing_docs().iter() {
if let Some(cert) = docs.get(id) {
let parsed = AuthCert::parse(cert.as_str()?)?.check_signature()?;
if let Ok(cert) = parsed.check_valid_now() {
self.missing_certs.remove(cert.key_ids());
self.certs.push(cert);
changed = true;
} else {
warn!("Got a cert from our cache that we couldn't parse");
}
}
}
Ok(changed)
}
async fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool> {
let asked_for: HashSet<_> = match request {
ClientRequest::AuthCert(a) => a.keys().collect(),
_ => return Err(Error::BadArgument("Mismatched request").into()),
};
let mut newcerts = Vec::new();
for cert in AuthCert::parse_multiple(&text) {
if let Ok(parsed) = cert {
let s = parsed
.within(&text)
.expect("Certificate was not in input as expected");
if let Ok(wellsigned) = parsed.check_signature() {
if let Ok(timely) = wellsigned.check_valid_now() {
newcerts.push((timely, s));
}
} else {
// TODO: note the source.
warn!("Badly signed certificate received and discarded.");
}
} else {
// TODO: note the source.
warn!("Unparseable certificate received and discared.");
}
}
// Now discard any certs we didn't ask for.
let len_orig = newcerts.len();
newcerts.retain(|(cert, _)| asked_for.contains(cert.key_ids()));
if newcerts.len() != len_orig {
warn!("Discarding certificates that we didn't ask for.");
}
// We want to exit early if we aren't saving any certificates.
if newcerts.is_empty() {
return Ok(false);
}
if let Some(store) = storage {
// Write the certificates to the store.
let v: Vec<_> = newcerts[..]
.iter()
.map(|(cert, s)| (AuthCertMeta::from_authcert(cert), *s))
.collect();
let mut w = store.lock().await;
w.store_authcerts(&v[..])?;
}
// Remember the certificates in this state, and remove them
// from our list of missing certs.
let mut changed = false;
for (cert, _) in newcerts {
let ids = cert.key_ids();
if self.missing_certs.contains(ids) {
self.missing_certs.remove(ids);
self.certs.push(cert);
changed = true;
}
}
Ok(changed)
}
fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
if self.can_advance() {
let validated = self.unvalidated.check_signature(&self.certs[..])?;
Ok(Box::new(GetMicrodescsState::new(
validated,
self.consensus_meta,
self.writedir,
)?))
} else {
Ok(self)
}
}
fn reset_time(&self) -> Option<SystemTime> {
Some(self.consensus_meta.lifetime().valid_until())
}
fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
Ok(Box::new(GetConsensusState::new(
self.writedir,
self.cache_usage,
)?))
}
}
/// Final state: we're fetching or loading microdescriptors
#[derive(Debug, Clone)]
struct GetMicrodescsState<DM: WriteNetDir> {
/// The digests of the microdesscriptors we are missing.
missing: HashSet<MdDigest>,
/// The dirmgr to inform about a usable directory.
writedir: Weak<DM>,
/// A NetDir that we are currently building, but which doesn't
/// have enough microdescs yet.
partial: Option<PartialNetDir>,
/// Metadata for the current consensus.
meta: ConsensusMeta,
/// A pending list of microdescriptor digests whose
/// "last-listed-at" times we should update.
newly_listed: Vec<MdDigest>,
/// A time after which we should try to replace this directory and
/// find a new one. Since this is randomized, we only compute it
/// once.
reset_time: SystemTime,
}
impl<DM: WriteNetDir> GetMicrodescsState<DM> {
/// Create a new [`GetMicroDescsState`] from a provided
/// microdescriptor consensus.
fn new(consensus: MdConsensus, meta: ConsensusMeta, writedir: Weak<DM>) -> Result<Self> {
let reset_time = consensus.lifetime().valid_until();
let partial_dir = match Weak::upgrade(&writedir) {
Some(wd) => {
let params = wd.config().override_net_params();
let mut dir = PartialNetDir::new(consensus, Some(params));
if let Some(old_dir) = wd.netdir().get() {
dir.fill_from_previous_netdir(&old_dir);
}
dir
}
None => return Err(Error::ManagerDropped.into()),
};
let missing = partial_dir.missing_microdescs().map(Clone::clone).collect();
let mut result = GetMicrodescsState {
missing,
writedir,
partial: Some(partial_dir),
meta,
newly_listed: Vec::new(),
reset_time,
};
result.consider_upgrade();
Ok(result)
}
/// Add a bunch of microdescriptors to the in-progress netdir.
///
/// Return true if the netdir has just become usable.
fn register_microdescs<I>(&mut self, mds: I) -> bool
where
I: IntoIterator<Item = Microdesc>,
{
if let Some(p) = &mut self.partial {
for md in mds {
self.newly_listed.push(*md.digest());
p.add_microdesc(md);
}
return self.consider_upgrade();
} else if let Some(wd) = Weak::upgrade(&self.writedir) {
let _ = wd.netdir().mutate(|nd| {
for md in mds {
nd.add_microdesc(md);
}
Ok(())
});
}
false
}
/// Check whether this netdir we're building has _just_ become
/// usable when it was not previously usable. If so, tell the
/// dirmgr about it and return true; otherwise return false.
fn consider_upgrade(&mut self) -> bool {
if let Some(p) = self.partial.take() {
match p.unwrap_if_sufficient() {
Ok(netdir) => {
self.reset_time = pick_download_time(netdir.lifetime());
if let Some(wd) = Weak::upgrade(&self.writedir) {
wd.netdir().replace(netdir);
return true;
}
}
Err(partial) => self.partial = Some(partial),
}
}
false
}
}
#[async_trait]
impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
fn describe(&self) -> String {
format!(
"Downloading microdescriptors (we are missing {}).",
self.missing.len()
)
}
fn missing_docs(&self) -> Vec<DocId> {
self.missing.iter().map(|d| DocId::Microdesc(*d)).collect()
}
fn is_ready(&self, ready: Readiness) -> bool {
match ready {
Readiness::Complete => self.missing.is_empty(),
Readiness::Usable => self.partial.is_none(),
}
}
fn can_advance(&self) -> bool {
false
}
fn dl_config(&self) -> Result<(usize, RetryConfig)> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok((
wd.config().timing().microdesc_parallelism(),
*wd.config().timing().retry_microdescs(),
))
} else {
Err(Error::ManagerDropped.into())
}
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool> {
let mut microdescs = Vec::new();
for (id, text) in docs {
if let DocId::Microdesc(digest) = id {
if !self.missing.remove(&digest) {
// we didn't want this.
continue;
}
if let Ok(md) = Microdesc::parse(text.as_str()?) {
if md.digest() == &digest {
microdescs.push(md);
continue;
}
}
warn!("Found a mismatched microdescriptor in cache; ignoring");
}
}
let changed = !microdescs.is_empty();
self.register_microdescs(microdescs);
Ok(changed)
}
async fn add_from_download(
&mut self,
text: &str,
request: &ClientRequest,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool> {
let requested: HashSet<_> = if let ClientRequest::Microdescs(req) = request {
req.digests().collect()
} else {
return Err(Error::BadArgument("Mismatched request").into());
};
let mut new_mds = Vec::new();
for annotated in MicrodescReader::new(text, AllowAnnotations::AnnotationsNotAllowed) {
if let Ok(anno) = annotated {
let txt = anno
.within(&text)
.expect("annotation not from within text as expected");
let md = anno.into_microdesc();
if !requested.contains(md.digest()) {
warn!(
"Received microdescriptor we did not ask for: {:?}",
md.digest()
);
continue;
}
self.missing.remove(md.digest());
new_mds.push((txt, md));
}
}
let mark_listed = self.meta.lifetime().valid_after();
if let Some(store) = storage {
let mut s = store.lock().await;
if !self.newly_listed.is_empty() {
s.update_microdescs_listed(self.newly_listed.iter(), mark_listed)?;
self.newly_listed.clear();
}
if !new_mds.is_empty() {
s.store_microdescs(
new_mds.iter().map(|(txt, md)| (&txt[..], md.digest())),
mark_listed,
)?;
}
}
if self.register_microdescs(new_mds.into_iter().map(|(_, md)| md)) {
// oh hey, this is no longer pending.
if let Some(store) = storage {
let mut store = store.lock().await;
info!("marked consensus usable.");
store.mark_consensus_usable(&self.meta)?;
// DOCDOC: explain why we're doing this here.
store.expire_all()?;
}
}
Ok(true)
}
fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
Ok(self)
}
fn reset_time(&self) -> Option<SystemTime> {
Some(self.reset_time)
}
fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
Ok(Box::new(GetConsensusState::new(
self.writedir,
CacheUsage::MustDownload,
)?))
}
}
/// Choose a random download time to replace a consensus whose lifetime
/// is `lifetime`.
fn pick_download_time(lifetime: &Lifetime) -> SystemTime {
let (lowbound, uncertainty) = client_download_range(lifetime);
let zero = Duration::new(0, 0);
let t = lowbound + rand::thread_rng().gen_range(zero..uncertainty);
info!("The current consensus is fresh until {}, and valid until {}. I've picked {} as the earliest time to replace it.",
DateTime::<Utc>::from(lifetime.fresh_until()),
DateTime::<Utc>::from(lifetime.valid_until()),
DateTime::<Utc>::from(t));
t
}
/// Based on the lifetime for a consensus, return the time range during which
/// clients should fetch the next one.
fn client_download_range(lt: &Lifetime) -> (SystemTime, Duration) {
let valid_after = lt.valid_after();
let fresh_until = lt.fresh_until();
let valid_until = lt.valid_until();
let voting_interval = fresh_until
.duration_since(valid_after)
.expect("valid-after must precede fresh-until");
let whole_lifetime = valid_until
.duration_since(valid_after)
.expect("valid-after must precede valid-until");
// From dir-spec:
// "This time is chosen uniformly at random from the interval
// between the time 3/4 into the first interval after the
// consensus is no longer fresh, and 7/8 of the time remaining
// after that before the consensus is invalid."
let lowbound = voting_interval + (voting_interval * 3) / 4;
let remainder = whole_lifetime - lowbound;
let uncertainty = (remainder * 7) / 8;
(valid_after + lowbound, uncertainty)
}

View File

@ -400,8 +400,11 @@ impl SqliteStore {
}
/// Try to read the consensus corresponding to the provided metadata object.
#[allow(unused)]
pub fn consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString> {
if let Some((text, _)) = self.consensus_by_sha3_digest(cmeta.sha3_256_of_whole())? {
if let Some((text, _)) =
self.consensus_by_sha3_digest_of_signed_part(cmeta.sha3_256_of_signed())?
{
Ok(text)
} else {
Err(Error::CacheCorruption("couldn't find a consensus we thought we had.").into())
@ -409,14 +412,15 @@ impl SqliteStore {
}
/// Try to read the consensus whose SHA3-256 digests is the provided
/// valid, and its metadata.
pub fn consensus_by_sha3_digest(
/// value, and its metadata.
pub fn consensus_by_sha3_digest_of_signed_part(
&self,
d: &[u8; 32],
) -> Result<Option<(InputString, ConsensusMeta)>> {
let d = hex::encode(d);
let digest = format!("sha3-256-{}", d);
let mut stmt = self.conn.prepare(FIND_CONSENSUS_AND_META_BY_DIGEST)?;
let digest = hex::encode(d);
let mut stmt = self
.conn
.prepare(FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED)?;
let mut rows = stmt.query(params![digest])?;
if let Some(row) = rows.next()? {
let meta = cmeta_from_row(&row)?;
@ -441,6 +445,7 @@ impl SqliteStore {
}
/// Remove the consensus generated from `cmeta`.
#[allow(unused)]
pub fn delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
let d = hex::encode(cmeta.sha3_256_of_whole());
let digest = format!("sha3-256-{}", d);
@ -782,12 +787,12 @@ const FIND_LATEST_CONSENSUS_META: &str = "
LIMIT 1;
";
/// Look up a consensus by its digest string.
const FIND_CONSENSUS_AND_META_BY_DIGEST: &str = "
/// Look up a consensus by its digest-of-signed-part string.
const FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED: &str = "
SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, Consensuses.digest, filename
FROM Consensuses
INNER JOIN ExtDocs on ExtDocs.digest = Consensuses.digest
WHERE Consensuses.digest = ?
WHERE Consensuses.sha3_of_signed_part = ?
LIMIT 1;
";

View File

@ -1,224 +0,0 @@
//! Code to run as a background task and keep a directory up-to-date.
use crate::retry::RetryDelay;
use crate::{DirMgr, Error, Result};
use tor_netdoc::doc::netstatus::Lifetime;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use chrono::{DateTime, Utc};
use log::{debug, info, warn};
use rand::Rng;
/// A DirectoryUpdater runs in a background task to periodically re-fetch
/// new directory objects as the old ones become outdated.
pub struct DirectoryUpdater {
/// A directory manager to use in picking directory caches, and which can
/// download new directory objects.
dir_mgr: Weak<DirMgr>,
/// A flag to tell the DirectoryUpdater to exit.
stopping: AtomicBool,
}
impl DirectoryUpdater {
/// Make a new DirectoryUpdater. It takes a reference to a directory
/// manager, but stores a weak reference to it. It doesn't
/// start going till you call 'run' on it.
pub(crate) fn new(dir_mgr: Arc<DirMgr>) -> Self {
DirectoryUpdater {
dir_mgr: Arc::downgrade(&dir_mgr),
stopping: AtomicBool::new(false),
}
}
/// Tell the DirectoryUpdater to stop fetching.
///
/// (This won't take effect till after the current set of attempted
/// downloads is done.)
pub fn shutdown(&self) {
self.stopping.store(true, Ordering::SeqCst);
}
/// Run in a loop, and keep the directory manager's directory up-to-date.
pub(crate) async fn run(&self) -> Result<()> {
let readonly = self.is_readonly().await?;
if readonly {
self.run_offline().await?;
}
self.run_online().await
}
/// Run forever, trying to load a new directory from disk, or get the lock
/// to become a read-write dirmgr.
///
/// Returns Ok(()) if we got the lock.
pub(crate) async fn run_offline(&self) -> Result<()> {
loop {
if self.stopping.load(Ordering::SeqCst) {
return Err(Error::UpdaterShutdown.into());
}
// TODO: we should do this in some way that is smarter. Five
// minutes is too short for avoiding CPU usage, and too
// long for chutney or other uses.
let five_minutes = Duration::new(5 * 60, 0);
tor_rtcompat::task::sleep(five_minutes).await;
if let Some(dm) = self.dir_mgr.upgrade() {
if dm.try_upgrade_to_readwrite().await? {
// Hey, it's a read-write dirmgr now! We can take over
// responsibility for bootstrapping!
info!("Lock acquired: it's our responsibility to download directory info.");
break;
}
// No, we should just try loading.
dm.load_directory().await?;
}
}
Ok(())
}
/// Run in a loop, and keep the directory manager's directory
/// up-to-date by downloading directory information.
///
/// Requires that the underlying directory manager has a circuit
/// manager and a read-write store.
pub(crate) async fn run_online(&self) -> Result<()> {
loop {
let download_time = self.pick_download_time().await;
// Updating phase: try to add microdescriptors to the directory.
// Do this until we have all the microdescriptors, or it's time
// to download the next thing.
if let Some(download_time) = download_time {
let mut retry = RetryDelay::from_msec(1000); // XXXX make this configurable?
while SystemTime::now() < download_time {
let again = self.fetch_more_microdescs().await?;
if !again {
debug!("We have all the microdescriptors for our current consensus");
break;
}
let delay = retry.next_delay(&mut rand::thread_rng());
if SystemTime::now() + delay > download_time {
debug!("Out of time to fetch additional microdescriptors.");
break;
}
tor_rtcompat::task::sleep(delay).await;
}
// We're done with the updating phase: we either got all the mds or
// ran out of time.
debug!(
"Waiting till {}, when we download the next directory.",
DateTime::<Utc>::from(download_time)
);
tor_rtcompat::timer::sleep_until_wallclock(download_time).await;
}
// Time to get a new directory!
self.fetch_new_directory().await?;
}
}
/// Keep trying to get a new consensus until we have one, along with any
/// other directory objects we need to use that consensus.
async fn fetch_new_directory(&self) -> Result<()> {
let mut retry = RetryDelay::from_msec(1000); // XXXX make this configurable?
loop {
if self.stopping.load(Ordering::SeqCst) {
return Err(Error::UpdaterShutdown.into());
}
if let Some(dm) = self.dir_mgr.upgrade() {
let result = dm.fetch_new_directory().await;
if let Err(e) = result {
warn!("Directory fetch failed: {}. Will retry in later.", e);
let delay = retry.next_delay(&mut rand::thread_rng());
tor_rtcompat::task::sleep(delay).await;
} else {
return Err(Error::UpdaterShutdown.into());
}
} else {
return Ok(());
}
}
}
/// Perform a _single_ attempt to download any missing microdescriptors for the
/// current NetDir. Return true if we are still missing microdescriptors,
/// and false if we have received them all.
async fn fetch_more_microdescs(&self) -> Result<bool> {
if self.stopping.load(Ordering::SeqCst) {
return Err(Error::UpdaterShutdown.into());
}
if let Some(dm) = self.dir_mgr.upgrade() {
let result = dm.fetch_additional_microdescs().await;
match result {
Ok(n_missing) => Ok(n_missing != 0),
Err(e) => {
warn!("Microdescriptor fetch failed: {}. Will retry later.", e);
Ok(true)
}
}
} else {
Err(Error::UpdaterShutdown.into())
}
}
/// Select a random time to start fetching the next directory, based on the
/// directory we already have.
async fn pick_download_time(&self) -> Option<SystemTime> {
if let Some(dm) = self.dir_mgr.upgrade() {
let netdir = dm.netdir();
let lt = netdir.lifetime();
let (lowbound, uncertainty) = client_download_range(&lt);
let zero = Duration::new(0, 0);
let t = lowbound + rand::thread_rng().gen_range(zero..uncertainty);
info!("Current directory is fresh until {}, valid until {}. I've picked {} as the earliest to download a new one.",
DateTime::<Utc>::from(lt.fresh_until()),
DateTime::<Utc>::from(lt.valid_until()),
DateTime::<Utc>::from(t));
return Some(t);
}
None
}
/// Check whether the underlying directory manager has a read-only store.
async fn is_readonly(&self) -> Result<bool> {
if let Some(dm) = self.dir_mgr.upgrade() {
Ok(dm.store.lock().await.is_readonly())
} else {
Err(Error::UpdaterShutdown.into())
}
}
}
/// Based on the lifetime for a consensus, return the time range during which
/// clients should fetch the next one.
fn client_download_range(lt: &Lifetime) -> (SystemTime, Duration) {
let valid_after = lt.valid_after();
let fresh_until = lt.fresh_until();
let valid_until = lt.valid_until();
let voting_interval = fresh_until
.duration_since(valid_after)
.expect("valid-after must precede fresh-until");
let whole_lifetime = valid_until
.duration_since(valid_after)
.expect("valid-after must precede valid-until");
// From dir-spec:
// "This time is chosen uniformly at random from the interval
// between the time 3/4 into the first interval after the
// consensus is no longer fresh, and 7/8 of the time remaining
// after that before the consensus is invalid."
let lowbound = voting_interval + (voting_interval * 3) / 4;
let remainder = whole_lifetime - lowbound;
let uncertainty = (remainder * 7) / 8;
(valid_after + lowbound, uncertainty)
}

View File

@ -1347,6 +1347,16 @@ impl<RS> UnvalidatedConsensus<RS> {
}
}
/// Return an iterator of all the certificate IDs that we might use
/// to validate this consensus.
pub fn signing_cert_ids(&self) -> impl Iterator<Item = AuthCertKeyIds> {
match self.key_is_correct(&[]) {
Ok(()) => Vec::new(),
Err(missing) => missing,
}
.into_iter()
}
/// Return the lifetime of this unvalidated consensus
pub fn peek_lifetime(&self) -> &Lifetime {
self.consensus.lifetime()
@ -1441,6 +1451,9 @@ impl SignatureGroup {
/// authorities we believe in, and that every cert in `certs` belongs
/// to a real authority.
fn validate(&self, n_authorities: u16, certs: &[AuthCert]) -> bool {
// A set of the authorities (by identity) who have have signed
// this document. We use a set here in case `certs` has more
// than one certificate for a single authority.
let mut ok: HashSet<RsaIdentity> = HashSet::new();
for sig in self.signatures.iter() {