Provide ChannelUsage and plumb it all the way down

Channel padding depends on what the channel is being used for.  We
therefore need to let the channel code know this information.

The implementation of the per-channel padding control logic will be in
the new note_usage function, which for now is simply a stub.

A future commit will introduce a `PaddingControlState` which lives in
the channel frontend; consult the doc comment for that type to see why
the plumbing through the channel manager terminates in the channel
frontend.
This commit is contained in:
Ian Jackson 2022-07-26 19:14:12 +01:00
parent 3c23c2333a
commit 8d44ef05dc
9 changed files with 133 additions and 29 deletions

View File

@ -2,6 +2,7 @@
use std::io;
use std::net::SocketAddr;
use std::result::Result as StdResult;
use std::sync::{Arc, Mutex};
use crate::{event::ChanMgrEventSender, Error};
@ -11,6 +12,7 @@ use tor_error::{bad_api_usage, internal};
use tor_linkspec::{HasAddrs, HasRelayIds, OwnedChanTarget};
use tor_llcrypto::pk;
use tor_proto::channel::params::ChannelsParamsUpdates;
use tor_proto::channel::ChannelUsage;
use tor_rtcompat::{tls::TlsConnector, Runtime, TcpProvider, TlsProvider};
use async_trait::async_trait;
@ -254,6 +256,9 @@ impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
fn reparameterize(&mut self, updates: Arc<ChannelsParamsUpdates>) -> tor_proto::Result<()> {
self.reparameterize(updates)
}
fn note_usage(&self, usage: ChannelUsage) -> StdResult<(), tor_error::Bug> {
self.note_usage(usage)
}
}
#[cfg(test)]

View File

@ -67,7 +67,7 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_netdir::{NetDir, NetDirProvider};
use tor_proto::channel::Channel;
use tor_proto::channel::{Channel, ChannelUsage};
use tracing::{debug, error};
use void::{ResultVoidErrExt, Void};
@ -168,6 +168,7 @@ impl<R: Runtime> ChanMgr<R> {
pub async fn get_or_launch<T: ChanTarget + ?Sized>(
&self,
target: &T,
usage: ChannelUsage,
) -> Result<(Channel, ChanProvenance)> {
// TODO(nickm): We will need to change the way that we index our map
// when we eventually support channels that are _not_ primarily
@ -177,7 +178,10 @@ impl<R: Runtime> ChanMgr<R> {
let ed_identity = target.ed_identity().ok_or(Error::MissingId)?;
let targetinfo = OwnedChanTarget::from_chan_target(target);
let (chan, provenance) = self.mgr.get_or_launch(*ed_identity, targetinfo).await?;
let (chan, provenance) = self
.mgr
.get_or_launch(*ed_identity, targetinfo, usage)
.await?;
// Double-check the match to make sure that the RSA identity is
// what we wanted too.
chan.check_match(target)

View File

@ -14,6 +14,7 @@ use std::time::Duration;
use tor_error::internal;
use tor_netdir::NetDir;
use tor_proto::channel::params::ChannelsParamsUpdates;
use tor_proto::channel::ChannelUsage;
mod map;
@ -40,6 +41,9 @@ pub(crate) trait AbstractChannel: Clone {
/// The changed parameters may not be implemented "immediately",
/// but this will be done "reasonably soon".
fn reparameterize(&mut self, updates: Arc<ChannelsParamsUpdates>) -> tor_proto::Result<()>;
/// Note that this channel is about to be used for `usage`
fn note_usage(&self, usage: ChannelUsage) -> StdResult<(), tor_error::Bug>;
}
/// Trait to describe how channels are created.
@ -120,8 +124,10 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> {
&self,
ident: <<CF as ChannelFactory>::Channel as AbstractChannel>::Ident,
target: CF::BuildSpec,
usage: ChannelUsage,
) -> Result<(CF::Channel, ChanProvenance)> {
let chan = self.get_or_launch_internal(ident, target).await?;
chan.0.note_usage(usage)?;
Ok(chan)
}
@ -319,6 +325,7 @@ mod test {
use std::time::Duration;
use tor_error::bad_api_usage;
use tor_proto::channel::ChannelUsage as CU;
use tor_rtcompat::{task::yield_now, test_with_one_runtime, Runtime};
struct FakeChannelFactory<RT> {
@ -353,6 +360,9 @@ mod test {
fn reparameterize(&mut self, _updates: Arc<ChannelsParamsUpdates>) -> tor_proto::Result<()> {
Ok(())
}
fn note_usage(&self, _usage: ChannelUsage) -> StdResult<(), tor_error::Bug> {
Ok(())
}
}
impl FakeChannel {
@ -403,8 +413,8 @@ mod test {
test_with_one_runtime!(|runtime| async {
let mgr = new_test_abstract_chanmgr(runtime);
let target = (413, '!');
let chan1 = mgr.get_or_launch(413, target).await.unwrap().0;
let chan2 = mgr.get_or_launch(413, target).await.unwrap().0;
let chan1 = mgr.get_or_launch(413, target, CU::Exit).await.unwrap().0;
let chan2 = mgr.get_or_launch(413, target, CU::Exit).await.unwrap().0;
assert_eq!(chan1, chan2);
@ -420,7 +430,7 @@ mod test {
// This is set up to always fail.
let target = (999, '❌');
let res1 = mgr.get_or_launch(999, target).await;
let res1 = mgr.get_or_launch(999, target, CU::Exit).await;
assert!(matches!(res1, Err(Error::UnusableTarget(_))));
let chan3 = mgr.get_nowait(&999);
@ -437,12 +447,12 @@ mod test {
// concurrently. Right now it seems that they don't actually
// interact.
let (ch3a, ch3b, ch44a, ch44b, ch86a, ch86b) = join!(
mgr.get_or_launch(3, (3, 'a')),
mgr.get_or_launch(3, (3, 'b')),
mgr.get_or_launch(44, (44, 'a')),
mgr.get_or_launch(44, (44, 'b')),
mgr.get_or_launch(86, (86, '❌')),
mgr.get_or_launch(86, (86, '🔥')),
mgr.get_or_launch(3, (3, 'a'), CU::Exit),
mgr.get_or_launch(3, (3, 'b'), CU::Exit),
mgr.get_or_launch(44, (44, 'a'), CU::Exit),
mgr.get_or_launch(44, (44, 'b'), CU::Exit),
mgr.get_or_launch(86, (86, '❌'), CU::Exit),
mgr.get_or_launch(86, (86, '🔥'), CU::Exit),
);
let ch3a = ch3a.unwrap();
let ch3b = ch3b.unwrap();
@ -466,9 +476,9 @@ mod test {
let mgr = new_test_abstract_chanmgr(runtime);
let (ch3, ch4, ch5) = join!(
mgr.get_or_launch(3, (3, 'a')),
mgr.get_or_launch(4, (4, 'a')),
mgr.get_or_launch(5, (5, 'a')),
mgr.get_or_launch(3, (3, 'a'), CU::Exit),
mgr.get_or_launch(4, (4, 'a'), CU::Exit),
mgr.get_or_launch(5, (5, 'a'), CU::Exit),
);
let ch3 = ch3.unwrap().0;
@ -478,7 +488,7 @@ mod test {
ch3.start_closing();
ch5.start_closing();
let ch3_new = mgr.get_or_launch(3, (3, 'b')).await.unwrap().0;
let ch3_new = mgr.get_or_launch(3, (3, 'b'), CU::Exit).await.unwrap().0;
assert_ne!(ch3, ch3_new);
assert_eq!(ch3_new.mood, 'b');

View File

@ -404,6 +404,7 @@ mod test {
use super::*;
use std::sync::Arc;
use tor_proto::channel::params::ChannelsParamsUpdates;
use tor_proto::channel::ChannelUsage;
fn new_test_channel_map<C: AbstractChannel>() -> ChannelMap<C> {
ChannelMap::new(Default::default())
@ -431,6 +432,9 @@ mod test {
self.params_update = Some(update);
Ok(())
}
fn note_usage(&self, _usage: ChannelUsage) -> StdResult<(), tor_error::Bug> {
Ok(())
}
}
fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
let channel = FakeChannel {

View File

@ -15,6 +15,7 @@ use std::time::{Duration, Instant};
use tor_chanmgr::{ChanMgr, ChanProvenance};
use tor_guardmgr::GuardStatus;
use tor_linkspec::{ChanTarget, OwnedChanTarget, OwnedCircTarget};
use tor_proto::channel::ChannelUsage;
use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc};
use tor_rtcompat::{Runtime, SleepProviderExt};
@ -42,6 +43,7 @@ pub(crate) trait Buildable: Sized {
guard_status: &GuardStatusHandle,
ct: &OwnedChanTarget,
params: &CircParameters,
usage: ChannelUsage,
) -> Result<Self>;
/// Launch a new circuit through a given relay, given a circuit target
@ -52,6 +54,7 @@ pub(crate) trait Buildable: Sized {
guard_status: &GuardStatusHandle,
ct: &OwnedCircTarget,
params: &CircParameters,
usage: ChannelUsage,
) -> Result<Self>;
/// Extend this circuit-like object by one hop, to the location described
@ -74,9 +77,10 @@ async fn create_common<RT: Runtime, CT: ChanTarget>(
rt: &RT,
target: &CT,
guard_status: &GuardStatusHandle,
usage: ChannelUsage,
) -> Result<PendingClientCirc> {
// Get or construct the channel.
let result = chanmgr.get_or_launch(target).await;
let result = chanmgr.get_or_launch(target, usage).await;
// Report the clock skew if appropriate, and exit if there has been an error.
let chan = match result {
@ -118,8 +122,9 @@ impl Buildable for ClientCirc {
guard_status: &GuardStatusHandle,
ct: &OwnedChanTarget,
params: &CircParameters,
usage: ChannelUsage,
) -> Result<Self> {
let circ = create_common(chanmgr, rt, ct, guard_status).await?;
let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
circ.create_firsthop_fast(params)
.await
.map_err(|error| Error::Protocol {
@ -134,8 +139,9 @@ impl Buildable for ClientCirc {
guard_status: &GuardStatusHandle,
ct: &OwnedCircTarget,
params: &CircParameters,
usage: ChannelUsage,
) -> Result<Self> {
let circ = create_common(chanmgr, rt, ct, guard_status).await?;
let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?;
circ.create_firsthop_ntor(ct, params.clone())
.await
.map_err(|error| Error::Protocol {
@ -208,6 +214,7 @@ impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
start_time: Instant,
n_hops_built: Arc<AtomicU32>,
guard_status: Arc<GuardStatusHandle>,
usage: ChannelUsage,
) -> Result<C> {
match path {
OwnedPath::ChannelOnly(target) => {
@ -219,6 +226,7 @@ impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
&guard_status,
&target,
&params,
usage,
)
.await?;
self.timeouts
@ -231,8 +239,15 @@ impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
let n_hops = p.len() as u8;
// If we fail now, it's the guard's fault.
guard_status.pending(GuardStatus::Failure);
let circ =
C::create(&self.chanmgr, &self.runtime, &guard_status, &p[0], &params).await?;
let circ = C::create(
&self.chanmgr,
&self.runtime,
&guard_status,
&p[0],
&params,
usage,
)
.await?;
self.timeouts
.note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
// If we fail after this point, we can't tell whether it's
@ -261,6 +276,7 @@ impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
path: OwnedPath,
params: &CircParameters,
guard_status: Arc<GuardStatusHandle>,
usage: ChannelUsage,
) -> Result<C> {
let action = Action::BuildCircuit { length: path.len() };
let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
@ -280,6 +296,7 @@ impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
start_time,
Arc::clone(&hops_built),
guard_status,
usage,
);
match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
@ -397,8 +414,11 @@ impl<R: Runtime> CircuitBuilder<R> {
path: OwnedPath,
params: &CircParameters,
guard_status: Arc<GuardStatusHandle>,
usage: ChannelUsage,
) -> Result<ClientCirc> {
self.builder.build_owned(path, params, guard_status).await
self.builder
.build_owned(path, params, guard_status, usage)
.await
}
/// Try to construct a new circuit from a given path, using appropriate
@ -407,9 +427,15 @@ impl<R: Runtime> CircuitBuilder<R> {
/// This circuit is _not_ automatically registered with any
/// circuit manager; if you don't hang on it it, it will
/// automatically go away when the last reference is dropped.
pub async fn build(&self, path: &TorPath<'_>, params: &CircParameters) -> Result<ClientCirc> {
pub async fn build(
&self,
path: &TorPath<'_>,
params: &CircParameters,
usage: ChannelUsage,
) -> Result<ClientCirc> {
let owned = path.try_into()?;
self.build_owned(owned, params, Arc::new(None.into())).await
self.build_owned(owned, params, Arc::new(None.into()), usage)
.await
}
/// Return true if this builder is currently learning timeout info.
@ -484,6 +510,7 @@ mod test {
use std::sync::Mutex;
use tor_linkspec::{HasRelayIds, RelayIdType, RelayIds};
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_proto::channel::ChannelUsage as CU;
use tor_rtcompat::{test_with_all_runtimes, SleepProvider};
use tracing::trace;
@ -661,6 +688,7 @@ mod test {
_guard_status: &GuardStatusHandle,
ct: &OwnedChanTarget,
_: &CircParameters,
_usage: ChannelUsage,
) -> Result<Self> {
let (d1, d2) = timeouts_from_chantarget(ct);
rt.sleep(d1).await;
@ -680,6 +708,7 @@ mod test {
_guard_status: &GuardStatusHandle,
ct: &OwnedCircTarget,
_: &CircParameters,
_usage: ChannelUsage,
) -> Result<Self> {
let (d1, d2) = timeouts_from_chantarget(ct);
rt.sleep(d1).await;
@ -795,6 +824,7 @@ mod test {
advance_initial: Duration,
path: OwnedPath,
advance_on_timeout: Option<(Duration, Duration)>,
usage: ChannelUsage,
) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
let chanmgr = Arc::new(ChanMgr::new(rt.clone(), Default::default()));
// always has 3 second timeout, 100 second abandon.
@ -814,7 +844,7 @@ mod test {
rt.block_advance("manually controlling advances");
rt.allow_one_advance(advance_initial);
let outcome = rt
.wait_for(Arc::new(builder).build_owned(path, &params, gs()))
.wait_for(Arc::new(builder).build_owned(path, &params, gs(), usage))
.await;
// Now we wait for a success to finally, finally be reported.
@ -837,7 +867,7 @@ mod test {
let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
let (outcome, timeouts) =
run_builder_test(rt, Duration::from_millis(100), path, None).await;
run_builder_test(rt, Duration::from_millis(100), path, None, CU::Exit).await;
let circ = outcome.unwrap();
assert!(circ.onehop);
assert_eq!(circ.hops.len(), 1);
@ -863,7 +893,7 @@ mod test {
OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
let (outcome, timeouts) =
run_builder_test(rt, Duration::from_millis(100), path, None).await;
run_builder_test(rt, Duration::from_millis(100), path, None, CU::Exit).await;
let circ = outcome.unwrap();
assert!(!circ.onehop);
assert_eq!(circ.hops.len(), 3);
@ -891,7 +921,7 @@ mod test {
let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
let (outcome, timeouts) =
run_builder_test(rt, Duration::from_millis(100), path, None).await;
run_builder_test(rt, Duration::from_millis(100), path, None, CU::Exit).await;
assert!(matches!(outcome, Err(Error::CircTimeout)));
assert_eq!(timeouts.len(), 1);
@ -922,6 +952,7 @@ mod test {
Duration::from_millis(100),
path,
Some(timeout_advance),
CU::Exit,
)
.await;
assert!(matches!(outcome, Err(Error::CircTimeout)));

View File

@ -1,6 +1,6 @@
//! Implement traits from [`crate::mgr`] for the circuit types we use.
use crate::mgr::{self, MockablePlan};
use crate::mgr::{self, AbstractSpec, MockablePlan};
use crate::path::OwnedPath;
use crate::usage::{SupportedCircUsage, TargetCircUsage};
use crate::{DirInfo, Error, Result};
@ -101,7 +101,12 @@ impl<R: Runtime> crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilde
// This will probably require a different API for circuit
// construction.
match self
.build_owned(path, &params, Arc::clone(&guard_status))
.build_owned(
path,
&params,
Arc::clone(&guard_status),
final_spec.channel_usage(),
)
.await
{
Ok(circuit) => {

View File

@ -29,6 +29,7 @@ use retry_error::RetryError;
use tor_basic_utils::retry::RetryDelay;
use tor_config::MutCfg;
use tor_error::{internal, AbsRetryTime, HasRetryTime};
use tor_proto::channel::ChannelUsage;
use tor_rtcompat::{Runtime, SleepProviderExt};
use async_trait::async_trait;
@ -114,6 +115,9 @@ pub(crate) trait AbstractSpec: Clone + Debug {
) -> Vec<&'b mut OpenEntry<Self, C>> {
abstract_spec_find_supported(list, usage)
}
/// How the circuit will be used, for use by the channel
fn channel_usage(&self) -> ChannelUsage;
}
/// An error type returned by [`AbstractSpec::restrict_mut`]
@ -1512,6 +1516,9 @@ mod test {
self.isolation = new_iso;
Ok(())
}
fn channel_usage(&self) -> ChannelUsage {
ChannelUsage::Exit
}
}
impl FakeSpec {

View File

@ -11,6 +11,7 @@ use crate::path::{dirpath::DirPathBuilder, exitpath::ExitPathBuilder, TorPath};
use tor_guardmgr::{GuardMgr, GuardMonitor, GuardUsable};
use tor_netdir::Relay;
use tor_netdoc::types::policy::PortPolicy;
use tor_proto::channel::ChannelUsage;
use tor_rtcompat::Runtime;
use crate::isolation::{IsolationHelper, StreamIsolation};
@ -355,6 +356,16 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage {
_ => abstract_spec_find_supported(list, usage),
}
}
fn channel_usage(&self) -> ChannelUsage {
use ChannelUsage as CU;
use SupportedCircUsage as SCU;
match self {
SCU::Dir => CU::Dir,
SCU::Exit { .. } => CU::Exit,
SCU::NoUsage => CU::UselessCircuit,
}
}
}
#[cfg(test)]

View File

@ -121,6 +121,27 @@ pub struct Channel {
details: Arc<ChannelDetails>,
}
/// How a channel is going to be used
///
/// A channel may be used in multiple ways. Each time it is (re)used, a separate
/// ChannelUsage is passed in.
///
/// This has the same variants as `tor_circmgr::usage::TargCircUsage`
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
#[non_exhaustive]
pub enum ChannelUsage {
/// Use for BEGINDIR-based non-anonymous directory connections
Dir,
/// Use to exit
///
/// Includes a circuit being constructed preemptively.
Exit,
/// For a channel which is for circuit(s) which cannot be used
UselessCircuit,
}
/// This is information shared between the reactor and the frontend.
///
/// This exists to make `Channel` cheap to clone, which is desirable because every circuit wants
@ -329,6 +350,12 @@ impl Channel {
Ok(())
}
/// Note that this channel is about to be used for `usage`
pub fn note_usage(&self, _usage: ChannelUsage) -> StdResult<(), tor_error::Bug> {
// TODO
Ok(())
}
/// Reparameterise (update parameters; reconfigure)
///
/// Returns `Err` if the channel was closed earlier