circmgr::hspool: Move the Mutex into an intermediary Inner struct
This will be helpful as we complexify the pool behavior a bit.
This commit is contained in:
parent
1c470fd483
commit
69179c5dfe
|
@ -4,7 +4,7 @@
|
|||
mod pool;
|
||||
|
||||
use std::{
|
||||
sync::{Arc, Weak},
|
||||
sync::{Arc, Mutex, Weak},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
|
@ -48,14 +48,20 @@ pub enum HsCircKind {
|
|||
pub struct HsCircPool<R: Runtime> {
|
||||
/// An underlying circuit manager, used for constructing circuits.
|
||||
circmgr: Arc<CircMgr<R>>,
|
||||
/// A collection of pre-constructed circuits.
|
||||
pool: pool::Pool,
|
||||
/// A task handle for making the background circuit launcher fire early.
|
||||
//
|
||||
// TODO: I think we may want to move this into the same Mutex as Pool
|
||||
// eventually. But for now, this is fine, since it's just an implementation
|
||||
// detail.
|
||||
launcher_handle: OnceCell<TaskHandle>,
|
||||
/// The mutable state of this pool.
|
||||
inner: Mutex<Inner>,
|
||||
}
|
||||
|
||||
/// The mutable state of an [`HsCircPool`]
|
||||
struct Inner {
|
||||
/// A collection of pre-constructed circuits.
|
||||
pool: pool::Pool,
|
||||
}
|
||||
|
||||
impl<R: Runtime> HsCircPool<R> {
|
||||
|
@ -67,8 +73,8 @@ impl<R: Runtime> HsCircPool<R> {
|
|||
let pool = pool::Pool::default();
|
||||
Arc::new(Self {
|
||||
circmgr,
|
||||
pool,
|
||||
launcher_handle: OnceCell::new(),
|
||||
inner: Mutex::new(Inner { pool }),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -224,23 +230,26 @@ impl<R: Runtime> HsCircPool<R> {
|
|||
target,
|
||||
relay: netdir.by_ids(target),
|
||||
});
|
||||
let found_usable_circ = self.pool.take_one_where(&mut rand::thread_rng(), |circ| {
|
||||
circuit_compatible_with_target(netdir, subnet_config, circ, target.as_ref())
|
||||
});
|
||||
|
||||
/// Tell the background task to fire immediately if we have fewer than
|
||||
/// this many circuits left, or if we found nothing. Chosen arbitrarily.
|
||||
///
|
||||
/// TODO HS: This should change dynamically, and probably be a fixed
|
||||
/// fraction of TARGET_N.
|
||||
const LAUNCH_THRESHOLD: usize = 2;
|
||||
if self.pool.len() < LAUNCH_THRESHOLD || found_usable_circ.is_none() {
|
||||
let handle = self.launcher_handle.get().ok_or_else(|| {
|
||||
Error::from(bad_api_usage!("The circuit launcher wasn't initialized"))
|
||||
})?;
|
||||
handle.fire();
|
||||
}
|
||||
let found_usable_circ = {
|
||||
let mut inner = self.inner.lock().expect("lock poisoned");
|
||||
let found_usable_circ = inner.pool.take_one_where(&mut rand::thread_rng(), |circ| {
|
||||
circuit_compatible_with_target(netdir, subnet_config, circ, target.as_ref())
|
||||
});
|
||||
|
||||
/// Tell the background task to fire immediately if we have fewer than
|
||||
/// this many circuits left, or if we found nothing. Chosen arbitrarily.
|
||||
///
|
||||
/// TODO HS: This should change dynamically, and probably be a fixed
|
||||
/// fraction of TARGET_N.
|
||||
const LAUNCH_THRESHOLD: usize = 2;
|
||||
if inner.pool.len() < LAUNCH_THRESHOLD || found_usable_circ.is_none() {
|
||||
let handle = self.launcher_handle.get().ok_or_else(|| {
|
||||
Error::from(bad_api_usage!("The circuit launcher wasn't initialized"))
|
||||
})?;
|
||||
handle.fire();
|
||||
}
|
||||
found_usable_circ
|
||||
};
|
||||
// Return the circuit we found before, if any.
|
||||
if let Some(circuit) = found_usable_circ {
|
||||
return Ok(circuit);
|
||||
|
@ -257,13 +266,16 @@ impl<R: Runtime> HsCircPool<R> {
|
|||
|
||||
/// Internal: Remove every closed circuit from this pool.
|
||||
fn remove_closed(&self) {
|
||||
self.pool.retain(|circ| !circ.is_closing());
|
||||
let mut inner = self.inner.lock().expect("lock poisoned");
|
||||
inner.pool.retain(|circ| !circ.is_closing());
|
||||
}
|
||||
|
||||
/// Internal: Remove every circuit form this pool for which any relay is not
|
||||
/// listed in `netdir`.
|
||||
fn remove_unlisted(&self, netdir: &NetDir) {
|
||||
self.pool
|
||||
let mut inner = self.inner.lock().expect("lock poisoned");
|
||||
inner
|
||||
.pool
|
||||
.retain(|circ| all_circ_relays_are_listed_in(circ, netdir));
|
||||
}
|
||||
}
|
||||
|
@ -373,7 +385,13 @@ async fn launch_hs_circuits_as_needed<R: Runtime>(
|
|||
}
|
||||
};
|
||||
pool.remove_closed();
|
||||
let mut n_to_launch = pool.pool.len().saturating_sub(TARGET_N);
|
||||
let mut n_to_launch = pool
|
||||
.inner
|
||||
.lock()
|
||||
.expect("poisoned lock")
|
||||
.pool
|
||||
.len()
|
||||
.saturating_sub(TARGET_N);
|
||||
let mut max_attempts = TARGET_N * 2;
|
||||
'inner: while n_to_launch > 1 {
|
||||
max_attempts -= 1;
|
||||
|
@ -394,7 +412,7 @@ async fn launch_hs_circuits_as_needed<R: Runtime>(
|
|||
// TODO HS: We should catch panics, here or in launch_hs_unmanaged.
|
||||
match pool.circmgr.launch_hs_unmanaged(no_target, &netdir).await {
|
||||
Ok(circ) => {
|
||||
pool.pool.insert(circ);
|
||||
pool.inner.lock().expect("poisoned lock").pool.insert(circ);
|
||||
n_to_launch -= 1;
|
||||
}
|
||||
Err(err) => {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! An internal pool object that we use to implement HsCircPool.
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Arc;
|
||||
|
||||
use rand::{seq::IteratorRandom, Rng};
|
||||
use tor_proto::circuit::ClientCirc;
|
||||
|
@ -9,44 +9,44 @@ use tor_proto::circuit::ClientCirc;
|
|||
#[derive(Default)]
|
||||
pub(super) struct Pool {
|
||||
/// The collection of circuits themselves, in no particular order.
|
||||
circuits: Mutex<Vec<Arc<ClientCirc>>>,
|
||||
circuits: Vec<Arc<ClientCirc>>,
|
||||
}
|
||||
|
||||
impl Pool {
|
||||
/// Return the number of circuits in this pool.
|
||||
pub(super) fn len(&self) -> usize {
|
||||
self.circuits.lock().expect("lock poisoned").len()
|
||||
self.circuits.len()
|
||||
}
|
||||
|
||||
/// Add `circ` to this pool
|
||||
pub(super) fn insert(&self, circ: Arc<ClientCirc>) {
|
||||
self.circuits.lock().expect("lock poisoned").push(circ);
|
||||
pub(super) fn insert(&mut self, circ: Arc<ClientCirc>) {
|
||||
self.circuits.push(circ);
|
||||
}
|
||||
|
||||
/// Remove every circuit from this pool for which `f` returns false.
|
||||
pub(super) fn retain<F>(&self, f: F)
|
||||
pub(super) fn retain<F>(&mut self, f: F)
|
||||
where
|
||||
F: FnMut(&Arc<ClientCirc>) -> bool,
|
||||
{
|
||||
self.circuits.lock().expect("lock poisoned").retain(f);
|
||||
self.circuits.retain(f);
|
||||
}
|
||||
|
||||
/// If there is any circuit in this pool for which `f` returns true, return one such circuit at random, and remove it from the pool.
|
||||
pub(super) fn take_one_where<R, F>(&self, rng: &mut R, f: F) -> Option<Arc<ClientCirc>>
|
||||
pub(super) fn take_one_where<R, F>(&mut self, rng: &mut R, f: F) -> Option<Arc<ClientCirc>>
|
||||
where
|
||||
R: Rng,
|
||||
F: Fn(&Arc<ClientCirc>) -> bool,
|
||||
{
|
||||
let mut circuits = self.circuits.lock().expect("lock poisoned");
|
||||
// TODO HS: This ensures that we take a circuit at random, but at the
|
||||
// expense of searching every circuit. That could certainly be costly
|
||||
// if `circuits` is large! Perhaps we should instead stop at the first
|
||||
// matching circuit we find.
|
||||
let (idx, _) = circuits
|
||||
let (idx, _) = self
|
||||
.circuits
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, c)| f(c))
|
||||
.choose(rng)?;
|
||||
Some(circuits.remove(idx))
|
||||
Some(self.circuits.remove(idx))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue