circmgr: Make hspool size dynamic

Previously we'd always try to keep 8 circuits ready.  That doesn't
make sense if we are super-busy.  Instead, if we run out of
circuits, we double the amount that we try to keep ready, and if we
never go under 80% of our target number, we half the number we try
to keep ready.

We limit the rate of change here, to make sure that we aren't
flapping too much or shrinking too aggressively.

This algorithm is still a mite arbitrary, and will need tuning in
the future.
This commit is contained in:
Nick Mathewson 2023-06-16 11:34:54 -04:00
parent 69179c5dfe
commit 686d5cf209
2 changed files with 109 additions and 29 deletions

View File

@ -236,13 +236,9 @@ impl<R: Runtime> HsCircPool<R> {
circuit_compatible_with_target(netdir, subnet_config, circ, target.as_ref())
});
/// Tell the background task to fire immediately if we have fewer than
/// this many circuits left, or if we found nothing. Chosen arbitrarily.
///
/// TODO HS: This should change dynamically, and probably be a fixed
/// fraction of TARGET_N.
const LAUNCH_THRESHOLD: usize = 2;
if inner.pool.len() < LAUNCH_THRESHOLD || found_usable_circ.is_none() {
// Tell the background task to fire immediately if we have very few circuits
// circuits left, or if we found nothing.
if inner.pool.very_low() || found_usable_circ.is_none() {
let handle = self.launcher_handle.get().ok_or_else(|| {
Error::from(bad_api_usage!("The circuit launcher wasn't initialized"))
})?;
@ -370,10 +366,6 @@ async fn launch_hs_circuits_as_needed<R: Runtime>(
netdir_provider: Weak<dyn NetDirProvider + 'static>,
mut schedule: TaskSchedule<R>,
) {
/// Number of circuits to keep in the pool. Chosen arbitrarily.
//
// TODO HS: This should instead change dynamically based on observed needs.
const TARGET_N: usize = 8;
/// Default delay when not told to fire explicitly. Chosen arbitrarily.
const DELAY: Duration = Duration::from_secs(30);
@ -384,15 +376,14 @@ async fn launch_hs_circuits_as_needed<R: Runtime>(
break;
}
};
let now = pool.circmgr.mgr.peek_runtime().now();
pool.remove_closed();
let mut n_to_launch = pool
.inner
.lock()
.expect("poisoned lock")
.pool
.len()
.saturating_sub(TARGET_N);
let mut max_attempts = TARGET_N * 2;
let mut n_to_launch = {
let mut inner = pool.inner.lock().expect("poisioned_lock");
inner.pool.update_target_size(now);
inner.pool.n_to_launch()
};
let mut max_attempts = n_to_launch * 2;
'inner: while n_to_launch > 1 {
max_attempts -= 1;
if max_attempts == 0 {

View File

@ -1,23 +1,56 @@
//! An internal pool object that we use to implement HsCircPool.
use std::sync::Arc;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use rand::{seq::IteratorRandom, Rng};
use tor_proto::circuit::ClientCirc;
/// A collection of circuits used to fulfil onion-service-related requests.
#[derive(Default)]
pub(super) struct Pool {
/// The collection of circuits themselves, in no particular order.
circuits: Vec<Arc<ClientCirc>>,
/// The number of elements that we would like to have in our pool.
///
/// We do not discard when we are _above_ this threshold, but we do
/// try to build when we are low.
target: usize,
/// True if we have exhausted our pool since the last time we decided
/// whether to change our target level.
have_been_exhausted: bool,
/// True if we have been under 4/5 of our target since the last time we
/// decided whether to change it.
have_been_under_highwater: bool,
/// Last time when we changed our target size.
last_changed_target: Option<Instant>,
}
/// Our default (and minimum) target pool size.
const DEFAULT_TARGET: usize = 4;
/// Our maximum target pool size. We will never let our target grow above this
/// value.
const MAX_TARGET: usize = 512;
impl Default for Pool {
fn default() -> Self {
Self {
circuits: Vec::new(),
target: DEFAULT_TARGET,
have_been_exhausted: false,
have_been_under_highwater: false,
last_changed_target: None,
}
}
}
impl Pool {
/// Return the number of circuits in this pool.
pub(super) fn len(&self) -> usize {
self.circuits.len()
}
/// Add `circ` to this pool
pub(super) fn insert(&mut self, circ: Arc<ClientCirc>) {
self.circuits.push(circ);
@ -31,6 +64,16 @@ impl Pool {
self.circuits.retain(f);
}
/// Return true if we ar very low on circuits and should build more immediately.
pub(super) fn very_low(&self) -> bool {
self.circuits.len() <= self.target / 3
}
/// Return the number of sircuits we would currently like to launch.
pub(super) fn n_to_launch(&self) -> usize {
self.target.saturating_sub(self.circuits.len())
}
/// If there is any circuit in this pool for which `f` returns true, return one such circuit at random, and remove it from the pool.
pub(super) fn take_one_where<R, F>(&mut self, rng: &mut R, f: F) -> Option<Arc<ClientCirc>>
where
@ -41,12 +84,58 @@ impl Pool {
// expense of searching every circuit. That could certainly be costly
// if `circuits` is large! Perhaps we should instead stop at the first
// matching circuit we find.
let (idx, _) = self
let rv = match self
.circuits
.iter()
.enumerate()
.filter(|(_, c)| f(c))
.choose(rng)?;
Some(self.circuits.remove(idx))
.choose(rng)
{
Some((idx, _)) => Some(self.circuits.remove(idx)),
None => None,
};
if self.circuits.is_empty() {
self.have_been_exhausted = true;
self.have_been_under_highwater = true;
} else if self.circuits.len() < self.target * 4 / 5 {
self.have_been_under_highwater = true;
}
rv
}
/// Update the target size for our pool.
pub(super) fn update_target_size(&mut self, now: Instant) {
/// Minimum amount of time that must elapse between a change and a
/// decision to grow our pool. We use this to control the rate of
/// growth and make sure that we are allowing enough time for circuits
/// to complete.
const MIN_TIME_TO_GROW: Duration = Duration::from_secs(120);
/// Minimum amount of time that must elapse between a target change and
/// a decisions to shrink our target. We use this to make sure that we
/// aren't shrinking too rapidly, and that we are allowing enough time
/// for the pool to actually get used.
const MIN_TIME_TO_SHRINK: Duration = Duration::from_secs(600);
let last_changed = self.last_changed_target.get_or_insert(now);
let time_since_last_change = now.saturating_duration_since(*last_changed);
if self.have_been_exhausted {
if time_since_last_change < MIN_TIME_TO_GROW {
return;
}
self.target *= 2;
} else if !self.have_been_under_highwater {
if time_since_last_change < MIN_TIME_TO_SHRINK {
return;
}
self.target /= 2;
}
self.last_changed_target = Some(now);
self.target = self.target.clamp(DEFAULT_TARGET, MAX_TARGET);
self.have_been_exhausted = false;
self.have_been_under_highwater = false;
}
}