Merge branch 'channel' into 'main'

Have channel reactor able to send channel padding

See merge request tpo/core/arti!574
This commit is contained in:
Nick Mathewson 2022-06-10 13:28:51 +00:00
commit 4f6c4f91c9
6 changed files with 718 additions and 10 deletions

112
Cargo.lock generated
View File

@ -63,6 +63,15 @@ version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
[[package]]
name = "approx"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6"
dependencies = [
"num-traits",
]
[[package]]
name = "arrayref"
version = "0.3.6"
@ -1801,6 +1810,15 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "matrixmultiply"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "add85d4dd35074e6fedc608f8c8f513a3548619a9024b751949ef0e8e45a4d84"
dependencies = [
"rawpointer",
]
[[package]]
name = "memchr"
version = "2.5.0"
@ -1898,6 +1916,35 @@ dependencies = [
"ws2_32-sys",
]
[[package]]
name = "nalgebra"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "462fffe4002f4f2e1f6a9dcf12cc1a6fc0e15989014efc02a941d3e0f5dc2120"
dependencies = [
"approx",
"matrixmultiply",
"nalgebra-macros",
"num-complex",
"num-rational",
"num-traits",
"rand 0.8.5",
"rand_distr",
"simba",
"typenum",
]
[[package]]
name = "nalgebra-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01fcc0b8149b4632adc89ac3b7b31a12fb6099a0317a4eb2ebff574ef7de7218"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "native-tls"
version = "0.2.10"
@ -1995,6 +2042,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "num-complex"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fbc387afefefd5e9e39493299f3069e14a140dd34dc19b4c1c1a8fddb6a790"
dependencies = [
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@ -2016,6 +2072,17 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a"
dependencies = [
"autocfg 1.1.0",
"num-integer",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.15"
@ -2478,6 +2545,16 @@ dependencies = [
"getrandom 0.2.6",
]
[[package]]
name = "rand_distr"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "964d548f8e7d12e102ef183a0de7e98180c9f8729f555897a857b96e48122d2f"
dependencies = [
"num-traits",
"rand 0.8.5",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
@ -2487,6 +2564,12 @@ dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "rawpointer"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3"
[[package]]
name = "redox_syscall"
version = "0.2.13"
@ -2925,6 +3008,18 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f054c6c1a6e95179d6f23ed974060dcefb2d9388bb7256900badad682c499de4"
[[package]]
name = "simba"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e82063457853d00243beda9952e910b82593e4b07ae9f721b9278a99a0d3d5c"
dependencies = [
"approx",
"num-complex",
"num-traits",
"paste",
]
[[package]]
name = "simple_asn1"
version = "0.6.1"
@ -2986,6 +3081,19 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "statrs"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05bdbb8e4e78216a85785a85d3ec3183144f98d0097b9281802c019bb07a6f05"
dependencies = [
"approx",
"lazy_static",
"nalgebra",
"num-traits",
"rand 0.8.5",
]
[[package]]
name = "strsim"
version = "0.8.0"
@ -3723,8 +3831,11 @@ dependencies = [
"hex-literal",
"hkdf",
"hmac",
"itertools",
"pin-project",
"rand 0.8.5",
"rand_core 0.6.3",
"statrs",
"subtle",
"thiserror",
"tokio",
@ -3739,6 +3850,7 @@ dependencies = [
"tor-llcrypto",
"tor-protover",
"tor-rtcompat",
"tor-rtmock",
"tracing",
"typenum",
"zeroize",

View File

@ -35,3 +35,4 @@ futures-await-test = "0.3.0"
hex-literal = "0.3"
tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0", features = ["tokio", "native-tls"] }
tor-rtmock = { path = "../tor-rtmock", version = "0.4.0" }

View File

@ -29,6 +29,7 @@ futures = "0.3.14"
generic-array = "0.14.3"
hkdf = "0.12.0"
hmac = "0.12.0"
pin-project = "1"
rand = "0.8"
rand_core = "0.6.2"
subtle = "2"
@ -45,6 +46,7 @@ tor-linkspec = { path = "../tor-linkspec", version = "0.3.0" }
tor-llcrypto = { path = "../tor-llcrypto", version = "0.3.0" }
tor-protover = { path = "../tor-protover", version = "0.3.0" }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0" }
tor-rtmock = { path = "../tor-rtmock", version = "0.4.0" }
tracing = "0.1.18"
typenum = "1.12"
zeroize = "1"
@ -52,4 +54,7 @@ zeroize = "1"
[dev-dependencies]
hex = "0.4"
hex-literal = "0.3"
itertools = "0.10.1"
statrs = "0.15.0"
tokio-crate = { package = "tokio", version = "1.7", features = ["full"] }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0", features = ["tokio", "native-tls"] }

View File

@ -59,6 +59,7 @@ pub const CHANNEL_BUFFER_SIZE: usize = 128;
mod circmap;
mod codec;
mod handshake;
mod padding;
mod reactor;
mod unique_id;
@ -278,6 +279,19 @@ impl Channel {
details: Arc::clone(&details),
};
let mut padding_timer = Box::pin(padding::Timer::new_disabled(
sleep_prov,
padding::Parameters {
// From padding-spec.txt s2.2
// TODO support reduced padding
low_ms: 1500,
high_ms: 9500,
},
));
if std::env::var("ARTI_EXPERIMENTAL_CHANNEL_PADDING").unwrap_or_default() != "" {
padding_timer.as_mut().enable();
}
let reactor = Reactor {
control: control_rx,
cells: cell_rx,
@ -287,7 +301,7 @@ impl Channel {
circ_unique_id_ctx: CircUniqIdContext::new(),
link_protocol,
details,
sleep_prov,
padding_timer,
};
(channel, reactor)

View File

@ -0,0 +1,555 @@
//! Channel padding
//!
//! Tor spec `padding-spec.txt` section 2.
use std::pin::Pin;
// TODO, coarsetime maybe? But see arti#496 and also we want to use the mockable SleepProvider
use std::time::{Duration, Instant};
use educe::Educe;
use futures::future::{self, FusedFuture};
use futures::FutureExt;
use pin_project::pin_project;
use rand::distributions::Distribution;
use tracing::error;
use tor_cell::chancell::msg::Padding;
use tor_rtcompat::SleepProvider;
/// Timer that organises wakeups when channel padding should be sent
///
/// Use [`next()`](Timer::next) to find when to send padding, and
/// [`note_cell_sent()`](Timer::note_cell_sent) to reset the timeout when data flows.
///
/// A `Timer` can be in "disabled" state, in which case `next()` never completes.
///
/// `Timer` must be pinned before use
/// (this allows us to avoid involving the allocator when we reschedule).
#[pin_project(project = PaddingTimerProj)]
pub(crate) struct Timer<R: SleepProvider> {
/// [`SleepProvider`]
sleep_prov: R,
/// Parameters controlling distribution of padding time intervals
parameters: PreparedParameters,
/// Gap that we intend to leave between last sent cell, and the padding
///
/// We only resample this (calculating a new random delay) after the previous
/// timeout actually expired.
///
/// `None` if the timer is disabled.
/// (This can be done explicitly, but also occurs on time calculation overflow.)
///
/// Invariants: this field may be `Some` or `None` regardless of the values
/// of other fields. If this field is `None` then the values in `trigger_at`
/// and `waker` are unspecified.
selected_timeout: Option<Duration>,
/// Absolute time at which we should send padding
///
/// `None` if cells more recently sent than we were polled.
/// That would mean that we are currently moving data out through this channel.
/// The absolute timeout will need to be recalculated when the data flow pauses.
///
/// `Some` means our `next` has been demanded recently.
/// Then `trigger_at` records the absolute timeout at which we should send padding,
/// which was calculated the first time we were polled (after data).
///
/// Invariants: the value in this field is meaningful only if `selected_timeout`
/// is `Some`.
///
/// If `selected_timeout` is `Some`, and `trigger_at` is therefore valid,
/// it is (obviously) no later than `selected_timeout` from now.
///
/// See also `waker`.
trigger_at: Option<Instant>,
/// Actual waker from the `SleepProvider`
///
/// This is created and updated lazily, because we suspect that with some runtimes
/// setting timeouts may be slow.
/// Lazy updating means that with intermittent data traffic, we do not keep scheduling,
/// descheduling, and adjusting, a wakeup time.
///
/// Invariants:
///
/// If `selected_timeout` is `Some`,
/// the time at which this waker will trigger here is never *later* than `trigger_at`,
/// and never *later* than `selected_timeout` from now.
///
/// The wakeup time here may well be earlier than `trigger_at`,
/// and sooner than `selected_timeout` from now. It may even be in the past.
/// When we wake up and discover this situation, we reschedule a new waker.
///
/// If `selected_timeout` is `None`, the value is unspecified.
/// We may retain a `Some` in this case so that if `SleepProvider` is enhanced to
/// support rescheduling, we can do that without making a new `SleepFuture`
/// (and without completely reorganising this the `Timer` state structure.)
#[pin]
waker: Option<R::SleepFuture>,
}
/// Timing parameters, as described in `padding-spec.txt`
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) struct Parameters {
/// Low end of the distribution of `X`
pub(crate) low_ms: u32,
/// High end of the distribution of `X` (inclusive)
pub(crate) high_ms: u32,
}
/// Timing parameters, "compiled" into a form which can be sampled more efficiently
///
/// According to the docs for [`rand::Rng::gen_range`],
/// it is better to construct a distribution,
/// than to call `gen_range` repeatedly on the same range.
#[derive(Debug, Clone)]
struct PreparedParameters {
/// The distribution of `X` (not of the ultimate delay, which is `max(X1,X2)`)
x_distribution_ms: rand::distributions::Uniform<u32>,
}
/// Return value from `prepare_to_sleep`: instructions for what caller ought to do
#[derive(Educe)]
#[educe(Debug)]
enum SleepInstructions<'f, R: SleepProvider> {
/// Caller should send padding immediately
Immediate {
/// The current `Instant`, returned so that the caller need not call `now` again
now: Instant,
},
/// Caller should wait forever
Forever,
/// Caller should `await` this
Waker(#[educe(Debug(ignore))] Pin<&'f mut R::SleepFuture>),
}
impl<R: SleepProvider> Timer<R> {
/// Create a new `Timer`
#[allow(dead_code)]
pub(crate) fn new(sleep_prov: R, parameters: Parameters) -> Self {
let mut self_ = Self::new_disabled(sleep_prov, parameters);
// We would like to call select_fresh_timeout but we don't have
// (and can't have) Pin<&mut self>
self_.selected_timeout = Some(self_.parameters.select_timeout());
self_
}
/// Create a new `Timer` which starts out disabled
pub(crate) fn new_disabled(sleep_prov: R, parameters: Parameters) -> Self {
Timer {
sleep_prov,
parameters: parameters.prepare(),
selected_timeout: None,
trigger_at: None,
waker: None,
}
}
/// Disable this `Timer`
///
/// Idempotent.
pub(crate) fn disable(self: &mut Pin<&mut Self>) {
*self.as_mut().project().selected_timeout = None;
}
/// Enable this `Timer`
///
/// (If the timer was disabled, the timeout will only start to run when `next()`
/// is next polled.)
///
/// Idempotent.
pub(crate) fn enable(self: &mut Pin<&mut Self>) {
if !self.is_enabled() {
self.as_mut().select_fresh_timeout();
}
}
/// Enquire whether this `Timer` is currently enabled
pub(crate) fn is_enabled(&self) -> bool {
self.selected_timeout.is_some()
}
/// Select a fresh timeout (and enable)
fn select_fresh_timeout(self: Pin<&mut Self>) -> Duration {
let mut self_ = self.project();
let timeout = self_.parameters.select_timeout();
*self_.selected_timeout = Some(timeout);
// This is no longer valid; recalculate it on next poll
*self_.trigger_at = None;
// Timeout might be earlier, so we will need a new waker too.
// (Technically this is not possible in a bad way right now, since any stale waker
// must be older, and so earlier, albeit from a previous random timeout.
// However in the future we may want to be able to adjust the parameters at runtime
// and then a stale waker might be harmfully too late.)
self_.waker.set(None);
timeout
}
/// Note that data has been sent (ie, reset the timeout, delaying the next padding)
pub(crate) fn note_cell_sent(self: &mut Pin<&mut Self>) {
// Fast path, does not need to do anything but clear the absolute expiry time
let self_ = self.as_mut().project();
*self_.trigger_at = None;
}
/// Calculate when to send padding, and return a suitable waker
///
/// In the usual case returns [`SleepInstructions::Waker`].
fn prepare_to_sleep(mut self: Pin<&mut Self>, now: Option<Instant>) -> SleepInstructions<R> {
let mut self_ = self.as_mut().project();
let timeout = match self_.selected_timeout {
None => return SleepInstructions::Forever,
Some(t) => *t,
};
if self_.waker.is_some() {
// We need to do this with is_some and expect because we need to consume self
// to get a return value with the right lifetimes.
let waker = self
.project()
.waker
.as_pin_mut()
.expect("None but we just checked");
return SleepInstructions::Waker(waker);
}
let now = now.unwrap_or_else(|| self_.sleep_prov.now());
let trigger_at = match self_.trigger_at {
Some(t) => t,
None => self_.trigger_at.insert(match now.checked_add(timeout) {
None => {
error!("timeout overflowed computing next channel padding");
self.disable();
return SleepInstructions::Forever;
}
Some(r) => r,
}),
};
let remaining = trigger_at.checked_duration_since(now).unwrap_or_default();
if remaining.is_zero() {
return SleepInstructions::Immediate { now };
}
//dbg!(timeout, remaining, now, trigger_at);
// There is no Option::get_pin_mut_or_set_with
if self_.waker.is_none() {
self_.waker.set(Some(self_.sleep_prov.sleep(remaining)));
}
let waker = self
.project()
.waker
.as_pin_mut()
.expect("None but we just inserted!");
SleepInstructions::Waker(waker)
}
/// Wait until we should next send padding, and then return the padding message
///
/// Should be used as a low-priority branch within `select_biased!`.
///
/// (`next()` has to be selected on, along with other possible events, in the
/// main loop, so that the padding timer runs concurrently with other processing;
/// and it should be in a low-priority branch of `select_biased!` as an optimisation:
/// that avoids calculating timeouts etc. until necessary,
/// i.e. it calculates them only when the main loop would otherwise block.)
///
/// The returned future is async-cancel-safe,
/// but once it yields, the padding must actually be sent.
pub(crate) fn next(self: Pin<&mut Self>) -> impl FusedFuture<Output = Padding> + '_ {
self.next_inner().fuse()
}
/// Wait until we should next send padding (not `FusedFuture`)
///
/// Callers wants a [`FusedFuture`] because `select!` needs one.
async fn next_inner(mut self: Pin<&mut Self>) -> Padding {
let now = loop {
match self.as_mut().prepare_to_sleep(None) {
SleepInstructions::Forever => future::pending().await,
SleepInstructions::Immediate { now } => break now,
SleepInstructions::Waker(waker) => waker.await,
}
// This timer has fired and has therefore been used up.
// When we go round again we will make a new one.
//
// TODO: have SleepProviders provide a reschedule function, and use it.
// That is likely to be faster where supported.
self.as_mut().project().waker.set(None);
};
// It's time to send padding.
// Firstly, calculate the new timeout for the *next* padding,
// so that we leave the `Timer` properly programmed.
self.as_mut().select_fresh_timeout();
// Bet that we will be going to sleep again, and set up the new trigger time
// and waker now. This will save us a future call to Instant::now.
self.as_mut().prepare_to_sleep(Some(now));
Padding::new()
}
}
impl Parameters {
/// "Compile" the parameters into a form which can be quickly sampled
fn prepare(self) -> PreparedParameters {
PreparedParameters {
x_distribution_ms: rand::distributions::Uniform::new_inclusive(
self.low_ms,
self.high_ms,
),
}
}
}
impl PreparedParameters {
/// Randomly select a timeout (as per `padding-spec.txt`)
fn select_timeout(&self) -> Duration {
let mut rng = rand::thread_rng();
let ms = std::cmp::max(
self.x_distribution_ms.sample(&mut rng),
self.x_distribution_ms.sample(&mut rng),
);
Duration::from_millis(ms.into())
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
#[allow(clippy::bool_assert_comparison)]
mod test {
use super::*;
use futures::future::ready;
use futures::select_biased;
use itertools::{izip, Itertools};
use statrs::distribution::ContinuousCDF;
use tokio::pin;
use tokio_crate as tokio;
use tor_rtcompat::*;
async fn assert_not_ready<R: Runtime>(timer: &mut Pin<&mut Timer<R>>) {
select_biased! {
_ = timer.as_mut().next() => panic!("unexpectedly ready"),
_ = ready(()) => { },
};
}
async fn assert_is_ready<R: Runtime>(timer: &mut Pin<&mut Timer<R>>) {
let _: Padding = select_biased! {
p = timer.as_mut().next() => p,
_ = ready(()) => panic!("pad timer failed to yield"),
};
}
#[test]
fn timer_impl() {
let runtime = tor_rtcompat::tokio::TokioNativeTlsRuntime::create().unwrap();
let runtime = tor_rtmock::MockSleepRuntime::new(runtime);
let parameters = Parameters {
low_ms: 1000,
high_ms: 1000,
};
let () = runtime.block_on(async {
let timer = Timer::new(runtime.clone(), parameters);
pin!(timer);
assert_eq! { true, timer.is_enabled() }
// expiry time not yet calculated
assert_eq! { timer.as_mut().trigger_at, None };
// ---------- timeout value ----------
// Just created, not ready yet
assert_not_ready(&mut timer).await;
runtime.advance(Duration::from_millis(999)).await;
// Not quite ready
assert_not_ready(&mut timer).await;
runtime.advance(Duration::from_millis(1)).await;
// Should go off precisely now
assert_is_ready(&mut timer).await;
assert_not_ready(&mut timer).await;
runtime.advance(Duration::from_millis(1001)).await;
// Should go off 1ms ago, fine
assert_is_ready(&mut timer).await;
// ---------- various resets ----------
runtime.advance(Duration::from_millis(500)).await;
timer.note_cell_sent();
assert_eq! { timer.as_mut().trigger_at, None };
// This ought not to cause us to actually calculate the expiry time
let () = select_biased! {
_ = ready(()) => { },
_ = timer.as_mut().next() => panic!(),
};
assert_eq! { timer.as_mut().trigger_at, None };
// ---------- disable/enable ----------
timer.disable();
runtime.advance(Duration::from_millis(2000)).await;
assert_eq! { timer.as_mut().selected_timeout, None };
assert_eq! { false, timer.is_enabled() }
assert_not_ready(&mut timer).await;
timer.enable();
runtime.advance(Duration::from_millis(3000)).await;
assert_eq! { true, timer.is_enabled() }
// Shouldn't be already ready, since we haven't polled yet
assert_not_ready(&mut timer).await;
runtime.advance(Duration::from_millis(1000)).await;
// *Now*
assert_is_ready(&mut timer).await;
});
let () = runtime.block_on(async {
let timer = Timer::new(runtime.clone(), parameters);
pin!(timer);
assert! { timer.as_mut().selected_timeout.is_some() };
assert! { timer.as_mut().trigger_at.is_none() };
// Force an overflow by guddling
*timer.as_mut().project().selected_timeout = Some(Duration::MAX);
assert_not_ready(&mut timer).await;
dbg!(timer.as_mut().project().trigger_at);
assert_eq! { false, timer.is_enabled() }
});
}
#[test]
fn timeout_distribution() {
// Test that the distribution of padding intervals is as we expect. This is not so
// straightforward. We need to deal with true randomness (since we can't plumb a
// testing RNG into the padding timer, and perhaps don't even *want* to make that a
// mockable interface). Measuring a distribution of random variables involves some
// statistics.
// The overall approach is:
// Use a fixed (but nontrivial) low to high range
// Sample N times into n equal sized buckets
// Calculate the expected number of samples in each bucket
// Do a chi^2 test. If it doesn't spot a potential difference, declare OK.
// If the chi^2 test does definitely declare a difference, declare failure.
// Otherwise increase N and go round again.
//
// This allows most runs to be fast without having an appreciable possibility of a
// false test failure and while being able to detect even quite small deviations.
// Notation from
// https://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test#Calculating_the_test-statistic
// I haven't done a formal power calculation but empirically
// this detects the following most of the time:
// deviation of the CDF power from B^2 to B^1.98
// wrong minimum value by 25ms out of 12s, low_ms = min + 25
// wrong maximum value by 10ms out of 12s, high_ms = max -1 - 10
#[allow(non_snake_case)]
let mut N = 100_0000;
#[allow(non_upper_case_globals)]
const n: usize = 100;
const P_GOOD: f64 = 0.05; // Investigate further 5% of times (if all is actually well)
const P_BAD: f64 = 1e-12;
loop {
eprintln!("padding distribution test, n={} N={}", n, N);
let min = 5000;
let max = 17000; // Exclusive
assert_eq!(0, (max - min) % (n as u32)); // buckets must match up to integer boundaries
let cdf = (0..=n)
.into_iter()
.map(|bi| {
let b = (bi as f64) / (n as f64);
// expected distribution:
// with B = bi / n
// P(X) < B == B
// P(max(X1,X1)) < B = B^2
b.powi(2)
})
.collect_vec();
let pdf = cdf
.iter()
.cloned()
.tuple_windows()
.map(|(p, q)| q - p)
.collect_vec();
let exp = pdf.iter().cloned().map(|p| p * f64::from(N)).collect_vec();
// chi-squared test only valid if every cell expects at least 5
assert!(exp.iter().cloned().all(|ei| ei >= 5.));
let mut obs = [0_u32; n];
let params = Parameters {
low_ms: min,
high_ms: max - 1, // convert exclusive to inclusive
}
.prepare();
for _ in 0..N {
let xx = params.select_timeout();
let ms = xx.as_millis();
let ms = u32::try_from(ms).unwrap();
assert!(ms >= min);
assert!(ms < max);
// Integer arithmetic ensures that we classify exactly
let bi = ((ms - min) * (n as u32)) / (max - min);
obs[bi as usize] += 1;
}
let chi2 = izip!(&obs, &exp)
.map(|(&oi, &ei)| (f64::from(oi) - ei).powi(2) / ei)
.sum::<f64>();
// n degrees of freedom, one-tailed test
// (since distro parameters are all fixed, not estimated from the sample)
let chi2_distr = statrs::distribution::ChiSquared::new(n as _).unwrap();
// probability of good code generating a result at least this bad
let p = 1. - chi2_distr.cdf(chi2);
eprintln!(
"padding distribution test, n={} N={} chi2={} p={}",
n, N, chi2, p
);
if p >= P_GOOD {
break;
}
for (i, (&oi, &ei)) in izip!(&obs, &exp).enumerate() {
eprintln!("bi={:4} OI={:4} EI={}", i, oi, ei);
}
if p < P_BAD {
panic!("distribution is wrong (p < {:e})", P_BAD);
}
// This is statistically rather cheaty: we keep trying until we get a definite
// answer! But we radically increase the power of the test each time.
// If the distribution is really wrong, this test ought to find it soon enough,
// especially since we run this repeatedly in CI.
N *= 10;
}
}
}

View File

@ -17,19 +17,19 @@ use tor_rtcompat::SleepProvider;
use futures::channel::{mpsc, oneshot};
use futures::select;
use futures::sink::SinkExt;
use futures::stream::Stream;
use futures::Sink;
use futures::StreamExt as _;
use futures::{select, select_biased};
use tor_error::internal;
use std::fmt;
//use std::pin::Pin;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use crate::channel::{codec::CodecError, unique_id, ChannelDetails};
use crate::channel::{codec::CodecError, padding, unique_id, ChannelDetails};
use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
use tracing::{debug, trace};
@ -90,6 +90,8 @@ pub struct Reactor<S: SleepProvider> {
///
/// This should also be backed by a TLS connection if you want it to be secure.
pub(super) output: BoxedChannelSink,
/// Timer tracking when to generate channel padding
pub(super) padding_timer: Pin<Box<padding::Timer<S>>>,
/// A map from circuit ID to Sinks on which we can deliver cells.
pub(super) circs: CircMap,
/// Information shared with the frontend
@ -99,9 +101,6 @@ pub struct Reactor<S: SleepProvider> {
/// What link protocol is the channel using?
#[allow(dead_code)] // We don't support protocols where this would matter
pub(super) link_protocol: u16,
/// Sleep Provider (dummy for now, this is going to be in the padding timer)
#[allow(dead_code)]
pub(super) sleep_prov: S,
}
/// Allows us to just say debug!("{}: Reactor did a thing", &self, ...)
@ -144,10 +143,32 @@ impl<S: SleepProvider> Reactor<S> {
// See if the output sink can have cells written to it yet.
// If so, see if we have to-be-transmitted cells.
ret = self.output.prepare_send_from(
ret = self.output.prepare_send_from(async {
// This runs if we will be able to write, so do the read:
self.cells.next()
) => {
select_biased! {
n = self.cells.next() => {
// Note transmission on *input* to the reactor, not ultimate
// transmission. Ideally we would tap into the TCP stream at the far
// end of our TLS or perhaps during encoding on entry to the TLS, but
// both of those would involve quite some plumbing. Doing it here in
// the reactor avoids additional inter-task communication, mutexes,
// etc. (And there is no real difference between doing it here on
// input, to just below, on enquieing into the `sendable`.)
//
// Padding is sent when the output channel is idle, and the effect of
// buffering is just that we might sent it a little early because we
// measure idleness when we last put something into the output layers.
//
// We can revisit this if measurement shows it to be bad in practice.
//
// (We in any case need padding that we generate when idle to make it
// through to the output promptly, or it will be late and ineffective.)
self.padding_timer.as_mut().note_cell_sent();
n
},
p = self.padding_timer.as_mut().next() => Some(p.into()),
}
}) => {
let (msg, sendable) = ret.map_err(codec_err_to_chan)?;
let msg = msg.ok_or(ReactorError::Shutdown)?;
sendable.send(msg).map_err(codec_err_to_chan)?;