From 3b41c78d6fc5eceac3b45ea9ef1540b72f9b4004 Mon Sep 17 00:00:00 2001 From: eta Date: Tue, 22 Nov 2022 16:24:15 +0000 Subject: [PATCH 1/2] Draft: Pluggable transport manager This commit implements `PtMgr`, a pluggable transport manager responsible for keeping track of spawned PTs and spawning them to satisfy client requests on demand. It does this in two parts: the `PtMgr` type exported to the rest of the code, and the background `PtReactor` that actually does the spawning; this design ensures that only one attempt to spawn a PT is active at a time, and will prove useful later for implementing e.g. timeouts. A few changes were necessary to the rest of the code in order to make this all work out. Namely: - `TransportRegistry`'s API didn't make any sense for two reasons: - It wasn't feasible for implementors to implement `ChannelFactory`, since that'd require constructing a `ChanBuilder` (which requires a bootstrap reporting event sender). - Treating the PT manager as a registry is over-general; it's only necessary for it to spawn pluggable transports, so saddling it with other concerns didn't make any sense. - (It's possible to get extensibility for arbitrary user customization by just letting the user swap in a new `ChannelFactory`, anyway.) - Therefore, the `PtMgr` implements the new `AbstractPtMgr` trait, which is far more narrowly focused; this only exists to solve a dependency loop, and is documented as such. - This provides a `TransportHelper` instead of a `ChannelFactory`. --- Cargo.lock | 1 + crates/tor-chanmgr/src/factory.rs | 5 +- crates/tor-chanmgr/src/transport/proxied.rs | 51 +++-- crates/tor-ptmgr/Cargo.toml | 3 +- crates/tor-ptmgr/examples/run-pt.rs | 2 +- crates/tor-ptmgr/src/config.rs | 41 +--- crates/tor-ptmgr/src/err.rs | 35 +++ crates/tor-ptmgr/src/ipc.rs | 41 +++- crates/tor-ptmgr/src/lib.rs | 234 ++++++++++++++++++-- 9 files changed, 326 insertions(+), 87 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c9a90646d..1a84bb1a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4074,6 +4074,7 @@ dependencies = [ "derive_builder_fork_arti", "futures", "serde", + "tempfile", "thiserror", "tokio", "tor-chanmgr", diff --git a/crates/tor-chanmgr/src/factory.rs b/crates/tor-chanmgr/src/factory.rs index a3d30e990..cd92e2b41 100644 --- a/crates/tor-chanmgr/src/factory.rs +++ b/crates/tor-chanmgr/src/factory.rs @@ -62,7 +62,10 @@ where } /// The error type returned by a pluggable transport manager. -pub trait AbstractPtError: std::error::Error + HasKind + HasRetryTime + Send + Sync {} +pub trait AbstractPtError: + std::error::Error + HasKind + HasRetryTime + Send + Sync + std::fmt::Debug +{ +} /// A pluggable transport manager. /// diff --git a/crates/tor-chanmgr/src/transport/proxied.rs b/crates/tor-chanmgr/src/transport/proxied.rs index 8845d09c8..86921b95e 100644 --- a/crates/tor-chanmgr/src/transport/proxied.rs +++ b/crates/tor-chanmgr/src/transport/proxied.rs @@ -260,6 +260,21 @@ pub struct ExternalProxyPlugin { runtime: R, /// The location of the proxy. proxy_addr: SocketAddr, + /// The SOCKS protocol version to use. + proxy_version: SocksVersion, +} + +#[cfg(feature = "pt-client")] +#[cfg_attr(docsrs, doc(cfg(feature = "pt-client")))] +impl ExternalProxyPlugin { + /// Make a new `ExternalProxyPlugin`. + pub fn new(rt: R, proxy_addr: SocketAddr, proxy_version: SocksVersion) -> Self { + Self { + runtime: rt, + proxy_addr, + proxy_version, + } + } } #[cfg(feature = "pt-client")] @@ -286,7 +301,8 @@ impl TransportHelper for ExternalProxyPlugin { } }; - let protocol = settings_to_protocol(encode_settings(pt_target.settings()))?; + let protocol = + settings_to_protocol(self.proxy_version, encode_settings(pt_target.settings()))?; Ok(( target.clone(), @@ -359,23 +375,30 @@ where /// Transform a string into a representation that can be sent as SOCKS /// authentication. +// NOTE(eta): I am very unsure of the logic in here. #[cfg(feature = "pt-client")] -fn settings_to_protocol(s: String) -> Result { +fn settings_to_protocol(vers: SocksVersion, s: String) -> Result { let mut bytes: Vec<_> = s.into(); Ok(if bytes.is_empty() { - Protocol::Socks(SocksVersion::V5, SocksAuth::NoAuth) + Protocol::Socks(vers, SocksAuth::NoAuth) + } else if vers == SocksVersion::V4 { + if bytes.contains(&0) { + return Err(ProxyError::InvalidSocksRequest( + tor_socksproto::Error::NotImplemented( + "SOCKS 4 doesn't support internal NUL bytes (for PT settings list)".into(), + ), + )); + } else { + Protocol::Socks(SocksVersion::V4, SocksAuth::Socks4(bytes)) + } } else if bytes.len() <= 255 { Protocol::Socks(SocksVersion::V5, SocksAuth::Username(bytes, vec![])) } else if bytes.len() <= (255 * 2) { let password = bytes.split_off(255); Protocol::Socks(SocksVersion::V5, SocksAuth::Username(bytes, password)) - } else if !bytes.contains(&0) { - Protocol::Socks(SocksVersion::V4, SocksAuth::Socks4(bytes)) } else { return Err(ProxyError::InvalidSocksRequest( - tor_socksproto::Error::NotImplemented( - "long settings lists with internal NUL bytes".into(), - ), + tor_socksproto::Error::NotImplemented("PT settings list too long for SOCKS 5".into()), )); }) } @@ -419,13 +442,14 @@ mod test { ); } + // TODO pt-client / FIXME(eta): make this test more complete #[cfg(feature = "pt-client")] #[test] fn split_settings() { use SocksVersion::*; let long_string = "examplestrg".to_owned().repeat(50); assert_eq!(long_string.len(), 550); - let s = |a, b| settings_to_protocol(long_string[a..b].to_owned()).unwrap(); + let s = |a, b| settings_to_protocol(V5, long_string[a..b].to_owned()).unwrap(); let v = |a, b| long_string.as_bytes()[a..b].to_vec(); assert_eq!(s(0, 0), Protocol::Socks(V5, SocksAuth::NoAuth)); @@ -450,19 +474,20 @@ mod test { Protocol::Socks(V5, SocksAuth::Username(v(0, 255), v(255, 510))) ); // This one needs to use socks4, or it won't fit. :P - assert_eq!(s(0, 511), Protocol::Socks(V4, SocksAuth::Socks4(v(0, 511)))); + // FIXME FIXME FIXME + // assert_eq!(s(0, 511), Protocol::Socks(V4, SocksAuth::Socks4(v(0, 511)))); // Small requests with "0" bytes work fine... assert_eq!( - settings_to_protocol("\0".to_owned()).unwrap(), + settings_to_protocol(V5, "\0".to_owned()).unwrap(), Protocol::Socks(V5, SocksAuth::Username(vec![0], vec![])) ); assert_eq!( - settings_to_protocol("\0".to_owned().repeat(510)).unwrap(), + settings_to_protocol(V5, "\0".to_owned().repeat(510)).unwrap(), Protocol::Socks(V5, SocksAuth::Username(vec![0; 255], vec![0; 255])) ); // Huge requests with "0" simply can't be encoded. - assert!(settings_to_protocol("\0".to_owned().repeat(511)).is_err()); + assert!(settings_to_protocol(V5, "\0".to_owned().repeat(511)).is_err()); } } diff --git a/crates/tor-ptmgr/Cargo.toml b/crates/tor-ptmgr/Cargo.toml index 891bb2f44..c4b418d7c 100644 --- a/crates/tor-ptmgr/Cargo.toml +++ b/crates/tor-ptmgr/Cargo.toml @@ -20,8 +20,9 @@ async-trait = "0.1.2" derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" } futures = "0.3.14" serde = { version = "1.0.103", features = ["derive"] } +tempfile = "3.3" thiserror = "1" -tor-chanmgr = { version = "0.7.0", path = "../tor-chanmgr" } +tor-chanmgr = { version = "0.7.0", path = "../tor-chanmgr", features = ["pt-client"] } tor-config = { version = "0.6.0", path = "../tor-config" } tor-error = { version = "0.3.2", path = "../tor-error" } tor-linkspec = { version = "0.5.1", path = "../tor-linkspec", features = ["pt-client"] } diff --git a/crates/tor-ptmgr/examples/run-pt.rs b/crates/tor-ptmgr/examples/run-pt.rs index aa92532ef..4dd0c988e 100644 --- a/crates/tor-ptmgr/examples/run-pt.rs +++ b/crates/tor-ptmgr/examples/run-pt.rs @@ -13,7 +13,7 @@ async fn main() -> Result<()> { .transports(vec!["obfs4".parse().unwrap()]) .build() .unwrap(); - let mut pt = PluggableTransport::new("./obfs4proxy".into(), params); + let mut pt = PluggableTransport::new("./obfs4proxy".into(), vec![], params); pt.launch(PreferredRuntime::current()?).await?; loop { info!("message: {:?}", pt.next_message().await?); diff --git a/crates/tor-ptmgr/src/config.rs b/crates/tor-ptmgr/src/config.rs index f691e943f..5c19944b1 100644 --- a/crates/tor-ptmgr/src/config.rs +++ b/crates/tor-ptmgr/src/config.rs @@ -1,43 +1,10 @@ //! Configuration logic for tor-ptmgr. -#![allow(dead_code)] // TODO pt-client: remove. - use derive_builder::Builder; use serde::{Deserialize, Serialize}; -use tor_config::list_builder::{define_list_builder_accessors, define_list_builder_helper}; use tor_config::{CfgPath, ConfigBuildError}; use tor_linkspec::PtTransportName; -/// Configure one or more pluggable transports. -#[derive(Debug, Clone, Builder, Eq, PartialEq)] -#[builder(derive(Debug, Serialize, Deserialize))] -#[builder(build_fn(error = "ConfigBuildError"))] -pub struct PtMgrConfig { - /// A list of configured transport binaries. - #[builder(sub_builder, setter(custom))] - binaries: TransportConfigList, - // TODO: Someday we will want to also have support for a directory full of - // transports, transports loaded dynamically from an object file, or stuff - // like that. -} - -define_list_builder_accessors! { - struct PtMgrConfigBuilder { - pub binaries: [ManagedTransportConfigBuilder], - } -} - -/// A list of configured transport binaries (type alias for macrology). -type TransportConfigList = Vec; - -define_list_builder_helper! { - pub(crate) struct TransportConfigListBuilder { - transports: [ManagedTransportConfigBuilder], - } - built: TransportConfigList = transports; - default = vec![]; -} - /// A single pluggable transport, to be launched as an external process. #[derive(Clone, Debug, Builder, Eq, PartialEq)] #[builder(derive(Debug, Serialize, Deserialize))] @@ -47,18 +14,18 @@ pub struct ManagedTransportConfig { // // NOTE(eta): This doesn't use the list builder stuff, because you're not likely to // set this field more than once. - protocols: Vec, + pub(crate) protocols: Vec, /// The path to the binary to run. - path: CfgPath, + pub(crate) path: CfgPath, /// One or more command-line arguments to pass to the binary. // TODO: Should this be OsString? That's a pain to parse... // // NOTE(eta): This doesn't use the list builder stuff, because you're not likely to // set this field more than once. #[builder(default)] - arguments: Vec, + pub(crate) arguments: Vec, /// If true, launch this transport on startup. Otherwise, we launch /// it on demand #[builder(default)] - run_on_startup: bool, + pub(crate) run_on_startup: bool, } diff --git a/crates/tor-ptmgr/src/err.rs b/crates/tor-ptmgr/src/err.rs index c561d31f5..544f33f0f 100644 --- a/crates/tor-ptmgr/src/err.rs +++ b/crates/tor-ptmgr/src/err.rs @@ -2,6 +2,9 @@ use std::path::PathBuf; use std::sync::Arc; +use tor_chanmgr::factory::AbstractPtError; +use tor_config::{CfgPath, CfgPathError}; +use tor_error::{ErrorKind, HasKind, HasRetryTime, RetryTime}; /// An error spawning or managing a pluggable transport. #[derive(Clone, Debug, thiserror::Error)] @@ -46,6 +49,7 @@ pub enum PtError { /// The binary path we tried to execute. path: PathBuf, /// The I/O error returned. + #[source] error: Arc, }, /// We failed to parse something a pluggable transport sent us. @@ -61,7 +65,38 @@ pub enum PtError { /// We couldn't get stdio for a spawned child process for some reason. #[error("PT stdio unavailable")] StdioUnavailable, + /// We couldn't create a temporary directory. + #[error("Failed to create a temporary directory: {0}")] + TempdirCreateFailed(#[source] Arc), + /// We couldn't expand a path. + #[error("Failed to expand path {}: {}", path, error)] + PathExpansionFailed { + /// The offending path. + path: CfgPath, + /// The error encountered. + #[source] + error: CfgPathError, + }, + /// The pluggable transport reactor failed. + #[error("PT reactor failed")] + // TODO pt-client: This should just be a bug. + ReactorFailed, } +// TODO pt-client: implement. +impl HasKind for PtError { + fn kind(&self) -> ErrorKind { + todo!() + } +} + +impl HasRetryTime for PtError { + fn retry_time(&self) -> RetryTime { + todo!() + } +} + +impl AbstractPtError for PtError {} + /// Standard-issue `Result` alias, with [`PtError`]. pub type Result = std::result::Result; diff --git a/crates/tor-ptmgr/src/ipc.rs b/crates/tor-ptmgr/src/ipc.rs index eb8710ccb..435ecf7bf 100644 --- a/crates/tor-ptmgr/src/ipc.rs +++ b/crates/tor-ptmgr/src/ipc.rs @@ -361,6 +361,7 @@ impl FromStr for PtMessage { // // FIXME(eta): This currently spawns an OS thread, since there's no other way to do this without // being async-runtime dependent (or adding process spawning to tor-rtcompat). +#[derive(Debug)] struct AsyncPtChild { /// Channel to receive lines from the child process stdout. stdout: Receiver>, @@ -463,7 +464,7 @@ impl AsyncPtChild { } /// Parameters passed to a pluggable transport. -#[derive(PartialEq, Eq, Clone, derive_builder::Builder)] +#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)] pub struct PtParameters { /// A path where the launched PT can store state. state_location: PathBuf, @@ -531,23 +532,38 @@ impl PtParameters { } /// A SOCKS endpoint to connect through a pluggable transport. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct PtClientMethod { /// The SOCKS protocol version to use. - kind: SocksVersion, + pub(crate) kind: SocksVersion, /// The socket address to connect to. - endpoint: SocketAddr, + pub(crate) endpoint: SocketAddr, +} + +impl PtClientMethod { + /// Get the SOCKS protocol version to use. + pub fn kind(&self) -> SocksVersion { + self.kind + } + + /// Get the socket address to connect to. + pub fn endpoint(&self) -> SocketAddr { + self.endpoint + } } /// A pluggable transport binary in a child process. /// /// These start out inert, and must be launched with [`PluggableTransport::launch`] in order /// to be useful. +#[derive(Debug)] pub struct PluggableTransport { /// The currently running child, if there is one. inner: Option, /// The path to the binary to run. binary_path: PathBuf, + /// Arguments to pass to the binary. + arguments: Vec, /// Configured parameters. params: PtParameters, /// Information about client methods obtained from the PT. @@ -559,20 +575,24 @@ impl PluggableTransport { /// the `params` to it. /// /// You must call [`PluggableTransport::launch`] to actually run the PT. - pub fn new(binary_path: PathBuf, params: PtParameters) -> Self { + pub fn new(binary_path: PathBuf, arguments: Vec, params: PtParameters) -> Self { Self { params, + arguments, binary_path, inner: None, cmethods: Default::default(), } } - /// Get information for the named `transport`, if the PT is running. - // - // FIXME(eta): This could be slightly more typed. - pub fn transport_method(&self, transport: &PtTransportName) -> Option<&PtClientMethod> { - self.cmethods.get(transport) + + /// Get all client methods returned by the binary, if it has been launched. + /// + /// If it hasn't been launched, the returned map will be empty. + // TODO(eta): Actually figure out a way to expose this more stably. + pub(crate) fn transport_methods(&self) -> &HashMap { + &self.cmethods } + /// Get the next [`PtMessage`] from the running transport. It is recommended to call this /// in a loop once a PT has been launched, in order to forward log messages and find out about /// status updates. @@ -603,6 +623,7 @@ impl PluggableTransport { return Ok(()); } let child = Command::new(&self.binary_path) + .args(self.arguments.iter()) .envs(self.params.environment_variables()) .stdout(Stdio::piped()) .stdin(Stdio::piped()) diff --git a/crates/tor-ptmgr/src/lib.rs b/crates/tor-ptmgr/src/lib.rs index 05c679b41..523eeca81 100644 --- a/crates/tor-ptmgr/src/lib.rs +++ b/crates/tor-ptmgr/src/lib.rs @@ -37,50 +37,236 @@ #![allow(clippy::result_large_err)] // temporary workaround for arti#587 //! +#![allow(dead_code)] // FIXME TODO pt-client: remove. +#![allow(unused_imports)] // FIXME TODO pt-client: remove. + pub mod config; pub mod err; pub mod ipc; -use config::PtMgrConfig; - +use crate::config::ManagedTransportConfig; +use crate::err::PtError; +use crate::ipc::{PluggableTransport, PtClientMethod, PtParameters, PtParametersBuilder}; +use crate::mpsc::Receiver; +use async_trait::async_trait; +use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use futures::channel::oneshot; +use futures::StreamExt; +use std::collections::HashMap; +use std::mem; +use std::ops::Deref; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; +use tempfile::TempDir; +use tor_chanmgr::builder::ChanBuilder; #[cfg(feature = "tor-channel-factory")] use tor_chanmgr::factory::ChannelFactory; -use tor_linkspec::TransportId; +use tor_chanmgr::factory::{AbstractPtError, AbstractPtMgr}; +use tor_chanmgr::transport::{ExternalProxyPlugin, TransportHelper}; +use tor_linkspec::{PtTransportName, TransportId}; use tor_rtcompat::Runtime; +use tracing::{info, warn}; + +/// Shared mutable state between the `PtReactor` and `PtMgr`. +#[derive(Default, Debug)] +struct PtSharedState { + /// Connection information for pluggable transports from currently running binaries. + cmethods: HashMap, + /// Current configured set of pluggable transport binaries. + configured: HashMap, +} + +/// A message to the `PtReactor`. +enum PtReactorMessage { + /// Notify the reactor that the currently configured set of PTs has changed. + Reconfigured, + /// Ask the reactor to spawn a pluggable transport binary. + Spawn { + /// Spawn a binary to provide this PT. + pt: PtTransportName, + /// Notify the result via this channel. + result: oneshot::Sender>, + }, +} + +/// Background reactor to handle managing pluggable transport binaries. +struct PtReactor { + /// Runtime. + rt: R, + /// Currently running pluggable transport binaries. + running: Vec, + /// State for the corresponding PtMgr. + state: Arc>, + /// PtMgr channel. + /// (Unbounded so that we can reconfigure without blocking: we're unlikely to have the reactor + /// get behind.) + rx: Receiver, +} + +impl PtReactor { + /// XXX + async fn run_one_step(&mut self) -> err::Result<()> { + todo!() + } +} /// A pluggable transport manager knows how to make different /// kinds of connections to the Tor network, for censorship avoidance. /// /// Currently, we only support two kinds of pluggable transports: Those /// configured in a PtConfig object, and those added with PtMgr::register. -// -// TODO: Will we need a here? I don't know. -nickm -#[derive(Clone, Debug)] pub struct PtMgr { /// An underlying `Runtime`, used to spawn background tasks. #[allow(dead_code)] runtime: R, + /// State for this PtMgr. + state: Arc>, + /// PtReactor channel. + tx: UnboundedSender, + /// Temporary directory to store PT state in. + // + // FIXME(eta): This should be configurable. + state_dir: TempDir, } -#[allow(clippy::missing_panics_doc, clippy::needless_pass_by_value)] impl PtMgr { - /// Create a new PtMgr. - pub fn new(cfg: PtMgrConfig, rt: R) -> Self { - let _ = (cfg, rt); - todo!("TODO pt-client: implement this.") - } - /// Reload the configuration - pub fn reconfigure(&self, cfg: PtMgrConfig) -> Result<(), tor_config::ReconfigureError> { - let _ = cfg; - todo!("TODO pt-client: implement this.") - } - /// Manually add a new channel factory to this registry. - #[cfg(feature = "tor-channel-factory")] - pub fn register_factory(&self, ids: &[TransportId], factory: impl ChannelFactory) { - let _ = (ids, factory); - todo!("TODO pt-client: implement this.") + /// Transform the config into a more useful representation indexed by transport name. + fn transform_config( + binaries: Vec, + ) -> HashMap { + let mut ret = HashMap::new(); + // FIXME(eta): You can currently specify overlapping protocols in your binaries, and it'll + // just use the last binary specified. + // I attempted to fix this, but decided I didn't want to stare into the list + // builder macro void after trying it for 15 minutes. + for thing in binaries { + for tn in thing.protocols.iter() { + ret.insert(tn.clone(), thing.clone()); + } + } + ret } - // TODO pt-client: Possibly, this should have a separate function to launch - // its background tasks. + /// Create a new PtMgr. + // TODO pt-client: maybe don't have the Vec directly exposed? + pub fn new(transports: Vec, rt: R) -> Result { + let state = PtSharedState { + cmethods: Default::default(), + configured: Self::transform_config(transports), + }; + let state = Arc::new(RwLock::new(state)); + let (tx, _) = mpsc::unbounded(); + + Ok(Self { + runtime: rt, + state, + tx, + state_dir: TempDir::new().map_err(|e| PtError::TempdirCreateFailed(Arc::new(e)))?, + }) + } + + /// Reload the configuration + pub fn reconfigure( + &mut self, + transports: Vec, + ) -> Result<(), tor_config::ReconfigureError> { + { + let mut inner = self.state.write().expect("ptmgr poisoned"); + inner.configured = Self::transform_config(transports); + } + // We don't have any way of propagating this sanely; the caller will find out the reactor + // has died later on anyway. + let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured); + Ok(()) + } +} + +/// Spawn a `PluggableTransport` using a `ManagedTransportConfig`. +async fn spawn_from_config( + rt: R, + state_dir: PathBuf, + cfg: ManagedTransportConfig, +) -> Result { + // FIXME(eta): make the rest of these parameters configurable + let pt_params = PtParameters::builder() + .state_location(state_dir) + .transports(cfg.protocols) + .build() + .expect("PtParameters constructed incorrectly"); + + // FIXME(eta): I really think this expansion should happen at builder validation time... + let path = cfg.path.path().map_err(|e| PtError::PathExpansionFailed { + path: cfg.path, + error: e, + })?; + let mut pt = PluggableTransport::new(path, cfg.arguments, pt_params); + pt.launch(rt).await?; + Ok(pt) +} + +#[cfg(feature = "tor-channel-factory")] +#[async_trait] +impl tor_chanmgr::factory::AbstractPtMgr for PtMgr { + // There is going to be a lot happening "under the hood" here. + // + // When we are asked to get a ChannelFactory for a given + // connection, we will need to: + // - launch the binary for that transport if it is not already running*. + // - If we launched the binary, talk to it and see which ports it + // is listening on. + // - Return a ChannelFactory that connects via one of those ports, + // using the appropriate version of SOCKS, passing K=V parameters + // encoded properly. + // + // * As in other managers, we'll need to avoid trying to launch the same + // transport twice if we get two concurrent requests. + // + // Later if the binary crashes, we should detect that. We should relaunch + // it on demand. + // + // On reconfigure, we should shut down any no-longer-used transports. + // + // Maybe, we should shut down transports that haven't been used + // for a long time. + async fn factory_for_transport( + &self, + transport: &PtTransportName, + ) -> Result>, Arc> { + // NOTE(eta): This is using a RwLock inside async code (but not across an await point). + // Arguably this is fine since it's just a small read, and nothing should ever + // hold this lock for very long. + let (mut cmethod, configured) = { + let inner = self.state.read().expect("ptmgr poisoned"); + let cmethod = inner.cmethods.get(transport).copied(); + let configured = cmethod.is_some() || inner.configured.get(transport).is_some(); + (cmethod, configured) + }; + if cmethod.is_none() { + if configured { + // Tell the reactor to spawn the PT, and wait for it. + // (The reactor will handle coalescing multiple requests.) + let (tx, rx) = oneshot::channel(); + self.tx + .unbounded_send(PtReactorMessage::Spawn { + pt: transport.clone(), + result: tx, + }) + .map_err(|_| Arc::new(PtError::ReactorFailed) as Arc)?; + cmethod = Some( + // NOTE(eta): Could be improved with result flattening. + rx.await + .map_err(|_| Arc::new(PtError::ReactorFailed) as Arc)? + .map_err(|x| Arc::new(x) as Arc)?, + ); + } else { + return Ok(None); + } + } + let cmethod = cmethod.expect("impossible"); + let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind); + let factory = ChanBuilder::new(self.runtime.clone(), proxy); + // FIXME(eta): Should we cache constructed factories? If no: should this still be an Arc? + // FIXME(eta): Should we track what transports are live somehow, so we can shut them down? + Ok(Some(Arc::new(factory))) + } } From a3c9fc5b1aa9f1072448fdea336f561b71ce1176 Mon Sep 17 00:00:00 2001 From: eta Date: Mon, 28 Nov 2022 15:32:56 +0000 Subject: [PATCH 2/2] tor-chanmgr: Introduce the BootstrapReporter API, publicize ChanBuilder This commit makes the `ChanBuilder` type in `tor-chanmgr` usable by consumers outside of that crate, like the doc comment for `ChannelFactory` says you need to be able to do in order to turn your `TransportHelper` into something useful. As part of doing this, the `event_sender` its constructor takes needed to be dealt with, since it was a crate-internal type that came from inside the `ChanMgr`. Enter `BootstrapReporter`: an opaque wrapper around that sender, now provided as an additional argument to `ChannelFactory::connect_via_transport`. You can now construct a `ChanBuilder` outside this crate, and it'll still be able to report its bootstrap status by unwrapping this new type that's threaded through from the `ChanMgr`. (This was a fair deal of manually threading the type through all the layers in this crate!) Note that you cannot implement bootstrap updating using something that isn't `ChanBuilder` yet due to the type being entirely opaque (but, of course, we can figure out exactly what API the reporter should have later, and add that capability in). --- crates/tor-chanmgr/src/builder.rs | 37 +++++------ crates/tor-chanmgr/src/factory.rs | 62 ++++++++++++++++--- crates/tor-chanmgr/src/lib.rs | 8 ++- crates/tor-chanmgr/src/mgr.rs | 23 ++++++- crates/tor-chanmgr/src/mgr/state.rs | 7 ++- .../tor-chanmgr/src/mgr/state/padding_test.rs | 15 ++++- 6 files changed, 111 insertions(+), 41 deletions(-) diff --git a/crates/tor-chanmgr/src/builder.rs b/crates/tor-chanmgr/src/builder.rs index 01d3e4666..b37282d0f 100644 --- a/crates/tor-chanmgr/src/builder.rs +++ b/crates/tor-chanmgr/src/builder.rs @@ -3,7 +3,7 @@ use std::io; use std::sync::{Arc, Mutex}; -use crate::factory::ChannelFactory; +use crate::factory::{BootstrapReporter, ChannelFactory}; use crate::transport::TransportHelper; use crate::{event::ChanMgrEventSender, Error}; @@ -28,14 +28,12 @@ use futures::task::SpawnExt; /// /// This channel builder does not retry on failure, but it _does_ implement a /// time-out. -pub(crate) struct ChanBuilder +pub struct ChanBuilder where R: tor_rtcompat::TlsProvider, { /// Asynchronous runtime for TLS, TCP, spawning, and timeouts. runtime: R, - /// Used to update our bootstrap reporting status. - event_sender: Arc>, /// The transport object that we use to construct streams. transport: H, /// Object to build TLS connections. @@ -47,16 +45,11 @@ where R: TlsProvider, { /// Construct a new ChanBuilder. - pub(crate) fn new( - runtime: R, - transport: H, - event_sender: Arc>, - ) -> Self { + pub fn new(runtime: R, transport: H) -> Self { let tls_connector = >::tls_connector(&runtime); ChanBuilder { runtime, transport, - event_sender, tls_connector, } } @@ -70,6 +63,7 @@ where async fn connect_via_transport( &self, target: &OwnedChanTarget, + reporter: BootstrapReporter, ) -> crate::Result { use tor_rtcompat::SleepProviderExt; @@ -80,7 +74,7 @@ where std::time::Duration::new(10, 0) }; - let connect_future = self.connect_no_timeout(target); + let connect_future = self.connect_no_timeout(target, reporter.0); self.runtime .timeout(delay, connect_future) .await @@ -99,15 +93,13 @@ where async fn connect_no_timeout( &self, target: &OwnedChanTarget, + event_sender: Arc>, ) -> crate::Result { use tor_proto::channel::ChannelBuilder; use tor_rtcompat::tls::CertifiedConn; { - self.event_sender - .lock() - .expect("Lock poisoned") - .record_attempt(); + event_sender.lock().expect("Lock poisoned").record_attempt(); } // 1a. Negotiate the TCP connection or other stream. @@ -127,7 +119,7 @@ where { // TODO pt-client: distinguish which transport just succeeded. - self.event_sender + event_sender .lock() .expect("Lock poisoned") .record_tcp_success(); @@ -148,7 +140,7 @@ where .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?; { - self.event_sender + event_sender .lock() .expect("Lock poisoned") .record_tls_finished(); @@ -171,7 +163,7 @@ where .check(target, &peer_cert, Some(now)) .map_err(|source| match &source { tor_proto::Error::HandshakeCertsExpired { .. } => { - self.event_sender + event_sender .lock() .expect("Lock poisoned") .record_handshake_done_with_skewed_clock(); @@ -190,7 +182,7 @@ where })?; { - self.event_sender + event_sender .lock() .expect("Lock poisoned") .record_handshake_done(); @@ -287,14 +279,15 @@ mod test { client_rt.jump_to(now); // Create the channel builder that we want to test. - let (snd, _rcv) = crate::event::channel(); let transport = crate::transport::DefaultTransport::new(client_rt.clone()); - let builder = ChanBuilder::new(client_rt, transport, Arc::new(Mutex::new(snd))); + let builder = ChanBuilder::new(client_rt, transport); let (r1, r2): (Result, Result) = futures::join!( async { // client-side: build a channel! - builder.build_channel(&target).await + builder + .build_channel(&target, BootstrapReporter::fake()) + .await }, async { // relay-side: accept the channel diff --git a/crates/tor-chanmgr/src/factory.rs b/crates/tor-chanmgr/src/factory.rs index cd92e2b41..863cd442f 100644 --- a/crates/tor-chanmgr/src/factory.rs +++ b/crates/tor-chanmgr/src/factory.rs @@ -1,14 +1,31 @@ //! Traits and code to define different mechanisms for building Channels to //! different kinds of targets. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use crate::event::ChanMgrEventSender; use async_trait::async_trait; use tor_error::{internal, HasKind, HasRetryTime}; use tor_linkspec::{HasChanMethod, OwnedChanTarget, PtTransportName}; use tor_proto::channel::Channel; use tracing::debug; +/// An opaque type that lets a `ChannelFactory` update the `ChanMgr` about bootstrap progress. +/// +/// A future release of this crate might make this type less opaque. +// FIXME(eta): Do that. +#[derive(Clone)] +pub struct BootstrapReporter(pub(crate) Arc>); + +impl BootstrapReporter { + #[cfg(test)] + /// Create a useless version of this type to satisfy some test. + pub(crate) fn fake() -> Self { + let (snd, _rcv) = crate::event::channel(); + Self(Arc::new(Mutex::new(snd))) + } +} + /// An object that knows how to build `Channels` to `ChanTarget`s. /// /// This trait must be object-safe. @@ -19,6 +36,11 @@ use tracing::debug; /// A `ChannelFactory` can be implemented in terms of a /// [`TransportHelper`](crate::transport::TransportHelper), by wrapping it in a /// `ChanBuilder`. +/// +// FIXME(eta): Rectify the below situation. +/// (In fact, as of the time of writing, this is the *only* way to implement this trait +/// outside of this crate while keeping bootstrap status reporting, since `BootstrapReporter` +/// is an opaque type.) #[async_trait] pub trait ChannelFactory: Send + Sync { /// Open an authenticated channel to `target`. @@ -30,20 +52,32 @@ pub trait ChannelFactory: Send + Sync { /// caller provides a target with an unsupported /// [`TransportId`](tor_linkspec::TransportId), this method should return /// [`Error::NoSuchTransport`](crate::Error::NoSuchTransport). - async fn connect_via_transport(&self, target: &OwnedChanTarget) -> crate::Result; + async fn connect_via_transport( + &self, + target: &OwnedChanTarget, + reporter: BootstrapReporter, + ) -> crate::Result; } #[async_trait] impl<'a> ChannelFactory for Arc<(dyn ChannelFactory + Send + Sync + 'a)> { - async fn connect_via_transport(&self, target: &OwnedChanTarget) -> crate::Result { - self.as_ref().connect_via_transport(target).await + async fn connect_via_transport( + &self, + target: &OwnedChanTarget, + reporter: BootstrapReporter, + ) -> crate::Result { + self.as_ref().connect_via_transport(target, reporter).await } } #[async_trait] impl<'a> ChannelFactory for Box<(dyn ChannelFactory + Send + Sync + 'a)> { - async fn connect_via_transport(&self, target: &OwnedChanTarget) -> crate::Result { - self.as_ref().connect_via_transport(target).await + async fn connect_via_transport( + &self, + target: &OwnedChanTarget, + reporter: BootstrapReporter, + ) -> crate::Result { + self.as_ref().connect_via_transport(target, reporter).await } } @@ -55,9 +89,13 @@ where type Channel = tor_proto::channel::Channel; type BuildSpec = OwnedChanTarget; - async fn build_channel(&self, target: &Self::BuildSpec) -> crate::Result { + async fn build_channel( + &self, + target: &Self::BuildSpec, + reporter: BootstrapReporter, + ) -> crate::Result { debug!("Attempting to open a new channel to {target}"); - self.connect_via_transport(target).await + self.connect_via_transport(target, reporter).await } } @@ -109,7 +147,11 @@ pub(crate) struct CompoundFactory { #[async_trait] impl ChannelFactory for CompoundFactory { - async fn connect_via_transport(&self, target: &OwnedChanTarget) -> crate::Result { + async fn connect_via_transport( + &self, + target: &OwnedChanTarget, + reporter: BootstrapReporter, + ) -> crate::Result { use tor_linkspec::ChannelMethod::*; let factory = match target.chan_method() { Direct(_) => self.default_factory.clone(), @@ -130,7 +172,7 @@ impl ChannelFactory for CompoundFactory { } }; - factory.connect_via_transport(target).await + factory.connect_via_transport(target, reporter).await } } diff --git a/crates/tor-chanmgr/src/lib.rs b/crates/tor-chanmgr/src/lib.rs index 1459c8056..d9db09b87 100644 --- a/crates/tor-chanmgr/src/lib.rs +++ b/crates/tor-chanmgr/src/lib.rs @@ -37,7 +37,7 @@ #![allow(clippy::result_large_err)] // temporary workaround for arti#587 //! -mod builder; +pub mod builder; mod config; mod err; mod event; @@ -70,6 +70,7 @@ use tor_rtcompat::Runtime; /// A Result as returned by this crate. pub type Result = std::result::Result; +use crate::factory::BootstrapReporter; pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents}; use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule}; @@ -166,14 +167,15 @@ impl ChanMgr { { let (sender, receiver) = event::channel(); let sender = Arc::new(std::sync::Mutex::new(sender)); + let reporter = BootstrapReporter(sender); let transport = transport::DefaultTransport::new(runtime.clone()); - let builder = builder::ChanBuilder::new(runtime, transport, sender); + let builder = builder::ChanBuilder::new(runtime, transport); let factory = factory::CompoundFactory::new( Arc::new(builder), #[cfg(feature = "pt-client")] None, ); - let mgr = mgr::AbstractChanMgr::new(factory, config, dormancy, netparams); + let mgr = mgr::AbstractChanMgr::new(factory, config, dormancy, netparams, reporter); ChanMgr { mgr, bootstrap_status: receiver, diff --git a/crates/tor-chanmgr/src/mgr.rs b/crates/tor-chanmgr/src/mgr.rs index b5fd3b872..674faeae5 100644 --- a/crates/tor-chanmgr/src/mgr.rs +++ b/crates/tor-chanmgr/src/mgr.rs @@ -3,6 +3,7 @@ use crate::mgr::state::{OpenEntry, PendingEntry}; use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result}; +use crate::factory::BootstrapReporter; use async_trait::async_trait; use futures::channel::oneshot; use futures::future::{FutureExt, Shared}; @@ -66,7 +67,11 @@ pub(crate) trait AbstractChannelFactory { /// and so on. /// /// It should not retry; that is handled at a higher level. - async fn build_channel(&self, target: &Self::BuildSpec) -> Result; + async fn build_channel( + &self, + target: &Self::BuildSpec, + reporter: BootstrapReporter, + ) -> Result; } /// A type- and network-agnostic implementation for [`ChanMgr`](crate::ChanMgr). @@ -83,6 +88,9 @@ pub(crate) struct AbstractChanMgr { /// The most important part is the map from relay identity to channel, or /// to pending channel status. pub(crate) channels: state::MgrState, + + /// A bootstrap reporter to give out when building channels. + pub(crate) reporter: BootstrapReporter, } /// Type alias for a future that we wait on to see when a pending @@ -100,9 +108,11 @@ impl AbstractChanMgr { config: &ChannelConfig, dormancy: Dormancy, netparams: &NetParameters, + reporter: BootstrapReporter, ) -> Self { AbstractChanMgr { channels: state::MgrState::new(connector, config.clone(), dormancy, netparams), + reporter, } } @@ -225,7 +235,9 @@ impl AbstractChanMgr { // We need to launch a channel. Some(Action::Launch(send)) => { let connector = self.channels.builder(); - let outcome = connector.build_channel(&target).await; + let outcome = connector + .build_channel(&target, self.reporter.clone()) + .await; let status = self.handle_build_outcome(&target, outcome); // It's okay if all the receivers went away: @@ -590,6 +602,7 @@ mod test { &ChannelConfig::default(), Default::default(), &Default::default(), + BootstrapReporter::fake(), ) } @@ -620,7 +633,11 @@ mod test { type Channel = FakeChannel; type BuildSpec = FakeBuildSpec; - async fn build_channel(&self, target: &Self::BuildSpec) -> Result { + async fn build_channel( + &self, + target: &Self::BuildSpec, + _reporter: BootstrapReporter, + ) -> Result { yield_now().await; let FakeBuildSpec(ident, mood, id) = *target; let ed_ident = u32_to_ed(ident); diff --git a/crates/tor-chanmgr/src/mgr/state.rs b/crates/tor-chanmgr/src/mgr/state.rs index 83b4e0ba8..380fdc3c1 100644 --- a/crates/tor-chanmgr/src/mgr/state.rs +++ b/crates/tor-chanmgr/src/mgr/state.rs @@ -537,6 +537,7 @@ mod test { //! use super::*; + use crate::factory::BootstrapReporter; use async_trait::async_trait; use std::sync::{Arc, Mutex}; use tor_llcrypto::pk::ed25519::Ed25519Identity; @@ -559,7 +560,11 @@ mod test { type BuildSpec = tor_linkspec::OwnedChanTarget; - async fn build_channel(&self, _target: &Self::BuildSpec) -> Result { + async fn build_channel( + &self, + _target: &Self::BuildSpec, + _reporter: BootstrapReporter, + ) -> Result { unimplemented!() } } diff --git a/crates/tor-chanmgr/src/mgr/state/padding_test.rs b/crates/tor-chanmgr/src/mgr/state/padding_test.rs index 3304c8a1a..010d9023a 100644 --- a/crates/tor-chanmgr/src/mgr/state/padding_test.rs +++ b/crates/tor-chanmgr/src/mgr/state/padding_test.rs @@ -28,6 +28,7 @@ use tor_proto::channel::{Channel, CtrlMsg}; use crate::mgr::{AbstractChanMgr, AbstractChannelFactory}; use crate::ChannelUsage; +use crate::factory::BootstrapReporter; use PaddingLevel as PL; const DEF_MS: [u32; 2] = [1500, 9500]; @@ -124,7 +125,11 @@ impl AbstractChannelFactory for FakeChannelFactory { type Channel = Channel; type BuildSpec = tor_linkspec::RelayIds; - async fn build_channel(&self, _target: &Self::BuildSpec) -> Result { + async fn build_channel( + &self, + _target: &Self::BuildSpec, + _reporter: BootstrapReporter, + ) -> Result { Ok(self.channel.clone()) } } @@ -161,7 +166,13 @@ async fn case(level: PaddingLevel, dormancy: Dormancy, usage: ChannelUsage) -> C let netparams = Arc::new(NetParameters::default()); - let chanmgr = AbstractChanMgr::new(factory, &cconfig, dormancy, &netparams); + let chanmgr = AbstractChanMgr::new( + factory, + &cconfig, + dormancy, + &netparams, + BootstrapReporter::fake(), + ); let (channel, _prov) = chanmgr.get_or_launch(relay_ids, usage).await.unwrap();