Connect the timeout estimator code to our circuit builder.

This currently leaves a bit to be desired, since the logic is kind
of gnarly.  I'm not sure I want to be using so many Arc<>s.
This commit is contained in:
Nick Mathewson 2021-07-08 09:40:30 -04:00
parent cd51d8bb8b
commit 3e894218aa
5 changed files with 134 additions and 32 deletions

View File

@ -1,12 +1,18 @@
//! Facilities to build circuits directly, instead of via a circuit manager. //! Facilities to build circuits directly, instead of via a circuit manager.
use crate::path::{OwnedPath, TorPath}; use crate::path::{OwnedPath, TorPath};
use crate::Result; use crate::timeouts::{pareto::ParetoTimeoutEstimator, Action, TimeoutEstimator};
use crate::{Error, Result};
use futures::channel::oneshot;
use futures::task::SpawnExt; use futures::task::SpawnExt;
use rand::{CryptoRng, Rng}; use futures::Future;
use rand::{rngs::StdRng, CryptoRng, Rng, SeedableRng};
use std::convert::TryInto; use std::convert::TryInto;
use std::sync::Arc; use std::sync::{
use std::time::Duration; atomic::{AtomicU32, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use tor_chanmgr::ChanMgr; use tor_chanmgr::ChanMgr;
use tor_proto::circuit::{CircParameters, ClientCirc}; use tor_proto::circuit::{CircParameters, ClientCirc};
use tor_rtcompat::{Runtime, SleepProviderExt}; use tor_rtcompat::{Runtime, SleepProviderExt};
@ -21,25 +27,38 @@ use tor_rtcompat::{Runtime, SleepProviderExt};
pub struct CircuitBuilder<R: Runtime> { pub struct CircuitBuilder<R: Runtime> {
/// The runtime used by this circuit builder. /// The runtime used by this circuit builder.
runtime: R, runtime: R,
/// A channel manager that this circuit builder uses to make chanels. /// A channel manager that this circuit builder uses to make channels.
chanmgr: Arc<ChanMgr<R>>, chanmgr: Arc<ChanMgr<R>>,
/// An estimator to determine the correct timeouts for circuit building.
timeouts: Box<dyn TimeoutEstimator + Send + Sync>,
} }
impl<R: Runtime> CircuitBuilder<R> { impl<R: Runtime> CircuitBuilder<R> {
/// Construct a new [`CircuitBuilder`]. /// Construct a new [`CircuitBuilder`].
pub fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>) -> Self { pub fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>) -> Self {
CircuitBuilder { runtime, chanmgr } // XXXX make this configurable and changeable.
let timeouts = Box::new(ParetoTimeoutEstimator::default());
CircuitBuilder {
runtime,
chanmgr,
timeouts,
}
} }
/// Build a circuit, without performing any timeout operations. /// Build a circuit, without performing any timeout operations.
///
/// After each hop is built, increments n_hops_built. (TODO: Find
/// a better design there.)
async fn build_notimeout<RNG: CryptoRng + Rng>( async fn build_notimeout<RNG: CryptoRng + Rng>(
&self, self: Arc<Self>,
path: &OwnedPath, path: OwnedPath,
params: &CircParameters, params: CircParameters,
rng: &mut RNG, start_time: Instant,
n_hops_built: Arc<AtomicU32>,
mut rng: RNG,
) -> Result<Arc<ClientCirc>> { ) -> Result<Arc<ClientCirc>> {
let chan = self.chanmgr.get_or_launch(path.first_hop()?).await?; let chan = self.chanmgr.get_or_launch(path.first_hop()?).await?;
let (pending_circ, reactor) = chan.new_circ(rng).await?; let (pending_circ, reactor) = chan.new_circ(&mut rng).await?;
self.runtime.spawn(async { self.runtime.spawn(async {
let _ = reactor.run().await; let _ = reactor.run().await;
@ -47,16 +66,31 @@ impl<R: Runtime> CircuitBuilder<R> {
match path { match path {
OwnedPath::ChannelOnly(_) => { OwnedPath::ChannelOnly(_) => {
let circ = pending_circ.create_firsthop_fast(rng, params).await?; let circ = pending_circ.create_firsthop_fast(&mut rng, &params).await?;
self.timeouts
.note_hop_completed(0, Instant::now() - start_time, true);
n_hops_built.fetch_add(1, Ordering::SeqCst);
Ok(circ) Ok(circ)
} }
OwnedPath::Normal(p) => { OwnedPath::Normal(p) => {
assert!(!p.is_empty()); assert!(!p.is_empty());
let n_hops = p.len() as u8;
let circ = pending_circ let circ = pending_circ
.create_firsthop_ntor(rng, &p[0], params) .create_firsthop_ntor(&mut rng, &p[0], &params)
.await?; .await?;
self.timeouts
.note_hop_completed(0, Instant::now() - start_time, n_hops == 0);
n_hops_built.fetch_add(1, Ordering::SeqCst);
let mut hop_num = 1;
for relay in p[1..].iter() { for relay in p[1..].iter() {
circ.extend_ntor(rng, relay, params).await?; circ.extend_ntor(&mut rng, relay, &params).await?;
n_hops_built.fetch_add(1, Ordering::SeqCst);
self.timeouts.note_hop_completed(
hop_num,
Instant::now() - start_time,
hop_num == n_hops,
);
hop_num += 1;
} }
Ok(circ) Ok(circ)
} }
@ -64,18 +98,37 @@ impl<R: Runtime> CircuitBuilder<R> {
} }
/// Build a circuit from an [`OwnedPath`]. /// Build a circuit from an [`OwnedPath`].
pub(crate) async fn build_owned<RNG: CryptoRng + Rng>( pub(crate) async fn build_owned<RNG: CryptoRng + Rng + Send + 'static>(
&self, self: &Arc<Self>,
path: &OwnedPath, path: OwnedPath,
params: &CircParameters, params: &CircParameters,
rng: &mut RNG, rng: RNG,
) -> Result<Arc<ClientCirc>> { ) -> Result<Arc<ClientCirc>> {
let delay = Duration::from_secs(5); // TODO: make this configurable and inferred. let action = Action::BuildCircuit { length: path.len() };
let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
let start_time = Instant::now();
let build_future = self.build_notimeout(path, params, rng); // TODO: This is probably not the best way for build_notimeout to
let circuit = self.runtime.timeout(delay, build_future).await??; // tell us how many hops it managed to build, but at least it is
// isolated here.
let hops_built = Arc::new(AtomicU32::new(0));
Ok(circuit) let self_clone = Arc::clone(self);
let params = params.clone();
let circuit_future =
self_clone.build_notimeout(path, params, start_time, Arc::clone(&hops_built), rng);
match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
Ok(circuit) => Ok(circuit),
Err(Error::CircTimeout) => {
let n_built = hops_built.load(Ordering::SeqCst);
self.timeouts
.note_circ_timeout(n_built as u8, Instant::now() - start_time);
Err(Error::CircTimeout)
}
Err(e) => Err(e),
}
} }
/// Try to construct a new circuit from a given path, using appropriate /// Try to construct a new circuit from a given path, using appropriate
@ -85,12 +138,53 @@ impl<R: Runtime> CircuitBuilder<R> {
/// circuit manager; if you don't hang on it it, it will /// circuit manager; if you don't hang on it it, it will
/// automatically go away when the last reference is dropped. /// automatically go away when the last reference is dropped.
pub async fn build<RNG: CryptoRng + Rng>( pub async fn build<RNG: CryptoRng + Rng>(
&self, self: &Arc<Self>,
path: &TorPath<'_>, path: &TorPath<'_>,
params: &CircParameters, params: &CircParameters,
rng: &mut RNG, rng: &mut RNG,
) -> Result<Arc<ClientCirc>> { ) -> Result<Arc<ClientCirc>> {
let rng = StdRng::from_rng(rng).expect("couldn't construct temporary rng");
let owned = path.try_into()?; let owned = path.try_into()?;
self.build_owned(&owned, params, rng).await self.build_owned(owned, params, rng).await
} }
} }
/// Helper function: spawn a future as a background task, and run it with
/// two separate timeouts.
///
/// If the future does not complete by `timeout`, then return a
/// timeout error immediately, but keep running the future in the
/// background.
///
/// If the future does not complete by `abandon`, then abandon the
/// future completely.
async fn double_timeout<R, F, T>(
runtime: &R,
fut: F,
timeout: Duration,
abandon: Duration,
) -> Result<T>
where
R: Runtime,
F: Future<Output = Result<T>> + Send + 'static,
T: Send + 'static,
{
let (snd, rcv) = oneshot::channel();
let rt = runtime.clone();
runtime.spawn(async move {
let result = rt.timeout(abandon, fut).await;
let _ignore_cancelled_error = snd.send(result);
})?;
let outcome = runtime.timeout(timeout, rcv).await;
// 4 layers of error to collapse:
// One from the receiver being cancelled.
// One from the outer timeout.
// One from the inner timeout.
// One from the actual future's result.
//
// (Technically, we could refrain from unwrapping the future's result,
// but doing it this way helps make it more certain that we really are
// collapsing all the layers into one.)
Ok(outcome????)
}

View File

@ -34,7 +34,7 @@ pub(crate) struct Plan {
} }
#[async_trait] #[async_trait]
impl<R: Runtime> crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilder<R> { impl<R: Runtime> crate::mgr::AbstractCircBuilder for Arc<crate::build::CircuitBuilder<R>> {
type Circ = ClientCirc; type Circ = ClientCirc;
type Spec = SupportedCircUsage; type Spec = SupportedCircUsage;
type Plan = Plan; type Plan = Plan;
@ -62,10 +62,9 @@ impl<R: Runtime> crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilde
path, path,
params, params,
} = plan; } = plan;
let mut rng = let rng = StdRng::from_rng(rand::thread_rng()).expect("couldn't construct temporary rng");
StdRng::from_rng(rand::thread_rng()).expect("couldn't construct temporary rng");
let circuit = self.build_owned(&path, &params, &mut rng).await?; let circuit = self.build_owned(path, &params, rng).await?;
Ok((final_spec, circuit)) Ok((final_spec, circuit))
} }

View File

@ -138,14 +138,14 @@ impl<'a> DirInfo<'a> {
#[derive(Clone)] #[derive(Clone)]
pub struct CircMgr<R: Runtime> { pub struct CircMgr<R: Runtime> {
/// The underlying circuit manager object that implements our behavior. /// The underlying circuit manager object that implements our behavior.
mgr: Arc<mgr::AbstractCircMgr<build::CircuitBuilder<R>, R>>, mgr: Arc<mgr::AbstractCircMgr<Arc<build::CircuitBuilder<R>>, R>>,
} }
impl<R: Runtime> CircMgr<R> { impl<R: Runtime> CircMgr<R> {
/// Construct a new circuit manager. /// Construct a new circuit manager.
pub fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>) -> Self { pub fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>) -> Self {
let builder = build::CircuitBuilder::new(runtime.clone(), chanmgr); let builder = build::CircuitBuilder::new(runtime.clone(), chanmgr);
let mgr = mgr::AbstractCircMgr::new(builder, runtime); let mgr = mgr::AbstractCircMgr::new(Arc::new(builder), runtime);
CircMgr { mgr: Arc::new(mgr) } CircMgr { mgr: Arc::new(mgr) }
} }

View File

@ -122,6 +122,15 @@ impl OwnedPath {
OwnedPath::Normal(p) => Ok(&p[0]), OwnedPath::Normal(p) => Ok(&p[0]),
} }
} }
/// Return the number of hops in this path.
#[allow(clippy::len_without_is_empty)]
pub(crate) fn len(&self) -> usize {
match self {
OwnedPath::ChannelOnly(_) => 1,
OwnedPath::Normal(p) => p.len(),
}
}
} }
/// For testing: make sure that `path` is the same when it is an owned /// For testing: make sure that `path` is the same when it is an owned

View File

@ -10,11 +10,11 @@
use std::time::Duration; use std::time::Duration;
mod pareto; pub(crate) mod pareto;
/// An object that calculates circuit timeout thresholds from the history /// An object that calculates circuit timeout thresholds from the history
/// of circuit build times. /// of circuit build times.
trait TimeoutEstimator { pub(crate) trait TimeoutEstimator {
/// Record that a given circuit hop has completed. /// Record that a given circuit hop has completed.
/// ///
/// The `hop` number is a zero-indexed value for which hop just completed. /// The `hop` number is a zero-indexed value for which hop just completed.