Merge branch 'download-schedule' into 'main'

DownloadSchedule: Introduce Builder

See merge request tpo/core/arti!473
This commit is contained in:
Nick Mathewson 2022-04-26 18:47:08 +00:00
commit 5484bcc21f
8 changed files with 176 additions and 148 deletions

View File

@ -341,8 +341,8 @@ impl TryInto<dir::DirMgrConfig> for &TorClientConfig {
#[rustfmt::skip]
fn try_into(self) -> Result<dir::DirMgrConfig, ConfigBuildError> {
Ok(dir::DirMgrConfig {
network_config: self.tor_network .clone(),
schedule_config: self.download_schedule .clone(),
network: self.tor_network .clone(),
schedule: self.download_schedule .clone(),
cache_path: self.storage.expand_cache_dir()?,
override_net_params: self.override_net_params.clone(),
extensions: Default::default(),
@ -383,7 +383,6 @@ mod test {
#[test]
fn builder() {
use tor_dirmgr::DownloadSchedule;
let sec = std::time::Duration::from_secs(1);
let auth = dir::Authority::builder()
@ -402,9 +401,14 @@ mod test {
bld.storage()
.cache_dir(CfgPath::new("/var/tmp/foo".to_owned()))
.state_dir(CfgPath::new("/var/tmp/bar".to_owned()));
bld.download_schedule().retry_certs().attempts(10);
bld.download_schedule().retry_certs().initial_delay(sec);
bld.download_schedule().retry_certs().parallelism(3);
bld.download_schedule().retry_microdescs().attempts(30);
bld.download_schedule()
.retry_certs(DownloadSchedule::new(10, sec, 3))
.retry_microdescs(DownloadSchedule::new(30, 10 * sec, 9));
.retry_microdescs()
.initial_delay(10 * sec);
bld.download_schedule().retry_microdescs().parallelism(9);
bld.override_net_params()
.insert("wombats-per-quokka".to_owned(), 7);
bld.path_rules()

View File

@ -83,16 +83,16 @@ state_dir = "${ARTI_LOCAL_DATA}"
[download_schedule]
# How to retry our initial bootstrapping when we're trying to start up.
retry_bootstrap = { num_retries = 128, initial_delay = "1 sec" }
retry_bootstrap = { attempts = 128, initial_delay = "1 sec" }
# How to retry a single consensus download.
retry_consensus = { num_retries = 3, initial_delay = "1 sec" }
retry_consensus = { attempts = 3, initial_delay = "1 sec" }
# How to retry a set of authority certificate downloads.
retry_certs = { num_retries = 3, initial_delay = "1 sec" }
retry_certs = { attempts = 3, initial_delay = "1 sec" }
# How to retry a set of microdescriptor downloads.
retry_microdescs = { num_retries = 3, initial_delay = "1 sec", parallelism = 4 }
retry_microdescs = { attempts = 3, initial_delay = "1 sec", parallelism = 4 }
# Tells the circuit manager rule for constructing circuit paths
[path_rules]

View File

@ -219,7 +219,6 @@ mod test {
#[test]
fn builder() {
use arti_client::config::dir::DownloadSchedule;
use tor_config::CfgPath;
let sec = std::time::Duration::from_secs(1);
@ -245,10 +244,24 @@ mod test {
.storage()
.cache_dir(CfgPath::new("/var/tmp/foo".to_owned()))
.state_dir(CfgPath::new("/var/tmp/bar".to_owned()));
bld.tor().download_schedule().retry_certs().attempts(10);
bld.tor()
.download_schedule()
.retry_certs(DownloadSchedule::new(10, sec, 3))
.retry_microdescs(DownloadSchedule::new(30, 10 * sec, 9));
.retry_certs()
.initial_delay(sec);
bld.tor().download_schedule().retry_certs().parallelism(3);
bld.tor()
.download_schedule()
.retry_microdescs()
.attempts(30);
bld.tor()
.download_schedule()
.retry_microdescs()
.initial_delay(10 * sec);
bld.tor()
.download_schedule()
.retry_microdescs()
.parallelism(9);
bld.tor()
.override_net_params()
.insert("wombats-per-quokka".to_owned(), 7);

View File

@ -8,10 +8,10 @@
//! The types in this module are re-exported from `arti-client`: any changes
//! here must be reflected in the version of `arti-client`.
use crate::authority::AuthorityList;
use crate::retry::DownloadSchedule;
use crate::authority::{Authority, AuthorityList};
use crate::retry::{DownloadSchedule, DownloadScheduleBuilder};
use crate::storage::DynStore;
use crate::{Authority, AuthorityListBuilder, Result};
use crate::{AuthorityListBuilder, Result};
use tor_config::ConfigBuildError;
use tor_guardmgr::fallback::FallbackListBuilder;
use tor_netdoc::doc::netstatus;
@ -110,34 +110,30 @@ impl NetworkConfigBuilder {
#[builder(derive(Deserialize))]
pub struct DownloadScheduleConfig {
/// Top-level configuration for how to retry our initial bootstrap attempt.
#[serde(default = "default_retry_bootstrap")]
#[builder(default = "default_retry_bootstrap()")]
retry_bootstrap: DownloadSchedule,
#[builder(
sub_builder,
field(build = "self.retry_bootstrap.build_retry_bootstrap()?")
)]
#[builder_field_attr(serde(default))]
pub(crate) retry_bootstrap: DownloadSchedule,
/// Configuration for how to retry a consensus download.
#[serde(default)]
#[builder(default)]
retry_consensus: DownloadSchedule,
#[builder(sub_builder)]
#[builder_field_attr(serde(default))]
pub(crate) retry_consensus: DownloadSchedule,
/// Configuration for how to retry an authority cert download.
#[serde(default)]
#[builder(default)]
retry_certs: DownloadSchedule,
#[builder(sub_builder)]
#[builder_field_attr(serde(default))]
pub(crate) retry_certs: DownloadSchedule,
/// Configuration for how to retry a microdescriptor download.
#[serde(default = "default_microdesc_schedule")]
#[builder(default = "default_microdesc_schedule()")]
retry_microdescs: DownloadSchedule,
}
/// Default value for retry_bootstrap in DownloadScheduleConfig.
fn default_retry_bootstrap() -> DownloadSchedule {
DownloadSchedule::new(128, std::time::Duration::new(1, 0), 1)
}
/// Default value for microdesc_bootstrap in DownloadScheduleConfig.
fn default_microdesc_schedule() -> DownloadSchedule {
DownloadSchedule::new(3, std::time::Duration::new(1, 0), 4)
#[builder(
sub_builder,
field(build = "self.retry_microdescs.build_retry_microdescs()?")
)]
#[builder_field_attr(serde(default))]
pub(crate) retry_microdescs: DownloadSchedule,
}
impl Default for DownloadScheduleConfig {
@ -186,7 +182,7 @@ pub struct DirMgrConfig {
pub cache_path: PathBuf,
/// Configuration information about the network.
pub network_config: NetworkConfig,
pub network: NetworkConfig,
/// Configuration information about when we download things.
///
@ -198,7 +194,7 @@ pub struct DirMgrConfig {
/// on in-progress attempts as well, at least at the top level. Users
/// should _not_ assume that the effect of changing this option will always
/// be delayed.)
pub schedule_config: DownloadScheduleConfig,
pub schedule: DownloadScheduleConfig,
/// A map of network parameters that we're overriding from their settings in
/// the consensus.
@ -230,30 +226,14 @@ impl DirMgrConfig {
)?))
}
/// Return the configured cache path.
pub fn cache_path(&self) -> &std::path::Path {
self.cache_path.as_ref()
}
/// Return a slice of the configured authorities
pub fn authorities(&self) -> &[Authority] {
&self.network_config.authorities
&self.network.authorities
}
/// Return the configured set of fallback directories
pub fn fallbacks(&self) -> &tor_guardmgr::fallback::FallbackList {
&self.network_config.fallbacks
}
/// Return set of configured networkstatus parameter overrides.
pub fn override_net_params(&self) -> &netstatus::NetParams<i32> {
&self.override_net_params
}
/// Return the schedule configuration we should use to decide when to
/// attempt and retry downloads.
pub fn schedule(&self) -> &DownloadScheduleConfig {
&self.schedule_config
&self.network.fallbacks
}
/// Construct a new configuration object where all replaceable fields in
@ -263,11 +243,11 @@ impl DirMgrConfig {
pub(crate) fn update_from_config(&self, new_config: &DirMgrConfig) -> DirMgrConfig {
DirMgrConfig {
cache_path: self.cache_path.clone(),
network_config: NetworkConfig {
fallbacks: new_config.network_config.fallbacks.clone(),
authorities: self.network_config.authorities.clone(),
network: NetworkConfig {
fallbacks: new_config.network.fallbacks.clone(),
authorities: self.network.authorities.clone(),
},
schedule_config: new_config.schedule_config.clone(),
schedule: new_config.schedule.clone(),
override_net_params: new_config.override_net_params.clone(),
extensions: new_config.extensions.clone(),
}
@ -292,29 +272,6 @@ pub struct DirMgrExtensions {
pub filter: crate::filter::FilterConfig,
}
impl DownloadScheduleConfig {
/// Return configuration for retrying our entire bootstrap
/// operation at startup.
pub(crate) fn retry_bootstrap(&self) -> &DownloadSchedule {
&self.retry_bootstrap
}
/// Return configuration for retrying a consensus download.
pub(crate) fn retry_consensus(&self) -> &DownloadSchedule {
&self.retry_consensus
}
/// Return configuration for retrying an authority certificate download
pub(crate) fn retry_certs(&self) -> &DownloadSchedule {
&self.retry_certs
}
/// Return configuration for retrying an authority certificate download
pub(crate) fn retry_microdescs(&self) -> &DownloadSchedule {
&self.retry_microdescs
}
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
@ -384,21 +341,30 @@ mod test {
let mut bld = DownloadScheduleConfig::builder();
let cfg = bld.build().unwrap();
assert_eq!(cfg.retry_microdescs().parallelism(), 4);
assert_eq!(cfg.retry_microdescs().n_attempts(), 3);
assert_eq!(cfg.retry_bootstrap().n_attempts(), 128);
assert_eq!(cfg.retry_microdescs.parallelism(), 4);
assert_eq!(cfg.retry_microdescs.n_attempts(), 3);
assert_eq!(cfg.retry_bootstrap.n_attempts(), 128);
bld.retry_consensus(DownloadSchedule::new(7, Duration::new(86400, 0), 1))
.retry_bootstrap(DownloadSchedule::new(4, Duration::new(3600, 0), 1))
.retry_certs(DownloadSchedule::new(5, Duration::new(3600, 0), 1))
.retry_microdescs(DownloadSchedule::new(6, Duration::new(3600, 0), 0));
bld.retry_consensus().attempts(7);
bld.retry_consensus().initial_delay(Duration::new(86400, 0));
bld.retry_consensus().parallelism(1);
bld.retry_bootstrap().attempts(4);
bld.retry_bootstrap().initial_delay(Duration::new(3600, 0));
bld.retry_bootstrap().parallelism(1);
bld.retry_certs().attempts(5);
bld.retry_certs().initial_delay(Duration::new(3600, 0));
bld.retry_certs().parallelism(1);
bld.retry_microdescs().attempts(6);
bld.retry_microdescs().initial_delay(Duration::new(3600, 0));
bld.retry_microdescs().parallelism(1);
let cfg = bld.build().unwrap();
assert_eq!(cfg.retry_microdescs().parallelism(), 1); // gets clamped
assert_eq!(cfg.retry_microdescs().n_attempts(), 6);
assert_eq!(cfg.retry_bootstrap().n_attempts(), 4);
assert_eq!(cfg.retry_consensus().n_attempts(), 7);
assert_eq!(cfg.retry_certs().n_attempts(), 5);
assert_eq!(cfg.retry_microdescs.parallelism(), 1);
assert_eq!(cfg.retry_microdescs.n_attempts(), 6);
assert_eq!(cfg.retry_bootstrap.n_attempts(), 4);
assert_eq!(cfg.retry_consensus.n_attempts(), 7);
assert_eq!(cfg.retry_certs.n_attempts(), 5);
Ok(())
}
@ -411,7 +377,7 @@ mod test {
bld.override_net_params.set("circwindow".into(), 999);
bld.cache_path = tmp.path().into();
assert_eq!(bld.override_net_params().get("circwindow").unwrap(), &999);
assert_eq!(bld.override_net_params.get("circwindow").unwrap(), &999);
Ok(())
}

View File

@ -534,7 +534,7 @@ impl<R: Runtime> DirMgr<R> {
// TODO(nickm): instead of getting this every time we loop, it
// might be a good idea to refresh it with each attempt, at
// least at the point of checking the number of attempts.
*dirmgr.config.get().schedule().retry_bootstrap()
dirmgr.config.get().schedule.retry_bootstrap
};
let mut retry_delay = retry_config.schedule();
@ -607,7 +607,7 @@ impl<R: Runtime> DirMgr<R> {
// We don't support changing these: doing so basically would require us
// to abort all our in-progress downloads, since they might be based on
// no-longer-viable information.
if new_config.cache_path() != config.cache_path() {
if new_config.cache_path != config.cache_path {
how.cannot_change("storage.cache_path")?;
}
if new_config.authorities() != config.authorities() {
@ -618,14 +618,14 @@ impl<R: Runtime> DirMgr<R> {
return Ok(());
}
let params_changed = new_config.override_net_params() != config.override_net_params();
let params_changed = new_config.override_net_params != config.override_net_params;
self.config
.map_and_replace(|cfg| cfg.update_from_config(new_config));
if params_changed {
let _ignore_err = self.netdir.mutate(|netdir| {
netdir.replace_overridden_parameters(new_config.override_net_params());
netdir.replace_overridden_parameters(&new_config.override_net_params);
Ok(())
});
// (It's okay to ignore the error, since it just means that there

View File

@ -6,34 +6,93 @@
use std::num::{NonZeroU32, NonZeroU8};
use std::time::Duration;
use derive_builder::Builder;
use serde::Deserialize;
use tor_basic_utils::retry::RetryDelay;
use tor_config::ConfigBuildError;
/// Configuration for how many times to retry a download, with what
/// frequency.
#[derive(Debug, Copy, Clone, Deserialize, Eq, PartialEq)]
#[derive(Debug, Builder, Copy, Clone, Deserialize, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
#[builder(build_fn(error = "ConfigBuildError"))]
#[builder(derive(Deserialize))]
pub struct DownloadSchedule {
/// How many times to retry before giving up?
num_retries: NonZeroU32,
/// How many attempts to make before giving up?
#[builder(
setter(strip_option),
field(
type = "Option<u32>",
build = r#"build_nonzero(self.attempts, 3, "attempts")?"#
)
)]
attempts: NonZeroU32,
/// The amount of time to delay after the first failure, and a
/// lower-bound for future delays.
#[serde(with = "humantime_serde")]
#[builder(default = "Duration::from_millis(1000)")]
#[builder_field_attr(serde(with = "humantime_serde::option"))]
initial_delay: Duration,
/// When we want to download a bunch of these at a time, how many
/// attempts should we try to launch at once?
#[serde(default = "default_parallelism")]
#[builder(
setter(strip_option),
field(
type = "Option<u8>",
build = r#"build_nonzero(self.parallelism, 1, "parallelism")?"#
)
)]
parallelism: NonZeroU8,
}
impl DownloadScheduleBuilder {
/// Default value for retry_bootstrap in DownloadScheduleConfig.
pub(crate) fn build_retry_bootstrap(&self) -> Result<DownloadSchedule, ConfigBuildError> {
let mut bld = self.clone();
bld.attempts.get_or_insert(128);
bld.initial_delay.get_or_insert_with(|| Duration::new(1, 0));
bld.parallelism.get_or_insert(1);
bld.build()
}
/// Default value for microdesc_bootstrap in DownloadScheduleConfig.
pub(crate) fn build_retry_microdescs(&self) -> Result<DownloadSchedule, ConfigBuildError> {
let mut bld = self.clone();
bld.attempts.get_or_insert(3);
bld.initial_delay
.get_or_insert_with(|| (Duration::new(1, 0)));
bld.parallelism.get_or_insert(4);
bld.build()
}
}
impl Default for DownloadSchedule {
fn default() -> Self {
DownloadSchedule::new(3, Duration::from_millis(1000), 1)
DownloadSchedule::builder()
.build()
.expect("build default DownloadSchedule")
}
}
/// Helper for building a NonZero* field
fn build_nonzero<NZ, I>(
spec: Option<I>,
default: I,
field: &'static str,
) -> Result<NZ, ConfigBuildError>
where
I: TryInto<NZ>,
{
spec.unwrap_or(default).try_into().map_err(|_| {
let field = field.into();
let problem = "zero specified, but not permitted".to_string();
ConfigBuildError::Invalid { field, problem }
})
}
/// Return the default parallelism for DownloadSchedule.
fn default_parallelism() -> NonZeroU8 {
#![allow(clippy::unwrap_used)]
@ -41,39 +100,21 @@ fn default_parallelism() -> NonZeroU8 {
}
impl DownloadSchedule {
/// Create a new DownloadSchedule to control our logic for retrying
/// a given download.
///
/// The resulting configuration will always make at least one
/// attempt, and at most `attempts`. After a failure, it will
/// wait at least `initial_delay` before trying again.
#[allow(clippy::missing_panics_doc)] // can't really panic.
pub fn new(attempts: u32, initial_delay: Duration, parallelism: u8) -> Self {
// If unwrapping `1.try_into()` is not safe there are bigger problems
#![allow(clippy::unwrap_used)]
let num_retries = attempts
.try_into()
.unwrap_or_else(|_| 1.try_into().unwrap());
let parallelism = parallelism
.try_into()
.unwrap_or_else(|_| 1.try_into().unwrap());
DownloadSchedule {
num_retries,
initial_delay,
parallelism,
}
/// Return a new [`DownloadScheduleBuilder`]
pub fn builder() -> DownloadScheduleBuilder {
DownloadScheduleBuilder::default()
}
/// Return an iterator to use over all the supported attempts for
/// this configuration.
pub fn attempts(&self) -> impl Iterator<Item = u32> {
0..(self.num_retries.into())
0..(self.attempts.into())
}
/// Return the number of times that we're supposed to retry, according
/// to this DownloadSchedule.
pub fn n_attempts(&self) -> u32 {
self.num_retries.into()
self.attempts.into()
}
/// Return the number of parallel attempts that we're supposed to launch,
@ -99,7 +140,6 @@ mod test {
// default configuration is 3 tries, 1000 msec initial delay
let cfg = DownloadSchedule::default();
let one_sec = Duration::from_secs(1);
let zero_sec = Duration::from_secs(0);
let mut rng = rand::thread_rng();
assert_eq!(cfg.n_attempts(), 3);
@ -110,15 +150,14 @@ mod test {
let mut sched = cfg.schedule();
assert_eq!(sched.next_delay(&mut rng), one_sec);
// Try a zero-attempt schedule, and have it get remapped to 1,1
let cfg = DownloadSchedule::new(0, zero_sec, 0);
assert_eq!(cfg.n_attempts(), 1);
assert_eq!(cfg.parallelism(), 1);
let v: Vec<_> = cfg.attempts().collect();
assert_eq!(&v[..], &[0]);
assert_eq!(cfg.initial_delay, zero_sec);
let mut sched = cfg.schedule();
assert_eq!(sched.next_delay(&mut rng), one_sec);
// Try schedules with zeroes and show that they fail
DownloadSchedule::builder()
.attempts(0)
.build()
.expect_err("built with 0 retries");
DownloadSchedule::builder()
.parallelism(0)
.build()
.expect_err("built with 0 parallelism");
}
}

View File

@ -217,7 +217,7 @@ impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
}
fn dl_config(&self) -> Result<DownloadSchedule> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok(*wd.config().schedule().retry_consensus())
Ok(wd.config().schedule.retry_consensus)
} else {
Err(Error::ManagerDropped)
}
@ -398,7 +398,7 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
}
fn dl_config(&self) -> Result<DownloadSchedule> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok(*wd.config().schedule().retry_certs())
Ok(wd.config().schedule.retry_certs)
} else {
Err(Error::ManagerDropped)
}
@ -627,7 +627,7 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
let partial_dir = match Weak::upgrade(&writedir) {
Some(wd) => {
let config = wd.config();
let params = config.override_net_params();
let params = &config.override_net_params;
let mut dir = PartialNetDir::new(consensus, Some(params));
if let Some(old_dir) = wd.netdir().get() {
dir.fill_from_previous_netdir(&old_dir);
@ -696,7 +696,7 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
self.reset_time = pick_download_time(netdir.lifetime());
// We re-set the parameters here, in case they have been
// reconfigured.
netdir.replace_overridden_parameters(wd.config().override_net_params());
netdir.replace_overridden_parameters(&wd.config().override_net_params);
wd.netdir().replace(netdir);
wd.netdir_consensus_changed();
wd.netdir_descriptors_changed();
@ -791,7 +791,7 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
}
fn dl_config(&self) -> Result<DownloadSchedule> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok(*wd.config().schedule().retry_microdescs())
Ok(wd.config().schedule.retry_microdescs)
} else {
Err(Error::ManagerDropped)
}
@ -1017,7 +1017,7 @@ mod test {
}
let cfg = DirMgrConfig {
cache_path: "/we_will_never_use_this/".into(),
network_config: netcfg.build().unwrap(),
network: netcfg.build().unwrap(),
..Default::default()
};
let cfg = Arc::new(cfg);
@ -1138,7 +1138,7 @@ mod test {
// Download configuration is simple: only 1 request can be done in
// parallel. It uses a consensus retry schedule.
let retry = state.dl_config().unwrap();
assert_eq!(&retry, DownloadScheduleConfig::default().retry_consensus());
assert_eq!(retry, DownloadScheduleConfig::default().retry_consensus);
// Do we know what we want?
let docs = state.missing_docs();
@ -1239,7 +1239,7 @@ mod test {
let consensus_expires = datetime!(2020-08-07 12:43:20 UTC).into();
assert_eq!(state.reset_time(), Some(consensus_expires));
let retry = state.dl_config().unwrap();
assert_eq!(&retry, DownloadScheduleConfig::default().retry_certs());
assert_eq!(retry, DownloadScheduleConfig::default().retry_certs);
// Bootstrap status okay?
assert_eq!(
@ -1365,7 +1365,7 @@ mod test {
assert!(reset_time <= valid_until);
}
let retry = state.dl_config().unwrap();
assert_eq!(&retry, DownloadScheduleConfig::default().retry_microdescs());
assert_eq!(retry, DownloadScheduleConfig::default().retry_microdescs);
assert_eq!(
state.bootstrap_status().to_string(),
"fetching microdescriptors (0/4)"

View File

@ -27,6 +27,9 @@ BREAKING: Requiring Rust 1.56, edition 2021.
BREAKING: Configuration of fallback directories overhauled; now uses FalllbadkDirBuilder more.
BREAKING: Configuration of directory authoorities overhauled; now uses AuthorityListBuilder.
BREAKING: Configuration of preemptive ports overhauled; now uses PredictedPortsListBuilder..
BREAKING: Configuration of download schedules overhauled; now uses builders
BREAKING: download schedules: "num_retries" configuration field renamed to (accurate) "attempts"
BREAKING: download schedules: Setting zero values for attempts or parallelism is now rejected
### arti
@ -51,6 +54,9 @@ BREAKING: Made internal scheduled entry points non-public.
### tor-dirmgr
BREAKING: AuthorityBuilder::build now throws ConfigBuildError, not a custom error type
BREAKING: DownloadSchedule::new() replaced with DownloadScheduleBuilder
BREAKING: DownloadScheduleConfigBuilder now has accessors for the schedules, not setters
BREAKING: DirMgrCfg: schedule and network fields rename (`_config` removed)
### tor-guardmgr