channel padding: Introduce ChannelsConfig and reconfigure facility
Nothing geenrates config updates yet.
This commit is contained in:
parent
33ef338fe2
commit
b5218a0c0e
|
@ -58,11 +58,13 @@ pub const CHANNEL_BUFFER_SIZE: usize = 128;
|
|||
|
||||
mod circmap;
|
||||
mod codec;
|
||||
pub mod config;
|
||||
mod handshake;
|
||||
pub mod padding;
|
||||
mod reactor;
|
||||
mod unique_id;
|
||||
|
||||
pub use crate::channel::config::*;
|
||||
use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, CtrlMsg, Reactor};
|
||||
pub use crate::channel::unique_id::UniqId;
|
||||
use crate::circuit::celltypes::CreateResponse;
|
||||
|
@ -70,6 +72,7 @@ use crate::util::ts::OptTimestamp;
|
|||
use crate::{circuit, ClockSkew};
|
||||
use crate::{Error, Result};
|
||||
use std::pin::Pin;
|
||||
use std::result::Result as StdResult;
|
||||
use std::time::Duration;
|
||||
use tor_cell::chancell::{msg, ChanCell, CircId};
|
||||
use tor_error::internal;
|
||||
|
@ -288,7 +291,7 @@ impl Channel {
|
|||
high_ms: 9500,
|
||||
}),
|
||||
));
|
||||
if std::env::var("ARTI_EXPERIMENTAL_CHANNEL_PADDING").unwrap_or_default() != "" {
|
||||
if interim_enable_by_env_var() {
|
||||
padding_timer.as_mut().enable();
|
||||
}
|
||||
|
||||
|
@ -339,6 +342,18 @@ impl Channel {
|
|||
self.details.clock_skew
|
||||
}
|
||||
|
||||
/// Reconfigure
|
||||
///
|
||||
/// Returns `Err` if the channel was closed earlier
|
||||
pub fn reconfigure(
|
||||
&mut self,
|
||||
updates: Arc<ChannelsConfigUpdates>,
|
||||
) -> StdResult<(), mpsc::SendError> {
|
||||
self.control
|
||||
.unbounded_send(CtrlMsg::ConfigUpdate(updates))
|
||||
.map_err(|e| e.into_send_error())
|
||||
}
|
||||
|
||||
/// Return an error if this channel is somehow mismatched with the
|
||||
/// given target.
|
||||
pub fn check_match<T: ChanTarget + ?Sized>(&self, target: &T) -> Result<()> {
|
||||
|
|
|
@ -0,0 +1,195 @@
|
|||
//! Configuration influencing all channels in a Tor client
|
||||
|
||||
use educe::Educe;
|
||||
|
||||
use super::padding;
|
||||
|
||||
/// Generate most of the module: things which contain or process all config fields (or each one)
|
||||
///
|
||||
/// There is one call to this macro, which has as argument
|
||||
/// the body of `struct ChannelsConfig`, with the following differences:
|
||||
///
|
||||
/// * field visibility specifiers are not specified; they are provided by the macro
|
||||
/// * non-doc attributes that ought to be applied to fields in `ChannelsConfig`
|
||||
/// are prefixed with `field`, e.g. `#[field educe(Default ...)]`;
|
||||
/// this allows applying doc attributes to other items too.
|
||||
///
|
||||
/// Generates, fairly straightforwardly:
|
||||
///
|
||||
/// ```ignore
|
||||
/// pub struct ChannelsConfig { ... } // containing the fields as specified
|
||||
/// pub struct ChannelsConfigUpdates { ... } // containing `Option` of each field
|
||||
/// pub fn ChannelsConfig::total_update(&self) -> ChannelsConfigUpdates;
|
||||
/// pub fn ChannelsConfigUpdatesBuilder::$field(self, new_value: _) -> Self;
|
||||
/// ```
|
||||
///
|
||||
/// Within the macro body, we indent the per-field `$( )*` with 2 spaces.
|
||||
macro_rules! define_channels_config_and_automatic_impls { { $(
|
||||
$( #[doc $($doc_attr:tt)*] )*
|
||||
$( #[field $other_attr:meta] )*
|
||||
$field:ident : $ty:ty
|
||||
),* $(,)? } => {
|
||||
|
||||
/// Initial, and, overall, configuration for channels
|
||||
///
|
||||
/// This is used both to generate the initial configuration,
|
||||
/// and to handle updates:
|
||||
/// when used for handling updates,
|
||||
/// it contains the last configuration that has bee implemented.
|
||||
///
|
||||
/// Central code managing all channels will contain a `ChannelsConfig`,
|
||||
/// and use `ChannelsConfigUpdatesBuilder` to both update that config
|
||||
/// and generate `ChannelsConfigUpdates` messages representing the changes.
|
||||
///
|
||||
/// `Default` is a placeholder to use pending availability of a netdir etc.
|
||||
#[derive(Debug, Educe, Clone, Eq, PartialEq)]
|
||||
#[educe(Default)]
|
||||
pub struct ChannelsConfig {
|
||||
$(
|
||||
$( #[doc $($doc_attr)*] )*
|
||||
$( #[$other_attr] )*
|
||||
pub(crate) $field: $ty,
|
||||
)*
|
||||
}
|
||||
|
||||
/// Reconfiguration message
|
||||
///
|
||||
/// Can contain updates to each of the fields in `ChannelsConfig`.
|
||||
/// Constructed via [`ChannelsConfigUpdatesBuilder`],
|
||||
/// which is obtained from [`ChannelsConfig::start_update`].
|
||||
///
|
||||
/// Sent to all channel implementations, when they ought to change their behaviour.
|
||||
#[derive(Debug, Default, Clone, Eq, PartialEq)]
|
||||
pub struct ChannelsConfigUpdates {
|
||||
$(
|
||||
/// New value, if it has changed.
|
||||
///
|
||||
/// Having this contain `Option` allows the sender of an update to promise
|
||||
/// that the value hasn't changed, and thereby allows the channel implementation
|
||||
/// to avoid touching state that it doesn't need to (eg, timers).
|
||||
pub(crate) $field: Option<$ty>,
|
||||
)*
|
||||
}
|
||||
|
||||
impl ChannelsConfig {
|
||||
/// Create an update message which sets *all* of the settings in `self`
|
||||
///
|
||||
/// Used during channel startup.
|
||||
#[must_use = "total_update makes an updates message that must be sent to have effect"]
|
||||
pub fn total_update(&self) -> ChannelsConfigUpdates {
|
||||
ChannelsConfigUpdates {
|
||||
$(
|
||||
$field: Some(self.$field.clone()),
|
||||
)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'c> ChannelsConfigUpdatesBuilder<'c> {
|
||||
$(
|
||||
$( #[doc $($doc_attr)*] )*
|
||||
///
|
||||
/// (Adds this setting to the update, if it has changed.)
|
||||
pub fn $field(mut self, new_value: $ty) -> Self {
|
||||
if &new_value != &self.config.$field {
|
||||
self
|
||||
.update
|
||||
.get_or_insert_with(|| Default::default())
|
||||
.$field = Some(new_value.clone());
|
||||
self.config.$field = new_value;
|
||||
}
|
||||
self
|
||||
}
|
||||
)*
|
||||
}
|
||||
} }
|
||||
|
||||
define_channels_config_and_automatic_impls! {
|
||||
/// Whether to send padding
|
||||
#[field educe(Default(expression = "interim_enable_by_env_var()"))]
|
||||
padding_enable: bool,
|
||||
|
||||
/// Padding timing parameters
|
||||
///
|
||||
/// This is in abeyance if `send_padding` is `false`;
|
||||
/// we still pass it because the usual case is that padding is enabled/disabled
|
||||
/// rather than the parameters changing,
|
||||
/// so the padding timer always keeps parameters, even when disabled.
|
||||
padding_parameters: padding::Parameters
|
||||
}
|
||||
|
||||
/// Placeholder function for saying whether to enable channel padding
|
||||
///
|
||||
/// This will be abolished in due course.
|
||||
pub(crate) fn interim_enable_by_env_var() -> bool {
|
||||
std::env::var("ARTI_EXPERIMENTAL_CHANNEL_PADDING").unwrap_or_default() != ""
|
||||
}
|
||||
|
||||
/// Builder for a channels config update
|
||||
///
|
||||
/// Obtain this from `ChannelsConfig::update`,
|
||||
/// call zero or more setter methods,
|
||||
/// call [`finish`](ChannelsConfigUpdatesBuilder::finish),
|
||||
/// and then send the resulting message.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if dropped. Instead, call `finish`.
|
||||
pub struct ChannelsConfigUpdatesBuilder<'c> {
|
||||
/// Tracking the existing config
|
||||
config: &'c mut ChannelsConfig,
|
||||
|
||||
/// The update we are building
|
||||
///
|
||||
/// `None` means nothing has changed yet.
|
||||
update: Option<ChannelsConfigUpdates>,
|
||||
|
||||
/// Make it hard to write code paths that drop this
|
||||
drop_bomb: bool,
|
||||
}
|
||||
|
||||
impl ChannelsConfig {
|
||||
/// Start building an update to channel configuration
|
||||
///
|
||||
/// The builder **must not be dropped**, once created;
|
||||
/// instead, [`finish`](ChannelsConfigUpdatesBuilder::finish) must be called.
|
||||
/// So prepare your new values first, perhaps fallibly,
|
||||
/// and only then create and use the builder and send the update, infallibly.
|
||||
///
|
||||
/// (This is because the builder uses `self: ChannelsConfig`
|
||||
/// to track which values have changed,
|
||||
/// and the values in `self` are updated immediately by the field update methods.)
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// [`ChannelsConfigUpdatesBuilder`] panics if it is dropped.
|
||||
pub fn start_update(&mut self) -> ChannelsConfigUpdatesBuilder {
|
||||
ChannelsConfigUpdatesBuilder {
|
||||
config: self,
|
||||
update: None,
|
||||
drop_bomb: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'c> Drop for ChannelsConfigUpdatesBuilder<'c> {
|
||||
fn drop(&mut self) {
|
||||
assert!(!self.drop_bomb, "ChannelsConfigUpdatesBuilder dropped");
|
||||
}
|
||||
}
|
||||
|
||||
impl<'c> ChannelsConfigUpdatesBuilder<'c> {
|
||||
/// Finalise the update
|
||||
///
|
||||
/// If nothing actually changed, returns `None`.
|
||||
/// (Tracking this, and returning `None`, allows us to avoid bothering
|
||||
/// every channel with a null update.)
|
||||
///
|
||||
/// If `Some` is returned, the update **must** be implemented,
|
||||
/// since the underlying tracking [`ChannelsConfig`] has already been updated.
|
||||
#[must_use = "the update from finish() must be sent, to avoid losing config changes"]
|
||||
pub fn finish(mut self) -> Option<ChannelsConfigUpdates> {
|
||||
self.drop_bomb = false;
|
||||
self.update.take()
|
||||
}
|
||||
}
|
|
@ -194,7 +194,6 @@ impl<R: SleepProvider> Timer<R> {
|
|||
/// longer than either of the configured values.)
|
||||
///
|
||||
/// Idempotent.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn reconfigure(self: &mut Pin<&mut Self>, parameters: &Parameters) {
|
||||
*self.as_mut().project().parameters = Some(parameters.prepare());
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ use std::pin::Pin;
|
|||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::channel::{codec::CodecError, padding, unique_id, ChannelDetails};
|
||||
use crate::channel::{codec::CodecError, config::*, padding, unique_id, ChannelDetails};
|
||||
use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
|
@ -68,6 +68,18 @@ pub(super) enum CtrlMsg {
|
|||
/// Oneshot channel to send the new circuit's identifiers down.
|
||||
tx: ReactorResultChannel<(CircId, crate::circuit::UniqId)>,
|
||||
},
|
||||
/// Enable/disable/reconfigure channel padding
|
||||
///
|
||||
/// `bool` is whether to send padding.
|
||||
/// `padding::Parameters` is an optional update to the timing parameters;
|
||||
/// if not present, previously stored parameters should be used.
|
||||
///
|
||||
/// The sender of these messages is responsible for the optimisation of
|
||||
/// ensuring that "no-change" messages are elided.
|
||||
///
|
||||
/// This is done via a control message to avoid adding additional branches to the
|
||||
/// main reactor `select!`.
|
||||
ConfigUpdate(Arc<ChannelsConfigUpdates>),
|
||||
}
|
||||
|
||||
/// Object to handle incoming cells and background tasks on a channel.
|
||||
|
@ -215,6 +227,24 @@ impl<S: SleepProvider> Reactor<S> {
|
|||
let _ = tx.send(ret); // don't care about other side going away
|
||||
self.update_disused_since();
|
||||
}
|
||||
CtrlMsg::ConfigUpdate(updates) => {
|
||||
let ChannelsConfigUpdates {
|
||||
// List all the fields explicitly; that way the compiler will warn us
|
||||
// if one is added and we fail to handle it here.
|
||||
padding_enable,
|
||||
padding_parameters,
|
||||
} = &*updates;
|
||||
if let Some(parameters) = padding_parameters {
|
||||
self.padding_timer.as_mut().reconfigure(parameters);
|
||||
}
|
||||
if let Some(enable) = padding_enable {
|
||||
if *enable {
|
||||
self.padding_timer.as_mut().enable();
|
||||
} else {
|
||||
self.padding_timer.as_mut().disable();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue