diff --git a/Cargo.lock b/Cargo.lock index e18392f40..54a6d470b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3672,6 +3672,7 @@ dependencies = [ name = "tor-dirmgr" version = "0.8.0" dependencies = [ + "anyhow", "async-trait", "base64", "derive_builder_fork_arti", @@ -3701,6 +3702,7 @@ dependencies = [ "tempfile", "thiserror", "time", + "tokio", "tor-basic-utils", "tor-checkable", "tor-circmgr", @@ -3709,6 +3711,7 @@ dependencies = [ "tor-dirclient", "tor-error", "tor-guardmgr", + "tor-linkspec", "tor-llcrypto", "tor-netdir", "tor-netdoc", diff --git a/crates/tor-dirmgr/Cargo.toml b/crates/tor-dirmgr/Cargo.toml index b52a6c88a..86eda36e9 100644 --- a/crates/tor-dirmgr/Cargo.toml +++ b/crates/tor-dirmgr/Cargo.toml @@ -72,9 +72,12 @@ tor-rtcompat = { path = "../tor-rtcompat", version = "0.7.0" } tracing = "0.1.18" [dev-dependencies] +anyhow = "1.0.23" float_eq = "1.0.0" hex-literal = "0.3" tempfile = "3" +tokio = { version = "1.7", features = ["full"] } +tor-linkspec = { path = "../tor-linkspec", version = "0.5.1" } tor-rtcompat = { path = "../tor-rtcompat", version = "0.7.0", features = ["tokio", "native-tls"] } tor-rtmock = { path = "../tor-rtmock", version = "0.6.0" } [package.metadata.docs.rs] diff --git a/crates/tor-dirmgr/src/bridgedesc.rs b/crates/tor-dirmgr/src/bridgedesc.rs index 0a842d0e1..fc91c28ba 100644 --- a/crates/tor-dirmgr/src/bridgedesc.rs +++ b/crates/tor-dirmgr/src/bridgedesc.rs @@ -32,6 +32,9 @@ use tor_rtcompat::Runtime; use crate::event::FlagPublisher; +#[cfg(test)] +mod bdtest; + /// The key we use in all our data structures /// /// This type saves typing and would make it easier to change the bridge descriptor manager @@ -1055,6 +1058,11 @@ pub enum Error { /// There was a programming error somewhere in our code, or the calling code. #[error("Programming error")] Bug(#[from] tor_error::Bug), + + /// Error used for testing + #[cfg(test)] + #[error("Error for testing, {0:?}, retry at {1:?}")] + TestError(&'static str, RetryTime), } impl HasKind for Error { @@ -1073,6 +1081,8 @@ impl HasKind for Error { E::ExtremeValidityTime => bridge_protocol_violation, E::BadValidityTime(..) => EK::ClockSkew, E::Bug(e) => e.kind(), + #[cfg(test)] + E::TestError(..) => EK::Internal, } } } @@ -1099,6 +1109,9 @@ impl HasRetryTime for Error { // Probably, things are broken here, rather than remotely. E::Bug(..) => R::Never, + + #[cfg(test)] + E::TestError(_, retry) => *retry, } } } diff --git a/crates/tor-dirmgr/src/bridgedesc/bdtest.rs b/crates/tor-dirmgr/src/bridgedesc/bdtest.rs new file mode 100644 index 000000000..0eafbe365 --- /dev/null +++ b/crates/tor-dirmgr/src/bridgedesc/bdtest.rs @@ -0,0 +1,254 @@ +//! Tests for bridge descriptor downloading + +// @@ begin test lint list maintained by maint/add_warning @@ +#![allow(clippy::bool_assert_comparison)] +#![allow(clippy::clone_on_copy)] +#![allow(clippy::dbg_macro)] +#![allow(clippy::print_stderr)] +#![allow(clippy::print_stdout)] +#![allow(clippy::single_char_pattern)] +#![allow(clippy::unwrap_used)] +//! + +#![allow(unused_variables)] // XXX +#![allow(dead_code)] // XXX + +use std::future::Future; +use std::iter; +use std::ops::Bound; + +use futures::select_biased; +use futures::stream::FusedStream; +use futures::Stream; +use itertools::{chain, Itertools}; + +use tor_linkspec::HasAddrs; +use tor_rtcompat::SleepProvider; +use tor_rtmock::time::MockSleepProvider; +use tor_rtmock::MockSleepRuntime; + +use super::*; + +const EXAMPLE_DESCRIPTOR: &str = include_str!("../../testdata/routerdesc1.txt"); +const EXAMPLE_PORT: u16 = 9001; + +fn example_validity() -> (SystemTime, SystemTime) { + let (_, (t, u)) = RouterDesc::parse(EXAMPLE_DESCRIPTOR) + .unwrap() + .dangerously_assume_wellsigned() + .dangerously_into_parts(); + let ret = |tb| match tb { + Bound::Included(t) | Bound::Excluded(t) => t, + _ => panic!(), + }; + (ret(t), ret(u)) +} +fn example_wallclock() -> SystemTime { + example_validity().0 + Duration::from_secs(10) +} + +type RealRuntime = tor_rtcompat::tokio::TokioNativeTlsRuntime; +type R = MockSleepRuntime; +type M = Mock; +type Bdm = BridgeDescManager; +type RT = RetryTime; +use Error::TestError as TE; + +#[derive(Debug, Clone)] +struct Mock { + sleep: MockSleepProvider, + + // Using an async mutex lets us block a call to `download` + // so we can see what the state is mid-download. + mstate: Arc>, +} + +struct MockState { + docs: HashMap>, + + download_calls: usize, +} + +impl Mockable for Mock {} + +#[async_trait] +impl mockable::MockableAPI for Mock { + type CircMgr = (); + + async fn download( + self, + _runtime: &R, + _circmgr: &Self::CircMgr, + bridge: &BridgeConfig, + ) -> Result { + eprint!("download ..."); + let mut mstate = self.mstate.lock().await; + mstate.download_calls += 1; + eprintln!("#{} {:?}", mstate.download_calls, bridge); + let addr = bridge + .addrs() + .get(0) + .ok_or(TE("bridge has no error", RT::Never))?; + let doc = mstate + .docs + .get(&addr.port()) + .ok_or(TE("no document", RT::AfterWaiting))?; + doc.clone() + } +} + +impl Mock { + async fn expect_download_calls(&self, expected: usize) { + let mut mstate = self.mstate.lock().await; + assert_eq!(mstate.download_calls, expected); + mstate.download_calls = 0; + } +} + +fn setup() -> (Bdm, R, M, BridgeKey) { + let runtime = RealRuntime::current().unwrap(); + let runtime = MockSleepRuntime::new(runtime); + let sleep = runtime.mock_sleep().clone(); + + sleep.jump_to(example_wallclock()); + + let mut docs = HashMap::new(); + docs.insert(EXAMPLE_PORT, Ok(EXAMPLE_DESCRIPTOR.into())); + + let mstate = Arc::new(futures::lock::Mutex::new(MockState { + docs, + download_calls: 0, + })); + + let mock = Mock { sleep, mstate }; + + let bdm = BridgeDescManager::::with_mockable( + runtime.clone(), + (), + Default::default(), + mock.clone(), + ) + .unwrap(); + + let bridge = "51.68.172.83:9001 EB6EFB27F29AC9511A4246D7ABE1AFABFB416FF1" + .parse() + .unwrap(); + let bridge = Arc::new(bridge); + + (bdm, runtime, mock, bridge) +} + +async fn stream_drain_ready(s: &mut S) { + while select_biased! { + _ = s.next() => true, + () = future::ready(()) => false, + } { + tor_rtcompat::task::yield_now().await; + } +} + +async fn stream_drain_until(attempts: usize, s: &mut S, mut f: F) -> Y +where + S: Stream + Unpin + FusedStream, + S::Item: Debug, + F: FnMut() -> FF, + FF: Future>, +{ + for _ in 0..attempts { + let event = s.next().await; + eprintln!("stream_drain_until, got {:?}", event); + + if let Some(y) = f().await { + return y; + } + } + panic!("untilness didn't occur"); +} + +#[tokio::test] +async fn success() -> Result<(), anyhow::Error> { + let (bdm, runtime, mock, bridge) = setup(); + + bdm.check_consistency(Some([])); + + let mut events = bdm.events().fuse(); + + eprintln!("----- test downloading one descriptor -----"); + + stream_drain_ready(&mut events).await; + + let hold = mock.mstate.lock().await; + + bdm.set_bridges(&[bridge.clone()]); + bdm.check_consistency(Some([&bridge])); + + drop(hold); + + let got = stream_drain_until(3, &mut events, || async { + bdm.bridges().get(&bridge).cloned() + }) + .await; + + dbg!(runtime.wallclock(), example_validity(),); + + eprintln!("got: {:?}", got.unwrap()); + + bdm.check_consistency(Some([&bridge])); + mock.expect_download_calls(1).await; + + eprintln!("----- add a number of failing descriptors -----"); + + const NFAIL: usize = 6; + + let bad = (1..=NFAIL) + .map(|i| { + let bad = format!("192.126.0.1:{} EB6EFB27F29AC9511A4246D7ABE1AFABFB416FF1", i); + let bad: BridgeConfig = bad.parse().unwrap(); + Arc::new(bad) + }) + .collect_vec(); + + let bridges = chain!(iter::once(bridge.clone()), bad.iter().cloned(),).collect_vec(); + + let hold = mock.mstate.lock().await; + + bdm.set_bridges(&bridges); + bdm.check_consistency(Some(&bridges)); + + drop(hold); + + let () = stream_drain_until(13, &mut events, || async { + bdm.check_consistency(Some(&bridges)); + bridges + .iter() + .all(|b| bdm.bridges().contains_key(b)) + .then(|| ()) + }) + .await; + + for b in &bad { + bdm.bridges().get(b).unwrap().as_ref().unwrap_err(); + } + + bdm.check_consistency(Some(&bridges)); + mock.expect_download_calls(NFAIL).await; + + eprintln!("----- move the clock forward to do some retries ----------"); + + mock.sleep.advance(Duration::from_secs(50000)).await; + + bdm.check_consistency(Some(&bridges)); + + let () = stream_drain_until(13, &mut events, || async { + bdm.check_consistency(Some(&bridges)); + (mock.mstate.lock().await.download_calls == NFAIL).then(|| ()) + }) + .await; + + stream_drain_ready(&mut events).await; + + bdm.check_consistency(Some(&bridges)); + mock.expect_download_calls(NFAIL).await; + + Ok(()) +}