Merge branch 'main' into 'accel-features'

# Conflicts:
#   crates/arti-client/Cargo.toml
This commit is contained in:
Nick Mathewson 2022-06-16 13:24:46 +00:00
commit c4a9c8a1c4
30 changed files with 272 additions and 158 deletions

1
Cargo.lock generated
View File

@ -3630,6 +3630,7 @@ dependencies = [
"rand 0.8.5",
"retry-error",
"rusqlite",
"scopeguard",
"serde",
"signature",
"tempfile",

View File

@ -18,16 +18,8 @@ default = ["tokio", "native-tls"]
# * Features that are testing-only
# * Features which are select a particular implementation or build flag and
# which therefore are not strictly additive.
full = [
"tokio",
"async-std",
"native-tls",
"rustls",
"tor-rtcompat/full",
"tor-proto/full",
"tor-netdoc/full",
"tor-dirmgr/full",
]
# * Features which may introduce unnecessary licensing restrictions.
full = ["tokio", "async-std", "native-tls", "tor-rtcompat/full", "tor-proto/full", "tor-netdoc/full", "tor-dirmgr/full"]
async-std = ["tor-rtcompat/async-std"]
tokio = ["tor-rtcompat/tokio", "tor-proto/tokio"]

View File

@ -384,6 +384,7 @@ impl<R: Runtime> TorClient<R> {
let mut periodic_task_handles = circmgr
.launch_background_tasks(&runtime, &dirmgr, statemgr.clone())
.map_err(ErrorDetail::CircMgrSetup)?;
periodic_task_handles.extend(dirmgr.download_task_handle());
periodic_task_handles.extend(
chanmgr

View File

@ -172,8 +172,6 @@
//! [native-tls](https://github.com/sfackler/rust-native-tls) crate for TLS
//! support
//! * `async-std` -- build with [async-std](https://async.rs/) support
//! * `rustls` -- build with the [rustls](https://github.com/rustls/rustls)
//! crate for TLS support
//!
//! * `full` -- Build with all features above, along with all stable additive
//! features from other arti crates. (This does not include experimental
@ -181,6 +179,11 @@
//! implementation to the exclusion of another, or those that set a build
//! flag.)
//!
//! * `rustls` -- build with the [rustls](https://github.com/rustls/rustls)
//! crate for TLS support. This is not included in `full`, since it uses the
//! `ring` crate, which uses the old (3BSD/SSLEay) OpenSSL license, which may
//! introduce licensing compatibility issues.
//!
//! Note that flags `tokio`, `native-tls`, `async-std`, `rustls` and `static`
//! will enable the flags of the same name on the [`tor_rtcompat`] crate.
//!

View File

@ -14,7 +14,7 @@ repository = "https://gitlab.torproject.org/tpo/core/arti.git/"
[features]
default = ["tokio", "native-tls"]
full = ["async-std", "tokio", "native-tls", "rustls", "journald", "arti-client/full"]
full = ["async-std", "tokio", "native-tls", "journald", "arti-client/full"]
async-std = ["arti-client/async-std", "tor-rtcompat/async-std", "async-ctrlc", "once_cell"]
tokio = ["tokio-crate", "arti-client/tokio", "tor-rtcompat/tokio"]

1
crates/arti/semver.md Normal file
View File

@ -0,0 +1 @@
BREAKING: LoggingConfig.journald now takes `Option<Into<String>>`

View File

@ -50,7 +50,6 @@
//! disable tokio.
//! * `native-tls` -- Build with support for the `native_tls` TLS backend.
//! (default)
//! * `rustls` -- Build with support for the `rustls` TLS backend.
//! * `journald` -- Build with support for logging to the `journald` logging
//! backend (available as part of systemd.)
//!
@ -60,6 +59,11 @@
//! implementation to the exclusion of another, or those that set a build
//! flag.)
//!
//! * `rustls` -- build with the [rustls](https://github.com/rustls/rustls)
//! TLS backend. This is not included in `full`, since it uses the
//! `ring` crate, which uses the old (3BSD/SSLEay) OpenSSL license, which may
//! introduce licensing compatibility issues.
//!
//! ## Build-flag related features
//!
//! * `static` -- Link with static versions of your system dependencies,

View File

@ -34,7 +34,7 @@ pub struct LoggingConfig {
///
/// Only takes effect if Arti is built with the `journald` filter.
#[builder(
setter(into, strip_option),
setter(into),
field(build = r#"tor_config::resolve_option(&self.journald, || None)"#)
)]
journald: Option<String>,

View File

@ -2102,10 +2102,7 @@ mod test {
fn get_exit_policies() -> (ExitPolicy, ExitPolicy, ExitPolicy) {
// FIXME(eta): the below is copypasta; would be nice to have a better way of
// constructing ExitPolicy objects for testing maybe
let network = testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap();
let network = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
// Nodes with ID 0x0a through 0x13 and 0x1e through 0x27 are
// exits. Odd-numbered ones allow only ports 80 and 443;

View File

@ -93,10 +93,7 @@ mod test {
#[test]
fn dirpath_relay() {
let netdir = testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap();
let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
let mut rng = testing_rng();
let dirinfo = (&netdir).into();
let guards: OptDummyGuardMgr<'_> = None;
@ -175,10 +172,7 @@ mod test {
#[test]
fn dirpath_with_guards() {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
let netdir = testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap();
let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
let mut rng = testing_rng();
let dirinfo = (&netdir).into();
let statemgr = tor_persist::TestingStateMgr::new();

View File

@ -264,10 +264,7 @@ mod test {
#[test]
fn by_ports() {
let mut rng = testing_rng();
let netdir = testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap();
let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
let ports = vec![TargetPort::ipv4(443), TargetPort::ipv4(1119)];
let dirinfo = (&netdir).into();
let config = PathConfig::default();
@ -311,10 +308,7 @@ mod test {
#[test]
fn any_exit() {
let mut rng = testing_rng();
let netdir = testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap();
let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
let dirinfo = (&netdir).into();
let guards: OptDummyGuardMgr<'_> = None;
let now = SystemTime::now();
@ -389,10 +383,7 @@ mod test {
use tor_guardmgr::GuardStatus;
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
let netdir = testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap();
let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
let mut rng = testing_rng();
let dirinfo = (&netdir).into();
let statemgr = tor_persist::TestingStateMgr::new();

View File

@ -662,10 +662,7 @@ pub(crate) mod test {
fn buildpath() {
use crate::mgr::AbstractSpec;
let mut rng = testing_rng();
let netdir = testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap();
let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
let di = (&netdir).into();
let config = crate::PathConfig::default();
let guards: OptDummyGuardMgr<'_> = None;

View File

@ -153,9 +153,14 @@ impl Reconfigure {
///
/// See <https://gitlab.torproject.org/tpo/core/arti/-/issues/488>
///
/// For consistency with other APIs in Arti, when using this,
/// do not pass `setter(strip_option)` to derive_builder.
///
/// # ⚠ Stability Warning ⚠
///
/// We hope to significantly change this so that it is an method in an extension trait.
/// We may also make it able to support settings where the special "no such thing" value is
/// not `T::Default`.
//
// This is an annoying AOI right now because you have to write things like
// #[builder(field(build = r#"tor_config::resolve_option(&self.dns_port, || None)"#))]

View File

@ -48,6 +48,7 @@ postage = { version = "0.5.0", default-features = false, features = ["futures-tr
rand = "0.8"
retry-error = { path = "../retry-error", version = "0.2.0" }
rusqlite = { version = "0.27.0", features = ["time"] }
scopeguard = "1"
serde = { version = "1.0.103", features = ["derive"] }
signature = "1"
thiserror = "1"

View File

@ -0,0 +1,3 @@
MODIFIED: DirProvider now has download_task_handle().
There's a default implementation, so this isn't a breaking change.

View File

@ -21,7 +21,8 @@ use futures::channel::oneshot;
use futures::FutureExt;
use futures::StreamExt;
use tor_dirclient::DirResponse;
use tor_rtcompat::{Runtime, SleepProviderExt};
use tor_rtcompat::scheduler::TaskSchedule;
use tor_rtcompat::Runtime;
use tracing::{debug, info, trace, warn};
use crate::storage::Store;
@ -472,6 +473,7 @@ async fn download_attempt<R: Runtime>(
pub(crate) async fn download<R: Runtime>(
dirmgr: Weak<DirMgr<R>>,
state: &mut Box<dyn DirState>,
schedule: &mut TaskSchedule<R>,
on_usable: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
@ -515,7 +517,6 @@ pub(crate) async fn download<R: Runtime>(
}
let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
let mut reset_timeout_future = runtime.sleep_until_wallclock(reset_time).fuse();
let mut retry = retry_config.schedule();
let mut delay = None;
@ -530,14 +531,19 @@ pub(crate) async fn download<R: Runtime>(
let next_delay = retry.next_delay(&mut rand::thread_rng());
if let Some(delay) = delay.replace(next_delay) {
debug!("Waiting {:?} for next download attempt...", delay);
futures::select_biased! {
_ = reset_timeout_future => {
info!("Download attempt timed out completely; resetting download state.");
reset(state);
continue 'next_state;
}
_ = FutureExt::fuse(runtime.sleep(delay)) => {}
let time_until_reset = {
reset_time
.duration_since(now)
.unwrap_or(Duration::from_secs(0))
};
schedule.sleep(delay.min(time_until_reset)).await;
now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
if now >= reset_time {
info!("Download attempt timed out completely; resetting download state.");
reset(state);
continue 'next_state;
}
}
info!("{}: {}", attempt + 1, state.describe());
@ -553,7 +559,7 @@ pub(crate) async fn download<R: Runtime>(
continue 'next_attempt;
}
}
_ = runtime.sleep_until_wallclock(reset_time).fuse() => {
_ = schedule.sleep_until_wallclock(reset_time).fuse() => {
// We need to reset. This can happen if (for
// example) we're downloading the last few
// microdescriptors on a consensus that now
@ -785,7 +791,8 @@ mod test {
// Let's try bootstrapping when everything is in the cache.
tor_rtcompat::test_with_one_runtime!(|rt| async {
let now = rt.wallclock();
let (_tempdir, mgr) = new_mgr(rt);
let (_tempdir, mgr) = new_mgr(rt.clone());
let (mut schedule, _handle) = TaskSchedule::new(rt);
{
let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
@ -804,9 +811,14 @@ mod test {
let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
let mut on_usable = None;
super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable)
.await
.unwrap();
super::download(
Arc::downgrade(&mgr),
&mut state,
&mut schedule,
&mut on_usable,
)
.await
.unwrap();
assert!(state.is_ready(Readiness::Complete));
});
}
@ -817,7 +829,8 @@ mod test {
// phase 2 in cache.
tor_rtcompat::test_with_one_runtime!(|rt| async {
let now = rt.wallclock();
let (_tempdir, mgr) = new_mgr(rt);
let (_tempdir, mgr) = new_mgr(rt.clone());
let (mut schedule, _handle) = TaskSchedule::new(rt);
{
let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
@ -838,9 +851,14 @@ mod test {
let mut on_usable = None;
let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable)
.await
.unwrap();
super::download(
Arc::downgrade(&mgr),
&mut state,
&mut schedule,
&mut on_usable,
)
.await
.unwrap();
assert!(state.is_ready(Readiness::Complete));
});
}

View File

@ -81,6 +81,7 @@ pub use crate::shared_ref::SharedMutArc;
use crate::storage::{DynStore, Store};
use postage::watch;
pub use retry::{DownloadSchedule, DownloadScheduleBuilder};
use scopeguard::ScopeGuard;
use tor_circmgr::CircMgr;
use tor_dirclient::SourceInfo;
use tor_error::into_internal;
@ -88,7 +89,8 @@ use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
use async_trait::async_trait;
use futures::{channel::oneshot, stream::BoxStream, task::SpawnExt};
use tor_rtcompat::{Runtime, SleepProviderExt};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
use tor_rtcompat::Runtime;
use tracing::{debug, info, trace, warn};
use std::ops::Deref;
@ -134,6 +136,11 @@ pub trait DirProvider: NetDirProvider {
/// Note that this stream can be lossy: the caller will not necessarily
/// observe every event on the stream
fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
/// Return a [`TaskHandle`] that can be used to manage the download process.
fn download_task_handle(&self) -> Option<TaskHandle> {
None
}
}
// NOTE(eta): We can't implement this for Arc<DirMgr<R>> due to trait coherence rules, so instead
@ -165,6 +172,10 @@ impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
Box::pin(DirMgr::bootstrap_events(self))
}
fn download_task_handle(&self) -> Option<TaskHandle> {
Some(self.task_handle.clone())
}
}
/// A directory manager to download, fetch, and cache a Tor directory.
@ -231,29 +242,13 @@ pub struct DirMgr<R: Runtime> {
/// A filter that gets applied to directory objects before we use them.
#[cfg(feature = "dirfilter")]
filter: crate::filter::FilterConfig,
}
/// RAII guard to reset an AtomicBool on drop.
struct BoolResetter<'a> {
/// The bool to reset.
inner: &'a AtomicBool,
/// What value to store.
reset_to: bool,
/// What atomic ordering to use.
ordering: Ordering,
}
/// A task schedule that can be used if we're bootstrapping. If this is
/// None, then there's currently a scheduled task in progress.
task_schedule: Mutex<Option<TaskSchedule<R>>>,
impl<'a> Drop for BoolResetter<'a> {
fn drop(&mut self) {
self.inner.store(self.reset_to, self.ordering);
}
}
impl<'a> BoolResetter<'a> {
/// Disarm the guard, consuming it to make it not reset any more.
fn disarm(self) {
std::mem::forget(self);
}
/// A task handle that we return to anybody who needs to manage our download process.
task_handle: TaskHandle,
}
/// The possible origins of a document.
@ -377,10 +372,19 @@ impl<R: Runtime> DirMgr<R> {
// Use a RAII guard to reset `bootstrap_started` to `false` if we return early without
// completing bootstrap.
let resetter = BoolResetter {
inner: &self.bootstrap_started,
reset_to: false,
ordering: Ordering::SeqCst,
let reset_bootstrap_started = scopeguard::guard(&self.bootstrap_started, |v| {
v.store(false, Ordering::SeqCst);
});
let schedule = {
let sched = self.task_schedule.lock().expect("poisoned lock").take();
match sched {
Some(sched) => sched,
None => {
debug!("Attempted to bootstrap twice; ignoring.");
return Ok(());
}
}
};
// Try to load from the cache.
@ -399,17 +403,30 @@ impl<R: Runtime> DirMgr<R> {
let dirmgr_weak = Arc::downgrade(self);
self.runtime
.spawn(async move {
// NOTE: This is a daemon task. It should eventually get
// treated as one.
// Use an RAII guard to make sure that when this task exits, the
// TaskSchedule object is put back.
//
// TODO(nick): Putting the schedule back isn't actually useful
// if the task exits _after_ we've bootstrapped for the first
// time, because of how bootstrap_started works.
let mut schedule = scopeguard::guard(schedule, |schedule| {
if let Some(dm) = Weak::upgrade(&dirmgr_weak) {
*dm.task_schedule.lock().expect("poisoned lock") = Some(schedule);
}
});
// Don't warn when these are Error::ManagerDropped: that
// means that the DirMgr has been shut down.
if let Err(e) = Self::reload_until_owner(&dirmgr_weak, &mut sender).await {
if let Err(e) =
Self::reload_until_owner(&dirmgr_weak, &mut schedule, &mut sender).await
{
match e {
Error::ManagerDropped => {}
_ => warn!("Unrecovered error while waiting for bootstrap: {}", e),
}
} else if let Err(e) = Self::download_forever(dirmgr_weak, sender).await {
} else if let Err(e) =
Self::download_forever(dirmgr_weak.clone(), &mut schedule, sender).await
{
match e {
Error::ManagerDropped => {}
_ => warn!("Unrecovered error while downloading: {}", e),
@ -422,8 +439,8 @@ impl<R: Runtime> DirMgr<R> {
match receiver.await {
Ok(()) => {
info!("We have enough information to build circuits.");
// Disarm the RAII guard, since we succeeded.
resetter.disarm();
// Disarm the RAII guard, since we succeeded. Now bootstrap_started will remain true.
let _ = ScopeGuard::into_inner(reset_bootstrap_started);
}
Err(_) => {
warn!("Bootstrapping task exited before finishing.");
@ -462,14 +479,13 @@ impl<R: Runtime> DirMgr<R> {
/// If we eventually become the owner, return Ok().
async fn reload_until_owner(
weak: &Weak<Self>,
schedule: &mut TaskSchedule<R>,
on_complete: &mut Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut logged = false;
let mut bootstrapped;
let runtime;
{
let dirmgr = upgrade_weak_ref(weak)?;
runtime = dirmgr.runtime.clone();
bootstrapped = dirmgr.netdir.get().is_some();
}
@ -504,7 +520,7 @@ impl<R: Runtime> DirMgr<R> {
} else {
std::time::Duration::new(5, 0)
};
runtime.sleep(pause).await;
schedule.sleep(pause).await;
// TODO: instead of loading the whole thing we should have a
// database entry that says when the last update was, or use
// our state functions.
@ -531,6 +547,7 @@ impl<R: Runtime> DirMgr<R> {
/// message using `on_complete`.
async fn download_forever(
weak: Weak<Self>,
schedule: &mut TaskSchedule<R>,
mut on_complete: Option<oneshot::Sender<()>>,
) -> Result<()> {
let mut state: Box<dyn DirState> = {
@ -548,11 +565,6 @@ impl<R: Runtime> DirMgr<R> {
))
};
let runtime = {
let dirmgr = upgrade_weak_ref(&weak)?;
dirmgr.runtime.clone()
};
loop {
let mut usable = false;
@ -567,7 +579,8 @@ impl<R: Runtime> DirMgr<R> {
'retry_attempt: for _ in retry_config.attempts() {
let outcome =
bootstrap::download(Weak::clone(&weak), &mut state, &mut on_complete).await;
bootstrap::download(Weak::clone(&weak), &mut state, schedule, &mut on_complete)
.await;
if let Err(err) = outcome {
if state.is_ready(Readiness::Usable) {
@ -592,7 +605,7 @@ impl<R: Runtime> DirMgr<R> {
"Unable to download a usable directory: {}. We will restart in {:?}.",
err, delay
);
runtime.sleep(delay).await;
schedule.sleep(delay).await;
state = state.reset();
} else {
info!("Directory is complete.");
@ -617,7 +630,7 @@ impl<R: Runtime> DirMgr<R> {
let reset_at = state.reset_time();
match reset_at {
Some(t) => runtime.sleep_until_wallclock(t).await,
Some(t) => schedule.sleep_until_wallclock(t).await,
None => return Ok(()),
}
state = state.reset();
@ -743,6 +756,10 @@ impl<R: Runtime> DirMgr<R> {
#[cfg(feature = "dirfilter")]
let filter = config.extensions.filter.clone();
// We create these early so the client code can access task_handle before bootstrap() returns.
let (task_schedule, task_handle) = TaskSchedule::new(runtime.clone());
let task_schedule = Mutex::new(Some(task_schedule));
Ok(DirMgr {
config: config.into(),
store,
@ -756,6 +773,8 @@ impl<R: Runtime> DirMgr<R> {
bootstrap_started: AtomicBool::new(false),
#[cfg(feature = "dirfilter")]
filter,
task_schedule,
task_handle,
})
}
@ -1365,31 +1384,4 @@ replacement line
assert!(expanded.is_err());
});
}
#[test]
#[allow(clippy::bool_assert_comparison)]
fn bool_resetter_works() {
let bool = AtomicBool::new(false);
fn example(bool: &AtomicBool) {
bool.store(true, Ordering::SeqCst);
let _resetter = BoolResetter {
inner: bool,
reset_to: false,
ordering: Ordering::SeqCst,
};
}
fn example_disarm(bool: &AtomicBool) {
bool.store(true, Ordering::SeqCst);
let resetter = BoolResetter {
inner: bool,
reset_to: false,
ordering: Ordering::SeqCst,
};
resetter.disarm();
}
example(&bool);
assert_eq!(bool.load(Ordering::SeqCst), false);
example_disarm(&bool);
assert_eq!(bool.load(Ordering::SeqCst), true);
}
}

View File

@ -79,12 +79,12 @@ impl<T> SharedMutArc<T> {
F: FnOnce(&mut T) -> Result<U>,
T: Clone,
{
match self
let mut writeable = self
.dir
.write()
.expect("Poisoned lock for directory reference")
.as_mut()
{
.expect("Poisoned lock for directory reference");
let dir = writeable.as_mut();
match dir {
None => Err(Error::DirectoryNotPresent), // Kinda bogus.
Some(arc) => func(Arc::make_mut(arc)),
}

View File

@ -281,6 +281,9 @@ impl Store for SqliteStore {
}
fn expire_all(&mut self, expiration: &ExpirationConfig) -> Result<()> {
let tx = self.conn.transaction()?;
// This works around a false positive; see
// https://github.com/rust-lang/rust-clippy/issues/8114
#[allow(clippy::let_and_return)]
let expired_blobs: Vec<String> = {
let mut stmt = tx.prepare(FIND_EXPIRED_EXTDOCS)?;
let names = stmt

View File

@ -441,7 +441,8 @@ pub enum ErrorKind {
/// An resolve operation finished with an error.
///
/// Contrary to [`RemoteHostNotFound`], this can't mean "this is not a hostname".
/// Contrary to [`RemoteHostNotFound`](ErrorKind::RemoteHostNotFound),
/// this can't mean "this is not a hostname".
/// This error should be retried.
#[display(fmt = "remote hostname lookup failure")]
RemoteHostResolutionFailed,

View File

@ -1002,10 +1002,7 @@ mod test {
#[test]
fn netdir_integration() {
use tor_netdir::testnet;
let netdir = testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap();
let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
let params = GuardParams::default();
let now = SystemTime::now();
@ -1039,10 +1036,7 @@ mod test {
#[test]
fn update_from_netdir() {
use tor_netdir::testnet;
let netdir = testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap();
let netdir = testnet::construct_netdir().unwrap_if_sufficient().unwrap();
// Same as above but omit [22]
let netdir2 = testnet::construct_custom_netdir(|idx, mut node| {
if idx == 22 {

View File

@ -591,8 +591,8 @@ impl<R: Runtime> GuardMgr<R> {
) {
let now = self.runtime.now();
let mut inner = self.inner.lock().expect("Poisoned lock");
for id in inner.lookup_ids(ed_identity, rsa_identity) {
let ids = inner.lookup_ids(ed_identity, rsa_identity);
for id in ids {
match &id.0 {
FirstHopIdInner::Guard(id) => {
inner.guards.active_guards_mut().record_failure(

View File

@ -850,10 +850,7 @@ mod test {
fn netdir() -> NetDir {
use tor_netdir::testnet;
testnet::construct_netdir()
.unwrap()
.unwrap_if_sufficient()
.unwrap()
testnet::construct_netdir().unwrap_if_sufficient().unwrap()
}
#[test]

View File

@ -1302,7 +1302,7 @@ mod test {
// This is mostly a copy of test_pick, except that it uses
// pick_n_relays to pick several relays at once.
let dir = construct_netdir().unwrap().unwrap_if_sufficient().unwrap();
let dir = construct_netdir().unwrap_if_sufficient().unwrap();
let (mut rng, total, tolerance) = testing_rng_with_tolerances();
@ -1489,7 +1489,7 @@ mod test {
#[cfg(feature = "experimental-api")]
#[test]
fn test_accessors() {
let netdir = construct_netdir().unwrap().unwrap_if_sufficient().unwrap();
let netdir = construct_netdir().unwrap_if_sufficient().unwrap();
let r4 = netdir.by_id(&[4; 32].into()).unwrap();
let r16 = netdir.by_id(&[16; 32].into()).unwrap();
@ -1572,7 +1572,7 @@ mod test {
#[test]
fn weight_accessors() {
// Make a netdir that omits the microdescriptor for 0xDDDDDD...
let netdir = construct_netdir().unwrap().unwrap_if_sufficient().unwrap();
let netdir = construct_netdir().unwrap_if_sufficient().unwrap();
let g_total = netdir.total_weight(WeightRole::Guard, |r| r.is_flagged_guard());
// This is just the total guard weight, since all our Wxy = 1.

View File

@ -48,8 +48,8 @@ pub struct NodeBuilders {
fn simple_net_func(_idx: usize, _nb: &mut NodeBuilders) {}
/// As [`construct_network()`], but return a [`PartialNetDir`].
pub fn construct_netdir() -> BuildResult<PartialNetDir> {
construct_custom_netdir(simple_net_func)
pub fn construct_netdir() -> PartialNetDir {
construct_custom_netdir(simple_net_func).expect("failed to build default testing netdir")
}
/// As [`construct_custom_network()`], but return a [`PartialNetDir`].

View File

@ -24,6 +24,7 @@
/// assert_eq!(Location::from_str("start"), Location::START);
/// assert_eq!(Location::from_str("stfff"), Location::UNRECOGNIZED);
/// ```
#[allow(unknown_lints)] // We can remove once MSRV >= 1.63
#[allow(unused_macro_rules)]
macro_rules! decl_keyword {
{ $(#[$meta:meta])* $v:vis

View File

@ -96,7 +96,8 @@ impl StateMgr for TestingStateMgr {
{
let inner = self.inner.lock().expect("Lock poisoned.");
let storage = inner.storage.lock().expect("Lock poisoned.");
match storage.entries.get(key) {
let content = storage.entries.get(key);
match content {
Some(value) => Ok(Some(serde_json::from_str(value).map_err(load_error)?)),
None => Ok(None),
}

View File

@ -433,6 +433,7 @@ mod test {
}
#[test]
#[allow(clippy::print_stderr)]
fn timeout_distribution() {
// Test that the distribution of padding intervals is as we expect. This is not so
// straightforward. We need to deal with true randomness (since we can't plumb a

View File

@ -7,7 +7,7 @@ use futures::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime};
use pin_project::pin_project;
@ -20,6 +20,12 @@ enum SchedulerCommand {
FireAt(Instant),
/// Cancel a pending execution, if there is one.
Cancel,
/// Pause execution without cancelling any running timers. (Those timers
/// will fire after we resume execution.)
Suspend,
/// Resume execution. If there is a pending timer, start waiting for it again;
/// otherwise, fire immediately.
Resume,
}
/// A remotely-controllable trigger for recurring tasks.
@ -39,6 +45,9 @@ pub struct TaskSchedule<R: SleepProvider> {
/// This is used to avoid having to create a `SleepFuture` with zero duration,
/// which is potentially a bit wasteful.
instant_fire: bool,
/// Whether we are currently "suspended". If we are suspended, we won't
/// start executing again till we're explicitly "resumed".
suspended: bool,
}
/// A handle used to control a [`TaskSchedule`].
@ -59,6 +68,7 @@ impl<R: SleepProvider> TaskSchedule<R> {
rt,
// Start off ready.
instant_fire: true,
suspended: false,
},
TaskHandle { tx },
)
@ -75,6 +85,39 @@ impl<R: SleepProvider> TaskSchedule<R> {
self.instant_fire = true;
self.sleep = None;
}
/// Wait until `Dur` has elapsed.
///
/// This call is equivalent to [`SleepProvider::sleep`], except that the
/// resulting future will respect calls to the functions on this schedule's
/// associated [`TaskHandle`].
///
/// Alternatively, you can view this function as equivalent to
/// `self.fire_in(dur); self.next().await;`, only with the intent made more
/// explicit.
///
/// If the associated [`TaskHandle`] for this schedule is suspended, then
/// this method will not return until the schedule is unsuspended _and_ the
/// timer elapses. If the associated [`TaskHandle`] is cancelled, then this
/// method will not return at all, until the schedule is re-activated by
/// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
pub async fn sleep(&mut self, dur: Duration) {
self.fire_in(dur);
self.next().await;
}
/// As
/// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
/// but respect messages from this schedule's associated [`TaskHandle`].
pub async fn sleep_until_wallclock(&mut self, when: SystemTime) {
loop {
let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
self.sleep(delay).await;
if finished {
return;
}
}
}
}
impl TaskHandle {
@ -98,6 +141,31 @@ impl TaskHandle {
pub fn cancel(&self) -> bool {
self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok()
}
/// Suspend execution of the corresponding schedule.
///
/// If the schedule is ready now, it will become pending; it won't become
/// ready again until `resume()` is called. If the schedule is waiting for a
/// timer, the timer will keep counting, but the schedule won't become ready
/// until the timer has elapsed _and_ `resume()` has been called.
///
/// Returns `true` if the schedule still exists, and `false` otherwise.
pub fn suspend(&self) -> bool {
self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok()
}
/// Resume execution of the corresponding schedule.
///
/// This method undoes the effect of a call to `suspend()`: the schedule
/// will fire again if it is ready (or when it becomes ready).
///
/// This method won't cause the schedule to fire if it was already
/// cancelled. For that, use the `fire()` or fire_at()` methods.
///
/// Returns `true` if the schedule still exists, and `false` otherwise.
pub fn resume(&self) -> bool {
self.tx.unbounded_send(SchedulerCommand::Resume).is_ok()
}
}
// NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want
@ -120,6 +188,12 @@ impl<R: SleepProvider> TaskScheduleP<'_, R> {
*self.instant_fire = false;
*self.sleep = None;
}
SchedulerCommand::Suspend => {
*self.suspended = true;
}
SchedulerCommand::Resume => {
*self.suspended = false;
}
}
}
}
@ -138,6 +212,9 @@ impl<R: SleepProvider> Stream for TaskSchedule<R> {
}
}
}
if *this.suspended {
return Poll::Pending;
}
if *this.instant_fire {
*this.instant_fire = false;
return Poll::Ready(Some(()));
@ -283,4 +360,43 @@ mod test {
assert!(sch.next().now_or_never().is_none());
});
}
#[test]
fn suspend_and_resume_with_fire() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt.clone());
hdl.fire();
hdl.suspend();
assert!(sch.next().now_or_never().is_none());
hdl.resume();
assert!(sch.next().now_or_never().is_some());
});
}
#[test]
fn suspend_and_resume_with_sleep() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt.clone());
sch.fire_in(Duration::from_micros(100));
hdl.suspend();
assert!(sch.next().now_or_never().is_none());
hdl.resume();
assert!(sch.next().now_or_never().is_none());
assert!(sch.next().await.is_some());
});
}
#[test]
fn suspend_and_resume_with_nothing() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt.clone());
assert!(sch.next().now_or_never().is_some());
hdl.suspend();
assert!(sch.next().now_or_never().is_none());
hdl.resume();
});
}
}

View File

@ -180,7 +180,7 @@ const MAX_SLEEP: Duration = Duration::from_secs(600);
/// expect this to be the final delay.
///
/// (This is a separate function for testing.)
fn calc_next_delay(now: SystemTime, when: SystemTime) -> (bool, Duration) {
pub(crate) fn calc_next_delay(now: SystemTime, when: SystemTime) -> (bool, Duration) {
let remainder = when
.duration_since(now)
.unwrap_or_else(|_| Duration::from_secs(0));