Allow configurability on preemptive circuits

This commit is contained in:
Neel Chauhan 2021-12-07 15:04:41 +00:00 committed by eta
parent 68d4070038
commit 0e9c2d274e
10 changed files with 226 additions and 41 deletions

1
Cargo.lock generated
View File

@ -121,6 +121,7 @@ dependencies = [
"tempfile",
"thiserror",
"toml",
"tor-circmgr",
"tor-config",
]

View File

@ -14,8 +14,8 @@ pub use tor_config::ConfigBuildError;
/// Types for configuring how Tor circuits are built.
pub mod circ {
pub use tor_circmgr::{
CircMgrConfig, CircMgrConfigBuilder, CircuitTiming, CircuitTimingBuilder, PathConfig,
PathConfigBuilder,
CircMgrConfig, CircMgrConfigBuilder, CircuitPreemptive, CircuitPreemptiveBuilder,
CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
};
}
@ -185,6 +185,9 @@ pub struct TorClientConfig {
/// Information about how to build paths through the network.
path_rules: circ::PathConfig,
/// Information about preemptive circuits.
circuit_preemptive: circ::CircuitPreemptive,
/// Information about how to retry and expire circuits and request for circuits.
circuit_timing: circ::CircuitTiming,
@ -246,6 +249,8 @@ pub struct TorClientConfigBuilder {
/// Inner builder for the `path_rules` section.
path_rules: circ::PathConfigBuilder,
/// Inner builder for the `circuit_timing` section.
circuit_preemptive: circ::CircuitPreemptiveBuilder,
/// Inner builder for the `circuit_timing` section.
circuit_timing: circ::CircuitTimingBuilder,
/// Inner builder for the `address_filter` section.
address_filter: ClientAddrConfigBuilder,
@ -268,6 +273,10 @@ impl TorClientConfigBuilder {
.path_rules
.build()
.map_err(|e| e.within("path_rules"))?;
let circuit_preemptive = self
.circuit_preemptive
.build()
.map_err(|e| e.within("circuit_preemptive"))?;
let circuit_timing = self
.circuit_timing
.build()
@ -283,6 +292,7 @@ impl TorClientConfigBuilder {
download_schedule,
override_net_params,
path_rules,
circuit_preemptive,
circuit_timing,
address_filter,
})
@ -356,6 +366,13 @@ impl TorClientConfigBuilder {
&mut self.path_rules
}
/// Return a mutable reference to a [`CircuitPreemptiveBuilder`](circ::CircuitPreemptiveBuilder).
///
/// This section overrides Arti's rules for preemptive circuits.
pub fn circuit_preemptive(&mut self) -> &mut circ::CircuitPreemptiveBuilder {
&mut self.circuit_preemptive
}
/// Return a mutable reference to a [`CircuitTimingBuilder`](circ::CircuitTimingBuilder).
///
/// This section overrides Arti's rules for deciding how long to use
@ -382,6 +399,7 @@ impl From<TorClientConfig> for TorClientConfigBuilder {
download_schedule,
override_net_params,
path_rules,
circuit_preemptive,
circuit_timing,
address_filter,
} = cfg;
@ -392,6 +410,7 @@ impl From<TorClientConfig> for TorClientConfigBuilder {
download_schedule: download_schedule.into(),
override_net_params,
path_rules: path_rules.into(),
circuit_preemptive: circuit_preemptive.into(),
circuit_timing: circuit_timing.into(),
address_filter: address_filter.into(),
}

View File

@ -12,6 +12,7 @@ repository="https://gitlab.torproject.org/tpo/core/arti.git/"
[dependencies]
arti-client = { package="arti-client", path = "../arti-client", version = "0.0.2"}
tor-circmgr = { package="tor-circmgr", path="../tor-circmgr", version = "0.0.2"}
tor-config = { package="tor-config", path = "../tor-config", version = "0.0.2", features = ["expand-paths"]}
config = { version = "0.11.0", default-features = false, features = ["toml"] }
once_cell = "1.7.2"

View File

@ -83,6 +83,22 @@ retry_microdescs = { num_retries = 3, initial_delay = "1 sec", parallelism = 4 }
ipv4_subnet_family_prefix = 16
ipv6_subnet_family_prefix = 32
# Rules for preemptive circuits.
[circuit_preemptive]
# How many circuits should we have before we stop opening circuits
# preemptively?
threshold = 12
# Which exit ports should we preemptively build circuits through?
ports = [80, 443]
# When we see a new port, how long should we have an fast exit for that port?
duration = 3600
# How many circuits should we have at minimum for a exit port?
min_exit_circs_for_port = 2
# Rules for how long circuits should survive, and how long pending
# requests should wait for a circuit.
[circuit_timing]

View File

@ -152,6 +152,9 @@ pub struct ArtiConfig {
/// Information about how to build paths through the network.
path_rules: circ::PathConfig,
/// Information about preemptive circuits
circuit_preemptive: circ::CircuitPreemptive,
/// Information about how to retry and expire circuits and request for circuits.
circuit_timing: circ::CircuitTiming,
@ -166,6 +169,7 @@ impl From<ArtiConfig> for TorClientConfigBuilder {
storage,
address_filter,
path_rules,
circuit_preemptive,
circuit_timing,
override_net_params,
download_schedule,
@ -175,6 +179,7 @@ impl From<ArtiConfig> for TorClientConfigBuilder {
*builder.storage() = storage.into();
*builder.address_filter() = address_filter.into();
*builder.path_rules() = path_rules.into();
*builder.circuit_preemptive() = circuit_preemptive.into();
*builder.circuit_timing() = circuit_timing.into();
*builder.override_net_params() = override_net_params;
*builder.download_schedule() = download_schedule.into();
@ -228,6 +233,8 @@ pub struct ArtiConfigBuilder {
override_net_params: HashMap<String, i32>,
/// Builder for the path_rules section.
path_rules: circ::PathConfigBuilder,
/// Builder for the circuit_preemptive section.
circuit_preemptive: circ::CircuitPreemptiveBuilder,
/// Builder for the circuit_timing section.
circuit_timing: circ::CircuitTimingBuilder,
/// Builder for the address_filter section.
@ -253,6 +260,10 @@ impl ArtiConfigBuilder {
.path_rules
.build()
.map_err(|e| e.within("path_rules"))?;
let circuit_preemptive = self
.circuit_preemptive
.build()
.map_err(|e| e.within("circuit_preemptive"))?;
let circuit_timing = self
.circuit_timing
.build()
@ -269,6 +280,7 @@ impl ArtiConfigBuilder {
download_schedule,
override_net_params,
path_rules,
circuit_preemptive,
circuit_timing,
address_filter,
})
@ -339,6 +351,13 @@ impl ArtiConfigBuilder {
&mut self.path_rules
}
/// Return a mutable reference to a [`CircuitPreemptiveBuilder`](circ::CircuitPreemptiveBuilder).
///
/// This section overrides Arti's rules for preemptive circuits.
pub fn circuit_preemptive(&mut self) -> &mut circ::CircuitPreemptiveBuilder {
&mut self.circuit_preemptive
}
/// Return a mutable reference to a [`CircuitTimingBuilder`](circ::CircuitTimingBuilder).
///
/// This section overrides Arti's rules for deciding how long to use
@ -367,6 +386,7 @@ impl From<ArtiConfig> for ArtiConfigBuilder {
download_schedule: cfg.download_schedule.into(),
override_net_params: cfg.override_net_params,
path_rules: cfg.path_rules.into(),
circuit_preemptive: cfg.circuit_preemptive.into(),
circuit_timing: cfg.circuit_timing.into(),
address_filter: cfg.address_filter.into(),
}
@ -439,6 +459,11 @@ mod test {
bld.path_rules()
.ipv4_subnet_family_prefix(20)
.ipv6_subnet_family_prefix(48);
bld.circuit_preemptive()
.threshold(12)
.ports(vec![80, 443])
.duration(60 * 60)
.min_exit_circs_for_port(2);
bld.circuit_timing()
.max_dirtiness(90 * sec)
.request_timeout(10 * sec)

View File

@ -77,6 +77,37 @@ impl From<PathConfig> for PathConfigBuilder {
}
}
/// Configuration for preemptive circuits.
///
/// This type is immutable once constructed. To create an object of this
/// type, use [`CircuitPreemptiveBuilder`].
#[derive(Debug, Clone, Builder, Deserialize, Eq, PartialEq)]
#[builder(build_fn(error = "ConfigBuildError"))]
#[serde(deny_unknown_fields)]
pub struct CircuitPreemptive {
/// How many circuits should we have before we stop opening circuits
/// preemptively?
#[builder(default = "default_preemptive_threshold()")]
#[serde(default = "default_preemptive_threshold")]
pub(crate) threshold: usize,
/// Which exit ports should we preemptively build circuits through?
#[builder(default = "default_preemptive_ports()")]
#[serde(default = "default_preemptive_ports")]
pub(crate) ports: Vec<u16>,
/// When we see a new port, how long should we have a fast exit for
/// that port?
#[builder(default = "default_preemptive_duration()")]
#[serde(default = "default_preemptive_duration")]
pub(crate) duration: u64,
/// How many circuits should we have at minimum for an exit port?
#[builder(default = "default_preemptive_min_exit_circs_for_port()")]
#[serde(default = "default_preemptive_min_exit_circs_for_port")]
pub(crate) min_exit_circs_for_port: usize,
}
/// Configuration for circuit timeouts, expiration, and so on.
///
/// This type is immutable once constructed. To create an object of this
@ -113,6 +144,26 @@ pub struct CircuitTiming {
pub(crate) request_loyalty: Duration,
}
/// Return default threshold
fn default_preemptive_threshold() -> usize {
12
}
/// Return default target ports
fn default_preemptive_ports() -> Vec<u16> {
vec![80, 443]
}
/// Return default duration
fn default_preemptive_duration() -> u64 {
60 * 60
}
/// Return minimum circuits for an exit port
fn default_preemptive_min_exit_circs_for_port() -> usize {
2
}
/// Return the default value for `max_dirtiness`.
fn default_max_dirtiness() -> Duration {
Duration::from_secs(60 * 10)
@ -162,6 +213,33 @@ impl From<CircuitTiming> for CircuitTimingBuilder {
}
}
impl Default for CircuitPreemptive {
fn default() -> Self {
CircuitPreemptiveBuilder::default()
.build()
.expect("preemptive circuit defaults")
}
}
impl CircuitPreemptive {
/// Return a new [`CircuitPreemptiveBuilder`]
pub fn builder() -> CircuitPreemptiveBuilder {
CircuitPreemptiveBuilder::default()
}
}
impl From<CircuitPreemptive> for CircuitPreemptiveBuilder {
fn from(cfg: CircuitPreemptive) -> CircuitPreemptiveBuilder {
let mut builder = CircuitPreemptiveBuilder::default();
builder
.threshold(cfg.threshold)
.ports(cfg.ports)
.duration(cfg.duration)
.min_exit_circs_for_port(cfg.min_exit_circs_for_port);
builder
}
}
/// Configuration for a circuit manager.
///
/// This configuration includes information about how to build paths
@ -183,6 +261,10 @@ pub struct CircMgrConfig {
/// Timing and retry information related to circuits themselves.
#[builder(default)]
pub(crate) circuit_timing: CircuitTiming,
/// Information related to preemptive circuits.
#[builder(default)]
pub(crate) circuit_preemptive: CircuitPreemptive,
}
impl CircMgrConfig {

View File

@ -75,8 +75,8 @@ pub use err::Error;
pub use usage::{IsolationToken, StreamIsolation, StreamIsolationBuilder, TargetPort};
pub use config::{
CircMgrConfig, CircMgrConfigBuilder, CircuitTiming, CircuitTimingBuilder, PathConfig,
PathConfigBuilder,
CircMgrConfig, CircMgrConfigBuilder, CircuitPreemptive, CircuitPreemptiveBuilder,
CircuitTiming, CircuitTimingBuilder, PathConfig, PathConfigBuilder,
};
use crate::preemptive::PreemptiveCircuitPredictor;
@ -91,9 +91,6 @@ type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::Pareto
/// Key used to load timeout state information.
const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
/// If we have this number or more circuits open, we don't build circuits preemptively any more.
const PREEMPTIVE_CIRCUIT_THRESHOLD: usize = 12;
/// Represents what we know about the Tor network.
///
/// This can either be a complete directory, or a list of fallbacks.
@ -157,6 +154,9 @@ pub struct CircMgr<R: Runtime> {
mgr: Arc<mgr::AbstractCircMgr<build::CircuitBuilder<R>, R>>,
/// A preemptive circuit predictor, for, uh, building circuits preemptively.
predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
/// How many circuits should we have before we stop opening circuits
/// preemptively?
threshold: usize,
}
impl<R: Runtime> CircMgr<R> {
@ -173,13 +173,18 @@ impl<R: Runtime> CircMgr<R> {
let CircMgrConfig {
path_rules,
circuit_timing,
circuit_preemptive,
} = config;
// TODO(eta): don't hardcode!
let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(vec![
TargetPort::ipv4(80),
TargetPort::ipv4(443),
])));
let ports = circuit_preemptive
.ports
.iter()
.map(|p| TargetPort::ipv4(*p))
.collect();
let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
ports,
Duration::from_secs(circuit_preemptive.duration),
)));
let guardmgr = tor_guardmgr::GuardMgr::new(runtime.clone(), storage.clone())?;
@ -196,6 +201,7 @@ impl<R: Runtime> CircMgr<R> {
let circmgr = Arc::new(CircMgr {
mgr: Arc::new(mgr),
predictor: preemptive,
threshold: circuit_preemptive.threshold,
});
runtime.spawn(continually_expire_circuits(
@ -294,7 +300,7 @@ impl<R: Runtime> CircMgr<R> {
/// should ideally be refactored to be internal to this crate, and not be a
/// public API here.
pub async fn launch_circuits_preemptively(&self, netdir: DirInfo<'_>) {
if self.mgr.n_circs() >= PREEMPTIVE_CIRCUIT_THRESHOLD {
if self.mgr.n_circs() >= self.threshold {
return;
}
debug!("Checking preemptive circuit predictions.");

View File

@ -478,6 +478,9 @@ struct CircList<B: AbstractCircBuilder> {
impl<B: AbstractCircBuilder> CircList<B> {
/// Make a new empty `CircList`
///
/// XXXX: We need the exit_circs_for_port since it's used by find_open()
/// which in turn calls find_supported() which uses this variable
fn new() -> Self {
CircList {
open_circs: HashMap::new(),
@ -660,6 +663,7 @@ enum Action<B: AbstractCircBuilder> {
impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
/// Construct a new AbstractCircMgr.
/// XXX: We need the exit_circs_for_port for CircList
pub(crate) fn new(builder: B, runtime: R, circuit_timing: CircuitTiming) -> Self {
let circs = sync::Mutex::new(CircList::new());
let dflt_params = tor_netdir::params::NetParameters::default();
@ -1862,7 +1866,7 @@ mod test {
assert_eq!(
SupportedCircUsage::find_supported(
vec![&mut entry_none, &mut entry_web].into_iter(),
&usage_web
&usage_web,
),
vec![&mut entry_web_c]
);
@ -1870,7 +1874,7 @@ mod test {
assert_eq!(
SupportedCircUsage::find_supported(
vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
&usage_web
&usage_web,
),
vec![&mut entry_web_c, &mut entry_full_c]
);
@ -1879,8 +1883,12 @@ mod test {
let usage_preemptive_web = TargetCircUsage::Preemptive {
port: Some(TargetPort::ipv4(80)),
circs: 2,
};
let usage_preemptive_dns = TargetCircUsage::Preemptive {
port: None,
circs: 2,
};
let usage_preemptive_dns = TargetCircUsage::Preemptive { port: None };
// shouldn't return anything unless there are >=2 circuits

View File

@ -11,29 +11,33 @@ pub(crate) struct PreemptiveCircuitPredictor {
/// used to resolve DNS names instead of building a stream), to the last time we encountered
/// such usage.
usages: HashMap<Option<TargetPort>, Instant>,
/// How long should we have a fast exit for a port?
duration: Duration,
}
impl PreemptiveCircuitPredictor {
/// Create a new predictor, starting out with a set of ports we think are likely to be used.
pub(crate) fn new(starting_ports: Vec<TargetPort>) -> Self {
pub(crate) fn new(starting_ports: Vec<TargetPort>, dur: Duration) -> Self {
let mut usages = HashMap::new();
for sp in starting_ports {
usages.insert(Some(sp), Instant::now());
}
// We want to build circuits for resolving DNS, too.
usages.insert(None, Instant::now());
Self { usages }
let duration = dur;
Self { usages, duration }
}
/// Make some predictions for what circuits should be built.
pub(crate) fn predict(&self) -> Vec<TargetCircUsage> {
// path-spec.txt § 2.1.1: "[Tor] tries to have two fast exit circuits available for every
// port seen within the past hour" (although they can be shared)
let hour_ago = Instant::now() - Duration::from_secs(60 * 60);
let duration = Instant::now() - self.duration;
self.usages
.iter()
.filter(|(_, &time)| time > hour_ago)
.map(|(&port, _)| TargetCircUsage::Preemptive { port })
.filter(|(_, &time)| time > duration)
.map(|(&port, _)| TargetCircUsage::Preemptive { port, circs: 2 })
.collect()
}
@ -50,29 +54,39 @@ mod test {
#[test]
fn predicts_starting_ports() {
let predictor = PreemptiveCircuitPredictor::new(vec![]);
let predictor = PreemptiveCircuitPredictor::new(vec![], Duration::from_secs(2));
let mut results = predictor.predict();
results.sort();
assert_eq!(
predictor.predict(),
vec![TargetCircUsage::Preemptive { port: None }]
vec![TargetCircUsage::Preemptive {
port: None,
circs: 2
}]
);
let predictor =
PreemptiveCircuitPredictor::new(vec![TargetPort::ipv4(80), TargetPort::ipv6(80)]);
let predictor = PreemptiveCircuitPredictor::new(
vec![TargetPort::ipv4(80), TargetPort::ipv6(80)],
Duration::from_secs(2),
);
let mut results = predictor.predict();
results.sort();
assert_eq!(
results,
vec![
TargetCircUsage::Preemptive { port: None },
TargetCircUsage::Preemptive {
port: Some(TargetPort::ipv4(80))
port: None,
circs: 2
},
TargetCircUsage::Preemptive {
port: Some(TargetPort::ipv6(80))
port: Some(TargetPort::ipv4(80)),
circs: 2
},
TargetCircUsage::Preemptive {
port: Some(TargetPort::ipv6(80)),
circs: 2
},
]
);
@ -80,11 +94,14 @@ mod test {
#[test]
fn predicts_used_ports() {
let mut predictor = PreemptiveCircuitPredictor::new(vec![]);
let mut predictor = PreemptiveCircuitPredictor::new(vec![], Duration::from_secs(2));
assert_eq!(
predictor.predict(),
vec![TargetCircUsage::Preemptive { port: None }]
vec![TargetCircUsage::Preemptive {
port: None,
circs: 2
}]
);
predictor.note_usage(Some(TargetPort::ipv4(1234)), Instant::now());
@ -94,9 +111,13 @@ mod test {
assert_eq!(
results,
vec![
TargetCircUsage::Preemptive { port: None },
TargetCircUsage::Preemptive {
port: Some(TargetPort::ipv4(1234))
port: None,
circs: 2
},
TargetCircUsage::Preemptive {
port: Some(TargetPort::ipv4(1234)),
circs: 2
}
]
);
@ -104,14 +125,17 @@ mod test {
#[test]
fn does_not_predict_old_ports() {
let mut predictor = PreemptiveCircuitPredictor::new(vec![]);
let mut predictor = PreemptiveCircuitPredictor::new(vec![], Duration::from_secs(2));
let more_than_an_hour_ago = Instant::now() - Duration::from_secs(60 * 60 + 1);
predictor.note_usage(Some(TargetPort::ipv4(2345)), more_than_an_hour_ago);
assert_eq!(
predictor.predict(),
vec![TargetCircUsage::Preemptive { port: None }]
vec![TargetCircUsage::Preemptive {
port: None,
circs: 2
}]
);
}
}

View File

@ -1,6 +1,7 @@
//! Code related to tracking what activities a circuit can be used for.
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::debug;
@ -27,7 +28,7 @@ pub(crate) struct ExitPolicy {
///
/// Ordinarily, this is a TCP port, plus a flag to indicate whether we
/// must support IPv4 or IPv6.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord, Serialize)]
pub struct TargetPort {
/// True if this is a request to connect to an IPv6 address
ipv6: bool,
@ -235,6 +236,8 @@ pub(crate) enum TargetCircUsage {
///
/// If this is `None`, we just want a circuit capable of doing DNS resolution.
port: Option<TargetPort>,
/// The number of exit circuits needed for a port
circs: usize,
},
}
@ -278,7 +281,7 @@ impl TargetCircUsage {
let (path, mon, usable) = DirPathBuilder::new().pick_path(rng, netdir, guards)?;
Ok((path, SupportedCircUsage::Dir, mon, usable))
}
TargetCircUsage::Preemptive { port } => {
TargetCircUsage::Preemptive { port, .. } => {
// FIXME(eta): this is copypasta from `TargetCircUsage::Exit`.
let (path, mon, usable) = ExitPathBuilder::from_target_ports(port.iter().copied())
.pick_path(rng, netdir, guards, config)?;
@ -352,7 +355,7 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage {
i1.map(|i1| i1.may_share_circuit(i2)).unwrap_or(true)
&& p2.iter().all(|port| p1.allows_port(*port))
}
(Exit { policy, isolation }, TargetCircUsage::Preemptive { port }) => {
(Exit { policy, isolation }, TargetCircUsage::Preemptive { port, .. }) => {
if isolation.is_some() {
// If the circuit has a stream isolation token, we might not be able to use it
// for new streams that don't share it.
@ -403,7 +406,7 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage {
usage: &TargetCircUsage,
) -> Vec<&'b mut OpenEntry<Self, C>> {
match usage {
TargetCircUsage::Preemptive { .. } => {
TargetCircUsage::Preemptive { circs, .. } => {
let supported = abstract_spec_find_supported(list, usage);
// We need to have at least two circuits that support `port` in order
// to reuse them; otherwise, we must create a new circuit, so
@ -413,7 +416,7 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage {
usage,
supported.len()
);
if supported.len() >= 2 {
if supported.len() >= *circs {
supported
} else {
vec![]