Merge branch 'md_allocation' into 'main'

Use less space in hashtables for microdescriptors

Closes #386

See merge request tpo/core/arti!415
This commit is contained in:
Nick Mathewson 2022-03-17 16:45:38 +00:00
commit 9b7663b4ba
2 changed files with 113 additions and 123 deletions

View File

@ -518,8 +518,6 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
struct GetMicrodescsState<DM: WriteNetDir> {
/// How should we get the consensus from the cache, if at all?
cache_usage: CacheUsage,
/// The digests of the microdescriptors we are missing.
missing: HashSet<MdDigest>,
/// Total number of microdescriptors listed in the consensus.
n_microdescs: usize,
/// The dirmgr to inform about a usable directory.
@ -555,10 +553,14 @@ enum PendingNetDir {
WaitingForGuards(NetDir),
}
impl PendingNetDir {
/// Add the provided microdescriptor to this pending directory.
///
/// Return true if we indeed wanted (and added) this descriptor.
impl MdReceiver for PendingNetDir {
fn missing_microdescs(&self) -> Box<dyn Iterator<Item = &MdDigest> + '_> {
match self {
PendingNetDir::Partial(partial) => partial.missing_microdescs(),
PendingNetDir::WaitingForGuards(netdir) => netdir.missing_microdescs(),
}
}
fn add_microdesc(&mut self, md: Microdesc) -> bool {
match self {
PendingNetDir::Partial(partial) => partial.add_microdesc(md),
@ -566,6 +568,15 @@ impl PendingNetDir {
}
}
fn n_missing(&self) -> usize {
match self {
PendingNetDir::Partial(partial) => partial.n_missing(),
PendingNetDir::WaitingForGuards(netdir) => netdir.n_missing(),
}
}
}
impl PendingNetDir {
/// Try to move `self` as far as possible towards a complete, netdir with
/// enough directory information (according to `writedir`).
///
@ -618,11 +629,9 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
None => return Err(Error::ManagerDropped),
};
let missing = partial_dir.missing_microdescs().map(Clone::clone).collect();
let mut result = GetMicrodescsState {
cache_usage,
n_microdescs,
missing,
writedir,
partial: Some(PendingNetDir::Partial(partial_dir)),
meta,
@ -703,19 +712,52 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
}
}
impl<DM: WriteNetDir> GetMicrodescsState<DM> {
/// Try to obtain info from an inner `MdReceiver`
///
/// Either finds an inner `MdReceiver` and calls `f` on it, or returns `default()`.
///
/// Used for missing microdescs.
fn with_mdreceiver_for_missing<F, T>(&self, f: F) -> T
where
F: FnOnce(&dyn MdReceiver) -> T,
T: Default,
{
if let Some(partial) = &self.partial {
return f(partial);
} else if let Some(wd) = Weak::upgrade(&self.writedir) {
if let Some(nd) = wd.netdir().get() {
return f(nd.as_ref());
}
}
Default::default()
}
/// Number of missing microdescriptors, or 0
///
/// Can return 0 if we don't have that information.
fn n_missing(&self) -> usize {
self.with_mdreceiver_for_missing(|d| d.n_missing())
}
}
impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
fn describe(&self) -> String {
format!(
"Downloading microdescriptors (we are missing {}).",
self.missing.len()
self.n_missing()
)
}
fn missing_docs(&self) -> Vec<DocId> {
self.missing.iter().map(|d| DocId::Microdesc(*d)).collect()
self.with_mdreceiver_for_missing(|d| {
d.missing_microdescs()
.map(|d| DocId::Microdesc(*d))
.collect()
})
}
fn is_ready(&self, ready: Readiness) -> bool {
match ready {
Readiness::Complete => self.missing.is_empty(),
Readiness::Complete => self.n_missing() == 0,
Readiness::Usable => self.partial.is_none(),
}
}
@ -723,7 +765,7 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
false
}
fn bootstrap_status(&self) -> DirStatus {
let n_present = self.n_microdescs - self.missing.len();
let n_present = self.n_microdescs - self.n_missing();
DirStatusInner::Validated {
lifetime: self.meta.lifetime().clone(),
n_mds: (n_present as u32, self.n_microdescs as u32),
@ -746,10 +788,6 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
let mut microdescs = Vec::new();
for (id, text) in docs {
if let DocId::Microdesc(digest) = id {
if !self.missing.remove(&digest) {
warn!("Bug: loaded a microdesc that we didn't want from the cache.");
continue;
}
if let Ok(md) = Microdesc::parse(text.as_str().map_err(Error::BadUtf8InCache)?) {
if md.digest() == &digest {
microdescs.push(md);
@ -768,9 +806,6 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
// so often. As a compromise we call `shrink_to_fit at most twice
// per consensus: once when we're ready to use, and once when we are
// complete.
self.missing.shrink_to_fit();
} else if self.missing.is_empty() {
self.missing.shrink_to_fit();
}
Ok(changed)
@ -800,7 +835,6 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
);
continue;
}
self.missing.remove(md.digest());
new_mds.push((txt, md));
}

View File

@ -73,7 +73,7 @@ use tor_netdoc::doc::netstatus::{self, MdConsensus, RouterStatus};
use tor_netdoc::types::policy::PortPolicy;
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use tracing::warn;
@ -150,59 +150,6 @@ impl SubnetConfig {
}
}
/// Internal type: either a microdescriptor, or the digest for a
/// microdescriptor that we want.
///
/// This is a separate type so we can use a HashSet instead of
/// HashMap.
#[derive(Clone, Debug)]
enum MdEntry {
/// A microdescriptor that is wanted but not present.
Absent {
/// Index of the routerstatus entry that wants this microdesc.
rs_idx: usize,
/// Content digest of the microdescriptor.
// TODO: I wanted to make this a reference at first, but that's
// nontrival. At this point I'm not 100% sure it would be a good
// idea.
d: MdDigest,
},
/// A microdescriptor that we have.
Present {
/// The microdescriptor itself.
md: Arc<Microdesc>,
},
}
impl std::borrow::Borrow<MdDigest> for MdEntry {
fn borrow(&self) -> &MdDigest {
self.digest()
}
}
impl MdEntry {
/// Return the digest for this entry.
fn digest(&self) -> &MdDigest {
match self {
MdEntry::Absent { d, .. } => d,
MdEntry::Present { md, .. } => md.digest(),
}
}
}
impl PartialEq for MdEntry {
fn eq(&self, rhs: &MdEntry) -> bool {
self.digest() == rhs.digest()
}
}
impl Eq for MdEntry {}
impl std::hash::Hash for MdEntry {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.digest().hash(state);
}
}
/// An opaque type representing the weight with which a relay or set of
/// relays will be selected for a given role.
///
@ -274,9 +221,12 @@ pub struct NetDir {
/// A map from keys to integer values, distributed in the consensus,
/// and clamped to certain defaults.
params: NetParameters,
/// Map from SHA256 digest of microdescriptors to the
/// microdescriptors themselves.
mds: HashSet<MdEntry>,
/// Map from routerstatus index (the position of a routerstatus within the
/// consensus), to that routerstatus's microdescriptor (if we have one.)
mds: Vec<Option<Arc<Microdesc>>>,
/// Map from SHA256 of _missing_ microdescriptors to the position of their
/// corresponding routerstatus indices within `consensus`.
rs_idx_by_missing: HashMap<MdDigest, usize>,
/// Map from ed25519 identity to index of the routerstatus within
/// `self.consensus.relays()`.
///
@ -342,6 +292,8 @@ pub trait MdReceiver {
///
/// Return true if it was indeed wanted.
fn add_microdesc(&mut self, md: Microdesc) -> bool;
/// Return the number of missing microdescriptors.
fn n_missing(&self) -> usize;
}
impl PartialNetDir {
@ -373,14 +325,13 @@ impl PartialNetDir {
// Compute the weights we'll want to use for these relays.
let weights = weight::WeightSet::from_consensus(&consensus, &params);
let mds = consensus
let n_relays = consensus.relays().len();
let rs_idx_by_missing = consensus
.relays()
.iter()
.enumerate()
.map(|(rs_idx, rs)| MdEntry::Absent {
rs_idx,
d: *rs.md_digest(),
})
.map(|(rs_idx, rs)| (*rs.md_digest(), rs_idx))
.collect();
let rs_idx_by_rsa = consensus
@ -393,9 +344,10 @@ impl PartialNetDir {
let netdir = NetDir {
consensus: Arc::new(consensus),
params,
mds,
mds: vec![None; n_relays],
rs_idx_by_missing,
rs_idx_by_rsa: Arc::new(rs_idx_by_rsa),
rs_idx_by_ed: HashMap::new(),
rs_idx_by_ed: HashMap::with_capacity(n_relays),
weights,
};
@ -411,12 +363,9 @@ impl PartialNetDir {
/// netdir, using the microdescriptors from the previous netdir.
pub fn fill_from_previous_netdir<'a>(&mut self, prev: &'a NetDir) -> Vec<&'a MdDigest> {
let mut loaded = Vec::new();
for ent in &prev.mds {
if let MdEntry::Present { md, .. } = ent {
if self.netdir.mds.contains(md.digest()) {
loaded.push(md.digest());
self.netdir.add_arc_microdesc(Arc::clone(md));
}
for md in prev.mds.iter().flatten() {
if self.netdir.add_arc_microdesc(md.clone()) {
loaded.push(md.digest());
}
}
loaded
@ -453,6 +402,9 @@ impl MdReceiver for PartialNetDir {
fn add_microdesc(&mut self, md: Microdesc) -> bool {
self.netdir.add_microdesc(md)
}
fn n_missing(&self) -> usize {
self.netdir.n_missing()
}
}
impl NetDir {
@ -466,23 +418,23 @@ impl NetDir {
/// Return true if we wanted it, and false otherwise.
#[allow(clippy::missing_panics_doc)] // Can't panic on valid object.
fn add_arc_microdesc(&mut self, md: Arc<Microdesc>) -> bool {
if let Some(prev_ent) = self.mds.take(md.digest()) {
if let MdEntry::Absent { rs_idx, .. } = prev_ent {
assert_eq!(self.consensus.relays()[rs_idx].md_digest(), md.digest());
if let Some(rs_idx) = self.rs_idx_by_missing.remove(md.digest()) {
assert_eq!(self.consensus.relays()[rs_idx].md_digest(), md.digest());
// There should never be two approved MDs in the same
// consensus listing the same ID... but if there is,
// we'll let the most recent one win.
self.rs_idx_by_ed.insert(*md.ed25519_id(), rs_idx);
// There should never be two approved MDs in the same
// consensus listing the same ID... but if there is,
// we'll let the most recent one win.
self.rs_idx_by_ed.insert(*md.ed25519_id(), rs_idx);
// Happy path: we did indeed want this one.
self.mds.insert(MdEntry::Present { md });
// Happy path: we did indeed want this one.
self.mds[rs_idx] = Some(md);
return true;
} else {
// We already had this.
self.mds.insert(prev_ent);
// Save some space in the missing-descriptor list.
if self.rs_idx_by_missing.len() < self.rs_idx_by_missing.capacity() / 4 {
self.rs_idx_by_missing.shrink_to_fit();
}
return true;
}
// Either we already had it, or we never wanted it at all.
@ -490,15 +442,21 @@ impl NetDir {
}
/// Construct a (possibly invalid) Relay object from a routerstatus and its
/// microdescriptor (if any).
fn relay_from_rs<'a>(
/// position within the consensus.
fn relay_from_rs_and_idx<'a>(
&'a self,
rs: &'a netstatus::MdConsensusRouterStatus,
rs_idx: usize,
) -> UncheckedRelay<'a> {
let md = match self.mds.get(rs.md_digest()) {
Some(MdEntry::Present { md }) => Some(Arc::as_ref(md)),
_ => None,
};
debug_assert_eq!(
self.consensus.relays()[rs_idx].rsa_identity(),
rs.rsa_identity()
);
let md = self.mds[rs_idx].as_deref();
if let Some(md) = md {
debug_assert_eq!(rs.md_digest(), md.digest());
}
UncheckedRelay { rs, md }
}
@ -526,7 +484,8 @@ impl NetDir {
self.consensus
.relays()
.iter()
.map(move |rs| self.relay_from_rs(rs))
.enumerate()
.map(move |(idx, rs)| self.relay_from_rs_and_idx(rs, idx))
}
/// Return an iterator over all usable Relays.
pub fn relays(&self) -> impl Iterator<Item = Relay<'_>> {
@ -541,10 +500,10 @@ impl NetDir {
/// with this ID, the ID may become usable.
#[allow(clippy::missing_panics_doc)] // Can't panic on valid object.
pub fn by_id(&self, id: &Ed25519Identity) -> Option<Relay<'_>> {
let rs_idx = self.rs_idx_by_ed.get(id)?;
let rs = self.consensus.relays().get(*rs_idx).expect("Corrupt index");
let rs_idx = *self.rs_idx_by_ed.get(id)?;
let rs = self.consensus.relays().get(rs_idx).expect("Corrupt index");
let relay = self.relay_from_rs(rs).into_relay()?;
let relay = self.relay_from_rs_and_idx(rs, rs_idx).into_relay()?;
assert_eq!(id, relay.id());
Some(relay)
}
@ -610,10 +569,10 @@ impl NetDir {
/// Return a (possibly unusable) relay with a given RSA identity.
#[allow(clippy::missing_panics_doc)] // Can't panic on valid object.
pub fn by_rsa_id_unchecked(&self, rsa_id: &RsaIdentity) -> Option<UncheckedRelay<'_>> {
let rs_idx = self.rs_idx_by_rsa.get(rsa_id)?;
let rs = self.consensus.relays().get(*rs_idx).expect("Corrupt index");
let rs_idx = *self.rs_idx_by_rsa.get(rsa_id)?;
let rs = self.consensus.relays().get(rs_idx).expect("Corrupt index");
assert_eq!(rs.rsa_identity(), rsa_id);
Some(self.relay_from_rs(rs))
Some(self.relay_from_rs_and_idx(rs, rs_idx))
}
/// Return the relay with a given RSA identity, if we have one
/// and it is usable.
@ -864,17 +823,14 @@ impl NetDir {
impl MdReceiver for NetDir {
fn missing_microdescs(&self) -> Box<dyn Iterator<Item = &MdDigest> + '_> {
Box::new(self.consensus.relays().iter().filter_map(move |rs| {
let d = rs.md_digest();
match self.mds.get(d) {
Some(MdEntry::Absent { d, .. }) => Some(d),
_ => None,
}
}))
Box::new(self.rs_idx_by_missing.keys())
}
fn add_microdesc(&mut self, md: Microdesc) -> bool {
self.add_arc_microdesc(Arc::new(md))
}
fn n_missing(&self) -> usize {
self.rs_idx_by_missing.len()
}
}
impl<'a> UncheckedRelay<'a> {