diff --git a/Cargo.lock b/Cargo.lock index 9e7902500..52ae8eda6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,15 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" +[[package]] +name = "approx" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6" +dependencies = [ + "num-traits", +] + [[package]] name = "arrayref" version = "0.3.6" @@ -1801,6 +1810,15 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "matrixmultiply" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add85d4dd35074e6fedc608f8c8f513a3548619a9024b751949ef0e8e45a4d84" +dependencies = [ + "rawpointer", +] + [[package]] name = "memchr" version = "2.5.0" @@ -1898,6 +1916,35 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "nalgebra" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "462fffe4002f4f2e1f6a9dcf12cc1a6fc0e15989014efc02a941d3e0f5dc2120" +dependencies = [ + "approx", + "matrixmultiply", + "nalgebra-macros", + "num-complex", + "num-rational", + "num-traits", + "rand 0.8.5", + "rand_distr", + "simba", + "typenum", +] + +[[package]] +name = "nalgebra-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01fcc0b8149b4632adc89ac3b7b31a12fb6099a0317a4eb2ebff574ef7de7218" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "native-tls" version = "0.2.10" @@ -1995,6 +2042,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-complex" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fbc387afefefd5e9e39493299f3069e14a140dd34dc19b4c1c1a8fddb6a790" +dependencies = [ + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -2016,6 +2072,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-rational" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a" +dependencies = [ + "autocfg 1.1.0", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -2478,6 +2545,16 @@ dependencies = [ "getrandom 0.2.6", ] +[[package]] +name = "rand_distr" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "964d548f8e7d12e102ef183a0de7e98180c9f8729f555897a857b96e48122d2f" +dependencies = [ + "num-traits", + "rand 0.8.5", +] + [[package]] name = "rand_hc" version = "0.2.0" @@ -2487,6 +2564,12 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rawpointer" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" + [[package]] name = "redox_syscall" version = "0.2.13" @@ -2925,6 +3008,18 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f054c6c1a6e95179d6f23ed974060dcefb2d9388bb7256900badad682c499de4" +[[package]] +name = "simba" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e82063457853d00243beda9952e910b82593e4b07ae9f721b9278a99a0d3d5c" +dependencies = [ + "approx", + "num-complex", + "num-traits", + "paste", +] + [[package]] name = "simple_asn1" version = "0.6.1" @@ -2986,6 +3081,19 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "statrs" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05bdbb8e4e78216a85785a85d3ec3183144f98d0097b9281802c019bb07a6f05" +dependencies = [ + "approx", + "lazy_static", + "nalgebra", + "num-traits", + "rand 0.8.5", +] + [[package]] name = "strsim" version = "0.8.0" @@ -3723,8 +3831,11 @@ dependencies = [ "hex-literal", "hkdf", "hmac", + "itertools", + "pin-project", "rand 0.8.5", "rand_core 0.6.3", + "statrs", "subtle", "thiserror", "tokio", @@ -3739,6 +3850,7 @@ dependencies = [ "tor-llcrypto", "tor-protover", "tor-rtcompat", + "tor-rtmock", "tracing", "typenum", "zeroize", diff --git a/crates/tor-chanmgr/Cargo.toml b/crates/tor-chanmgr/Cargo.toml index ab946fb58..039c58bfe 100644 --- a/crates/tor-chanmgr/Cargo.toml +++ b/crates/tor-chanmgr/Cargo.toml @@ -35,3 +35,4 @@ futures-await-test = "0.3.0" hex-literal = "0.3" tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0", features = ["tokio", "native-tls"] } tor-rtmock = { path = "../tor-rtmock", version = "0.4.0" } + diff --git a/crates/tor-proto/Cargo.toml b/crates/tor-proto/Cargo.toml index a1fe54930..7cd50b40d 100644 --- a/crates/tor-proto/Cargo.toml +++ b/crates/tor-proto/Cargo.toml @@ -29,6 +29,7 @@ futures = "0.3.14" generic-array = "0.14.3" hkdf = "0.12.0" hmac = "0.12.0" +pin-project = "1" rand = "0.8" rand_core = "0.6.2" subtle = "2" @@ -45,6 +46,7 @@ tor-linkspec = { path = "../tor-linkspec", version = "0.3.0" } tor-llcrypto = { path = "../tor-llcrypto", version = "0.3.0" } tor-protover = { path = "../tor-protover", version = "0.3.0" } tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0" } +tor-rtmock = { path = "../tor-rtmock", version = "0.4.0" } tracing = "0.1.18" typenum = "1.12" zeroize = "1" @@ -52,4 +54,7 @@ zeroize = "1" [dev-dependencies] hex = "0.4" hex-literal = "0.3" +itertools = "0.10.1" +statrs = "0.15.0" +tokio-crate = { package = "tokio", version = "1.7", features = ["full"] } tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0", features = ["tokio", "native-tls"] } diff --git a/crates/tor-proto/src/channel.rs b/crates/tor-proto/src/channel.rs index f740088e9..7e8e16953 100644 --- a/crates/tor-proto/src/channel.rs +++ b/crates/tor-proto/src/channel.rs @@ -59,6 +59,7 @@ pub const CHANNEL_BUFFER_SIZE: usize = 128; mod circmap; mod codec; mod handshake; +mod padding; mod reactor; mod unique_id; @@ -278,6 +279,19 @@ impl Channel { details: Arc::clone(&details), }; + let mut padding_timer = Box::pin(padding::Timer::new_disabled( + sleep_prov, + padding::Parameters { + // From padding-spec.txt s2.2 + // TODO support reduced padding + low_ms: 1500, + high_ms: 9500, + }, + )); + if std::env::var("ARTI_EXPERIMENTAL_CHANNEL_PADDING").unwrap_or_default() != "" { + padding_timer.as_mut().enable(); + } + let reactor = Reactor { control: control_rx, cells: cell_rx, @@ -287,7 +301,7 @@ impl Channel { circ_unique_id_ctx: CircUniqIdContext::new(), link_protocol, details, - sleep_prov, + padding_timer, }; (channel, reactor) diff --git a/crates/tor-proto/src/channel/padding.rs b/crates/tor-proto/src/channel/padding.rs new file mode 100644 index 000000000..bccab467e --- /dev/null +++ b/crates/tor-proto/src/channel/padding.rs @@ -0,0 +1,555 @@ +//! Channel padding +//! +//! Tor spec `padding-spec.txt` section 2. + +use std::pin::Pin; +// TODO, coarsetime maybe? But see arti#496 and also we want to use the mockable SleepProvider +use std::time::{Duration, Instant}; + +use educe::Educe; +use futures::future::{self, FusedFuture}; +use futures::FutureExt; +use pin_project::pin_project; +use rand::distributions::Distribution; +use tracing::error; + +use tor_cell::chancell::msg::Padding; +use tor_rtcompat::SleepProvider; + +/// Timer that organises wakeups when channel padding should be sent +/// +/// Use [`next()`](Timer::next) to find when to send padding, and +/// [`note_cell_sent()`](Timer::note_cell_sent) to reset the timeout when data flows. +/// +/// A `Timer` can be in "disabled" state, in which case `next()` never completes. +/// +/// `Timer` must be pinned before use +/// (this allows us to avoid involving the allocator when we reschedule). +#[pin_project(project = PaddingTimerProj)] +pub(crate) struct Timer { + /// [`SleepProvider`] + sleep_prov: R, + + /// Parameters controlling distribution of padding time intervals + parameters: PreparedParameters, + + /// Gap that we intend to leave between last sent cell, and the padding + /// + /// We only resample this (calculating a new random delay) after the previous + /// timeout actually expired. + /// + /// `None` if the timer is disabled. + /// (This can be done explicitly, but also occurs on time calculation overflow.) + /// + /// Invariants: this field may be `Some` or `None` regardless of the values + /// of other fields. If this field is `None` then the values in `trigger_at` + /// and `waker` are unspecified. + selected_timeout: Option, + + /// Absolute time at which we should send padding + /// + /// `None` if cells more recently sent than we were polled. + /// That would mean that we are currently moving data out through this channel. + /// The absolute timeout will need to be recalculated when the data flow pauses. + /// + /// `Some` means our `next` has been demanded recently. + /// Then `trigger_at` records the absolute timeout at which we should send padding, + /// which was calculated the first time we were polled (after data). + /// + /// Invariants: the value in this field is meaningful only if `selected_timeout` + /// is `Some`. + /// + /// If `selected_timeout` is `Some`, and `trigger_at` is therefore valid, + /// it is (obviously) no later than `selected_timeout` from now. + /// + /// See also `waker`. + trigger_at: Option, + + /// Actual waker from the `SleepProvider` + /// + /// This is created and updated lazily, because we suspect that with some runtimes + /// setting timeouts may be slow. + /// Lazy updating means that with intermittent data traffic, we do not keep scheduling, + /// descheduling, and adjusting, a wakeup time. + /// + /// Invariants: + /// + /// If `selected_timeout` is `Some`, + /// the time at which this waker will trigger here is never *later* than `trigger_at`, + /// and never *later* than `selected_timeout` from now. + /// + /// The wakeup time here may well be earlier than `trigger_at`, + /// and sooner than `selected_timeout` from now. It may even be in the past. + /// When we wake up and discover this situation, we reschedule a new waker. + /// + /// If `selected_timeout` is `None`, the value is unspecified. + /// We may retain a `Some` in this case so that if `SleepProvider` is enhanced to + /// support rescheduling, we can do that without making a new `SleepFuture` + /// (and without completely reorganising this the `Timer` state structure.) + #[pin] + waker: Option, +} + +/// Timing parameters, as described in `padding-spec.txt` +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub(crate) struct Parameters { + /// Low end of the distribution of `X` + pub(crate) low_ms: u32, + /// High end of the distribution of `X` (inclusive) + pub(crate) high_ms: u32, +} + +/// Timing parameters, "compiled" into a form which can be sampled more efficiently +/// +/// According to the docs for [`rand::Rng::gen_range`], +/// it is better to construct a distribution, +/// than to call `gen_range` repeatedly on the same range. +#[derive(Debug, Clone)] +struct PreparedParameters { + /// The distribution of `X` (not of the ultimate delay, which is `max(X1,X2)`) + x_distribution_ms: rand::distributions::Uniform, +} + +/// Return value from `prepare_to_sleep`: instructions for what caller ought to do +#[derive(Educe)] +#[educe(Debug)] +enum SleepInstructions<'f, R: SleepProvider> { + /// Caller should send padding immediately + Immediate { + /// The current `Instant`, returned so that the caller need not call `now` again + now: Instant, + }, + /// Caller should wait forever + Forever, + /// Caller should `await` this + Waker(#[educe(Debug(ignore))] Pin<&'f mut R::SleepFuture>), +} + +impl Timer { + /// Create a new `Timer` + #[allow(dead_code)] + pub(crate) fn new(sleep_prov: R, parameters: Parameters) -> Self { + let mut self_ = Self::new_disabled(sleep_prov, parameters); + // We would like to call select_fresh_timeout but we don't have + // (and can't have) Pin<&mut self> + self_.selected_timeout = Some(self_.parameters.select_timeout()); + self_ + } + + /// Create a new `Timer` which starts out disabled + pub(crate) fn new_disabled(sleep_prov: R, parameters: Parameters) -> Self { + Timer { + sleep_prov, + parameters: parameters.prepare(), + selected_timeout: None, + trigger_at: None, + waker: None, + } + } + + /// Disable this `Timer` + /// + /// Idempotent. + pub(crate) fn disable(self: &mut Pin<&mut Self>) { + *self.as_mut().project().selected_timeout = None; + } + + /// Enable this `Timer` + /// + /// (If the timer was disabled, the timeout will only start to run when `next()` + /// is next polled.) + /// + /// Idempotent. + pub(crate) fn enable(self: &mut Pin<&mut Self>) { + if !self.is_enabled() { + self.as_mut().select_fresh_timeout(); + } + } + + /// Enquire whether this `Timer` is currently enabled + pub(crate) fn is_enabled(&self) -> bool { + self.selected_timeout.is_some() + } + + /// Select a fresh timeout (and enable) + fn select_fresh_timeout(self: Pin<&mut Self>) -> Duration { + let mut self_ = self.project(); + let timeout = self_.parameters.select_timeout(); + *self_.selected_timeout = Some(timeout); + // This is no longer valid; recalculate it on next poll + *self_.trigger_at = None; + // Timeout might be earlier, so we will need a new waker too. + // (Technically this is not possible in a bad way right now, since any stale waker + // must be older, and so earlier, albeit from a previous random timeout. + // However in the future we may want to be able to adjust the parameters at runtime + // and then a stale waker might be harmfully too late.) + self_.waker.set(None); + timeout + } + + /// Note that data has been sent (ie, reset the timeout, delaying the next padding) + pub(crate) fn note_cell_sent(self: &mut Pin<&mut Self>) { + // Fast path, does not need to do anything but clear the absolute expiry time + let self_ = self.as_mut().project(); + *self_.trigger_at = None; + } + + /// Calculate when to send padding, and return a suitable waker + /// + /// In the usual case returns [`SleepInstructions::Waker`]. + fn prepare_to_sleep(mut self: Pin<&mut Self>, now: Option) -> SleepInstructions { + let mut self_ = self.as_mut().project(); + + let timeout = match self_.selected_timeout { + None => return SleepInstructions::Forever, + Some(t) => *t, + }; + + if self_.waker.is_some() { + // We need to do this with is_some and expect because we need to consume self + // to get a return value with the right lifetimes. + let waker = self + .project() + .waker + .as_pin_mut() + .expect("None but we just checked"); + return SleepInstructions::Waker(waker); + } + + let now = now.unwrap_or_else(|| self_.sleep_prov.now()); + + let trigger_at = match self_.trigger_at { + Some(t) => t, + None => self_.trigger_at.insert(match now.checked_add(timeout) { + None => { + error!("timeout overflowed computing next channel padding"); + self.disable(); + return SleepInstructions::Forever; + } + Some(r) => r, + }), + }; + + let remaining = trigger_at.checked_duration_since(now).unwrap_or_default(); + if remaining.is_zero() { + return SleepInstructions::Immediate { now }; + } + + //dbg!(timeout, remaining, now, trigger_at); + + // There is no Option::get_pin_mut_or_set_with + if self_.waker.is_none() { + self_.waker.set(Some(self_.sleep_prov.sleep(remaining))); + } + let waker = self + .project() + .waker + .as_pin_mut() + .expect("None but we just inserted!"); + SleepInstructions::Waker(waker) + } + + /// Wait until we should next send padding, and then return the padding message + /// + /// Should be used as a low-priority branch within `select_biased!`. + /// + /// (`next()` has to be selected on, along with other possible events, in the + /// main loop, so that the padding timer runs concurrently with other processing; + /// and it should be in a low-priority branch of `select_biased!` as an optimisation: + /// that avoids calculating timeouts etc. until necessary, + /// i.e. it calculates them only when the main loop would otherwise block.) + /// + /// The returned future is async-cancel-safe, + /// but once it yields, the padding must actually be sent. + pub(crate) fn next(self: Pin<&mut Self>) -> impl FusedFuture + '_ { + self.next_inner().fuse() + } + + /// Wait until we should next send padding (not `FusedFuture`) + /// + /// Callers wants a [`FusedFuture`] because `select!` needs one. + async fn next_inner(mut self: Pin<&mut Self>) -> Padding { + let now = loop { + match self.as_mut().prepare_to_sleep(None) { + SleepInstructions::Forever => future::pending().await, + SleepInstructions::Immediate { now } => break now, + SleepInstructions::Waker(waker) => waker.await, + } + + // This timer has fired and has therefore been used up. + // When we go round again we will make a new one. + // + // TODO: have SleepProviders provide a reschedule function, and use it. + // That is likely to be faster where supported. + self.as_mut().project().waker.set(None); + }; + + // It's time to send padding. + + // Firstly, calculate the new timeout for the *next* padding, + // so that we leave the `Timer` properly programmed. + self.as_mut().select_fresh_timeout(); + + // Bet that we will be going to sleep again, and set up the new trigger time + // and waker now. This will save us a future call to Instant::now. + self.as_mut().prepare_to_sleep(Some(now)); + + Padding::new() + } +} + +impl Parameters { + /// "Compile" the parameters into a form which can be quickly sampled + fn prepare(self) -> PreparedParameters { + PreparedParameters { + x_distribution_ms: rand::distributions::Uniform::new_inclusive( + self.low_ms, + self.high_ms, + ), + } + } +} + +impl PreparedParameters { + /// Randomly select a timeout (as per `padding-spec.txt`) + fn select_timeout(&self) -> Duration { + let mut rng = rand::thread_rng(); + let ms = std::cmp::max( + self.x_distribution_ms.sample(&mut rng), + self.x_distribution_ms.sample(&mut rng), + ); + Duration::from_millis(ms.into()) + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +#[allow(clippy::bool_assert_comparison)] +mod test { + use super::*; + use futures::future::ready; + use futures::select_biased; + use itertools::{izip, Itertools}; + use statrs::distribution::ContinuousCDF; + use tokio::pin; + use tokio_crate as tokio; + use tor_rtcompat::*; + + async fn assert_not_ready(timer: &mut Pin<&mut Timer>) { + select_biased! { + _ = timer.as_mut().next() => panic!("unexpectedly ready"), + _ = ready(()) => { }, + }; + } + + async fn assert_is_ready(timer: &mut Pin<&mut Timer>) { + let _: Padding = select_biased! { + p = timer.as_mut().next() => p, + _ = ready(()) => panic!("pad timer failed to yield"), + }; + } + + #[test] + fn timer_impl() { + let runtime = tor_rtcompat::tokio::TokioNativeTlsRuntime::create().unwrap(); + let runtime = tor_rtmock::MockSleepRuntime::new(runtime); + + let parameters = Parameters { + low_ms: 1000, + high_ms: 1000, + }; + + let () = runtime.block_on(async { + let timer = Timer::new(runtime.clone(), parameters); + pin!(timer); + assert_eq! { true, timer.is_enabled() } + + // expiry time not yet calculated + assert_eq! { timer.as_mut().trigger_at, None }; + + // ---------- timeout value ---------- + + // Just created, not ready yet + assert_not_ready(&mut timer).await; + + runtime.advance(Duration::from_millis(999)).await; + // Not quite ready + assert_not_ready(&mut timer).await; + + runtime.advance(Duration::from_millis(1)).await; + // Should go off precisely now + assert_is_ready(&mut timer).await; + + assert_not_ready(&mut timer).await; + runtime.advance(Duration::from_millis(1001)).await; + // Should go off 1ms ago, fine + assert_is_ready(&mut timer).await; + + // ---------- various resets ---------- + + runtime.advance(Duration::from_millis(500)).await; + timer.note_cell_sent(); + assert_eq! { timer.as_mut().trigger_at, None }; + + // This ought not to cause us to actually calculate the expiry time + let () = select_biased! { + _ = ready(()) => { }, + _ = timer.as_mut().next() => panic!(), + }; + assert_eq! { timer.as_mut().trigger_at, None }; + + // ---------- disable/enable ---------- + + timer.disable(); + runtime.advance(Duration::from_millis(2000)).await; + assert_eq! { timer.as_mut().selected_timeout, None }; + assert_eq! { false, timer.is_enabled() } + assert_not_ready(&mut timer).await; + + timer.enable(); + runtime.advance(Duration::from_millis(3000)).await; + assert_eq! { true, timer.is_enabled() } + // Shouldn't be already ready, since we haven't polled yet + assert_not_ready(&mut timer).await; + + runtime.advance(Duration::from_millis(1000)).await; + // *Now* + assert_is_ready(&mut timer).await; + }); + + let () = runtime.block_on(async { + let timer = Timer::new(runtime.clone(), parameters); + pin!(timer); + + assert! { timer.as_mut().selected_timeout.is_some() }; + assert! { timer.as_mut().trigger_at.is_none() }; + // Force an overflow by guddling + *timer.as_mut().project().selected_timeout = Some(Duration::MAX); + + assert_not_ready(&mut timer).await; + dbg!(timer.as_mut().project().trigger_at); + assert_eq! { false, timer.is_enabled() } + }); + } + + #[test] + fn timeout_distribution() { + // Test that the distribution of padding intervals is as we expect. This is not so + // straightforward. We need to deal with true randomness (since we can't plumb a + // testing RNG into the padding timer, and perhaps don't even *want* to make that a + // mockable interface). Measuring a distribution of random variables involves some + // statistics. + + // The overall approach is: + // Use a fixed (but nontrivial) low to high range + // Sample N times into n equal sized buckets + // Calculate the expected number of samples in each bucket + // Do a chi^2 test. If it doesn't spot a potential difference, declare OK. + // If the chi^2 test does definitely declare a difference, declare failure. + // Otherwise increase N and go round again. + // + // This allows most runs to be fast without having an appreciable possibility of a + // false test failure and while being able to detect even quite small deviations. + + // Notation from + // https://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test#Calculating_the_test-statistic + // I haven't done a formal power calculation but empirically + // this detects the following most of the time: + // deviation of the CDF power from B^2 to B^1.98 + // wrong minimum value by 25ms out of 12s, low_ms = min + 25 + // wrong maximum value by 10ms out of 12s, high_ms = max -1 - 10 + + #[allow(non_snake_case)] + let mut N = 100_0000; + + #[allow(non_upper_case_globals)] + const n: usize = 100; + + const P_GOOD: f64 = 0.05; // Investigate further 5% of times (if all is actually well) + const P_BAD: f64 = 1e-12; + + loop { + eprintln!("padding distribution test, n={} N={}", n, N); + + let min = 5000; + let max = 17000; // Exclusive + assert_eq!(0, (max - min) % (n as u32)); // buckets must match up to integer boundaries + + let cdf = (0..=n) + .into_iter() + .map(|bi| { + let b = (bi as f64) / (n as f64); + // expected distribution: + // with B = bi / n + // P(X) < B == B + // P(max(X1,X1)) < B = B^2 + b.powi(2) + }) + .collect_vec(); + + let pdf = cdf + .iter() + .cloned() + .tuple_windows() + .map(|(p, q)| q - p) + .collect_vec(); + let exp = pdf.iter().cloned().map(|p| p * f64::from(N)).collect_vec(); + + // chi-squared test only valid if every cell expects at least 5 + assert!(exp.iter().cloned().all(|ei| ei >= 5.)); + + let mut obs = [0_u32; n]; + + let params = Parameters { + low_ms: min, + high_ms: max - 1, // convert exclusive to inclusive + } + .prepare(); + + for _ in 0..N { + let xx = params.select_timeout(); + let ms = xx.as_millis(); + let ms = u32::try_from(ms).unwrap(); + assert!(ms >= min); + assert!(ms < max); + // Integer arithmetic ensures that we classify exactly + let bi = ((ms - min) * (n as u32)) / (max - min); + obs[bi as usize] += 1; + } + + let chi2 = izip!(&obs, &exp) + .map(|(&oi, &ei)| (f64::from(oi) - ei).powi(2) / ei) + .sum::(); + + // n degrees of freedom, one-tailed test + // (since distro parameters are all fixed, not estimated from the sample) + let chi2_distr = statrs::distribution::ChiSquared::new(n as _).unwrap(); + + // probability of good code generating a result at least this bad + let p = 1. - chi2_distr.cdf(chi2); + + eprintln!( + "padding distribution test, n={} N={} chi2={} p={}", + n, N, chi2, p + ); + + if p >= P_GOOD { + break; + } + + for (i, (&oi, &ei)) in izip!(&obs, &exp).enumerate() { + eprintln!("bi={:4} OI={:4} EI={}", i, oi, ei); + } + + if p < P_BAD { + panic!("distribution is wrong (p < {:e})", P_BAD); + } + + // This is statistically rather cheaty: we keep trying until we get a definite + // answer! But we radically increase the power of the test each time. + // If the distribution is really wrong, this test ought to find it soon enough, + // especially since we run this repeatedly in CI. + N *= 10; + } + } +} diff --git a/crates/tor-proto/src/channel/reactor.rs b/crates/tor-proto/src/channel/reactor.rs index 208154c78..966fe7378 100644 --- a/crates/tor-proto/src/channel/reactor.rs +++ b/crates/tor-proto/src/channel/reactor.rs @@ -17,19 +17,19 @@ use tor_rtcompat::SleepProvider; use futures::channel::{mpsc, oneshot}; -use futures::select; use futures::sink::SinkExt; use futures::stream::Stream; use futures::Sink; use futures::StreamExt as _; +use futures::{select, select_biased}; use tor_error::internal; use std::fmt; -//use std::pin::Pin; +use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::Arc; -use crate::channel::{codec::CodecError, unique_id, ChannelDetails}; +use crate::channel::{codec::CodecError, padding, unique_id, ChannelDetails}; use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse}; use tracing::{debug, trace}; @@ -90,6 +90,8 @@ pub struct Reactor { /// /// This should also be backed by a TLS connection if you want it to be secure. pub(super) output: BoxedChannelSink, + /// Timer tracking when to generate channel padding + pub(super) padding_timer: Pin>>, /// A map from circuit ID to Sinks on which we can deliver cells. pub(super) circs: CircMap, /// Information shared with the frontend @@ -99,9 +101,6 @@ pub struct Reactor { /// What link protocol is the channel using? #[allow(dead_code)] // We don't support protocols where this would matter pub(super) link_protocol: u16, - /// Sleep Provider (dummy for now, this is going to be in the padding timer) - #[allow(dead_code)] - pub(super) sleep_prov: S, } /// Allows us to just say debug!("{}: Reactor did a thing", &self, ...) @@ -144,10 +143,32 @@ impl Reactor { // See if the output sink can have cells written to it yet. // If so, see if we have to-be-transmitted cells. - ret = self.output.prepare_send_from( + ret = self.output.prepare_send_from(async { // This runs if we will be able to write, so do the read: - self.cells.next() - ) => { + select_biased! { + n = self.cells.next() => { + // Note transmission on *input* to the reactor, not ultimate + // transmission. Ideally we would tap into the TCP stream at the far + // end of our TLS or perhaps during encoding on entry to the TLS, but + // both of those would involve quite some plumbing. Doing it here in + // the reactor avoids additional inter-task communication, mutexes, + // etc. (And there is no real difference between doing it here on + // input, to just below, on enquieing into the `sendable`.) + // + // Padding is sent when the output channel is idle, and the effect of + // buffering is just that we might sent it a little early because we + // measure idleness when we last put something into the output layers. + // + // We can revisit this if measurement shows it to be bad in practice. + // + // (We in any case need padding that we generate when idle to make it + // through to the output promptly, or it will be late and ineffective.) + self.padding_timer.as_mut().note_cell_sent(); + n + }, + p = self.padding_timer.as_mut().next() => Some(p.into()), + } + }) => { let (msg, sendable) = ret.map_err(codec_err_to_chan)?; let msg = msg.ok_or(ReactorError::Shutdown)?; sendable.send(msg).map_err(codec_err_to_chan)?;