diff --git a/crates/tor-chanmgr/semver.md b/crates/tor-chanmgr/semver.md new file mode 100644 index 000000000..4a3b30c32 --- /dev/null +++ b/crates/tor-chanmgr/semver.md @@ -0,0 +1,2 @@ +ADDED: `proxied::Protocol` +ADDED: `proxied::settings_to_protocol` diff --git a/crates/tor-chanmgr/src/transport.rs b/crates/tor-chanmgr/src/transport.rs index bfcaa3795..b5cedf480 100644 --- a/crates/tor-chanmgr/src/transport.rs +++ b/crates/tor-chanmgr/src/transport.rs @@ -5,7 +5,7 @@ use futures::{AsyncRead, AsyncWrite}; use tor_linkspec::OwnedChanTarget; pub(crate) mod default; -pub(crate) mod proxied; +pub mod proxied; pub(crate) use default::DefaultTransport; diff --git a/crates/tor-chanmgr/src/transport/proxied.rs b/crates/tor-chanmgr/src/transport/proxied.rs index 68fd818a6..13da15b17 100644 --- a/crates/tor-chanmgr/src/transport/proxied.rs +++ b/crates/tor-chanmgr/src/transport/proxied.rs @@ -40,7 +40,7 @@ use tor_linkspec::{ChannelMethod, HasChanMethod, OwnedChanTarget}; /// Information about what proxy protocol to use, and how to use it. #[derive(Clone, Debug, Eq, PartialEq)] #[non_exhaustive] -pub(crate) enum Protocol { +pub enum Protocol { /// Connect via SOCKS 4, SOCKS 4a, or SOCKS 5. Socks(SocksVersion, SocksAuth), } @@ -410,7 +410,7 @@ where /// authentication. // NOTE(eta): I am very unsure of the logic in here. #[cfg(feature = "pt-client")] -fn settings_to_protocol(vers: SocksVersion, s: String) -> Result { +pub fn settings_to_protocol(vers: SocksVersion, s: String) -> Result { let mut bytes: Vec<_> = s.into(); Ok(if bytes.is_empty() { Protocol::Socks(vers, SocksAuth::NoAuth) diff --git a/crates/tor-ptmgr/examples/run-pt.rs b/crates/tor-ptmgr/examples/run-pt.rs index 4dd0c988e..5657e81f5 100644 --- a/crates/tor-ptmgr/examples/run-pt.rs +++ b/crates/tor-ptmgr/examples/run-pt.rs @@ -1,19 +1,25 @@ //! Very very very basic soak test that runs obfs4proxy. use anyhow::Result; -use tor_ptmgr::ipc::{PluggableTransport, PtParameters}; +use tor_ptmgr::ipc::{ + PluggableClientTransport, PluggableTransport, PtClientParameters, PtCommonParameters, +}; use tor_rtcompat::PreferredRuntime; use tracing::info; #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt::init(); - let params = PtParameters::builder() + let common_params = PtCommonParameters::builder() .state_location("/tmp/arti-pt".into()) + .build() + .unwrap(); + let client_params = PtClientParameters::builder() .transports(vec!["obfs4".parse().unwrap()]) .build() .unwrap(); - let mut pt = PluggableTransport::new("./obfs4proxy".into(), vec![], params); + let mut pt = + PluggableClientTransport::new("./obfs4proxy".into(), vec![], common_params, client_params); pt.launch(PreferredRuntime::current()?).await?; loop { info!("message: {:?}", pt.next_message().await?); diff --git a/crates/tor-ptmgr/semver.md b/crates/tor-ptmgr/semver.md new file mode 100644 index 000000000..7330f5b5d --- /dev/null +++ b/crates/tor-ptmgr/semver.md @@ -0,0 +1,6 @@ +ADDED: `ipc::PtServerParameters` +ADDED: `trait ipc::PluggableTransport` providing `.transport_methods()` and `.next_message()` +ADDED: `ipc::PtMessage` +ADDED: `ipc::PluggableServerTransport` +BREAKING: Split `ipc::PtParameters` into `ipc::PtCommonParameters` and `ipc::PtClientParameters` +BREAKING: Renamed `ipc::PluggableTransport` struct into `ipc::PluggableClientTransport`; `ipc::PluggableTransport` is a trait now. diff --git a/crates/tor-ptmgr/src/ipc.rs b/crates/tor-ptmgr/src/ipc.rs index d11a6a2c4..a9b4ff8b5 100644 --- a/crates/tor-ptmgr/src/ipc.rs +++ b/crates/tor-ptmgr/src/ipc.rs @@ -51,7 +51,7 @@ pub struct PtStatus { #[derive(PartialEq, Eq, Debug, Clone)] #[non_exhaustive] #[cfg_attr(feature = "experimental-api", visibility::make(pub))] -pub(crate) enum PtMessage { +pub enum PtMessage { /// `VERSION-ERROR`: No compatible pluggable transport specification version was provided. VersionError(String), /// `VERSION`: Specifies the version the binary is using for the IPC protocol. @@ -358,142 +358,248 @@ impl FromStr for PtMessage { } } -/// A handle to receive lines from a pluggable transport process' stdout asynchronously. -// -// FIXME(eta): This currently spawns an OS thread, since there's no other way to do this without -// being async-runtime dependent (or adding process spawning to tor-rtcompat). -#[derive(Debug)] -struct AsyncPtChild { - /// Channel to receive lines from the child process stdout. - stdout: Receiver>, - /// Identifier to put in logging messages. - identifier: String, -} +/// Sealed trait to protect private types and default trait implementations +pub(crate) mod sealed { + use super::*; -impl AsyncPtChild { - /// Wrap an OS child process by spawning a worker thread to forward output from the child - /// to the asynchronous runtime via use of a channel. - fn new(mut child: Child, identifier: String) -> Result { - let (stdin, stdout) = ( - child.stdin.take().ok_or_else(|| { - PtError::Internal(internal!("Created child process without stdin pipe")) - })?, - child.stdout.take().ok_or_else(|| { - PtError::Internal(internal!("Created child process without stdout pipe")) - })?, - ); - let (mut tx, rx) = mpsc::channel(PT_STDIO_BUFFER); - let ident = identifier.clone(); - thread::spawn(move || { - let reader = BufReader::new(stdout); - let _stdin = stdin; - let mut noted_full = false; - // Forward lines from the blocking reader to the async channel. - for line in reader.lines() { - let err = line.is_err(); - match &line { - Ok(l) => trace!("<-- PT {}: {:?}", ident, l), - Err(e) => trace!("<-- PT {}: Error: {:?}", ident, e), - } - if let Err(e) = tx.try_send(line) { - if e.is_disconnected() { - debug!("PT {} is disconnected; shutting it down.", ident); - // Channel dropped, so shut down the pluggable transport process. - break; - } - // The other kind of error is "full", which we can't do anything about. - // Just throw the line away. - if !noted_full { - noted_full = true; // warn only once per PT. - warn!( - "Bug: Message queue for PT {} became full; dropping message", - ident - ); - } - } - if err { - // Encountered an error reading, so ensure the process is shut down (it's - // probably "broken pipe" anyway, so this is slightly redundant, but the - // rest of the code assumes errors are nonrecoverable). - break; - } - } - // Has it already quit? If so, just exit now. - if let Ok(Some(_)) = child.try_wait() { - // FIXME(eta): We currently throw away the exit code, which might be useful - // for debugging purposes! - debug!("PT {} has exited.", ident); - return; - } - // Otherwise, tell it to exit. - // Dropping stdin should tell the PT to exit, since we set the correct environment - // variable for that to happen. - trace!("Asking PT {} to exit, nicely.", ident); - drop(_stdin); - // Give it some time to exit. - thread::sleep(GRACEFUL_EXIT_TIME); - match child.try_wait() { - Ok(None) => { - // Kill it. - debug!("Sending kill signal to PT {}", ident); - if let Err(e) = child.kill() { - warn_report!(e, "Failed to kill() spawned PT {}", ident); - } - } - Ok(Some(_)) => { - debug!("PT {} shut down successfully.", ident); - } // It exited. - Err(e) => { - warn_report!(e, "Failed to call try_wait() on spawned PT {}", ident); - } - } - }); - Ok(AsyncPtChild { - stdout: rx, - identifier, - }) + /// A handle to receive lines from a pluggable transport process' stdout asynchronously. + // + // FIXME(eta): This currently spawns an OS thread, since there's no other way to do this without + // being async-runtime dependent (or adding process spawning to tor-rtcompat). + #[derive(Debug)] + pub struct AsyncPtChild { + /// Channel to receive lines from the child process stdout. + stdout: Receiver>, + /// Identifier to put in logging messages. + pub identifier: String, } - /// Receive a message from the pluggable transport binary asynchronously. - /// - /// Note: This will convert `PtMessage::Log` into a tracing log call automatically. - async fn recv(&mut self) -> err::Result { - loop { - match self.stdout.next().await { - None => return Err(PtError::ChildGone), - Some(Ok(line)) => { - let line = line - .parse::() - .map_err(|e| PtError::IpcParseFailed { - line, - error: e.into(), - })?; - if let PtMessage::Log { severity, message } = line { - // FIXME(eta): I wanted to make this integrate with `tracing` more nicely, - // but gave up after 15 minutes of clicking through spaghetti. - match &severity as &str { - "error" => error!("[pt {}] {}", self.identifier, message), - "warning" => warn!("[pt {}] {}", self.identifier, message), - "notice" => info!("[pt {}] {}", self.identifier, message), - "info" => debug!("[pt {}] {}", self.identifier, message), - "debug" => trace!("[pt {}] {}", self.identifier, message), - x => warn!("[pt] {} {} {}", self.identifier, x, message), + impl AsyncPtChild { + /// Wrap an OS child process by spawning a worker thread to forward output from the child + /// to the asynchronous runtime via use of a channel. + pub fn new(mut child: Child, identifier: String) -> Result { + let (stdin, stdout) = ( + child.stdin.take().ok_or_else(|| { + PtError::Internal(internal!("Created child process without stdin pipe")) + })?, + child.stdout.take().ok_or_else(|| { + PtError::Internal(internal!("Created child process without stdout pipe")) + })?, + ); + let (mut tx, rx) = mpsc::channel(PT_STDIO_BUFFER); + let ident = identifier.clone(); + thread::spawn(move || { + let reader = BufReader::new(stdout); + let _stdin = stdin; + let mut noted_full = false; + // Forward lines from the blocking reader to the async channel. + for line in reader.lines() { + let err = line.is_err(); + match &line { + Ok(l) => trace!("<-- PT {}: {:?}", ident, l), + Err(e) => trace!("<-- PT {}: Error: {:?}", ident, e), + } + if let Err(e) = tx.try_send(line) { + if e.is_disconnected() { + debug!("PT {} is disconnected; shutting it down.", ident); + // Channel dropped, so shut down the pluggable transport process. + break; } - } else { - return Ok(line); + // The other kind of error is "full", which we can't do anything about. + // Just throw the line away. + if !noted_full { + noted_full = true; // warn only once per PT. + warn!( + "Bug: Message queue for PT {} became full; dropping message", + ident + ); + } + } + if err { + // Encountered an error reading, so ensure the process is shut down (it's + // probably "broken pipe" anyway, so this is slightly redundant, but the + // rest of the code assumes errors are nonrecoverable). + break; } } - Some(Err(e)) => { - return Err(PtError::ChildReadFailed(Arc::new(e))); + // Has it already quit? If so, just exit now. + if let Ok(Some(_)) = child.try_wait() { + // FIXME(eta): We currently throw away the exit code, which might be useful + // for debugging purposes! + debug!("PT {} has exited.", ident); + return; + } + // Otherwise, tell it to exit. + // Dropping stdin should tell the PT to exit, since we set the correct environment + // variable for that to happen. + trace!("Asking PT {} to exit, nicely.", ident); + drop(_stdin); + // Give it some time to exit. + thread::sleep(GRACEFUL_EXIT_TIME); + match child.try_wait() { + Ok(None) => { + // Kill it. + debug!("Sending kill signal to PT {}", ident); + if let Err(e) = child.kill() { + warn_report!(e, "Failed to kill() spawned PT {}", ident); + } + } + Ok(Some(_)) => { + debug!("PT {} shut down successfully.", ident); + } // It exited. + Err(e) => { + warn_report!(e, "Failed to call try_wait() on spawned PT {}", ident); + } + } + }); + Ok(AsyncPtChild { + stdout: rx, + identifier, + }) + } + + /// Receive a message from the pluggable transport binary asynchronously. + /// + /// Note: This will convert `PtMessage::Log` into a tracing log call automatically. + pub async fn recv(&mut self) -> err::Result { + loop { + match self.stdout.next().await { + None => return Err(PtError::ChildGone), + Some(Ok(line)) => { + let line = + line.parse::() + .map_err(|e| PtError::IpcParseFailed { + line, + error: e.into(), + })?; + if let PtMessage::Log { severity, message } = line { + // FIXME(eta): I wanted to make this integrate with `tracing` more nicely, + // but gave up after 15 minutes of clicking through spaghetti. + match &severity as &str { + "error" => error!("[pt {}] {}", self.identifier, message), + "warning" => warn!("[pt {}] {}", self.identifier, message), + "notice" => info!("[pt {}] {}", self.identifier, message), + "info" => debug!("[pt {}] {}", self.identifier, message), + "debug" => trace!("[pt {}] {}", self.identifier, message), + x => warn!("[pt] {} {} {}", self.identifier, x, message), + } + } else { + return Ok(line); + } + } + Some(Err(e)) => { + return Err(PtError::ChildReadFailed(Arc::new(e))); + } + } + } + } + } + + /// Defines some helper methods that are required later on + #[async_trait::async_trait] + pub trait PluggableTransportPrivate { + /// Return the [`AsyncPtChild`] if it exists + fn inner(&mut self) -> Result<&mut AsyncPtChild, PtError>; + + /// Set the [`AsyncPtChild`] + fn set_inner(&mut self, newval: Option); + + /// Return a loggable identifier for this transport. + fn identifier(&self) -> &str; + + /// Attempt to launch the PT and return the corresponding `[AsyncPtChild]` + fn get_child_from_pt_launch( + inner: &Option, + transports: &Vec, + binary_path: &PathBuf, + arguments: &[String], + all_env_vars: HashMap, + ) -> Result { + if inner.is_some() { + let warning_msg = + format!("Attempted to launch PT binary for {:?} twice.", transports); + warn!("{warning_msg}"); + // WARN: this may not be the correct error to throw here + return Err(PtError::ChildProtocolViolation(warning_msg)); + } + info!( + "Launching pluggable transport at {} for {:?}", + binary_path.display(), + transports + ); + let child = Command::new(binary_path) + .args(arguments.iter()) + .envs(all_env_vars) + .stdout(Stdio::piped()) + .stdin(Stdio::piped()) + .spawn() + .map_err(|e| PtError::ChildSpawnFailed { + path: binary_path.clone(), + error: Arc::new(e), + })?; + + let identifier = crate::pt_identifier(binary_path)?; + sealed::AsyncPtChild::new(child, identifier) + } + + /// Consolidates some of the [`PtMessage`] potential matches to + /// deduplicate code + /// + /// Note that getting a [`PtMessage`] from this method implies that + /// the method was unable to match it and thus you should continue handling + /// the message. Getting [`None`] after error handling means that a match + /// was found and the appropriate action was successfully taken, and you don't + /// need to worry about it. + async fn try_match_common_messages( + rt: &R, + deadline: Instant, + async_child: &mut AsyncPtChild, + ) -> Result, PtError> { + match rt + .timeout( + // FIXME(eta): It'd be nice if SleepProviderExt took an `Instant` natively. + deadline.saturating_duration_since(Instant::now()), + async_child.recv(), + ) + .await + .map_err(|_| PtError::Timeout)?? + { + PtMessage::VersionError(e) => { + if e != "no-version" { + warn!("weird VERSION-ERROR: {}", e); + } + return Err(PtError::UnsupportedVersion); + } + PtMessage::Version(vers) => { + if vers != "1" { + return Err(PtError::ProtocolViolation(format!( + "stated version is {}, asked for 1", + vers + ))); + } + Ok(None) + } + PtMessage::EnvError(e) => return Err(PtError::ChildProtocolViolation(e)), + PtMessage::ProxyError(e) => return Err(PtError::ProxyError(e)), + // TODO(eta): We don't do anything with these right now! + PtMessage::Status(_) => Ok(None), + PtMessage::Unknown(x) => { + warn!("unknown PT line: {}", x); + Ok(None) + } + // Return the PtMessage as it is for further processing + // TODO: handle [`PtError::ProtocolViolation`] here somehow + x => { + return Ok(Some(x)); } } } } } -/// Parameters passed to a pluggable transport. +/// Common parameters passed to a pluggable transport. #[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)] -pub struct PtParameters { +pub struct PtCommonParameters { /// A path where the launched PT can store state. state_location: PathBuf, /// An IPv4 address to bind outgoing connections to (if specified). @@ -506,28 +612,21 @@ pub struct PtParameters { /// Leaving this out will mean the PT uses a sane default. #[builder(default)] outbound_bind_v6: Option, - /// A SOCKS URI specifying a proxy to use. - #[builder(default)] - proxy_uri: Option, - /// A list of transports to initialise. - /// - /// The PT launch will fail if all transports are not successfully initialised. - transports: Vec, /// The maximum time we should wait for a pluggable transport binary to report successful /// initialization. If `None`, a default value is used. #[builder(default)] timeout: Option, } -impl PtParameters { - /// Return a new `PtParametersBuilder` for constructing a set of parameters. - pub fn builder() -> PtParametersBuilder { - PtParametersBuilder::default() +impl PtCommonParameters { + /// Return a new `PtCommonParametersBuilder` for constructing a set of parameters. + pub fn builder() -> PtCommonParametersBuilder { + PtCommonParametersBuilder::default() } /// Convert these parameters into a set of environment variables to be passed to the PT binary /// in accordance with the specification. - fn environment_variables(&self) -> HashMap { + fn common_environment_variables(&self) -> HashMap { let mut ret = HashMap::new(); ret.insert("TOR_PT_MANAGED_TRANSPORT_VER".into(), "1".into()); ret.insert( @@ -548,6 +647,35 @@ impl PtParameters { format!("[{}]", v6).into(), ); } + ret + } +} + +/// Parameters passed only to a pluggable transport client. +#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)] +pub struct PtClientParameters { + /// A SOCKS URI specifying a proxy to use. + #[builder(default)] + proxy_uri: Option, + /// A list of transports to initialise. + /// + /// The PT launch will fail if all transports are not successfully initialised. + transports: Vec, +} + +impl PtClientParameters { + /// Return a new `PtClientParametersBuilder` for constructing a set of parameters. + pub fn builder() -> PtClientParametersBuilder { + PtClientParametersBuilder::default() + } + + /// Convert these parameters into a set of environment variables to be passed to the PT binary + /// in accordance with the specification. + fn environment_variables( + &self, + common_params: &PtCommonParameters, + ) -> HashMap { + let mut ret = common_params.common_environment_variables(); if let Some(ref proxy_uri) = self.proxy_uri { ret.insert("TOR_PT_PROXY".into(), proxy_uri.clone().into()); } @@ -559,6 +687,65 @@ impl PtParameters { } } +/// Parameters passed only to a pluggable transport server. +#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)] +pub struct PtServerParameters { + /// A list of transports to initialise. + /// + /// The PT launch will fail if all transports are not successfully initialised. + transports: Vec, + /// Transport options for each server transport + #[builder(default)] + server_transport_options: String, + /// Set host:port on which the server transport should listen for connections + #[builder(default)] + server_bindaddr: String, + /// Set host:port on which the server transport should forward requests + #[builder(default)] + server_orport: Option, + /// Set host:port on which the server transport should forward requests (extended ORPORT) + #[builder(default)] + server_extended_orport: Option, +} + +impl PtServerParameters { + /// Return a new `PtServerParametersBuilder` for constructing a set of parameters. + pub fn builder() -> PtServerParametersBuilder { + PtServerParametersBuilder::default() + } + + /// Convert these parameters into a set of environment variables to be passed to the PT binary + /// in accordance with the specification. + fn environment_variables( + &self, + common_params: &PtCommonParameters, + ) -> HashMap { + let mut ret = common_params.common_environment_variables(); + ret.insert( + "TOR_PT_SERVER_TRANSPORTS".into(), + self.transports.iter().join(",").into(), + ); + ret.insert( + "TOR_PT_SERVER_TRANSPORT_OPTIONS".into(), + self.server_transport_options.clone().into(), + ); + ret.insert( + "TOR_PT_SERVER_BINDADDR".into(), + self.server_bindaddr.clone().into(), + ); + if let Some(ref server_orport) = self.server_orport { + ret.insert("TOR_PT_ORPORT".into(), server_orport.into()); + } + if let Some(ref server_extended_orport) = self.server_extended_orport { + ret.insert( + "TOR_PT_EXTENDED_SERVER_PORT".into(), + server_extended_orport.into(), + ); + } + ret + } +} + /// A SOCKS endpoint to connect through a pluggable transport. #[derive(Debug, Clone, PartialEq, Eq)] pub struct PtClientMethod { @@ -580,54 +767,15 @@ impl PtClientMethod { } } -/// A pluggable transport binary in a child process. -/// -/// These start out inert, and must be launched with [`PluggableTransport::launch`] in order -/// to be useful. -#[derive(Debug)] -pub struct PluggableTransport { - /// The currently running child, if there is one. - inner: Option, - /// The path to the binary to run. - pub(crate) binary_path: PathBuf, - /// Arguments to pass to the binary. - arguments: Vec, - /// Configured parameters. - params: PtParameters, - /// Information about client methods obtained from the PT. - cmethods: HashMap, -} - -impl PluggableTransport { - /// Create a new pluggable transport wrapper, wrapping the binary at `binary_path` and passing - /// the `params` to it. - /// - /// You must call [`PluggableTransport::launch`] to actually run the PT. - pub fn new(binary_path: PathBuf, arguments: Vec, params: PtParameters) -> Self { - Self { - params, - arguments, - binary_path, - inner: None, - cmethods: Default::default(), - } - } - +/// Common functionality implemented to allow code reuse +#[async_trait::async_trait] +#[cfg_attr(feature = "experimental-api", visibility::make(pub))] +pub trait PluggableTransport: sealed::PluggableTransportPrivate { /// Get all client methods returned by the binary, if it has been launched. /// /// If it hasn't been launched, the returned map will be empty. // TODO(eta): Actually figure out a way to expose this more stably. - pub(crate) fn transport_methods(&self) -> &HashMap { - &self.cmethods - } - - /// Return a loggable identifier for this transport. - pub(crate) fn identifier(&self) -> &str { - match &self.inner { - Some(child) => &child.identifier, - None => "", - } - } + fn transport_methods(&self) -> &HashMap; /// Get the next [`PtMessage`] from the running transport. It is recommended to call this /// in a loop once a PT has been launched, in order to forward log messages and find out about @@ -636,9 +784,8 @@ impl PluggableTransport { // FIXME(eta): This API will probably go away and get replaced with something better. // In particular, we'd want to cache `Status` messages from before this method // was called. - #[cfg_attr(feature = "experimental-api", visibility::make(pub))] - pub(crate) async fn next_message(&mut self) -> err::Result { - let inner = self.inner.as_mut().ok_or(PtError::ChildGone)?; + async fn next_message(&mut self) -> err::Result { + let inner = self.inner()?; let ret = inner.recv().await; if let Err(PtError::ChildGone) | Err(PtError::ChildReadFailed { .. }) = &ret { // FIXME(eta): Currently this lets the caller still think the methods work by calling @@ -648,148 +795,176 @@ impl PluggableTransport { self.identifier(), ret ); - self.inner = None; + self.set_inner(None); } ret } +} +/// A pluggable transport binary in a child process. +/// +/// These start out inert, and must be launched with [`PluggableClientTransport::launch`] in order +/// to be useful. +#[derive(Debug)] +pub struct PluggableClientTransport { + /// The currently running child, if there is one. + inner: Option, + /// The path to the binary to run. + pub(crate) binary_path: PathBuf, + /// Arguments to pass to the binary. + arguments: Vec, + /// Configured parameters. + common_params: PtCommonParameters, + /// Configured client-only parameters. + client_params: PtClientParameters, + /// Information about client methods obtained from the PT. + cmethods: HashMap, +} + +impl PluggableTransport for PluggableClientTransport { + fn transport_methods(&self) -> &HashMap { + &self.cmethods + } +} + +impl sealed::PluggableTransportPrivate for PluggableClientTransport { + fn inner(&mut self) -> Result<&mut sealed::AsyncPtChild, PtError> { + self.inner.as_mut().ok_or(PtError::ChildGone) + } + fn set_inner(&mut self, newval: Option) { + self.inner = newval; + } + fn identifier(&self) -> &str { + match &self.inner { + Some(child) => &child.identifier, + None => "", + } + } +} + +impl PluggableClientTransport { + /// Create a new pluggable transport wrapper, wrapping the binary at `binary_path` and passing + /// the `params` to it. + /// + /// You must call [`PluggableClientTransport::launch`] to actually run the PT. + pub fn new( + binary_path: PathBuf, + arguments: Vec, + common_params: PtCommonParameters, + client_params: PtClientParameters, + ) -> Self { + Self { + common_params, + client_params, + arguments, + binary_path, + inner: None, + cmethods: Default::default(), + } + } + /// Launch the pluggable transport, executing the binary. /// /// Will return an error if the launch fails, one of the transports fail, not all transports /// were launched, or the launch times out. pub async fn launch(&mut self, rt: R) -> err::Result<()> { - if self.inner.is_some() { - warn!( - "Attempted to launch PT binary for {:?} twice.", - self.params.transports - ); - return Ok(()); - } - info!( - "Launching pluggable transport at {} for {:?}", - self.binary_path.display(), - self.params.transports - ); - let child = Command::new(&self.binary_path) - .args(self.arguments.iter()) - .envs(self.params.environment_variables()) - .stdout(Stdio::piped()) - .stdin(Stdio::piped()) - .spawn() - .map_err(|e| PtError::ChildSpawnFailed { - path: self.binary_path.clone(), - error: Arc::new(e), - })?; + let all_env_vars = self + .client_params + .environment_variables(&self.common_params); - let identifier = crate::pt_identifier(&self.binary_path)?; + let mut async_child = + ::get_child_from_pt_launch( + &self.inner, + &self.client_params.transports, + &self.binary_path, + &self.arguments, + all_env_vars, + )?; - let mut async_child = AsyncPtChild::new(child, identifier)?; - - let deadline = Instant::now() + self.params.timeout.unwrap_or(PT_START_TIMEOUT); + let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT); let mut cmethods = HashMap::new(); - let mut proxy_done = self.params.proxy_uri.is_none(); + let mut proxy_done = self.client_params.proxy_uri.is_none(); loop { - match rt - .timeout( - // FIXME(eta): It'd be nice if SleepProviderExt took an `Instant` natively. - deadline.saturating_duration_since(Instant::now()), - async_child.recv(), - ) - .await - .map_err(|_| PtError::Timeout)?? - { - PtMessage::VersionError(e) => { - if e != "no-version" { - warn!("weird VERSION-ERROR: {}", e); - } - return Err(PtError::UnsupportedVersion); - } - PtMessage::Version(vers) => { - if vers != "1" { - return Err(PtError::ProtocolViolation(format!( - "stated version is {}, asked for 1", - vers - ))); - } - } - PtMessage::EnvError(e) => return Err(PtError::ChildProtocolViolation(e)), - PtMessage::ProxyDone => { - if proxy_done { - return Err(PtError::ProtocolViolation( - "binary initiated proxy when not asked (or twice)".into(), - )); - } - info!("PT binary now proxying connections via supplied URI"); - proxy_done = true; - } - PtMessage::ProxyError(e) => return Err(PtError::ProxyError(e)), - PtMessage::ClientTransportLaunched { - transport, - protocol, - endpoint, - } => { - if !self.params.transports.contains(&transport) { - return Err(PtError::ProtocolViolation(format!( - "binary launched unwanted transport '{}'", - transport - ))); - } - let protocol = match &protocol as &str { - "socks4" => SocksVersion::V4, - "socks5" => SocksVersion::V5, + match ::try_match_common_messages(&rt, deadline, &mut async_child).await { + Ok(maybe_message) => if let Some(message) = maybe_message { + match message { + PtMessage::ProxyDone => { + if proxy_done { + return Err(PtError::ProtocolViolation( + "binary initiated proxy when not asked (or twice)".into(), + )); + } + info!("PT binary now proxying connections via supplied URI"); + proxy_done = true; + } + // TODO HSS: unify most of the handling of ClientTransportLaunched with ServerTransportLaunched + // TODO HSS: unify most of the handling of ClientTransportFailed with ServerTransportFailed + // TODO HSS: unify most of the handling of ClientTransportsDone with ServerTransportsDone + PtMessage::ClientTransportLaunched { + transport, + protocol, + endpoint, + } => { + if !self.client_params.transports.contains(&transport) { + return Err(PtError::ProtocolViolation(format!( + "binary launched unwanted transport '{}'", + transport + ))); + } + let protocol = match &protocol as &str { + "socks4" => SocksVersion::V4, + "socks5" => SocksVersion::V5, + x => { + return Err(PtError::ProtocolViolation(format!( + "unknown CMETHOD protocol '{}'", + x + ))) + } + }; + let method = PtClientMethod { + kind: protocol, + endpoint, + }; + info!("Transport '{}' uses method {:?}", transport, method); + cmethods.insert(transport, method); + } + PtMessage::ClientTransportFailed { transport, message } => { + warn!( + "PT {} unable to launch {}. It said: {:?}", + async_child.identifier, transport, message + ); + return Err(PtError::ClientTransportGaveError { + transport: transport.to_string(), + message, + }); + } + PtMessage::ClientTransportsDone => { + let unsupported = self + .client_params + .transports + .iter() + .filter(|&x| !cmethods.contains_key(x)) + .map(|x| x.to_string()) + .collect::>(); + if !unsupported.is_empty() { + warn!( + "PT binary failed to initialise transports: {:?}", + unsupported + ); + return Err(PtError::ClientTransportsUnsupported(unsupported)); + } + info!("PT binary initialisation done"); + break; + } x => { return Err(PtError::ProtocolViolation(format!( - "unknown CMETHOD protocol '{}'", + "received unexpected {:?}", x - ))) + ))); } - }; - let method = PtClientMethod { - kind: protocol, - endpoint, - }; - info!("Transport '{}' uses method {:?}", transport, method); - cmethods.insert(transport, method); - } - PtMessage::ClientTransportFailed { transport, message } => { - warn!( - "PT {} unable to launch {}. It said: {:?}", - async_child.identifier, transport, message - ); - return Err(PtError::ClientTransportGaveError { - transport: transport.to_string(), - message, - }); - } - PtMessage::ClientTransportsDone => { - let unsupported = self - .params - .transports - .iter() - .filter(|&x| !cmethods.contains_key(x)) - .map(|x| x.to_string()) - .collect::>(); - if !unsupported.is_empty() { - warn!( - "PT binary failed to initialise transports: {:?}", - unsupported - ); - return Err(PtError::ClientTransportsUnsupported(unsupported)); } - info!("PT binary initialisation done"); - break; - } - // TODO(eta): We don't do anything with these right now! - PtMessage::Status(_) => {} - PtMessage::Unknown(x) => { - warn!("unknown PT line: {}", x); - } - x => { - return Err(PtError::ProtocolViolation(format!( - "received unexpected {:?}", - x - ))); } + Err(e) => return Err(e) } } self.cmethods = cmethods; @@ -799,6 +974,160 @@ impl PluggableTransport { } } +/// A pluggable transport server binary in a child process. +/// +/// These start out inert, and must be launched with [`PluggableServerTransport::launch`] in order +/// to be useful. +#[derive(Debug)] +pub struct PluggableServerTransport { + /// The currently running child, if there is one. + inner: Option, + /// The path to the binary to run. + pub(crate) binary_path: PathBuf, + /// Arguments to pass to the binary. + arguments: Vec, + /// Configured parameters. + common_params: PtCommonParameters, + /// Configured server-only parameters. + server_params: PtServerParameters, + /// Information about server methods obtained from the PT. + smethods: HashMap, +} + +impl sealed::PluggableTransportPrivate for PluggableServerTransport { + fn inner(&mut self) -> Result<&mut sealed::AsyncPtChild, PtError> { + self.inner.as_mut().ok_or(PtError::ChildGone) + } + fn set_inner(&mut self, newval: Option) { + self.inner = newval; + } + fn identifier(&self) -> &str { + match &self.inner { + Some(child) => &child.identifier, + None => "", + } + } +} + +impl PluggableTransport for PluggableServerTransport { + fn transport_methods(&self) -> &HashMap { + &self.smethods + } +} + +impl PluggableServerTransport { + /// Create a new pluggable transport wrapper, wrapping the binary at `binary_path` and passing + /// the `params` to it. + /// + /// You must call [`PluggableServerTransport::launch`] to actually run the PT. + pub fn new( + binary_path: PathBuf, + arguments: Vec, + common_params: PtCommonParameters, + server_params: PtServerParameters, + ) -> Self { + Self { + common_params, + server_params, + arguments, + binary_path, + inner: None, + smethods: Default::default(), + } + } + + /// Launch the pluggable transport, executing the binary. + /// + /// Will return an error if the launch fails, one of the transports fail, not all transports + /// were launched, or the launch times out. + pub async fn launch(&mut self, rt: R) -> err::Result<()> { + let all_env_vars = self + .server_params + .environment_variables(&self.common_params); + + let mut async_child = + ::get_child_from_pt_launch( + &self.inner, + &self.server_params.transports, + &self.binary_path, + &self.arguments, + all_env_vars, + )?; + + let deadline = Instant::now() + self.common_params.timeout.unwrap_or(PT_START_TIMEOUT); + let mut smethods = HashMap::new(); + + loop { + match ::try_match_common_messages(&rt, deadline, &mut async_child).await { + Ok(maybe_message) => if let Some(message) = maybe_message { + match message { + + PtMessage::ServerTransportLaunched { + transport, + endpoint, + options: _options, + } => { + if !self.server_params.transports.contains(&transport) { + return Err(PtError::ProtocolViolation(format!( + "binary launched unwanted transport '{}'", + transport + ))); + } + let protocol = SocksVersion::V5; + let method = PtClientMethod { + kind: protocol, + endpoint, + }; + info!("Transport '{}' uses method {:?}", transport, method); + smethods.insert(transport, method); + } + + PtMessage::ServerTransportFailed { transport, message } => { + warn!( + "PT {} unable to launch {}. It said: {:?}", + async_child.identifier, transport, message + ); + return Err(PtError::ClientTransportGaveError { + transport: transport.to_string(), + message, + }); + } + PtMessage::ServerTransportsDone => { + let unsupported = self + .server_params + .transports + .iter() + .filter(|&x| !smethods.contains_key(x)) + .map(|x| x.to_string()) + .collect::>(); + if !unsupported.is_empty() { + warn!( + "PT binary failed to initialise transports: {:?}", + unsupported + ); + return Err(PtError::ClientTransportsUnsupported(unsupported)); + } + info!("PT binary initialisation done"); + break; + } + x => { + return Err(PtError::ProtocolViolation(format!( + "received unexpected {:?}", + x + ))); + } + } + } + Err(e) => return Err(e) + } + } + self.smethods = smethods; + self.inner = Some(async_child); + // TODO(eta): We need to expose the log and status messages after this function exits! + Ok(()) + } +} + #[cfg(test)] mod test { // @@ begin test lint list maintained by maint/add_warning @@ diff --git a/crates/tor-ptmgr/src/lib.rs b/crates/tor-ptmgr/src/lib.rs index 1a6686b35..18f74ae2c 100644 --- a/crates/tor-ptmgr/src/lib.rs +++ b/crates/tor-ptmgr/src/lib.rs @@ -46,7 +46,10 @@ pub mod ipc; use crate::config::ManagedTransportConfig; use crate::err::PtError; -use crate::ipc::{PluggableTransport, PtClientMethod, PtParameters}; +use crate::ipc::{ + sealed::PluggableTransportPrivate, PluggableClientTransport, PluggableTransport, + PtClientMethod, PtClientParameters, PtCommonParameters, +}; use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot; use futures::stream::FuturesUnordered; @@ -94,14 +97,14 @@ enum PtReactorMessage { } /// The result of a spawn attempt: the list of transports the spawned binary covers, and the result. -type SpawnResult = (Vec, err::Result); +type SpawnResult = (Vec, err::Result); /// Background reactor to handle managing pluggable transport binaries. struct PtReactor { /// Runtime. rt: R, /// Currently running pluggable transport binaries. - running: Vec, + running: Vec, /// A map of asked-for transports. /// /// If a transport name has an entry, we will append any additional requests for that entry. @@ -148,7 +151,7 @@ impl PtReactor { fn handle_spawned( &mut self, covers: Vec, - result: err::Result, + result: err::Result, ) { match result { Err(e) => { @@ -184,7 +187,7 @@ impl PtReactor { } /// Called to remove a pluggable transport from the shared state. - fn remove_pt(&self, pt: PluggableTransport) { + fn remove_pt(&self, pt: PluggableClientTransport) { let mut state = self.state.write().expect("ptmgr state poisoned"); for transport in pt.transport_methods().keys() { state.cmethods.remove(transport); @@ -381,7 +384,7 @@ async fn spawn_from_config( rt: R, state_dir: PathBuf, cfg: ManagedTransportConfig, -) -> Result { +) -> Result { // FIXME(eta): I really think this expansion should happen at builder validation time... let binary_path = cfg.path.path().map_err(|e| PtError::PathExpansionFailed { path: cfg.path, @@ -399,13 +402,22 @@ async fn spawn_from_config( })?; // FIXME(eta): make the rest of these parameters configurable - let pt_params = PtParameters::builder() + let pt_common_params = PtCommonParameters::builder() .state_location(new_state_dir) + .build() + .expect("PtCommonParameters constructed incorrectly"); + + let pt_client_params = PtClientParameters::builder() .transports(cfg.protocols) .build() - .expect("PtParameters constructed incorrectly"); + .expect("PtClientParameters constructed incorrectly"); - let mut pt = PluggableTransport::new(binary_path, cfg.arguments, pt_params); + let mut pt = PluggableClientTransport::new( + binary_path, + cfg.arguments, + pt_common_params, + pt_client_params, + ); pt.launch(rt).await?; Ok(pt) }