Merge remote-tracking branch 'origin/mr/110'

This commit is contained in:
Nick Mathewson 2021-10-28 11:40:18 -04:00
commit 6dde31f328
1 changed files with 237 additions and 140 deletions

View File

@ -426,7 +426,6 @@ mod test {
use super::*;
use crate::timeouts::TimeoutEstimator;
use futures::channel::oneshot;
use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
use std::sync::Mutex;
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_rtcompat::{test_with_all_runtimes, SleepProvider};
@ -549,14 +548,41 @@ mod test {
});
}
// Tells FakeCirc how much to delay, in milliseconds.
//
// (These are very foolish globals.)
static HOP1_DELAY: AtomicU64 = AtomicU64::new(100);
static HOP2_DELAY: AtomicU64 = AtomicU64::new(200);
static HOP3_DELAY: AtomicU64 = AtomicU64::new(300);
/// Get a pair of timeouts that we've encoded as an Ed25519 identity.
///
/// In our FakeCircuit code below, the first timeout is the amount of
/// time that we should sleep while building a hop to this key,
/// and the second timeout is the length of time-advance we should allow
/// after the hop is built.
///
/// (This is pretty silly, but it's good enough for testing.)
fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
let mut be = [0; 8];
be[..].copy_from_slice(&id.as_bytes()[0..8]);
let dur = u64::from_be_bytes(be);
be[..].copy_from_slice(&id.as_bytes()[8..16]);
let dur2 = u64::from_be_bytes(be);
(Duration::from_millis(dur), Duration::from_millis(dur2))
}
/// Encode a pair of timeouts as an Ed25519 identity.
///
/// In our FakeCircuit code below, the first timeout is the amount of
/// time that we should sleep while building a hop to this key,
/// and the second timeout is the length of time-advance we should allow
/// after the hop is built.
///
/// (This is pretty silly but it's good enough for testing.)
fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
let mut bytes = [0; 32];
let dur = (d1.as_millis() as u64).to_be_bytes();
bytes[0..8].copy_from_slice(&dur);
let dur = (d2.as_millis() as u64).to_be_bytes();
bytes[8..16].copy_from_slice(&dur);
bytes.into()
}
/// Replacement type for circuit, to implement buildable.
#[derive(Clone)]
struct FakeCirc {
hops: Vec<Ed25519Identity>,
onehop: bool,
@ -570,8 +596,13 @@ mod test {
ct: &OwnedChanTarget,
_: &CircParameters,
) -> Result<Self> {
rt.sleep(Duration::from_millis(HOP1_DELAY.load(SeqCst)))
.await;
let ed_id = ct.ed_identity();
let (d1, d2) = timeouts_from_key(ed_id);
rt.sleep(d1).await;
if !d2.is_zero() {
rt.allow_one_advance(d2);
}
let c = FakeCirc {
hops: vec![*ct.ed_identity()],
onehop: true,
@ -585,8 +616,13 @@ mod test {
ct: &OwnedCircTarget,
_: &CircParameters,
) -> Result<Self> {
rt.sleep(Duration::from_millis(HOP1_DELAY.load(SeqCst)))
.await;
let ed_id = ct.ed_identity();
let (d1, d2) = timeouts_from_key(ed_id);
rt.sleep(d1).await;
if !d2.is_zero() {
rt.allow_one_advance(d2);
}
let c = FakeCirc {
hops: vec![*ct.ed_identity()],
onehop: false,
@ -600,44 +636,75 @@ mod test {
ct: &OwnedCircTarget,
_: &CircParameters,
) -> Result<()> {
let d = {
let c = self.lock().unwrap();
assert!(!c.onehop);
match c.hops.len() {
1 => HOP2_DELAY.load(SeqCst),
2 => HOP3_DELAY.load(SeqCst),
_ => 0,
}
};
rt.sleep(Duration::from_millis(d)).await;
let ed_id = ct.ed_identity();
let (d1, d2) = timeouts_from_key(ed_id);
rt.sleep(d1).await;
if !d2.is_zero() {
rt.allow_one_advance(d2);
}
{
let mut c = self.lock().unwrap();
c.hops.push(*ct.ed_identity());
c.hops.push(*ed_id);
}
Ok(())
}
}
/// Fake implementation of TimeoutEstimator that just records its inputs.
struct TimeoutRecorder {
struct TimeoutRecorder<R> {
runtime: R,
hist: Vec<(bool, u8, Duration)>,
// How much advance to permit after being told of a timeout?
on_timeout: Duration,
// How much advance to permit after being told of a success?
on_success: Duration,
snd_success: Option<oneshot::Sender<()>>,
rcv_success: Option<oneshot::Receiver<()>>,
}
impl TimeoutRecorder {
fn new() -> Self {
Self { hist: Vec::new() }
impl<R> TimeoutRecorder<R> {
fn new(runtime: R) -> Self {
Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
}
fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
let (snd_success, rcv_success) = oneshot::channel();
Self {
runtime,
hist: Vec::new(),
on_timeout,
on_success,
rcv_success: Some(rcv_success),
snd_success: Some(snd_success),
}
}
}
impl TimeoutEstimator for Arc<Mutex<TimeoutRecorder>> {
impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
if !is_last {
return;
}
let mut this = self.lock().unwrap();
this.hist.push((true, hop, delay));
let (rt, advance) = {
let mut this = self.lock().unwrap();
this.hist.push((true, hop, delay));
let _ = this.snd_success.take().unwrap().send(());
(this.runtime.clone(), this.on_success)
};
if !advance.is_zero() {
rt.allow_one_advance(advance);
}
}
fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
let mut this = self.lock().unwrap();
this.hist.push((false, hop, delay));
let (rt, advance) = {
let mut this = self.lock().unwrap();
this.hist.push((false, hop, delay));
(this.runtime.clone(), this.on_timeout)
};
if !advance.is_zero() {
rt.allow_one_advance(advance);
}
}
fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
(Duration::from_secs(3), Duration::from_secs(100))
@ -661,121 +728,151 @@ mod test {
OwnedChanTarget::new(vec![], id, [0x20; 20].into())
}
/// Try successful and failing building cases
// TODO: re-enable this test after arti#149 is fixed. For now, it
// is not reliable enough.
async fn run_builder_test<R: Runtime>(
rt: tor_rtmock::MockSleepRuntime<R>,
advance_initial: Duration,
path: OwnedPath,
advance_on_timeout: Option<(Duration, Duration)>,
) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
let chanmgr = Arc::new(ChanMgr::new(rt.clone()));
// always has 3 second timeout, 100 second abandon.
let timeouts = match advance_on_timeout {
Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
None => TimeoutRecorder::new(rt.clone()),
};
let timeouts = Arc::new(Mutex::new(timeouts));
let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
rt.clone(),
chanmgr,
timeouts::Estimator::new(Arc::clone(&timeouts)),
);
let rng = StdRng::from_rng(rand::thread_rng()).expect("couldn't construct temporary rng");
let params = CircParameters::default();
rt.block_advance("manually controlling advances");
rt.allow_one_advance(advance_initial);
let outcome = rt
.wait_for(Arc::new(builder).build_owned(path, &params, rng, gs()))
.await;
// Now we wait for a success to finally, finally be reported.
if advance_on_timeout.is_some() {
let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
let _ = rt.wait_for(receiver).await;
}
let circ = outcome.map(|m| m.lock().unwrap().clone());
let timeouts = timeouts.lock().unwrap().hist.clone();
(circ, timeouts)
}
#[test]
#[ignore]
fn test_builder() {
fn build_onehop() {
test_with_all_runtimes!(|rt| async move {
HOP3_DELAY.store(300, SeqCst); // undo previous run.
let rt = tor_rtmock::MockSleepRuntime::new(rt);
let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
let p1 = OwnedPath::ChannelOnly(chan_t([0x11; 32].into()));
let p2 = OwnedPath::Normal(vec![
circ_t([0x11; 32].into()),
circ_t([0x22; 32].into()),
circ_t([0x33; 32].into()),
]);
let chanmgr = Arc::new(ChanMgr::new(rt.clone()));
let timeouts = Arc::new(Mutex::new(TimeoutRecorder::new()));
let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
rt.clone(),
chanmgr,
timeouts::Estimator::new(Arc::clone(&timeouts)),
);
let builder = Arc::new(builder);
let rng =
StdRng::from_rng(rand::thread_rng()).expect("couldn't construct temporary rng");
let params = CircParameters::default();
let outcome = rt
.wait_for(builder.build_owned(p1, &params, rng, gs()))
.await;
let circ = outcome.unwrap().into_inner().unwrap();
let (outcome, timeouts) =
run_builder_test(rt, Duration::from_millis(100), path, None).await;
let circ = outcome.unwrap();
assert!(circ.onehop);
assert_eq!(circ.hops, [[0x11; 32].into()]);
assert_eq!(circ.hops, [id_100ms]);
let rng =
StdRng::from_rng(rand::thread_rng()).expect("couldn't construct temporary rng");
let outcome = rt
.wait_for(builder.build_owned(p2.clone(), &params, rng, gs()))
.await;
let circ = outcome.unwrap().into_inner().unwrap();
assert!(!circ.onehop);
assert_eq!(
circ.hops[..],
[[0x11; 32].into(), [0x22; 32].into(), [0x33; 32].into()]
);
{
let timeouts = timeouts.lock().unwrap();
assert_eq!(timeouts.hist.len(), 2);
assert!(timeouts.hist[0].0); // completed
assert_eq!(timeouts.hist[0].1, 0); // last hop completed
// TODO: test time elapsed, once wait_for is more reliable.
assert!(timeouts.hist[1].0); // completed
assert_eq!(timeouts.hist[1].1, 2); // last hop completed
// TODO: test time elapsed, once wait_for is more reliable.
}
// Try a very long timeout.
// (one hour is super long and won't get recorded as a
// circuit: only as a timeout).
HOP3_DELAY.store(3_600_000, SeqCst);
let rng =
StdRng::from_rng(rand::thread_rng()).expect("couldn't construct temporary rng");
let outcome = rt
.wait_for(builder.build_owned(p2.clone(), &params, rng, gs()))
.await;
assert!(outcome.is_err());
{
let timeouts = timeouts.lock().unwrap();
assert_eq!(timeouts.hist.len(), 3);
assert!(!timeouts.hist[2].0);
assert_eq!(timeouts.hist[2].1, 2);
}
// Now try a recordable timeout.
HOP3_DELAY.store(5_000, SeqCst); // five seconds is plausible.
let rng =
StdRng::from_rng(rand::thread_rng()).expect("couldn't construct temporary rng");
let outcome = rt
.wait_for(builder.build_owned(p2.clone(), &params, rng, gs()))
.await;
assert!(outcome.is_err());
// "wait" a while longer to make sure that we eventually
// notice the circuit completing.
for _ in 0..1000_u16 {
rt.advance(Duration::from_millis(100)).await;
}
{
let timeouts = timeouts.lock().unwrap();
dbg!(&timeouts.hist);
assert!(timeouts.hist.len() >= 4);
// First we notice a circuit timeout after 2 hops
assert!(!timeouts.hist[3].0);
assert_eq!(timeouts.hist[3].1, 2);
// TODO: check timeout more closely.
assert!(timeouts.hist[3].2 < Duration::from_secs(100));
assert!(timeouts.hist[3].2 >= Duration::from_secs(3));
// This test is not reliable under test coverage; see arti#149.
#[cfg(not(tarpaulin))]
{
assert_eq!(timeouts.hist.len(), 5);
// Then we notice a circuit completing at its third hop.
assert!(timeouts.hist[4].0);
assert_eq!(timeouts.hist[4].1, 2);
// TODO: check timeout more closely.
assert!(timeouts.hist[4].2 < Duration::from_secs(100));
assert!(timeouts.hist[4].2 >= Duration::from_secs(5));
assert!(timeouts.hist[3].2 < timeouts.hist[4].2);
}
}
HOP3_DELAY.store(300, SeqCst); // undo previous run.
assert_eq!(timeouts.len(), 1);
assert_eq!(timeouts[0].0, true); // success
assert_eq!(timeouts[0].1, 0); // one-hop
assert_eq!(timeouts[0].2, Duration::from_millis(100));
})
}
#[test]
fn build_threehop() {
test_with_all_runtimes!(|rt| async move {
let rt = tor_rtmock::MockSleepRuntime::new(rt);
let id_100ms =
key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
let id_200ms =
key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
let path =
OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
let (outcome, timeouts) =
run_builder_test(rt, Duration::from_millis(100), path, None).await;
let circ = outcome.unwrap();
assert!(!circ.onehop);
assert_eq!(circ.hops, [id_100ms, id_200ms, id_300ms]);
assert_eq!(timeouts.len(), 1);
assert_eq!(timeouts[0].0, true); // success
assert_eq!(timeouts[0].1, 2); // three-hop
assert_eq!(timeouts[0].2, Duration::from_millis(600));
})
}
#[test]
fn build_huge_timeout() {
test_with_all_runtimes!(|rt| async move {
let rt = tor_rtmock::MockSleepRuntime::new(rt);
let id_100ms =
key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
let id_200ms =
key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
let (outcome, timeouts) =
run_builder_test(rt, Duration::from_millis(100), path, None).await;
assert!(matches!(outcome, Err(Error::CircTimeout)));
assert_eq!(timeouts.len(), 1);
assert_eq!(timeouts[0].0, false); // timeout
// BUG: Sometimes this is 1 and sometimes this is 2.
// assert_eq!(timeouts[0].1, 2); // at third hop.
assert_eq!(timeouts[0].2, Duration::from_millis(3000));
});
}
#[test]
fn build_modest_timeout() {
test_with_all_runtimes!(|rt| async move {
let rt = tor_rtmock::MockSleepRuntime::new(rt);
let id_100ms =
key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
let id_200ms =
key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
let (outcome, timeouts) = run_builder_test(
rt.clone(),
Duration::from_millis(100),
path,
Some(timeout_advance),
)
.await;
assert!(matches!(outcome, Err(Error::CircTimeout)));
dbg!(&timeouts);
assert_eq!(timeouts.len(), 2);
assert_eq!(timeouts[0].0, false); // timeout
// BUG: Sometimes this is 1 and sometimes this is 2.
//assert_eq!(timeouts[0].1, 2); // at third hop.
assert_eq!(timeouts[0].2, Duration::from_millis(3000));
assert_eq!(timeouts[1].0, true); // success
assert_eq!(timeouts[1].1, 2); // three-hop
// BUG: This timer is not always reliable, due to races.
//assert_eq!(timeouts[1].2, Duration::from_millis(3300));
});
}
}