Merge branch 'channel' into 'main'
Plumb channel padding timing parameters from netdir to tor-proto See merge request tpo/core/arti!586
This commit is contained in:
commit
0bd375cb34
|
@ -3514,10 +3514,13 @@ dependencies = [
|
|||
"tor-error",
|
||||
"tor-linkspec",
|
||||
"tor-llcrypto",
|
||||
"tor-netdir",
|
||||
"tor-proto",
|
||||
"tor-rtcompat",
|
||||
"tor-rtmock",
|
||||
"tor-units",
|
||||
"tracing",
|
||||
"void",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -3869,6 +3872,7 @@ dependencies = [
|
|||
"bytes",
|
||||
"cipher",
|
||||
"coarsetime",
|
||||
"derive_builder_fork_arti",
|
||||
"digest 0.10.3",
|
||||
"educe",
|
||||
"futures",
|
||||
|
@ -3897,6 +3901,7 @@ dependencies = [
|
|||
"tor-protover",
|
||||
"tor-rtcompat",
|
||||
"tor-rtmock",
|
||||
"tor-units",
|
||||
"tracing",
|
||||
"typenum",
|
||||
"zeroize",
|
||||
|
|
|
@ -388,7 +388,7 @@ impl<R: Runtime> TorClient<R> {
|
|||
|
||||
periodic_task_handles.extend(
|
||||
chanmgr
|
||||
.launch_background_tasks(&runtime)
|
||||
.launch_background_tasks(&runtime, dirmgr.clone().upcast_arc())
|
||||
.map_err(ErrorDetail::ChanMgrSetup)?
|
||||
.into_iter(),
|
||||
);
|
||||
|
|
|
@ -25,14 +25,17 @@ tor-basic-utils = { path = "../tor-basic-utils", version = "0.3.1" }
|
|||
tor-error = { path = "../tor-error", version = "0.3.1" }
|
||||
tor-linkspec = { path = "../tor-linkspec", version = "0.3.0" }
|
||||
tor-llcrypto = { path = "../tor-llcrypto", version = "0.3.0" }
|
||||
tor-netdir = { path = "../tor-netdir", version = "0.3.0" }
|
||||
tor-proto = { path = "../tor-proto", version = "0.3.1" }
|
||||
tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0" }
|
||||
tor-units = { path = "../tor-units", version = "0.3.0" }
|
||||
tracing = "0.1.18"
|
||||
void = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
float_eq = "1.0.0"
|
||||
futures-await-test = "0.3.0"
|
||||
hex-literal = "0.3"
|
||||
tor-netdir = { path = "../tor-netdir", version = "0.3.0", features = ["testing"] }
|
||||
tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0", features = ["tokio", "native-tls"] }
|
||||
tor-rtmock = { path = "../tor-rtmock", version = "0.4.0" }
|
||||
|
||||
|
|
|
@ -6,10 +6,12 @@ use std::sync::{Arc, Mutex};
|
|||
|
||||
use crate::{event::ChanMgrEventSender, Error};
|
||||
|
||||
use std::result::Result as StdResult;
|
||||
use std::time::Duration;
|
||||
use tor_error::{bad_api_usage, internal};
|
||||
use tor_linkspec::{ChanTarget, OwnedChanTarget};
|
||||
use tor_llcrypto::pk;
|
||||
use tor_proto::channel::params::ChannelsParamsUpdates;
|
||||
use tor_rtcompat::{tls::TlsConnector, Runtime, TcpProvider, TlsProvider};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
@ -236,6 +238,9 @@ impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
|
|||
fn duration_unused(&self) -> Option<Duration> {
|
||||
self.duration_unused()
|
||||
}
|
||||
fn reparameterize(&mut self, updates: Arc<ChannelsParamsUpdates>) -> StdResult<(), ()> {
|
||||
self.reparameterize(updates).map_err(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -55,12 +55,16 @@ mod mgr;
|
|||
#[cfg(test)]
|
||||
mod testing;
|
||||
|
||||
use futures::select_biased;
|
||||
use futures::task::SpawnExt;
|
||||
use futures::StreamExt;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
use tor_linkspec::{ChanTarget, OwnedChanTarget};
|
||||
use tor_netdir::NetDirProvider;
|
||||
use tor_proto::channel::Channel;
|
||||
use tracing::{debug, error};
|
||||
use void::{ResultVoidErrExt, Void};
|
||||
|
||||
pub use err::Error;
|
||||
|
||||
|
@ -112,10 +116,22 @@ impl<R: Runtime> ChanMgr<R> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Launch the periodic daemon task required by the manager to function properly.
|
||||
/// Launch the periodic daemon tasks required by the manager to function properly.
|
||||
///
|
||||
/// Returns a [`TaskHandle`] that can be used to manage the daemon task.
|
||||
pub fn launch_background_tasks(self: &Arc<Self>, runtime: &R) -> Result<Vec<TaskHandle>> {
|
||||
/// Returns a [`TaskHandle`] that can be used to manage
|
||||
/// those daemon tasks that poll periodically.
|
||||
pub fn launch_background_tasks(
|
||||
self: &Arc<Self>,
|
||||
runtime: &R,
|
||||
netdir: Arc<dyn NetDirProvider>,
|
||||
) -> Result<Vec<TaskHandle>> {
|
||||
runtime
|
||||
.spawn(Self::continually_update_channels_config(
|
||||
Arc::downgrade(self),
|
||||
netdir,
|
||||
))
|
||||
.map_err(|e| Error::from_spawn("channels config task", e))?;
|
||||
|
||||
let (sched, handle) = TaskSchedule::new(runtime.clone());
|
||||
runtime
|
||||
.spawn(Self::continually_expire_channels(
|
||||
|
@ -163,6 +179,49 @@ impl<R: Runtime> ChanMgr<R> {
|
|||
self.mgr.expire_channels()
|
||||
}
|
||||
|
||||
/// Watch for things that ought to change the configuration of all channels in the client
|
||||
///
|
||||
/// Currently this handles enabling and disabling channel padding.
|
||||
///
|
||||
/// This is a daemon task that runs indefinitely in the background,
|
||||
/// and exits when we find that `chanmgr` is dropped.
|
||||
async fn continually_update_channels_config(
|
||||
self_: Weak<Self>,
|
||||
netdir: Arc<dyn NetDirProvider>,
|
||||
) {
|
||||
use tor_netdir::DirEvent as DE;
|
||||
let mut netdir_stream = netdir.events().fuse();
|
||||
let netdir = {
|
||||
let weak = Arc::downgrade(&netdir);
|
||||
drop(netdir);
|
||||
weak
|
||||
};
|
||||
let termination_reason: std::result::Result<Void, &str> = async move {
|
||||
loop {
|
||||
select_biased! {
|
||||
direvent = netdir_stream.next() => {
|
||||
let direvent = direvent.ok_or("EOF on netdir provider event stream")?;
|
||||
if ! matches!(direvent, DE::NewConsensus) { continue };
|
||||
let self_ = self_.upgrade().ok_or("channel manager gone away")?;
|
||||
let netdir = netdir.upgrade().ok_or("netdir gone away")?;
|
||||
let netdir = netdir.latest_netdir();
|
||||
let netdir = if let Some(nd) = netdir { nd } else { continue };
|
||||
self_.mgr.channels.process_updated_netdir(netdir).map_err(|e| {
|
||||
error!("continually_update_channels_config: failed to process! {} {:?}",
|
||||
&e, &e);
|
||||
"error processing netdir"
|
||||
})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.await;
|
||||
debug!(
|
||||
"continually_update_channels_config: shutting down: {}",
|
||||
termination_reason.void_unwrap_err()
|
||||
);
|
||||
}
|
||||
|
||||
/// Periodically expire any channels that have been unused beyond
|
||||
/// the maximum duration allowed.
|
||||
///
|
||||
|
|
|
@ -8,8 +8,11 @@ use futures::channel::oneshot;
|
|||
use futures::future::{FutureExt, Shared};
|
||||
use rand::Rng;
|
||||
use std::hash::Hash;
|
||||
use std::result::Result as StdResult;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tor_error::internal;
|
||||
use tor_proto::channel::params::ChannelsParamsUpdates;
|
||||
|
||||
mod map;
|
||||
|
||||
|
@ -30,6 +33,14 @@ pub(crate) trait AbstractChannel: Clone {
|
|||
/// Return the amount of time a channel has not been in use.
|
||||
/// Return None if the channel is currently in use.
|
||||
fn duration_unused(&self) -> Option<Duration>;
|
||||
|
||||
/// Reparameterise this channel according to the provided `ChannelsParamsUpdates`
|
||||
///
|
||||
/// The changed parameters may not be implemented "immediately",
|
||||
/// but this will be done "reasonably soon".
|
||||
///
|
||||
/// Returns `Err` (only) if the channel was closed earlier.
|
||||
fn reparameterize(&mut self, updates: Arc<ChannelsParamsUpdates>) -> StdResult<(), ()>;
|
||||
}
|
||||
|
||||
/// Trait to describe how channels are created.
|
||||
|
@ -63,7 +74,7 @@ pub(crate) struct AbstractChanMgr<CF: ChannelFactory> {
|
|||
connector: CF,
|
||||
|
||||
/// A map from ed25519 identity to channel, or to pending channel status.
|
||||
channels: map::ChannelMap<CF::Channel>,
|
||||
pub(crate) channels: map::ChannelMap<CF::Channel>,
|
||||
}
|
||||
|
||||
/// Type alias for a future that we wait on to see when a pending
|
||||
|
@ -195,18 +206,28 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> {
|
|||
},
|
||||
// We need to launch a channel.
|
||||
Action::Launch(send) => match self.connector.build_channel(&target).await {
|
||||
Ok(chan) => {
|
||||
Ok(mut chan) => {
|
||||
// The channel got built: remember it, tell the
|
||||
// others, and return it.
|
||||
self.channels.replace(
|
||||
ident.clone(),
|
||||
Open(OpenEntry {
|
||||
channel: chan.clone(),
|
||||
max_unused_duration: Duration::from_secs(
|
||||
rand::thread_rng().gen_range(180..270),
|
||||
),
|
||||
}),
|
||||
)?;
|
||||
self.channels
|
||||
.replace_with_params(ident.clone(), |channels_params| {
|
||||
// This isn't great. We context switch to the newly-created
|
||||
// channel just to tell it how and whether to do padding. Ideally
|
||||
// we would pass the params at some suitable point during
|
||||
// building. However, that would involve the channel taking a
|
||||
// copy of the params, and that must happen in the same channel
|
||||
// manager lock acquisition span as the one where we insert the
|
||||
// channel into the table so it will receive updates. I.e.,
|
||||
// here.
|
||||
chan.reparameterize(channels_params.total_update().into())
|
||||
.map_err(|()| internal!("new channel already closed"))?;
|
||||
Ok(Open(OpenEntry {
|
||||
channel: chan.clone(),
|
||||
max_unused_duration: Duration::from_secs(
|
||||
rand::thread_rng().gen_range(180..270),
|
||||
),
|
||||
}))
|
||||
})?;
|
||||
// It's okay if all the receivers went away:
|
||||
// that means that nobody was waiting for this channel.
|
||||
let _ignore_err = send.send(Ok(chan.clone()));
|
||||
|
@ -297,6 +318,9 @@ mod test {
|
|||
fn duration_unused(&self) -> Option<Duration> {
|
||||
None
|
||||
}
|
||||
fn reparameterize(&mut self, _updates: Arc<ChannelsParamsUpdates>) -> StdResult<(), ()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FakeChannel {
|
||||
|
|
|
@ -6,20 +6,42 @@ use super::{AbstractChannel, Pending};
|
|||
use crate::{Error, Result};
|
||||
|
||||
use std::collections::{hash_map, HashMap};
|
||||
use tor_error::internal;
|
||||
use std::result::Result as StdResult;
|
||||
use std::sync::Arc;
|
||||
use tor_error::{internal, into_internal};
|
||||
use tor_netdir::{params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND, NetDir};
|
||||
use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
|
||||
use tor_proto::ChannelsParams;
|
||||
use tor_units::BoundedInt32;
|
||||
use tracing::info;
|
||||
|
||||
/// A map from channel id to channel state.
|
||||
/// A map from channel id to channel state, plus necessary auxiliary state
|
||||
///
|
||||
/// We make this a separate type instead of just using
|
||||
/// `Mutex<HashMap<...>>` to limit the amount of code that can see and
|
||||
/// lock the Mutex here. (We're using a blocking mutex close to async
|
||||
/// code, so we need to be careful.)
|
||||
pub(crate) struct ChannelMap<C: AbstractChannel> {
|
||||
/// The data, within a lock
|
||||
inner: std::sync::Mutex<Inner<C>>,
|
||||
}
|
||||
|
||||
/// A map from channel id to channel state, plus necessary auxiliary state - inside lock
|
||||
struct Inner<C: AbstractChannel> {
|
||||
/// A map from identity to channel, or to pending channel status.
|
||||
///
|
||||
/// (Danger: this uses a blocking mutex close to async code. This mutex
|
||||
/// must never be held while an await is happening.)
|
||||
channels: std::sync::Mutex<HashMap<C::Ident, ChannelState<C>>>,
|
||||
channels: HashMap<C::Ident, ChannelState<C>>,
|
||||
|
||||
/// Parameters for channels that we create, and that all existing channels are using
|
||||
///
|
||||
/// Will be updated by a background task, which also notifies all existing
|
||||
/// `Open` channels via `channels`.
|
||||
///
|
||||
/// (Must be protected by the same lock as `channels`, or a channel might be
|
||||
/// created using being-replaced parameters, but not get an update.)
|
||||
channels_params: ChannelsParams,
|
||||
}
|
||||
|
||||
/// Structure that can only be constructed from within this module.
|
||||
|
@ -70,9 +92,9 @@ impl<C: Clone> ChannelState<C> {
|
|||
/// For testing: either give the Open channel inside this state,
|
||||
/// or panic if there is none.
|
||||
#[cfg(test)]
|
||||
fn unwrap_open(&self) -> C {
|
||||
fn unwrap_open(&mut self) -> &mut C {
|
||||
match self {
|
||||
ChannelState::Open(ent) => ent.clone().channel,
|
||||
ChannelState::Open(ent) => &mut ent.channel,
|
||||
_ => panic!("Not an open channel"),
|
||||
}
|
||||
}
|
||||
|
@ -125,41 +147,71 @@ impl<C: AbstractChannel> ChannelState<C> {
|
|||
impl<C: AbstractChannel> ChannelMap<C> {
|
||||
/// Create a new empty ChannelMap.
|
||||
pub(crate) fn new() -> Self {
|
||||
let channels_params = ChannelsParams::default();
|
||||
ChannelMap {
|
||||
channels: std::sync::Mutex::new(HashMap::new()),
|
||||
inner: std::sync::Mutex::new(Inner {
|
||||
channels: HashMap::new(),
|
||||
channels_params,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the channel state for the given identity, if any.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn get(&self, ident: &C::Ident) -> Result<Option<ChannelState<C>>> {
|
||||
let map = self.channels.lock()?;
|
||||
map.get(ident).map(ChannelState::clone_ref).transpose()
|
||||
let inner = self.inner.lock()?;
|
||||
inner
|
||||
.channels
|
||||
.get(ident)
|
||||
.map(ChannelState::clone_ref)
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Replace the channel state for `ident` with `newval`, and return the
|
||||
/// previous value if any.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn replace(
|
||||
&self,
|
||||
ident: C::Ident,
|
||||
newval: ChannelState<C>,
|
||||
) -> Result<Option<ChannelState<C>>> {
|
||||
newval.check_ident(&ident)?;
|
||||
let mut map = self.channels.lock()?;
|
||||
Ok(map.insert(ident, newval))
|
||||
let mut inner = self.inner.lock()?;
|
||||
Ok(inner.channels.insert(ident, newval))
|
||||
}
|
||||
|
||||
/// Replace the channel state for `ident` with the return value from `func`,
|
||||
/// and return the previous value if any.
|
||||
///
|
||||
/// Passes a snapshot of the current global channels parameters to `func`.
|
||||
/// If those parameters are copied by `func` into an [`AbstractChannel`]
|
||||
/// `func` must ensure that that `AbstractChannel` is returned,
|
||||
/// so that it will be properly registered and receive params updates.
|
||||
pub(crate) fn replace_with_params<F>(
|
||||
&self,
|
||||
ident: C::Ident,
|
||||
func: F,
|
||||
) -> Result<Option<ChannelState<C>>>
|
||||
where
|
||||
F: FnOnce(&ChannelsParams) -> Result<ChannelState<C>>,
|
||||
{
|
||||
let mut inner = self.inner.lock()?;
|
||||
let newval = func(&inner.channels_params)?;
|
||||
newval.check_ident(&ident)?;
|
||||
Ok(inner.channels.insert(ident, newval))
|
||||
}
|
||||
|
||||
/// Remove and return the state for `ident`, if any.
|
||||
pub(crate) fn remove(&self, ident: &C::Ident) -> Result<Option<ChannelState<C>>> {
|
||||
let mut map = self.channels.lock()?;
|
||||
Ok(map.remove(ident))
|
||||
let mut inner = self.inner.lock()?;
|
||||
Ok(inner.channels.remove(ident))
|
||||
}
|
||||
|
||||
/// Remove every unusable state from the map.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn remove_unusable(&self) -> Result<()> {
|
||||
let mut map = self.channels.lock()?;
|
||||
map.retain(|_, state| match state {
|
||||
let mut inner = self.inner.lock()?;
|
||||
inner.channels.retain(|_, state| match state {
|
||||
ChannelState::Poisoned(_) => false,
|
||||
ChannelState::Open(ent) => ent.channel.is_usable(),
|
||||
ChannelState::Building(_) => true,
|
||||
|
@ -186,8 +238,8 @@ impl<C: AbstractChannel> ChannelMap<C> {
|
|||
F: FnOnce(Option<ChannelState<C>>) -> (Option<ChannelState<C>>, V),
|
||||
{
|
||||
use hash_map::Entry::*;
|
||||
let mut map = self.channels.lock()?;
|
||||
let entry = map.entry(ident.clone());
|
||||
let mut inner = self.inner.lock()?;
|
||||
let entry = inner.channels.entry(ident.clone());
|
||||
match entry {
|
||||
Occupied(mut occupied) => {
|
||||
// Temporarily replace the entry for this identity with
|
||||
|
@ -217,29 +269,113 @@ impl<C: AbstractChannel> ChannelMap<C> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Handle a `NetDir` update (by reparameterising channels as needed)
|
||||
pub(crate) fn process_updated_netdir(&self, netdir: Arc<tor_netdir::NetDir>) -> Result<()> {
|
||||
use ChannelState as CS;
|
||||
|
||||
// TODO support dormant mode
|
||||
// TODO when entering/leaving dormant mode, send CELL_PADDING_NEGOTIATE to peers
|
||||
|
||||
// TODO when we support operation as a relay, inter-relay channels ought
|
||||
// not to get padding.
|
||||
let padding_parameters = {
|
||||
let mut p = PaddingParametersBuilder::default();
|
||||
update_padding_parameters_from_netdir(&mut p, &netdir).unwrap_or_else(|e| {
|
||||
info!(
|
||||
"consensus channel padding parameters wrong, using defaults: {}",
|
||||
&e,
|
||||
);
|
||||
});
|
||||
let p = p
|
||||
.build()
|
||||
.map_err(into_internal!("failed to build padding parameters"))?;
|
||||
|
||||
// Drop the `Arc<NetDir>` as soon as we have got what we need from it,
|
||||
// before we take the channel map lock.
|
||||
drop(netdir);
|
||||
p
|
||||
};
|
||||
|
||||
let mut inner = self.inner.lock()?;
|
||||
let update = inner
|
||||
.channels_params
|
||||
.start_update()
|
||||
.padding_parameters(padding_parameters)
|
||||
.finish();
|
||||
let update = if let Some(u) = update {
|
||||
u
|
||||
} else {
|
||||
return Ok(());
|
||||
};
|
||||
let update = Arc::new(update);
|
||||
|
||||
for channel in inner.channels.values_mut() {
|
||||
let channel = match channel {
|
||||
CS::Open(OpenEntry { channel, .. }) => channel,
|
||||
CS::Building(_) | CS::Poisoned(_) => continue,
|
||||
};
|
||||
// Ignore error (which simply means the channel is closed or gone)
|
||||
let _ = channel.reparameterize(update.clone());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Expire all channels that have been unused for too long.
|
||||
///
|
||||
/// Return a Duration until the next time at which
|
||||
/// a channel _could_ expire.
|
||||
pub(crate) fn expire_channels(&self) -> Duration {
|
||||
let mut ret = Duration::from_secs(180);
|
||||
self.channels
|
||||
self.inner
|
||||
.lock()
|
||||
.expect("Poisoned lock")
|
||||
.channels
|
||||
.retain(|_id, chan| !chan.ready_to_expire(&mut ret));
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a `NetDir`, update a `PaddingParametersBuilder` with channel padding parameters
|
||||
fn update_padding_parameters_from_netdir(
|
||||
p: &mut PaddingParametersBuilder,
|
||||
netdir: &NetDir,
|
||||
) -> StdResult<(), &'static str> {
|
||||
let params = netdir.params();
|
||||
// TODO support reduced padding via global client config,
|
||||
// TODO and with reduced padding, send CELL_PADDING_NEGOTIATE
|
||||
let (low, high) = (¶ms.nf_ito_low, ¶ms.nf_ito_high);
|
||||
|
||||
let conv_timing_param =
|
||||
|bounded: BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>| bounded.get().try_into();
|
||||
let low = low
|
||||
.try_map(conv_timing_param)
|
||||
.map_err(|_| "low value out of range?!")?;
|
||||
let high = high
|
||||
.try_map(conv_timing_param)
|
||||
.map_err(|_| "high value out of range?!")?;
|
||||
|
||||
if high > low {
|
||||
return Err("high > low");
|
||||
}
|
||||
|
||||
p.low_ms(low);
|
||||
p.high_ms(high);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
#![allow(clippy::unwrap_used)]
|
||||
use super::*;
|
||||
use std::result::Result as StdResult;
|
||||
use std::sync::Arc;
|
||||
use tor_proto::channel::params::ChannelsParamsUpdates;
|
||||
#[derive(Eq, PartialEq, Clone, Debug)]
|
||||
struct FakeChannel {
|
||||
ident: &'static str,
|
||||
usable: bool,
|
||||
unused_duration: Option<u64>,
|
||||
params_update: Option<Arc<ChannelsParamsUpdates>>,
|
||||
}
|
||||
impl AbstractChannel for FakeChannel {
|
||||
type Ident = u8;
|
||||
|
@ -252,12 +388,17 @@ mod test {
|
|||
fn duration_unused(&self) -> Option<Duration> {
|
||||
self.unused_duration.map(Duration::from_secs)
|
||||
}
|
||||
fn reparameterize(&mut self, update: Arc<ChannelsParamsUpdates>) -> StdResult<(), ()> {
|
||||
self.params_update = Some(update);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
|
||||
let channel = FakeChannel {
|
||||
ident,
|
||||
usable: true,
|
||||
unused_duration: None,
|
||||
params_update: None,
|
||||
};
|
||||
ChannelState::Open(OpenEntry {
|
||||
channel,
|
||||
|
@ -273,6 +414,7 @@ mod test {
|
|||
ident,
|
||||
usable: true,
|
||||
unused_duration,
|
||||
params_update: None,
|
||||
};
|
||||
ChannelState::Open(OpenEntry {
|
||||
channel,
|
||||
|
@ -284,6 +426,7 @@ mod test {
|
|||
ident,
|
||||
usable: false,
|
||||
unused_duration: None,
|
||||
params_update: None,
|
||||
};
|
||||
ChannelState::Open(OpenEntry {
|
||||
channel,
|
||||
|
@ -390,6 +533,57 @@ mod test {
|
|||
assert!(matches!(map.get(&b'G'), Err(Error::Internal(_))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reparameterise_via_netdir() {
|
||||
let map = ChannelMap::new();
|
||||
|
||||
// Set some non-default parameters so that we can tell when an update happens
|
||||
let _ = map
|
||||
.inner
|
||||
.lock()
|
||||
.unwrap()
|
||||
.channels_params
|
||||
.start_update()
|
||||
.padding_parameters(
|
||||
PaddingParametersBuilder::default()
|
||||
.low_ms(1234.into())
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.finish();
|
||||
|
||||
assert!(map.replace(b't', ch("track")).unwrap().is_none());
|
||||
|
||||
let netdir = tor_netdir::testnet::construct_netdir()
|
||||
.unwrap_if_sufficient()
|
||||
.unwrap();
|
||||
let netdir = Arc::new(netdir);
|
||||
|
||||
let with_ch = |f: &dyn Fn(&mut FakeChannel)| {
|
||||
let mut inner = map.inner.lock().unwrap();
|
||||
let ch = inner.channels.get_mut(&b't').unwrap().unwrap_open();
|
||||
f(ch);
|
||||
};
|
||||
|
||||
eprintln!("-- process a default netdir, which should send an update --");
|
||||
map.process_updated_netdir(netdir.clone()).unwrap();
|
||||
with_ch(&|ch| {
|
||||
assert_eq!(
|
||||
format!("{:?}", ch.params_update.take().unwrap()),
|
||||
// evade field visibility by (ab)using Debug impl
|
||||
"ChannelsParamsUpdates { padding_enable: None, \
|
||||
padding_parameters: Some(Parameters { \
|
||||
low_ms: IntegerMilliseconds { value: 1500 }, \
|
||||
high_ms: IntegerMilliseconds { value: 9500 } }) }"
|
||||
);
|
||||
});
|
||||
eprintln!();
|
||||
|
||||
eprintln!("-- process a default netdir again, which should *not* send an update --");
|
||||
map.process_updated_netdir(netdir).unwrap();
|
||||
with_ch(&|ch| assert_eq!(ch.params_update, None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn expire_channels() {
|
||||
let map = ChannelMap::new();
|
||||
|
|
|
@ -22,6 +22,18 @@ use tor_units::{
|
|||
BoundedInt32, IntegerDays, IntegerMilliseconds, IntegerSeconds, Percentage, SendMeVersion,
|
||||
};
|
||||
|
||||
/// Upper limit for channel padding timeouts
|
||||
///
|
||||
/// This is just a safety catch which might help prevent integer overflow,
|
||||
/// and also might prevent a client getting permantently stuck in a state
|
||||
/// where it ought to send padding but never does.
|
||||
///
|
||||
/// The actual value is stolen from C Tor as per
|
||||
/// <https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/586#note_2813638>
|
||||
/// pending an update to the specifications
|
||||
/// <https://gitlab.torproject.org/tpo/core/torspec/-/issues/120>
|
||||
pub const CHANNEL_PADDING_TIMEOUT_UPPER_BOUND: i32 = 60_000;
|
||||
|
||||
/// An object that can be constructed from an i32, with saturating semantics.
|
||||
pub trait FromInt32Saturating {
|
||||
/// Construct an instance of this object from `val`.
|
||||
|
@ -263,6 +275,19 @@ pub struct NetParameters {
|
|||
pub min_circuit_path_threshold: Percentage<BoundedInt32<25, 95>> = (60)
|
||||
from "min_paths_for_circs_pct",
|
||||
|
||||
/// Channel padding, low end of random padding interval, milliseconds
|
||||
pub nf_ito_low: IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>> = (1500)
|
||||
from "nf_ito_low",
|
||||
/// Channel padding, high end of random padding interval, milliseconds
|
||||
pub nf_ito_high: IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>> = (9500)
|
||||
from "nf_ito_high",
|
||||
/// Channel padding, low end of random padding interval (reduced padding) milliseconds
|
||||
pub nf_ito_low_reduced: IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>> = (9000)
|
||||
from "nf_ito_low_reduced",
|
||||
/// Channel padding, high end of random padding interval (reduced padding) , milliseconds
|
||||
pub nf_ito_high_reduced: IntegerMilliseconds<BoundedInt32<0, CHANNEL_PADDING_TIMEOUT_UPPER_BOUND>> = (14000)
|
||||
from "nf_ito_high_reduced",
|
||||
|
||||
/// The minimum sendme version to accept.
|
||||
pub sendme_accept_min_version: SendMeVersion = (0)
|
||||
from "sendme_accept_min_version",
|
||||
|
@ -483,6 +508,10 @@ mod test {
|
|||
("ExtendByEd25519ID", 0),
|
||||
("min_paths_for_circs_pct", 51),
|
||||
("nf_conntimeout_clients", 606),
|
||||
("nf_ito_low", 1_000),
|
||||
("nf_ito_high", 20_000),
|
||||
("nf_ito_low_reduced", 3_000),
|
||||
("nf_ito_high_reduced", 40_000),
|
||||
("sendme_accept_min_version", 31),
|
||||
("sendme_emit_min_version", 32),
|
||||
];
|
||||
|
@ -497,6 +526,10 @@ mod test {
|
|||
assert_eq!(p.cbt_min_circs_for_estimate.get(), 5);
|
||||
assert_eq!(p.cbt_timeout_quantile.as_percent().get(), 61);
|
||||
assert_eq!(p.cbt_abandon_quantile.as_percent().get(), 15);
|
||||
assert_eq!(p.nf_ito_low.as_millis().get(), 1_000);
|
||||
assert_eq!(p.nf_ito_high.as_millis().get(), 20_000);
|
||||
assert_eq!(p.nf_ito_low_reduced.as_millis().get(), 3_000);
|
||||
assert_eq!(p.nf_ito_high_reduced.as_millis().get(), 40_000);
|
||||
assert_eq!(
|
||||
Duration::try_from(p.unused_client_circ_timeout_while_learning_cbt).unwrap(),
|
||||
Duration::from_secs(1900)
|
||||
|
|
|
@ -25,6 +25,7 @@ asynchronous-codec = "0.6.0"
|
|||
bytes = "1"
|
||||
cipher = "0.4.1"
|
||||
coarsetime = "0.1.20"
|
||||
derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" }
|
||||
digest = "0.10.0"
|
||||
educe = "0.4.6"
|
||||
futures = "0.3.14"
|
||||
|
@ -49,6 +50,7 @@ tor-llcrypto = { path = "../tor-llcrypto", version = "0.3.0" }
|
|||
tor-protover = { path = "../tor-protover", version = "0.3.0" }
|
||||
tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0" }
|
||||
tor-rtmock = { path = "../tor-rtmock", version = "0.4.0" }
|
||||
tor-units = { path = "../tor-units", version = "0.3.0" }
|
||||
tracing = "0.1.18"
|
||||
typenum = "1.12"
|
||||
zeroize = "1"
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
BREAKING: Channels now require a SleepProvider
|
||||
BREAKING: `Error::ResolveError` now contains an enum instead of a String
|
||||
BREAKING: `Error::ChanelClosed` now contains a unit error `ChannelClosed`
|
||||
|
|
|
@ -59,17 +59,21 @@ pub const CHANNEL_BUFFER_SIZE: usize = 128;
|
|||
mod circmap;
|
||||
mod codec;
|
||||
mod handshake;
|
||||
mod padding;
|
||||
pub mod padding;
|
||||
pub mod params;
|
||||
mod reactor;
|
||||
mod unique_id;
|
||||
|
||||
pub use crate::channel::params::*;
|
||||
use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, CtrlMsg, Reactor};
|
||||
pub use crate::channel::unique_id::UniqId;
|
||||
use crate::circuit::celltypes::CreateResponse;
|
||||
use crate::util::err::ChannelClosed;
|
||||
use crate::util::ts::OptTimestamp;
|
||||
use crate::{circuit, ClockSkew};
|
||||
use crate::{Error, Result};
|
||||
use std::pin::Pin;
|
||||
use std::result::Result as StdResult;
|
||||
use std::time::Duration;
|
||||
use tor_cell::chancell::{msg, ChanCell, CircId};
|
||||
use tor_error::internal;
|
||||
|
@ -152,13 +156,13 @@ impl Sink<ChanCell> for Channel {
|
|||
let this = self.get_mut();
|
||||
Pin::new(&mut this.cell_tx)
|
||||
.poll_ready(cx)
|
||||
.map_err(|_| Error::ChannelClosed)
|
||||
.map_err(|_| ChannelClosed.into())
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, cell: ChanCell) -> Result<()> {
|
||||
let this = self.get_mut();
|
||||
if this.details.closed.load(Ordering::SeqCst) {
|
||||
return Err(Error::ChannelClosed);
|
||||
return Err(ChannelClosed.into());
|
||||
}
|
||||
this.check_cell(&cell)?;
|
||||
{
|
||||
|
@ -176,21 +180,21 @@ impl Sink<ChanCell> for Channel {
|
|||
|
||||
Pin::new(&mut this.cell_tx)
|
||||
.start_send(cell)
|
||||
.map_err(|_| Error::ChannelClosed)
|
||||
.map_err(|_| ChannelClosed.into())
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
let this = self.get_mut();
|
||||
Pin::new(&mut this.cell_tx)
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| Error::ChannelClosed)
|
||||
.map_err(|_| ChannelClosed.into())
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||
let this = self.get_mut();
|
||||
Pin::new(&mut this.cell_tx)
|
||||
.poll_close(cx)
|
||||
.map_err(|_| Error::ChannelClosed)
|
||||
.map_err(|_| ChannelClosed.into())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -279,18 +283,8 @@ impl Channel {
|
|||
details: Arc::clone(&details),
|
||||
};
|
||||
|
||||
let mut padding_timer = Box::pin(padding::Timer::new_disabled(
|
||||
sleep_prov,
|
||||
padding::Parameters {
|
||||
// From padding-spec.txt s2.2
|
||||
// TODO support reduced padding
|
||||
low_ms: 1500,
|
||||
high_ms: 9500,
|
||||
},
|
||||
));
|
||||
if std::env::var("ARTI_EXPERIMENTAL_CHANNEL_PADDING").unwrap_or_default() != "" {
|
||||
padding_timer.as_mut().enable();
|
||||
}
|
||||
// We start disabled; the channel manager will `reconfigure` us soon after creation.
|
||||
let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov, None));
|
||||
|
||||
let reactor = Reactor {
|
||||
control: control_rx,
|
||||
|
@ -339,6 +333,18 @@ impl Channel {
|
|||
self.details.clock_skew
|
||||
}
|
||||
|
||||
/// Reparameterise (update parameters; reconfigure)
|
||||
///
|
||||
/// Returns `Err` if the channel was closed earlier
|
||||
pub fn reparameterize(
|
||||
&mut self,
|
||||
updates: Arc<ChannelsParamsUpdates>,
|
||||
) -> StdResult<(), ChannelClosed> {
|
||||
self.control
|
||||
.unbounded_send(CtrlMsg::ConfigUpdate(updates))
|
||||
.map_err(|_| ChannelClosed)
|
||||
}
|
||||
|
||||
/// Return an error if this channel is somehow mismatched with the
|
||||
/// given target.
|
||||
pub fn check_match<T: ChanTarget + ?Sized>(&self, target: &T) -> Result<()> {
|
||||
|
@ -423,7 +429,7 @@ impl Channel {
|
|||
&self,
|
||||
) -> Result<(circuit::PendingClientCirc, circuit::reactor::Reactor)> {
|
||||
if self.is_closing() {
|
||||
return Err(Error::ChannelClosed);
|
||||
return Err(ChannelClosed.into());
|
||||
}
|
||||
|
||||
// TODO: blocking is risky, but so is unbounded.
|
||||
|
@ -437,8 +443,8 @@ impl Channel {
|
|||
sender,
|
||||
tx,
|
||||
})
|
||||
.map_err(|_| Error::ChannelClosed)?;
|
||||
let (id, circ_unique_id) = rx.await.map_err(|_| Error::ChannelClosed)??;
|
||||
.map_err(|_| ChannelClosed)?;
|
||||
let (id, circ_unique_id) = rx.await.map_err(|_| ChannelClosed)??;
|
||||
|
||||
trace!("{}: Allocated CircId {}", circ_unique_id, id);
|
||||
|
||||
|
@ -468,7 +474,7 @@ impl Channel {
|
|||
pub fn close_circuit(&self, circid: CircId) -> Result<()> {
|
||||
self.control
|
||||
.unbounded_send(CtrlMsg::CloseCircuit(circid))
|
||||
.map_err(|_| Error::ChannelClosed)?;
|
||||
.map_err(|_| ChannelClosed)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::pin::Pin;
|
|||
// TODO, coarsetime maybe? But see arti#496 and also we want to use the mockable SleepProvider
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use derive_builder::Builder;
|
||||
use educe::Educe;
|
||||
use futures::future::{self, FusedFuture};
|
||||
use futures::FutureExt;
|
||||
|
@ -15,6 +16,7 @@ use tracing::error;
|
|||
|
||||
use tor_cell::chancell::msg::Padding;
|
||||
use tor_rtcompat::SleepProvider;
|
||||
use tor_units::IntegerMilliseconds;
|
||||
|
||||
/// Timer that organises wakeups when channel padding should be sent
|
||||
///
|
||||
|
@ -31,7 +33,9 @@ pub(crate) struct Timer<R: SleepProvider> {
|
|||
sleep_prov: R,
|
||||
|
||||
/// Parameters controlling distribution of padding time intervals
|
||||
parameters: PreparedParameters,
|
||||
///
|
||||
/// Can be `None` to mean the timing parameters are set to infinity.
|
||||
parameters: Option<PreparedParameters>,
|
||||
|
||||
/// Gap that we intend to leave between last sent cell, and the padding
|
||||
///
|
||||
|
@ -91,12 +95,22 @@ pub(crate) struct Timer<R: SleepProvider> {
|
|||
}
|
||||
|
||||
/// Timing parameters, as described in `padding-spec.txt`
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
pub(crate) struct Parameters {
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Builder)]
|
||||
pub struct Parameters {
|
||||
/// Low end of the distribution of `X`
|
||||
pub(crate) low_ms: u32,
|
||||
#[builder(default = "1500.into()")]
|
||||
pub(crate) low_ms: IntegerMilliseconds<u32>,
|
||||
/// High end of the distribution of `X` (inclusive)
|
||||
pub(crate) high_ms: u32,
|
||||
#[builder(default = "9500.into()")]
|
||||
pub(crate) high_ms: IntegerMilliseconds<u32>,
|
||||
}
|
||||
|
||||
impl Default for Parameters {
|
||||
fn default() -> Self {
|
||||
ParametersBuilder::default()
|
||||
.build()
|
||||
.expect("could not build default channel padding Parameters")
|
||||
}
|
||||
}
|
||||
|
||||
/// Timing parameters, "compiled" into a form which can be sampled more efficiently
|
||||
|
@ -129,18 +143,23 @@ impl<R: SleepProvider> Timer<R> {
|
|||
/// Create a new `Timer`
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new(sleep_prov: R, parameters: Parameters) -> Self {
|
||||
let mut self_ = Self::new_disabled(sleep_prov, parameters);
|
||||
// We would like to call select_fresh_timeout but we don't have
|
||||
// (and can't have) Pin<&mut self>
|
||||
self_.selected_timeout = Some(self_.parameters.select_timeout());
|
||||
self_
|
||||
let parameters = parameters.prepare();
|
||||
let selected_timeout = parameters.select_timeout();
|
||||
// Too different to new_disabled to share its code, sadly.
|
||||
Timer {
|
||||
sleep_prov,
|
||||
parameters: Some(parameters),
|
||||
selected_timeout: Some(selected_timeout),
|
||||
trigger_at: None,
|
||||
waker: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new `Timer` which starts out disabled
|
||||
pub(crate) fn new_disabled(sleep_prov: R, parameters: Parameters) -> Self {
|
||||
pub(crate) fn new_disabled(sleep_prov: R, parameters: Option<Parameters>) -> Self {
|
||||
Timer {
|
||||
sleep_prov,
|
||||
parameters: parameters.prepare(),
|
||||
parameters: parameters.map(|p| p.prepare()),
|
||||
selected_timeout: None,
|
||||
trigger_at: None,
|
||||
waker: None,
|
||||
|
@ -166,25 +185,34 @@ impl<R: SleepProvider> Timer<R> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Set this `Timer`'s parameters
|
||||
///
|
||||
/// Will not enable or disable the timer; that must be done separately if desired.
|
||||
///
|
||||
/// The effect may not be immediate: if we are already in a gap between cells,
|
||||
/// that existing gap may not be adjusted.
|
||||
/// (We don't *restart* the timer since that would very likely result in a gap
|
||||
/// longer than either of the configured values.)
|
||||
///
|
||||
/// Idempotent.
|
||||
pub(crate) fn reconfigure(self: &mut Pin<&mut Self>, parameters: &Parameters) {
|
||||
*self.as_mut().project().parameters = Some(parameters.prepare());
|
||||
}
|
||||
|
||||
/// Enquire whether this `Timer` is currently enabled
|
||||
pub(crate) fn is_enabled(&self) -> bool {
|
||||
self.selected_timeout.is_some()
|
||||
}
|
||||
|
||||
/// Select a fresh timeout (and enable)
|
||||
fn select_fresh_timeout(self: Pin<&mut Self>) -> Duration {
|
||||
/// Select a fresh timeout (and enable, if possible)
|
||||
fn select_fresh_timeout(self: Pin<&mut Self>) {
|
||||
let mut self_ = self.project();
|
||||
let timeout = self_.parameters.select_timeout();
|
||||
*self_.selected_timeout = Some(timeout);
|
||||
let timeout = self_.parameters.as_ref().map(|p| p.select_timeout());
|
||||
*self_.selected_timeout = timeout;
|
||||
// This is no longer valid; recalculate it on next poll
|
||||
*self_.trigger_at = None;
|
||||
// Timeout might be earlier, so we will need a new waker too.
|
||||
// (Technically this is not possible in a bad way right now, since any stale waker
|
||||
// must be older, and so earlier, albeit from a previous random timeout.
|
||||
// However in the future we may want to be able to adjust the parameters at runtime
|
||||
// and then a stale waker might be harmfully too late.)
|
||||
self_.waker.set(None);
|
||||
timeout
|
||||
}
|
||||
|
||||
/// Note that data has been sent (ie, reset the timeout, delaying the next padding)
|
||||
|
@ -303,8 +331,8 @@ impl Parameters {
|
|||
fn prepare(self) -> PreparedParameters {
|
||||
PreparedParameters {
|
||||
x_distribution_ms: rand::distributions::Uniform::new_inclusive(
|
||||
self.low_ms,
|
||||
self.high_ms,
|
||||
self.low_ms.as_millis(),
|
||||
self.high_ms.as_millis(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
@ -356,8 +384,8 @@ mod test {
|
|||
let runtime = tor_rtmock::MockSleepRuntime::new(runtime);
|
||||
|
||||
let parameters = Parameters {
|
||||
low_ms: 1000,
|
||||
high_ms: 1000,
|
||||
low_ms: 1000.into(),
|
||||
high_ms: 1000.into(),
|
||||
};
|
||||
|
||||
let () = runtime.block_on(async {
|
||||
|
@ -431,6 +459,28 @@ mod test {
|
|||
dbg!(timer.as_mut().project().trigger_at);
|
||||
assert_eq! { false, timer.is_enabled() }
|
||||
});
|
||||
|
||||
let () = runtime.block_on(async {
|
||||
let timer = Timer::new_disabled(runtime.clone(), None);
|
||||
assert! { timer.parameters.is_none() };
|
||||
pin!(timer);
|
||||
assert_not_ready(&mut timer).await;
|
||||
assert! { timer.as_mut().selected_timeout.is_none() };
|
||||
assert! { timer.as_mut().trigger_at.is_none() };
|
||||
});
|
||||
|
||||
let () = runtime.block_on(async {
|
||||
let timer = Timer::new_disabled(runtime.clone(), Some(parameters));
|
||||
assert! { timer.parameters.is_some() };
|
||||
pin!(timer);
|
||||
assert_not_ready(&mut timer).await;
|
||||
runtime.advance(Duration::from_millis(3000)).await;
|
||||
assert_not_ready(&mut timer).await;
|
||||
timer.as_mut().enable();
|
||||
assert_not_ready(&mut timer).await;
|
||||
runtime.advance(Duration::from_millis(3000)).await;
|
||||
assert_is_ready(&mut timer).await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -503,8 +553,8 @@ mod test {
|
|||
let mut obs = [0_u32; n];
|
||||
|
||||
let params = Parameters {
|
||||
low_ms: min,
|
||||
high_ms: max - 1, // convert exclusive to inclusive
|
||||
low_ms: min.into(),
|
||||
high_ms: (max - 1).into(), // convert exclusive to inclusive
|
||||
}
|
||||
.prepare();
|
||||
|
||||
|
|
|
@ -0,0 +1,195 @@
|
|||
//! Parameters influencing all channels in a Tor client
|
||||
|
||||
use educe::Educe;
|
||||
|
||||
use super::padding;
|
||||
|
||||
/// Generate most of the module: things which contain or process all params fields (or each one)
|
||||
///
|
||||
/// There is one call to this macro, which has as argument
|
||||
/// the body of `struct ChannelsParams`, with the following differences:
|
||||
///
|
||||
/// * field visibility specifiers are not specified; they are provided by the macro
|
||||
/// * non-doc attributes that ought to be applied to fields in `ChannelsParams`
|
||||
/// are prefixed with `field`, e.g. `#[field educe(Default ...)]`;
|
||||
/// this allows applying doc attributes to other items too.
|
||||
///
|
||||
/// Generates, fairly straightforwardly:
|
||||
///
|
||||
/// ```ignore
|
||||
/// pub struct ChannelsParams { ... } // containing the fields as specified
|
||||
/// pub struct ChannelsParamsUpdates { ... } // containing `Option` of each field
|
||||
/// pub fn ChannelsParams::total_update(&self) -> ChannelsParamsUpdates;
|
||||
/// pub fn ChannelsParamsUpdatesBuilder::$field(self, new_value: _) -> Self;
|
||||
/// ```
|
||||
///
|
||||
/// Within the macro body, we indent the per-field `$( )*` with 2 spaces.
|
||||
macro_rules! define_channels_params_and_automatic_impls { { $(
|
||||
$( #[doc $($doc_attr:tt)*] )*
|
||||
$( #[field $other_attr:meta] )*
|
||||
$field:ident : $ty:ty
|
||||
),* $(,)? } => {
|
||||
|
||||
/// Initial, and, overall, parameters for channels
|
||||
///
|
||||
/// This is used both to generate the initial parameters,
|
||||
/// and to handle updates:
|
||||
/// when used for handling updates,
|
||||
/// it contains the last parameters that has been implemented.
|
||||
///
|
||||
/// Central code managing all channels will contain a `ChannelsParams`,
|
||||
/// and use `ChannelsParamsUpdatesBuilder` to both update that params
|
||||
/// and generate `ChannelsParamsUpdates` messages representing the changes.
|
||||
///
|
||||
/// `Default` is a placeholder to use pending availability of a netdir etc.
|
||||
#[derive(Debug, Educe, Clone, Eq, PartialEq)]
|
||||
#[educe(Default)]
|
||||
pub struct ChannelsParams {
|
||||
$(
|
||||
$( #[doc $($doc_attr)*] )*
|
||||
$( #[$other_attr] )*
|
||||
pub(crate) $field: $ty,
|
||||
)*
|
||||
}
|
||||
|
||||
/// Reparameterisation message
|
||||
///
|
||||
/// Can contain updates to each of the fields in `ChannelsParams`.
|
||||
/// Constructed via [`ChannelsParamsUpdatesBuilder`],
|
||||
/// which is obtained from [`ChannelsParams::start_update`].
|
||||
///
|
||||
/// Sent to all channel implementations, when they ought to change their behaviour.
|
||||
#[derive(Debug, Default, Clone, Eq, PartialEq)]
|
||||
pub struct ChannelsParamsUpdates {
|
||||
$(
|
||||
/// New value, if it has changed.
|
||||
///
|
||||
/// Having this contain `Option` allows the sender of an update to promise
|
||||
/// that the value hasn't changed, and thereby allows the channel implementation
|
||||
/// to avoid touching state that it doesn't need to (eg, timers).
|
||||
pub(crate) $field: Option<$ty>,
|
||||
)*
|
||||
}
|
||||
|
||||
impl ChannelsParams {
|
||||
/// Create an update message which sets *all* of the settings in `self`
|
||||
///
|
||||
/// Used during channel startup.
|
||||
#[must_use = "total_update makes an updates message that must be sent to have effect"]
|
||||
pub fn total_update(&self) -> ChannelsParamsUpdates {
|
||||
ChannelsParamsUpdates {
|
||||
$(
|
||||
$field: Some(self.$field.clone()),
|
||||
)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'c> ChannelsParamsUpdatesBuilder<'c> {
|
||||
$(
|
||||
$( #[doc $($doc_attr)*] )*
|
||||
///
|
||||
/// (Adds this setting to the update, if it has changed.)
|
||||
pub fn $field(mut self, new_value: $ty) -> Self {
|
||||
if &new_value != &self.params.$field {
|
||||
self
|
||||
.update
|
||||
.get_or_insert_with(|| Default::default())
|
||||
.$field = Some(new_value.clone());
|
||||
self.params.$field = new_value;
|
||||
}
|
||||
self
|
||||
}
|
||||
)*
|
||||
}
|
||||
} }
|
||||
|
||||
define_channels_params_and_automatic_impls! {
|
||||
/// Whether to send padding
|
||||
#[field educe(Default(expression = "interim_enable_by_env_var()"))]
|
||||
padding_enable: bool,
|
||||
|
||||
/// Padding timing parameters
|
||||
///
|
||||
/// This is in abeyance if `send_padding` is `false`;
|
||||
/// we still pass it because the usual case is that padding is enabled/disabled
|
||||
/// rather than the parameters changing,
|
||||
/// so the padding timer always keeps parameters, even when disabled.
|
||||
padding_parameters: padding::Parameters
|
||||
}
|
||||
|
||||
/// Placeholder function for saying whether to enable channel padding
|
||||
///
|
||||
/// This will be abolished in due course.
|
||||
pub(crate) fn interim_enable_by_env_var() -> bool {
|
||||
std::env::var("ARTI_EXPERIMENTAL_CHANNEL_PADDING").unwrap_or_default() != ""
|
||||
}
|
||||
|
||||
/// Builder for a channels params update
|
||||
///
|
||||
/// Obtain this from `ChannelsParams::update`,
|
||||
/// call zero or more setter methods,
|
||||
/// call [`finish`](ChannelsParamsUpdatesBuilder::finish),
|
||||
/// and then send the resulting message.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if dropped. Instead, call `finish`.
|
||||
pub struct ChannelsParamsUpdatesBuilder<'c> {
|
||||
/// Tracking the existing params
|
||||
params: &'c mut ChannelsParams,
|
||||
|
||||
/// The update we are building
|
||||
///
|
||||
/// `None` means nothing has changed yet.
|
||||
update: Option<ChannelsParamsUpdates>,
|
||||
|
||||
/// Make it hard to write code paths that drop this
|
||||
drop_bomb: bool,
|
||||
}
|
||||
|
||||
impl ChannelsParams {
|
||||
/// Start building an update to channel parameters
|
||||
///
|
||||
/// The builder **must not be dropped**, once created;
|
||||
/// instead, [`finish`](ChannelsParamsUpdatesBuilder::finish) must be called.
|
||||
/// So prepare your new values first, perhaps fallibly,
|
||||
/// and only then create and use the builder and send the update, infallibly.
|
||||
///
|
||||
/// (This is because the builder uses `self: ChannelsParams`
|
||||
/// to track which values have changed,
|
||||
/// and the values in `self` are updated immediately by the field update methods.)
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// [`ChannelsParamsUpdatesBuilder`] panics if it is dropped.
|
||||
pub fn start_update(&mut self) -> ChannelsParamsUpdatesBuilder {
|
||||
ChannelsParamsUpdatesBuilder {
|
||||
params: self,
|
||||
update: None,
|
||||
drop_bomb: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'c> Drop for ChannelsParamsUpdatesBuilder<'c> {
|
||||
fn drop(&mut self) {
|
||||
assert!(!self.drop_bomb, "ChannelsParamsUpdatesBuilder dropped");
|
||||
}
|
||||
}
|
||||
|
||||
impl<'c> ChannelsParamsUpdatesBuilder<'c> {
|
||||
/// Finalise the update
|
||||
///
|
||||
/// If nothing actually changed, returns `None`.
|
||||
/// (Tracking this, and returning `None`, allows us to avoid bothering
|
||||
/// every channel with a null update.)
|
||||
///
|
||||
/// If `Some` is returned, the update **must** be implemented,
|
||||
/// since the underlying tracking [`ChannelsParams`] has already been updated.
|
||||
#[must_use = "the update from finish() must be sent, to avoid losing params changes"]
|
||||
pub fn finish(mut self) -> Option<ChannelsParamsUpdates> {
|
||||
self.drop_bomb = false;
|
||||
self.update.take()
|
||||
}
|
||||
}
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
use super::circmap::{CircEnt, CircMap};
|
||||
use crate::circuit::halfcirc::HalfCirc;
|
||||
use crate::util::err::ReactorError;
|
||||
use crate::util::err::{ChannelClosed, ReactorError};
|
||||
use crate::{Error, Result};
|
||||
use tor_basic_utils::futures::SinkExt as _;
|
||||
use tor_cell::chancell::msg::{Destroy, DestroyReason};
|
||||
|
@ -29,7 +29,7 @@ use std::pin::Pin;
|
|||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::channel::{codec::CodecError, padding, unique_id, ChannelDetails};
|
||||
use crate::channel::{codec::CodecError, padding, params::*, unique_id, ChannelDetails};
|
||||
use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
|
@ -68,6 +68,15 @@ pub(super) enum CtrlMsg {
|
|||
/// Oneshot channel to send the new circuit's identifiers down.
|
||||
tx: ReactorResultChannel<(CircId, crate::circuit::UniqId)>,
|
||||
},
|
||||
/// Enable/disable/reconfigure channel padding
|
||||
///
|
||||
/// The sender of these messages is responsible for the optimisation of
|
||||
/// ensuring that "no-change" messages are elided.
|
||||
/// (This is implemented in `ChannelsParamsUpdatesBuilder`.)
|
||||
///
|
||||
/// These updates are done via a control message to avoid adding additional branches to the
|
||||
/// main reactor `select!`.
|
||||
ConfigUpdate(Arc<ChannelsParamsUpdates>),
|
||||
}
|
||||
|
||||
/// Object to handle incoming cells and background tasks on a channel.
|
||||
|
@ -121,7 +130,7 @@ impl<S: SleepProvider> Reactor<S> {
|
|||
/// used again.
|
||||
pub async fn run(mut self) -> Result<()> {
|
||||
if self.details.closed.load(Ordering::SeqCst) {
|
||||
return Err(Error::ChannelClosed);
|
||||
return Err(ChannelClosed.into());
|
||||
}
|
||||
debug!("{}: Running reactor", &self);
|
||||
let result: Result<()> = loop {
|
||||
|
@ -215,6 +224,24 @@ impl<S: SleepProvider> Reactor<S> {
|
|||
let _ = tx.send(ret); // don't care about other side going away
|
||||
self.update_disused_since();
|
||||
}
|
||||
CtrlMsg::ConfigUpdate(updates) => {
|
||||
let ChannelsParamsUpdates {
|
||||
// List all the fields explicitly; that way the compiler will warn us
|
||||
// if one is added and we fail to handle it here.
|
||||
padding_enable,
|
||||
padding_parameters,
|
||||
} = &*updates;
|
||||
if let Some(parameters) = padding_parameters {
|
||||
self.padding_timer.as_mut().reconfigure(parameters);
|
||||
}
|
||||
if let Some(enable) = padding_enable {
|
||||
if *enable {
|
||||
self.padding_timer.as_mut().enable();
|
||||
} else {
|
||||
self.padding_timer.as_mut().disable();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ use crate::crypto::cell::{
|
|||
ClientLayer, CryptInit, HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt,
|
||||
OutboundClientLayer, RelayCellBody, Tor1RelayCrypto,
|
||||
};
|
||||
use crate::util::err::ReactorError;
|
||||
use crate::util::err::{ChannelClosed, ReactorError};
|
||||
use crate::{Error, Result};
|
||||
use std::collections::VecDeque;
|
||||
use std::marker::PhantomData;
|
||||
|
@ -633,7 +633,7 @@ impl Reactor {
|
|||
|
||||
let _ = Pin::new(&mut self.channel)
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| Error::ChannelClosed)?;
|
||||
.map_err(|_| ChannelClosed)?;
|
||||
if create_message.is_some() {
|
||||
Poll::Ready(Ok(create_message))
|
||||
} else if did_things {
|
||||
|
@ -673,7 +673,7 @@ impl Reactor {
|
|||
futures::future::poll_fn(|cx| -> Poll<Result<()>> {
|
||||
let _ = Pin::new(&mut self.channel)
|
||||
.poll_flush(cx)
|
||||
.map_err(|_| Error::ChannelClosed)?;
|
||||
.map_err(|_| ChannelClosed)?;
|
||||
Poll::Ready(Ok(()))
|
||||
})
|
||||
.await?;
|
||||
|
|
|
@ -124,6 +124,8 @@ mod util;
|
|||
pub use util::err::{Error, ResolveError};
|
||||
pub use util::skew::ClockSkew;
|
||||
|
||||
pub use channel::params::ChannelsParams;
|
||||
|
||||
/// A vector of bytes that gets cleared when it's dropped.
|
||||
type SecretBytes = zeroize::Zeroizing<Vec<u8>>;
|
||||
|
||||
|
|
|
@ -59,8 +59,8 @@ pub enum Error {
|
|||
#[error("circuit protocol violation: {0}")]
|
||||
CircProto(String),
|
||||
/// Channel is closed.
|
||||
#[error("channel closed")]
|
||||
ChannelClosed,
|
||||
#[error("{0}")]
|
||||
ChannelClosed(#[from] ChannelClosed),
|
||||
/// Circuit is closed.
|
||||
#[error("circuit closed")]
|
||||
CircuitClosed,
|
||||
|
@ -95,6 +95,17 @@ pub enum Error {
|
|||
ResolveError(ResolveError),
|
||||
}
|
||||
|
||||
/// Error which indicates that the channel was closed
|
||||
#[derive(Error, Debug, Clone)]
|
||||
#[error("channel closed")]
|
||||
pub struct ChannelClosed;
|
||||
|
||||
impl HasKind for ChannelClosed {
|
||||
fn kind(&self) -> ErrorKind {
|
||||
ErrorKind::CircuitCollapse
|
||||
}
|
||||
}
|
||||
|
||||
/// Details about an error received while resolving a domain
|
||||
#[derive(Error, Debug, Clone)]
|
||||
#[non_exhaustive]
|
||||
|
@ -135,7 +146,7 @@ impl From<Error> for std::io::Error {
|
|||
|
||||
EndReceived(end_reason) => end_reason.into(),
|
||||
|
||||
ChannelClosed | CircuitClosed => ErrorKind::ConnectionReset,
|
||||
CircuitClosed => ErrorKind::ConnectionReset,
|
||||
|
||||
BytesErr(_)
|
||||
| BadCellAuth
|
||||
|
@ -143,6 +154,7 @@ impl From<Error> for std::io::Error {
|
|||
| HandshakeProto(_)
|
||||
| ChanProto(_)
|
||||
| HandshakeCertsExpired { .. }
|
||||
| ChannelClosed(_)
|
||||
| CircProto(_)
|
||||
| CellErr(_)
|
||||
| ChanMismatch(_)
|
||||
|
@ -175,7 +187,8 @@ impl HasKind for Error {
|
|||
E::HandshakeCertsExpired { .. } => EK::ClockSkew,
|
||||
E::ChanProto(_) => EK::TorProtocolViolation,
|
||||
E::CircProto(_) => EK::TorProtocolViolation,
|
||||
E::ChannelClosed | E::CircuitClosed => EK::CircuitCollapse,
|
||||
E::ChannelClosed(e) => e.kind(),
|
||||
E::CircuitClosed => EK::CircuitCollapse,
|
||||
E::IdRangeFull => EK::BadApiUsage,
|
||||
E::CircRefused(_) => EK::CircuitRefused,
|
||||
E::BadStreamAddress => EK::BadApiUsage,
|
||||
|
@ -205,6 +218,11 @@ impl From<Error> for ReactorError {
|
|||
ReactorError::Err(e)
|
||||
}
|
||||
}
|
||||
impl From<ChannelClosed> for ReactorError {
|
||||
fn from(e: ChannelClosed) -> ReactorError {
|
||||
ReactorError::Err(e.into())
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
impl ReactorError {
|
||||
/// Tests only: assert that this is an Error, and return it.
|
||||
|
|
|
@ -307,17 +307,43 @@ impl<const H: i32, const L: i32> TryFrom<i32> for Percentage<BoundedInt32<H, L>>
|
|||
)]
|
||||
/// This type represents an integer number of milliseconds.
|
||||
///
|
||||
/// The underlying type should implement TryInto<u64>.
|
||||
/// The underlying type should usually implement TryInto<u64>.
|
||||
pub struct IntegerMilliseconds<T> {
|
||||
/// Interior Value. Should Implement TryInto<u64> to be useful.
|
||||
value: T,
|
||||
}
|
||||
|
||||
impl<T: TryInto<u64>> IntegerMilliseconds<T> {
|
||||
impl<T> IntegerMilliseconds<T> {
|
||||
/// Public Constructor
|
||||
pub fn new(value: T) -> Self {
|
||||
IntegerMilliseconds { value }
|
||||
}
|
||||
|
||||
/// Deconstructor
|
||||
///
|
||||
/// Use only in contexts where it's no longer possible to
|
||||
/// use the Rust type system to ensure secs vs ms vs us correctness.
|
||||
pub fn as_millis(self) -> T {
|
||||
self.value
|
||||
}
|
||||
|
||||
/// Map the inner value (useful for conversion)
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use tor_units::{BoundedInt32, IntegerMilliseconds};
|
||||
///
|
||||
/// let value: IntegerMilliseconds<i32> = 42.into();
|
||||
/// let value: IntegerMilliseconds<BoundedInt32<0,1000>>
|
||||
/// = value.try_map(TryInto::try_into).unwrap();
|
||||
/// ```
|
||||
pub fn try_map<U, F, E>(self, f: F) -> Result<IntegerMilliseconds<U>, E>
|
||||
where
|
||||
F: FnOnce(T) -> Result<U, E>,
|
||||
{
|
||||
Ok(IntegerMilliseconds::new(f(self.value)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TryInto<u64>> TryFrom<IntegerMilliseconds<T>> for Duration {
|
||||
|
@ -339,17 +365,41 @@ impl<const H: i32, const L: i32> TryFrom<i32> for IntegerMilliseconds<BoundedInt
|
|||
)]
|
||||
/// This type represents an integer number of seconds.
|
||||
///
|
||||
/// The underlying type should implement TryInto<u64>.
|
||||
/// The underlying type should usually implement TryInto<u64>.
|
||||
pub struct IntegerSeconds<T> {
|
||||
/// Interior Value. Should Implement TryInto<u64> to be useful.
|
||||
value: T,
|
||||
}
|
||||
|
||||
impl<T: TryInto<u64>> IntegerSeconds<T> {
|
||||
impl<T> IntegerSeconds<T> {
|
||||
/// Public Constructor
|
||||
pub fn new(value: T) -> Self {
|
||||
IntegerSeconds { value }
|
||||
}
|
||||
|
||||
/// Deconstructor
|
||||
///
|
||||
/// Use only in contexts where it's no longer possible to
|
||||
/// use the Rust type system to ensure secs vs ms vs us correctness.
|
||||
pub fn as_secs(self) -> T {
|
||||
self.value
|
||||
}
|
||||
|
||||
/// Map the inner value (useful for conversion)
|
||||
///
|
||||
/// ```
|
||||
/// use tor_units::{BoundedInt32, IntegerSeconds};
|
||||
///
|
||||
/// let value: IntegerSeconds<i32> = 42.into();
|
||||
/// let value: IntegerSeconds<BoundedInt32<0,1000>>
|
||||
/// = value.try_map(TryInto::try_into).unwrap();
|
||||
/// ```
|
||||
pub fn try_map<U, F, E>(self, f: F) -> Result<IntegerSeconds<U>, E>
|
||||
where
|
||||
F: FnOnce(T) -> Result<U, E>,
|
||||
{
|
||||
Ok(IntegerSeconds::new(f(self.value)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TryInto<u64>> TryFrom<IntegerSeconds<T>> for Duration {
|
||||
|
@ -369,7 +419,7 @@ impl<const H: i32, const L: i32> TryFrom<i32> for IntegerSeconds<BoundedInt32<H,
|
|||
#[derive(Copy, Clone, From, FromStr, Display, Debug, PartialEq, Eq, Ord, PartialOrd)]
|
||||
/// This type represents an integer number of days.
|
||||
///
|
||||
/// The underlying type should implement TryInto<u64>.
|
||||
/// The underlying type should usually implement TryInto<u64>.
|
||||
pub struct IntegerDays<T> {
|
||||
/// Interior Value. Should Implement TryInto<u64> to be useful.
|
||||
value: T,
|
||||
|
@ -380,6 +430,30 @@ impl<T> IntegerDays<T> {
|
|||
pub fn new(value: T) -> Self {
|
||||
IntegerDays { value }
|
||||
}
|
||||
|
||||
/// Deconstructor
|
||||
///
|
||||
/// Use only in contexts where it's no longer possible to
|
||||
/// use the Rust type system to ensure secs vs ms vs us correctness.
|
||||
pub fn as_days(self) -> T {
|
||||
self.value
|
||||
}
|
||||
|
||||
/// Map the inner value (useful for conversion)
|
||||
///
|
||||
/// ```
|
||||
/// use tor_units::{BoundedInt32, IntegerDays};
|
||||
///
|
||||
/// let value: IntegerDays<i32> = 42.into();
|
||||
/// let value: IntegerDays<BoundedInt32<0,1000>>
|
||||
/// = value.try_map(TryInto::try_into).unwrap();
|
||||
/// ```
|
||||
pub fn try_map<U, F, E>(self, f: F) -> Result<IntegerDays<U>, E>
|
||||
where
|
||||
F: FnOnce(T) -> Result<U, E>,
|
||||
{
|
||||
Ok(IntegerDays::new(f(self.value)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: TryInto<u64>> TryFrom<IntegerDays<T>> for Duration {
|
||||
|
|
Loading…
Reference in New Issue