From 7f3f42673816be8924395c9db26e55bee5c9c412 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Tue, 14 Jun 2022 15:42:23 +0100 Subject: [PATCH] channel padding: Plumb settings from chanmgr --- Cargo.lock | 1 + crates/arti-client/src/client.rs | 2 +- crates/tor-chanmgr/Cargo.toml | 1 + crates/tor-chanmgr/src/builder.rs | 5 ++ crates/tor-chanmgr/src/lib.rs | 69 +++++++++++++++++++++++++-- crates/tor-chanmgr/src/mgr.rs | 46 +++++++++++++----- crates/tor-chanmgr/src/mgr/map.rs | 79 +++++++++++++++++++++++++++++++ crates/tor-proto/src/channel.rs | 14 +----- crates/tor-proto/src/lib.rs | 2 + 9 files changed, 192 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9d84e95a8..6be2b765c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3514,6 +3514,7 @@ dependencies = [ "tor-error", "tor-linkspec", "tor-llcrypto", + "tor-netdir", "tor-proto", "tor-rtcompat", "tor-rtmock", diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index 59606dc0a..a3a6692de 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -388,7 +388,7 @@ impl TorClient { periodic_task_handles.extend( chanmgr - .launch_background_tasks(&runtime) + .launch_background_tasks(&runtime, dirmgr.clone().upcast_arc()) .map_err(ErrorDetail::ChanMgrSetup)? .into_iter(), ); diff --git a/crates/tor-chanmgr/Cargo.toml b/crates/tor-chanmgr/Cargo.toml index 039c58bfe..cab1ee2af 100644 --- a/crates/tor-chanmgr/Cargo.toml +++ b/crates/tor-chanmgr/Cargo.toml @@ -25,6 +25,7 @@ tor-basic-utils = { path = "../tor-basic-utils", version = "0.3.1" } tor-error = { path = "../tor-error", version = "0.3.1" } tor-linkspec = { path = "../tor-linkspec", version = "0.3.0" } tor-llcrypto = { path = "../tor-llcrypto", version = "0.3.0" } +tor-netdir = { path = "../tor-netdir", version = "0.3.0" } tor-proto = { path = "../tor-proto", version = "0.3.1" } tor-rtcompat = { path = "../tor-rtcompat", version = "0.4.0" } tracing = "0.1.18" diff --git a/crates/tor-chanmgr/src/builder.rs b/crates/tor-chanmgr/src/builder.rs index f02570653..4040128fb 100644 --- a/crates/tor-chanmgr/src/builder.rs +++ b/crates/tor-chanmgr/src/builder.rs @@ -6,10 +6,12 @@ use std::sync::{Arc, Mutex}; use crate::{event::ChanMgrEventSender, Error}; +use std::result::Result as StdResult; use std::time::Duration; use tor_error::{bad_api_usage, internal}; use tor_linkspec::{ChanTarget, OwnedChanTarget}; use tor_llcrypto::pk; +use tor_proto::channel::config::ChannelsConfigUpdates; use tor_rtcompat::{tls::TlsConnector, Runtime, TcpProvider, TlsProvider}; use async_trait::async_trait; @@ -236,6 +238,9 @@ impl crate::mgr::AbstractChannel for tor_proto::channel::Channel { fn duration_unused(&self) -> Option { self.duration_unused() } + fn reconfigure(&mut self, updates: Arc) -> StdResult<(), ()> { + self.reconfigure(updates).map_err(|_| ()) + } } #[cfg(test)] diff --git a/crates/tor-chanmgr/src/lib.rs b/crates/tor-chanmgr/src/lib.rs index 86a50b6bb..98ec4adb3 100644 --- a/crates/tor-chanmgr/src/lib.rs +++ b/crates/tor-chanmgr/src/lib.rs @@ -55,12 +55,16 @@ mod mgr; #[cfg(test)] mod testing; +use futures::select_biased; use futures::task::SpawnExt; use futures::StreamExt; +use std::convert::Infallible; use std::sync::{Arc, Weak}; use std::time::Duration; use tor_linkspec::{ChanTarget, OwnedChanTarget}; +use tor_netdir::NetDirProvider; use tor_proto::channel::Channel; +use tracing::{debug, error}; pub use err::Error; @@ -112,10 +116,22 @@ impl ChanMgr { } } - /// Launch the periodic daemon task required by the manager to function properly. + /// Launch the periodic daemon tasks required by the manager to function properly. /// - /// Returns a [`TaskHandle`] that can be used to manage the daemon task. - pub fn launch_background_tasks(self: &Arc, runtime: &R) -> Result> { + /// Returns a [`TaskHandle`] that can be used to manage + /// those daemon tasks that poll periodically. + pub fn launch_background_tasks( + self: &Arc, + runtime: &R, + netdir: Arc, + ) -> Result> { + runtime + .spawn(Self::continually_update_channels_config( + Arc::downgrade(self), + netdir, + )) + .map_err(|e| Error::from_spawn("channels config task", e))?; + let (sched, handle) = TaskSchedule::new(runtime.clone()); runtime .spawn(Self::continually_expire_channels( @@ -163,6 +179,53 @@ impl ChanMgr { self.mgr.expire_channels() } + /// Watch for things that ought to change the configuration of all channels in the client + /// + /// Currently this handles enabling and disabling channel padding. + /// + /// This is a daemon task that runs indefinitely in the background, + /// and exits when we find that `chanmgr` is dropped. + async fn continually_update_channels_config( + self_: Weak, + netdir: Arc, + ) { + use tor_netdir::DirEvent as DE; + let mut netdir_stream = netdir.events().fuse(); + let netdir = { + let weak = Arc::downgrade(&netdir); + drop(netdir); + weak + }; + let termination_reason = match async move { + loop { + select_biased! { + direvent = netdir_stream.next() => { + let direvent = direvent.ok_or("EOF on netdir provider event stream")?; + if ! matches!(direvent, DE::NewConsensus) { continue }; + let self_ = self_.upgrade().ok_or("channel manager gone away")?; + let netdir = netdir.upgrade().ok_or("netdir gone away")?; + let netdir = netdir.latest_netdir(); + let netdir = if let Some(nd) = netdir { nd } else { continue }; + self_.mgr.channels.process_updated_netdir(netdir).map_err(|e| { + error!("continually_update_channels_config: failed to process! {} {:?}", + &e, &e); + "error processing netdir" + })?; + } + } + } + } + .await + { + Ok::(x) => match x {}, + Err(m) => m, + }; + debug!( + "continually_update_channels_config: shutting down: {}", + termination_reason + ); + } + /// Periodically expire any channels that have been unused beyond /// the maximum duration allowed. /// diff --git a/crates/tor-chanmgr/src/mgr.rs b/crates/tor-chanmgr/src/mgr.rs index 4fe39f1cc..2f10511bd 100644 --- a/crates/tor-chanmgr/src/mgr.rs +++ b/crates/tor-chanmgr/src/mgr.rs @@ -8,8 +8,11 @@ use futures::channel::oneshot; use futures::future::{FutureExt, Shared}; use rand::Rng; use std::hash::Hash; +use std::result::Result as StdResult; +use std::sync::Arc; use std::time::Duration; use tor_error::internal; +use tor_proto::channel::config::ChannelsConfigUpdates; mod map; @@ -30,6 +33,14 @@ pub(crate) trait AbstractChannel: Clone { /// Return the amount of time a channel has not been in use. /// Return None if the channel is currently in use. fn duration_unused(&self) -> Option; + + /// Reconfigure this channel according to the provided `ChannelsConfigUpdates` + /// + /// The changed configuration may not be implemented "immediately", + /// but this will be done "reasonably soon". + /// + /// Returns `Err` (only) if the channel was closed earlier. + fn reconfigure(&mut self, updates: Arc) -> StdResult<(), ()>; } /// Trait to describe how channels are created. @@ -63,7 +74,7 @@ pub(crate) struct AbstractChanMgr { connector: CF, /// A map from ed25519 identity to channel, or to pending channel status. - channels: map::ChannelMap, + pub(crate) channels: map::ChannelMap, } /// Type alias for a future that we wait on to see when a pending @@ -195,18 +206,28 @@ impl AbstractChanMgr { }, // We need to launch a channel. Action::Launch(send) => match self.connector.build_channel(&target).await { - Ok(chan) => { + Ok(mut chan) => { // The channel got built: remember it, tell the // others, and return it. - self.channels.replace( - ident.clone(), - Open(OpenEntry { - channel: chan.clone(), - max_unused_duration: Duration::from_secs( - rand::thread_rng().gen_range(180..270), - ), - }), - )?; + self.channels + .replace_with_config(ident.clone(), |channels_config| { + // This isn't great. We switch to the newly-created channel + // just to tell it how and whether to do padding. Ideally we + // would pass the config at some suitable point during + // building. However, that would involve the channel taking a + // copy of the config, and that must happen in the same channel + // manager lock acquisition span as the one where we insert the + // channel into the table so it will receive updates. I.e., + // here. + chan.reconfigure(channels_config.total_update().into()) + .map_err(|()| internal!("new channel already closed"))?; + Ok(Open(OpenEntry { + channel: chan.clone(), + max_unused_duration: Duration::from_secs( + rand::thread_rng().gen_range(180..270), + ), + })) + })?; // It's okay if all the receivers went away: // that means that nobody was waiting for this channel. let _ignore_err = send.send(Ok(chan.clone())); @@ -297,6 +318,9 @@ mod test { fn duration_unused(&self) -> Option { None } + fn reconfigure(&mut self, _updates: Arc) -> StdResult<(), ()> { + Ok(()) + } } impl FakeChannel { diff --git a/crates/tor-chanmgr/src/mgr/map.rs b/crates/tor-chanmgr/src/mgr/map.rs index 28a99d638..7aa2e0f00 100644 --- a/crates/tor-chanmgr/src/mgr/map.rs +++ b/crates/tor-chanmgr/src/mgr/map.rs @@ -6,7 +6,9 @@ use super::{AbstractChannel, Pending}; use crate::{Error, Result}; use std::collections::{hash_map, HashMap}; +use std::sync::Arc; use tor_error::internal; +use tor_proto::ChannelsConfig; /// A map from channel id to channel state, plus necessary auxiliary state /// @@ -26,6 +28,15 @@ struct Inner { /// (Danger: this uses a blocking mutex close to async code. This mutex /// must never be held while an await is happening.) channels: HashMap>, + + /// Configuration for channels that we create, and that all existing channels are using + /// + /// Will be updated by a background task, which also notifies all existing + /// `Open` channels via `channels.` + /// + /// (Must be protected by the same lock as `channels`, or a channel might be + /// created using being-replaced configuration, but not get an update.) + channels_config: ChannelsConfig, } /// Structure that can only be constructed from within this module. @@ -131,9 +142,11 @@ impl ChannelState { impl ChannelMap { /// Create a new empty ChannelMap. pub(crate) fn new() -> Self { + let channels_config = ChannelsConfig::default(); ChannelMap { inner: std::sync::Mutex::new(Inner { channels: HashMap::new(), + channels_config, }), } } @@ -151,6 +164,7 @@ impl ChannelMap { /// Replace the channel state for `ident` with `newval`, and return the /// previous value if any. + #[cfg(test)] pub(crate) fn replace( &self, ident: C::Ident, @@ -161,6 +175,27 @@ impl ChannelMap { Ok(inner.channels.insert(ident, newval)) } + /// Replace the channel state for `ident` with the return value from `func`, + /// and return the previous value if any. + /// + /// Passes a snapshot of the current global channels configuration to `func`. + /// If that configuration is copied by `func` into an [`AbstractChannel`] + /// `func` must ensure that that `BastractChannel` is returned, + /// so that it will be properly registered and receive config updates. + pub(crate) fn replace_with_config( + &self, + ident: C::Ident, + func: F, + ) -> Result>> + where + F: FnOnce(&ChannelsConfig) -> Result>, + { + let mut inner = self.inner.lock()?; + let newval = func(&inner.channels_config)?; + newval.check_ident(&ident)?; + Ok(inner.channels.insert(ident, newval)) + } + /// Remove and return the state for `ident`, if any. pub(crate) fn remove(&self, ident: &C::Ident) -> Result>> { let mut inner = self.inner.lock()?; @@ -229,6 +264,44 @@ impl ChannelMap { } } + /// Handle a `NetDir` update (by reconfiguring channels as needed) + pub(crate) fn process_updated_netdir(&self, netdir: Arc) -> Result<()> { + use ChannelState as CS; + + let padding_parameters = { + // TODO use the netdir instead + let p = tor_proto::channel::padding::Parameters::default(); + + // Drop the `Arc` as soon as we have got what we need from it, + // before we take the channel map lock. + drop(netdir); + p + }; + + let mut inner = self.inner.lock()?; + let update = inner + .channels_config + .start_update() + .padding_parameters(padding_parameters) + .finish(); + let update = if let Some(u) = update { + u + } else { + return Ok(()); + }; + let update = Arc::new(update); + + for channel in inner.channels.values_mut() { + let channel = match channel { + CS::Open(OpenEntry { channel, .. }) => channel, + CS::Building(_) | CS::Poisoned(_) => continue, + }; + // Ignore error (which simply means the channel is closed or gone) + let _ = channel.reconfigure(update.clone()); + } + Ok(()) + } + /// Expire all channels that have been unused for too long. /// /// Return a Duration until the next time at which @@ -248,6 +321,9 @@ impl ChannelMap { mod test { #![allow(clippy::unwrap_used)] use super::*; + use std::result::Result as StdResult; + use std::sync::Arc; + use tor_proto::channel::config::ChannelsConfigUpdates; #[derive(Eq, PartialEq, Clone, Debug)] struct FakeChannel { ident: &'static str, @@ -265,6 +341,9 @@ mod test { fn duration_unused(&self) -> Option { self.unused_duration.map(Duration::from_secs) } + fn reconfigure(&mut self, _updates: Arc) -> StdResult<(), ()> { + Ok(()) + } } fn ch(ident: &'static str) -> ChannelState { let channel = FakeChannel { diff --git a/crates/tor-proto/src/channel.rs b/crates/tor-proto/src/channel.rs index 5a7bacc2e..c6ed59dec 100644 --- a/crates/tor-proto/src/channel.rs +++ b/crates/tor-proto/src/channel.rs @@ -282,18 +282,8 @@ impl Channel { details: Arc::clone(&details), }; - let mut padding_timer = Box::pin(padding::Timer::new_disabled( - sleep_prov, - Some(padding::Parameters { - // From padding-spec.txt s2.2 - // TODO support reduced padding - low_ms: 1500, - high_ms: 9500, - }), - )); - if interim_enable_by_env_var() { - padding_timer.as_mut().enable(); - } + // We start disabled; the channel manager will `reconfigure` us soon after creation. + let padding_timer = Box::pin(padding::Timer::new_disabled(sleep_prov, None)); let reactor = Reactor { control: control_rx, diff --git a/crates/tor-proto/src/lib.rs b/crates/tor-proto/src/lib.rs index 6ef6556bf..4fd976d61 100644 --- a/crates/tor-proto/src/lib.rs +++ b/crates/tor-proto/src/lib.rs @@ -124,6 +124,8 @@ mod util; pub use util::err::{Error, ResolveError}; pub use util::skew::ClockSkew; +pub use channel::config::ChannelsConfig; + /// A vector of bytes that gets cleared when it's dropped. type SecretBytes = zeroize::Zeroizing>;