Merge branch 'ptmgr-draft' into 'main'

Pluggable transport manager

See merge request tpo/core/arti!886
This commit is contained in:
Nick Mathewson 2022-11-28 19:44:18 +00:00
commit d6642ef6ac
14 changed files with 437 additions and 128 deletions

1
Cargo.lock generated
View File

@ -4074,6 +4074,7 @@ dependencies = [
"derive_builder_fork_arti",
"futures",
"serde",
"tempfile",
"thiserror",
"tokio",
"tor-chanmgr",

View File

@ -3,7 +3,7 @@
use std::io;
use std::sync::{Arc, Mutex};
use crate::factory::ChannelFactory;
use crate::factory::{BootstrapReporter, ChannelFactory};
use crate::transport::TransportHelper;
use crate::{event::ChanMgrEventSender, Error};
@ -28,14 +28,12 @@ use futures::task::SpawnExt;
///
/// This channel builder does not retry on failure, but it _does_ implement a
/// time-out.
pub(crate) struct ChanBuilder<R: Runtime, H: TransportHelper>
pub struct ChanBuilder<R: Runtime, H: TransportHelper>
where
R: tor_rtcompat::TlsProvider<H::Stream>,
{
/// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
runtime: R,
/// Used to update our bootstrap reporting status.
event_sender: Arc<Mutex<ChanMgrEventSender>>,
/// The transport object that we use to construct streams.
transport: H,
/// Object to build TLS connections.
@ -47,16 +45,11 @@ where
R: TlsProvider<H::Stream>,
{
/// Construct a new ChanBuilder.
pub(crate) fn new(
runtime: R,
transport: H,
event_sender: Arc<Mutex<ChanMgrEventSender>>,
) -> Self {
pub fn new(runtime: R, transport: H) -> Self {
let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
ChanBuilder {
runtime,
transport,
event_sender,
tls_connector,
}
}
@ -70,6 +63,7 @@ where
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
reporter: BootstrapReporter,
) -> crate::Result<tor_proto::channel::Channel> {
use tor_rtcompat::SleepProviderExt;
@ -80,7 +74,7 @@ where
std::time::Duration::new(10, 0)
};
let connect_future = self.connect_no_timeout(target);
let connect_future = self.connect_no_timeout(target, reporter.0);
self.runtime
.timeout(delay, connect_future)
.await
@ -99,15 +93,13 @@ where
async fn connect_no_timeout(
&self,
target: &OwnedChanTarget,
event_sender: Arc<Mutex<ChanMgrEventSender>>,
) -> crate::Result<tor_proto::channel::Channel> {
use tor_proto::channel::ChannelBuilder;
use tor_rtcompat::tls::CertifiedConn;
{
self.event_sender
.lock()
.expect("Lock poisoned")
.record_attempt();
event_sender.lock().expect("Lock poisoned").record_attempt();
}
// 1a. Negotiate the TCP connection or other stream.
@ -127,7 +119,7 @@ where
{
// TODO pt-client: distinguish which transport just succeeded.
self.event_sender
event_sender
.lock()
.expect("Lock poisoned")
.record_tcp_success();
@ -148,7 +140,7 @@ where
.ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?;
{
self.event_sender
event_sender
.lock()
.expect("Lock poisoned")
.record_tls_finished();
@ -171,7 +163,7 @@ where
.check(target, &peer_cert, Some(now))
.map_err(|source| match &source {
tor_proto::Error::HandshakeCertsExpired { .. } => {
self.event_sender
event_sender
.lock()
.expect("Lock poisoned")
.record_handshake_done_with_skewed_clock();
@ -190,7 +182,7 @@ where
})?;
{
self.event_sender
event_sender
.lock()
.expect("Lock poisoned")
.record_handshake_done();
@ -287,14 +279,15 @@ mod test {
client_rt.jump_to(now);
// Create the channel builder that we want to test.
let (snd, _rcv) = crate::event::channel();
let transport = crate::transport::DefaultTransport::new(client_rt.clone());
let builder = ChanBuilder::new(client_rt, transport, Arc::new(Mutex::new(snd)));
let builder = ChanBuilder::new(client_rt, transport);
let (r1, r2): (Result<Channel>, Result<LocalStream>) = futures::join!(
async {
// client-side: build a channel!
builder.build_channel(&target).await
builder
.build_channel(&target, BootstrapReporter::fake())
.await
},
async {
// relay-side: accept the channel

View File

@ -1,14 +1,31 @@
//! Traits and code to define different mechanisms for building Channels to
//! different kinds of targets.
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use crate::event::ChanMgrEventSender;
use async_trait::async_trait;
use tor_error::{internal, HasKind, HasRetryTime};
use tor_linkspec::{HasChanMethod, OwnedChanTarget, PtTransportName};
use tor_proto::channel::Channel;
use tracing::debug;
/// An opaque type that lets a `ChannelFactory` update the `ChanMgr` about bootstrap progress.
///
/// A future release of this crate might make this type less opaque.
// FIXME(eta): Do that.
#[derive(Clone)]
pub struct BootstrapReporter(pub(crate) Arc<Mutex<ChanMgrEventSender>>);
impl BootstrapReporter {
#[cfg(test)]
/// Create a useless version of this type to satisfy some test.
pub(crate) fn fake() -> Self {
let (snd, _rcv) = crate::event::channel();
Self(Arc::new(Mutex::new(snd)))
}
}
/// An object that knows how to build `Channels` to `ChanTarget`s.
///
/// This trait must be object-safe.
@ -19,6 +36,11 @@ use tracing::debug;
/// A `ChannelFactory` can be implemented in terms of a
/// [`TransportHelper`](crate::transport::TransportHelper), by wrapping it in a
/// `ChanBuilder`.
///
// FIXME(eta): Rectify the below situation.
/// (In fact, as of the time of writing, this is the *only* way to implement this trait
/// outside of this crate while keeping bootstrap status reporting, since `BootstrapReporter`
/// is an opaque type.)
#[async_trait]
pub trait ChannelFactory: Send + Sync {
/// Open an authenticated channel to `target`.
@ -30,20 +52,32 @@ pub trait ChannelFactory: Send + Sync {
/// caller provides a target with an unsupported
/// [`TransportId`](tor_linkspec::TransportId), this method should return
/// [`Error::NoSuchTransport`](crate::Error::NoSuchTransport).
async fn connect_via_transport(&self, target: &OwnedChanTarget) -> crate::Result<Channel>;
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
reporter: BootstrapReporter,
) -> crate::Result<Channel>;
}
#[async_trait]
impl<'a> ChannelFactory for Arc<(dyn ChannelFactory + Send + Sync + 'a)> {
async fn connect_via_transport(&self, target: &OwnedChanTarget) -> crate::Result<Channel> {
self.as_ref().connect_via_transport(target).await
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
reporter: BootstrapReporter,
) -> crate::Result<Channel> {
self.as_ref().connect_via_transport(target, reporter).await
}
}
#[async_trait]
impl<'a> ChannelFactory for Box<(dyn ChannelFactory + Send + Sync + 'a)> {
async fn connect_via_transport(&self, target: &OwnedChanTarget) -> crate::Result<Channel> {
self.as_ref().connect_via_transport(target).await
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
reporter: BootstrapReporter,
) -> crate::Result<Channel> {
self.as_ref().connect_via_transport(target, reporter).await
}
}
@ -55,14 +89,21 @@ where
type Channel = tor_proto::channel::Channel;
type BuildSpec = OwnedChanTarget;
async fn build_channel(&self, target: &Self::BuildSpec) -> crate::Result<Self::Channel> {
async fn build_channel(
&self,
target: &Self::BuildSpec,
reporter: BootstrapReporter,
) -> crate::Result<Self::Channel> {
debug!("Attempting to open a new channel to {target}");
self.connect_via_transport(target).await
self.connect_via_transport(target, reporter).await
}
}
/// The error type returned by a pluggable transport manager.
pub trait AbstractPtError: std::error::Error + HasKind + HasRetryTime + Send + Sync {}
pub trait AbstractPtError:
std::error::Error + HasKind + HasRetryTime + Send + Sync + std::fmt::Debug
{
}
/// A pluggable transport manager.
///
@ -106,7 +147,11 @@ pub(crate) struct CompoundFactory {
#[async_trait]
impl ChannelFactory for CompoundFactory {
async fn connect_via_transport(&self, target: &OwnedChanTarget) -> crate::Result<Channel> {
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
reporter: BootstrapReporter,
) -> crate::Result<Channel> {
use tor_linkspec::ChannelMethod::*;
let factory = match target.chan_method() {
Direct(_) => self.default_factory.clone(),
@ -127,7 +172,7 @@ impl ChannelFactory for CompoundFactory {
}
};
factory.connect_via_transport(target).await
factory.connect_via_transport(target, reporter).await
}
}

View File

@ -37,7 +37,7 @@
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
mod builder;
pub mod builder;
mod config;
mod err;
mod event;
@ -70,6 +70,7 @@ use tor_rtcompat::Runtime;
/// A Result as returned by this crate.
pub type Result<T> = std::result::Result<T, Error>;
use crate::factory::BootstrapReporter;
pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
@ -166,14 +167,15 @@ impl<R: Runtime> ChanMgr<R> {
{
let (sender, receiver) = event::channel();
let sender = Arc::new(std::sync::Mutex::new(sender));
let reporter = BootstrapReporter(sender);
let transport = transport::DefaultTransport::new(runtime.clone());
let builder = builder::ChanBuilder::new(runtime, transport, sender);
let builder = builder::ChanBuilder::new(runtime, transport);
let factory = factory::CompoundFactory::new(
Arc::new(builder),
#[cfg(feature = "pt-client")]
None,
);
let mgr = mgr::AbstractChanMgr::new(factory, config, dormancy, netparams);
let mgr = mgr::AbstractChanMgr::new(factory, config, dormancy, netparams, reporter);
ChanMgr {
mgr,
bootstrap_status: receiver,

View File

@ -3,6 +3,7 @@
use crate::mgr::state::{OpenEntry, PendingEntry};
use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
use crate::factory::BootstrapReporter;
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::future::{FutureExt, Shared};
@ -66,7 +67,11 @@ pub(crate) trait AbstractChannelFactory {
/// and so on.
///
/// It should not retry; that is handled at a higher level.
async fn build_channel(&self, target: &Self::BuildSpec) -> Result<Self::Channel>;
async fn build_channel(
&self,
target: &Self::BuildSpec,
reporter: BootstrapReporter,
) -> Result<Self::Channel>;
}
/// A type- and network-agnostic implementation for [`ChanMgr`](crate::ChanMgr).
@ -83,6 +88,9 @@ pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
/// The most important part is the map from relay identity to channel, or
/// to pending channel status.
pub(crate) channels: state::MgrState<CF>,
/// A bootstrap reporter to give out when building channels.
pub(crate) reporter: BootstrapReporter,
}
/// Type alias for a future that we wait on to see when a pending
@ -100,9 +108,11 @@ impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
config: &ChannelConfig,
dormancy: Dormancy,
netparams: &NetParameters,
reporter: BootstrapReporter,
) -> Self {
AbstractChanMgr {
channels: state::MgrState::new(connector, config.clone(), dormancy, netparams),
reporter,
}
}
@ -225,7 +235,9 @@ impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
// We need to launch a channel.
Some(Action::Launch(send)) => {
let connector = self.channels.builder();
let outcome = connector.build_channel(&target).await;
let outcome = connector
.build_channel(&target, self.reporter.clone())
.await;
let status = self.handle_build_outcome(&target, outcome);
// It's okay if all the receivers went away:
@ -590,6 +602,7 @@ mod test {
&ChannelConfig::default(),
Default::default(),
&Default::default(),
BootstrapReporter::fake(),
)
}
@ -620,7 +633,11 @@ mod test {
type Channel = FakeChannel;
type BuildSpec = FakeBuildSpec;
async fn build_channel(&self, target: &Self::BuildSpec) -> Result<FakeChannel> {
async fn build_channel(
&self,
target: &Self::BuildSpec,
_reporter: BootstrapReporter,
) -> Result<FakeChannel> {
yield_now().await;
let FakeBuildSpec(ident, mood, id) = *target;
let ed_ident = u32_to_ed(ident);

View File

@ -537,6 +537,7 @@ mod test {
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use crate::factory::BootstrapReporter;
use async_trait::async_trait;
use std::sync::{Arc, Mutex};
use tor_llcrypto::pk::ed25519::Ed25519Identity;
@ -559,7 +560,11 @@ mod test {
type BuildSpec = tor_linkspec::OwnedChanTarget;
async fn build_channel(&self, _target: &Self::BuildSpec) -> Result<FakeChannel> {
async fn build_channel(
&self,
_target: &Self::BuildSpec,
_reporter: BootstrapReporter,
) -> Result<FakeChannel> {
unimplemented!()
}
}

View File

@ -28,6 +28,7 @@ use tor_proto::channel::{Channel, CtrlMsg};
use crate::mgr::{AbstractChanMgr, AbstractChannelFactory};
use crate::ChannelUsage;
use crate::factory::BootstrapReporter;
use PaddingLevel as PL;
const DEF_MS: [u32; 2] = [1500, 9500];
@ -124,7 +125,11 @@ impl AbstractChannelFactory for FakeChannelFactory {
type Channel = Channel;
type BuildSpec = tor_linkspec::RelayIds;
async fn build_channel(&self, _target: &Self::BuildSpec) -> Result<Self::Channel> {
async fn build_channel(
&self,
_target: &Self::BuildSpec,
_reporter: BootstrapReporter,
) -> Result<Self::Channel> {
Ok(self.channel.clone())
}
}
@ -161,7 +166,13 @@ async fn case(level: PaddingLevel, dormancy: Dormancy, usage: ChannelUsage) -> C
let netparams = Arc::new(NetParameters::default());
let chanmgr = AbstractChanMgr::new(factory, &cconfig, dormancy, &netparams);
let chanmgr = AbstractChanMgr::new(
factory,
&cconfig,
dormancy,
&netparams,
BootstrapReporter::fake(),
);
let (channel, _prov) = chanmgr.get_or_launch(relay_ids, usage).await.unwrap();

View File

@ -260,6 +260,21 @@ pub struct ExternalProxyPlugin<R> {
runtime: R,
/// The location of the proxy.
proxy_addr: SocketAddr,
/// The SOCKS protocol version to use.
proxy_version: SocksVersion,
}
#[cfg(feature = "pt-client")]
#[cfg_attr(docsrs, doc(cfg(feature = "pt-client")))]
impl<R: TcpProvider + Send + Sync> ExternalProxyPlugin<R> {
/// Make a new `ExternalProxyPlugin`.
pub fn new(rt: R, proxy_addr: SocketAddr, proxy_version: SocksVersion) -> Self {
Self {
runtime: rt,
proxy_addr,
proxy_version,
}
}
}
#[cfg(feature = "pt-client")]
@ -286,7 +301,8 @@ impl<R: TcpProvider + Send + Sync> TransportHelper for ExternalProxyPlugin<R> {
}
};
let protocol = settings_to_protocol(encode_settings(pt_target.settings()))?;
let protocol =
settings_to_protocol(self.proxy_version, encode_settings(pt_target.settings()))?;
Ok((
target.clone(),
@ -359,23 +375,30 @@ where
/// Transform a string into a representation that can be sent as SOCKS
/// authentication.
// NOTE(eta): I am very unsure of the logic in here.
#[cfg(feature = "pt-client")]
fn settings_to_protocol(s: String) -> Result<Protocol, ProxyError> {
fn settings_to_protocol(vers: SocksVersion, s: String) -> Result<Protocol, ProxyError> {
let mut bytes: Vec<_> = s.into();
Ok(if bytes.is_empty() {
Protocol::Socks(SocksVersion::V5, SocksAuth::NoAuth)
Protocol::Socks(vers, SocksAuth::NoAuth)
} else if vers == SocksVersion::V4 {
if bytes.contains(&0) {
return Err(ProxyError::InvalidSocksRequest(
tor_socksproto::Error::NotImplemented(
"SOCKS 4 doesn't support internal NUL bytes (for PT settings list)".into(),
),
));
} else {
Protocol::Socks(SocksVersion::V4, SocksAuth::Socks4(bytes))
}
} else if bytes.len() <= 255 {
Protocol::Socks(SocksVersion::V5, SocksAuth::Username(bytes, vec![]))
} else if bytes.len() <= (255 * 2) {
let password = bytes.split_off(255);
Protocol::Socks(SocksVersion::V5, SocksAuth::Username(bytes, password))
} else if !bytes.contains(&0) {
Protocol::Socks(SocksVersion::V4, SocksAuth::Socks4(bytes))
} else {
return Err(ProxyError::InvalidSocksRequest(
tor_socksproto::Error::NotImplemented(
"long settings lists with internal NUL bytes".into(),
),
tor_socksproto::Error::NotImplemented("PT settings list too long for SOCKS 5".into()),
));
})
}
@ -419,13 +442,14 @@ mod test {
);
}
// TODO pt-client / FIXME(eta): make this test more complete
#[cfg(feature = "pt-client")]
#[test]
fn split_settings() {
use SocksVersion::*;
let long_string = "examplestrg".to_owned().repeat(50);
assert_eq!(long_string.len(), 550);
let s = |a, b| settings_to_protocol(long_string[a..b].to_owned()).unwrap();
let s = |a, b| settings_to_protocol(V5, long_string[a..b].to_owned()).unwrap();
let v = |a, b| long_string.as_bytes()[a..b].to_vec();
assert_eq!(s(0, 0), Protocol::Socks(V5, SocksAuth::NoAuth));
@ -450,19 +474,20 @@ mod test {
Protocol::Socks(V5, SocksAuth::Username(v(0, 255), v(255, 510)))
);
// This one needs to use socks4, or it won't fit. :P
assert_eq!(s(0, 511), Protocol::Socks(V4, SocksAuth::Socks4(v(0, 511))));
// FIXME FIXME FIXME
// assert_eq!(s(0, 511), Protocol::Socks(V4, SocksAuth::Socks4(v(0, 511))));
// Small requests with "0" bytes work fine...
assert_eq!(
settings_to_protocol("\0".to_owned()).unwrap(),
settings_to_protocol(V5, "\0".to_owned()).unwrap(),
Protocol::Socks(V5, SocksAuth::Username(vec![0], vec![]))
);
assert_eq!(
settings_to_protocol("\0".to_owned().repeat(510)).unwrap(),
settings_to_protocol(V5, "\0".to_owned().repeat(510)).unwrap(),
Protocol::Socks(V5, SocksAuth::Username(vec![0; 255], vec![0; 255]))
);
// Huge requests with "0" simply can't be encoded.
assert!(settings_to_protocol("\0".to_owned().repeat(511)).is_err());
assert!(settings_to_protocol(V5, "\0".to_owned().repeat(511)).is_err());
}
}

View File

@ -20,8 +20,9 @@ async-trait = "0.1.2"
derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" }
futures = "0.3.14"
serde = { version = "1.0.103", features = ["derive"] }
tempfile = "3.3"
thiserror = "1"
tor-chanmgr = { version = "0.7.0", path = "../tor-chanmgr" }
tor-chanmgr = { version = "0.7.0", path = "../tor-chanmgr", features = ["pt-client"] }
tor-config = { version = "0.6.0", path = "../tor-config" }
tor-error = { version = "0.3.2", path = "../tor-error" }
tor-linkspec = { version = "0.5.1", path = "../tor-linkspec", features = ["pt-client"] }

View File

@ -13,7 +13,7 @@ async fn main() -> Result<()> {
.transports(vec!["obfs4".parse().unwrap()])
.build()
.unwrap();
let mut pt = PluggableTransport::new("./obfs4proxy".into(), params);
let mut pt = PluggableTransport::new("./obfs4proxy".into(), vec![], params);
pt.launch(PreferredRuntime::current()?).await?;
loop {
info!("message: {:?}", pt.next_message().await?);

View File

@ -1,43 +1,10 @@
//! Configuration logic for tor-ptmgr.
#![allow(dead_code)] // TODO pt-client: remove.
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use tor_config::list_builder::{define_list_builder_accessors, define_list_builder_helper};
use tor_config::{CfgPath, ConfigBuildError};
use tor_linkspec::PtTransportName;
/// Configure one or more pluggable transports.
#[derive(Debug, Clone, Builder, Eq, PartialEq)]
#[builder(derive(Debug, Serialize, Deserialize))]
#[builder(build_fn(error = "ConfigBuildError"))]
pub struct PtMgrConfig {
/// A list of configured transport binaries.
#[builder(sub_builder, setter(custom))]
binaries: TransportConfigList,
// TODO: Someday we will want to also have support for a directory full of
// transports, transports loaded dynamically from an object file, or stuff
// like that.
}
define_list_builder_accessors! {
struct PtMgrConfigBuilder {
pub binaries: [ManagedTransportConfigBuilder],
}
}
/// A list of configured transport binaries (type alias for macrology).
type TransportConfigList = Vec<ManagedTransportConfig>;
define_list_builder_helper! {
pub(crate) struct TransportConfigListBuilder {
transports: [ManagedTransportConfigBuilder],
}
built: TransportConfigList = transports;
default = vec![];
}
/// A single pluggable transport, to be launched as an external process.
#[derive(Clone, Debug, Builder, Eq, PartialEq)]
#[builder(derive(Debug, Serialize, Deserialize))]
@ -47,18 +14,18 @@ pub struct ManagedTransportConfig {
//
// NOTE(eta): This doesn't use the list builder stuff, because you're not likely to
// set this field more than once.
protocols: Vec<PtTransportName>,
pub(crate) protocols: Vec<PtTransportName>,
/// The path to the binary to run.
path: CfgPath,
pub(crate) path: CfgPath,
/// One or more command-line arguments to pass to the binary.
// TODO: Should this be OsString? That's a pain to parse...
//
// NOTE(eta): This doesn't use the list builder stuff, because you're not likely to
// set this field more than once.
#[builder(default)]
arguments: Vec<String>,
pub(crate) arguments: Vec<String>,
/// If true, launch this transport on startup. Otherwise, we launch
/// it on demand
#[builder(default)]
run_on_startup: bool,
pub(crate) run_on_startup: bool,
}

View File

@ -2,6 +2,9 @@
use std::path::PathBuf;
use std::sync::Arc;
use tor_chanmgr::factory::AbstractPtError;
use tor_config::{CfgPath, CfgPathError};
use tor_error::{ErrorKind, HasKind, HasRetryTime, RetryTime};
/// An error spawning or managing a pluggable transport.
#[derive(Clone, Debug, thiserror::Error)]
@ -46,6 +49,7 @@ pub enum PtError {
/// The binary path we tried to execute.
path: PathBuf,
/// The I/O error returned.
#[source]
error: Arc<std::io::Error>,
},
/// We failed to parse something a pluggable transport sent us.
@ -61,7 +65,38 @@ pub enum PtError {
/// We couldn't get stdio for a spawned child process for some reason.
#[error("PT stdio unavailable")]
StdioUnavailable,
/// We couldn't create a temporary directory.
#[error("Failed to create a temporary directory: {0}")]
TempdirCreateFailed(#[source] Arc<std::io::Error>),
/// We couldn't expand a path.
#[error("Failed to expand path {}: {}", path, error)]
PathExpansionFailed {
/// The offending path.
path: CfgPath,
/// The error encountered.
#[source]
error: CfgPathError,
},
/// The pluggable transport reactor failed.
#[error("PT reactor failed")]
// TODO pt-client: This should just be a bug.
ReactorFailed,
}
// TODO pt-client: implement.
impl HasKind for PtError {
fn kind(&self) -> ErrorKind {
todo!()
}
}
impl HasRetryTime for PtError {
fn retry_time(&self) -> RetryTime {
todo!()
}
}
impl AbstractPtError for PtError {}
/// Standard-issue `Result` alias, with [`PtError`].
pub type Result<T> = std::result::Result<T, PtError>;

View File

@ -361,6 +361,7 @@ impl FromStr for PtMessage {
//
// FIXME(eta): This currently spawns an OS thread, since there's no other way to do this without
// being async-runtime dependent (or adding process spawning to tor-rtcompat).
#[derive(Debug)]
struct AsyncPtChild {
/// Channel to receive lines from the child process stdout.
stdout: Receiver<io::Result<String>>,
@ -463,7 +464,7 @@ impl AsyncPtChild {
}
/// Parameters passed to a pluggable transport.
#[derive(PartialEq, Eq, Clone, derive_builder::Builder)]
#[derive(PartialEq, Eq, Clone, Debug, derive_builder::Builder)]
pub struct PtParameters {
/// A path where the launched PT can store state.
state_location: PathBuf,
@ -531,23 +532,38 @@ impl PtParameters {
}
/// A SOCKS endpoint to connect through a pluggable transport.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct PtClientMethod {
/// The SOCKS protocol version to use.
kind: SocksVersion,
pub(crate) kind: SocksVersion,
/// The socket address to connect to.
endpoint: SocketAddr,
pub(crate) endpoint: SocketAddr,
}
impl PtClientMethod {
/// Get the SOCKS protocol version to use.
pub fn kind(&self) -> SocksVersion {
self.kind
}
/// Get the socket address to connect to.
pub fn endpoint(&self) -> SocketAddr {
self.endpoint
}
}
/// A pluggable transport binary in a child process.
///
/// These start out inert, and must be launched with [`PluggableTransport::launch`] in order
/// to be useful.
#[derive(Debug)]
pub struct PluggableTransport {
/// The currently running child, if there is one.
inner: Option<AsyncPtChild>,
/// The path to the binary to run.
binary_path: PathBuf,
/// Arguments to pass to the binary.
arguments: Vec<String>,
/// Configured parameters.
params: PtParameters,
/// Information about client methods obtained from the PT.
@ -559,20 +575,24 @@ impl PluggableTransport {
/// the `params` to it.
///
/// You must call [`PluggableTransport::launch`] to actually run the PT.
pub fn new(binary_path: PathBuf, params: PtParameters) -> Self {
pub fn new(binary_path: PathBuf, arguments: Vec<String>, params: PtParameters) -> Self {
Self {
params,
arguments,
binary_path,
inner: None,
cmethods: Default::default(),
}
}
/// Get information for the named `transport`, if the PT is running.
//
// FIXME(eta): This could be slightly more typed.
pub fn transport_method(&self, transport: &PtTransportName) -> Option<&PtClientMethod> {
self.cmethods.get(transport)
/// Get all client methods returned by the binary, if it has been launched.
///
/// If it hasn't been launched, the returned map will be empty.
// TODO(eta): Actually figure out a way to expose this more stably.
pub(crate) fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
&self.cmethods
}
/// Get the next [`PtMessage`] from the running transport. It is recommended to call this
/// in a loop once a PT has been launched, in order to forward log messages and find out about
/// status updates.
@ -603,6 +623,7 @@ impl PluggableTransport {
return Ok(());
}
let child = Command::new(&self.binary_path)
.args(self.arguments.iter())
.envs(self.params.environment_variables())
.stdout(Stdio::piped())
.stdin(Stdio::piped())

View File

@ -37,50 +37,236 @@
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
#![allow(dead_code)] // FIXME TODO pt-client: remove.
#![allow(unused_imports)] // FIXME TODO pt-client: remove.
pub mod config;
pub mod err;
pub mod ipc;
use config::PtMgrConfig;
use crate::config::ManagedTransportConfig;
use crate::err::PtError;
use crate::ipc::{PluggableTransport, PtClientMethod, PtParameters, PtParametersBuilder};
use crate::mpsc::Receiver;
use async_trait::async_trait;
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
use futures::StreamExt;
use std::collections::HashMap;
use std::mem;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use tempfile::TempDir;
use tor_chanmgr::builder::ChanBuilder;
#[cfg(feature = "tor-channel-factory")]
use tor_chanmgr::factory::ChannelFactory;
use tor_linkspec::TransportId;
use tor_chanmgr::factory::{AbstractPtError, AbstractPtMgr};
use tor_chanmgr::transport::{ExternalProxyPlugin, TransportHelper};
use tor_linkspec::{PtTransportName, TransportId};
use tor_rtcompat::Runtime;
use tracing::{info, warn};
/// Shared mutable state between the `PtReactor` and `PtMgr`.
#[derive(Default, Debug)]
struct PtSharedState {
/// Connection information for pluggable transports from currently running binaries.
cmethods: HashMap<PtTransportName, PtClientMethod>,
/// Current configured set of pluggable transport binaries.
configured: HashMap<PtTransportName, ManagedTransportConfig>,
}
/// A message to the `PtReactor`.
enum PtReactorMessage {
/// Notify the reactor that the currently configured set of PTs has changed.
Reconfigured,
/// Ask the reactor to spawn a pluggable transport binary.
Spawn {
/// Spawn a binary to provide this PT.
pt: PtTransportName,
/// Notify the result via this channel.
result: oneshot::Sender<err::Result<PtClientMethod>>,
},
}
/// Background reactor to handle managing pluggable transport binaries.
struct PtReactor<R> {
/// Runtime.
rt: R,
/// Currently running pluggable transport binaries.
running: Vec<PluggableTransport>,
/// State for the corresponding PtMgr.
state: Arc<RwLock<PtSharedState>>,
/// PtMgr channel.
/// (Unbounded so that we can reconfigure without blocking: we're unlikely to have the reactor
/// get behind.)
rx: Receiver<PtReactorMessage>,
}
impl<R: Runtime> PtReactor<R> {
/// XXX
async fn run_one_step(&mut self) -> err::Result<()> {
todo!()
}
}
/// A pluggable transport manager knows how to make different
/// kinds of connections to the Tor network, for censorship avoidance.
///
/// Currently, we only support two kinds of pluggable transports: Those
/// configured in a PtConfig object, and those added with PtMgr::register.
//
// TODO: Will we need a <R:Runtime constraint> here? I don't know. -nickm
#[derive(Clone, Debug)]
pub struct PtMgr<R> {
/// An underlying `Runtime`, used to spawn background tasks.
#[allow(dead_code)]
runtime: R,
/// State for this PtMgr.
state: Arc<RwLock<PtSharedState>>,
/// PtReactor channel.
tx: UnboundedSender<PtReactorMessage>,
/// Temporary directory to store PT state in.
//
// FIXME(eta): This should be configurable.
state_dir: TempDir,
}
#[allow(clippy::missing_panics_doc, clippy::needless_pass_by_value)]
impl<R: Runtime> PtMgr<R> {
/// Create a new PtMgr.
pub fn new(cfg: PtMgrConfig, rt: R) -> Self {
let _ = (cfg, rt);
todo!("TODO pt-client: implement this.")
}
/// Reload the configuration
pub fn reconfigure(&self, cfg: PtMgrConfig) -> Result<(), tor_config::ReconfigureError> {
let _ = cfg;
todo!("TODO pt-client: implement this.")
}
/// Manually add a new channel factory to this registry.
#[cfg(feature = "tor-channel-factory")]
pub fn register_factory(&self, ids: &[TransportId], factory: impl ChannelFactory) {
let _ = (ids, factory);
todo!("TODO pt-client: implement this.")
/// Transform the config into a more useful representation indexed by transport name.
fn transform_config(
binaries: Vec<ManagedTransportConfig>,
) -> HashMap<PtTransportName, ManagedTransportConfig> {
let mut ret = HashMap::new();
// FIXME(eta): You can currently specify overlapping protocols in your binaries, and it'll
// just use the last binary specified.
// I attempted to fix this, but decided I didn't want to stare into the list
// builder macro void after trying it for 15 minutes.
for thing in binaries {
for tn in thing.protocols.iter() {
ret.insert(tn.clone(), thing.clone());
}
}
ret
}
// TODO pt-client: Possibly, this should have a separate function to launch
// its background tasks.
/// Create a new PtMgr.
// TODO pt-client: maybe don't have the Vec directly exposed?
pub fn new(transports: Vec<ManagedTransportConfig>, rt: R) -> Result<Self, PtError> {
let state = PtSharedState {
cmethods: Default::default(),
configured: Self::transform_config(transports),
};
let state = Arc::new(RwLock::new(state));
let (tx, _) = mpsc::unbounded();
Ok(Self {
runtime: rt,
state,
tx,
state_dir: TempDir::new().map_err(|e| PtError::TempdirCreateFailed(Arc::new(e)))?,
})
}
/// Reload the configuration
pub fn reconfigure(
&mut self,
transports: Vec<ManagedTransportConfig>,
) -> Result<(), tor_config::ReconfigureError> {
{
let mut inner = self.state.write().expect("ptmgr poisoned");
inner.configured = Self::transform_config(transports);
}
// We don't have any way of propagating this sanely; the caller will find out the reactor
// has died later on anyway.
let _ = self.tx.unbounded_send(PtReactorMessage::Reconfigured);
Ok(())
}
}
/// Spawn a `PluggableTransport` using a `ManagedTransportConfig`.
async fn spawn_from_config<R: Runtime>(
rt: R,
state_dir: PathBuf,
cfg: ManagedTransportConfig,
) -> Result<PluggableTransport, PtError> {
// FIXME(eta): make the rest of these parameters configurable
let pt_params = PtParameters::builder()
.state_location(state_dir)
.transports(cfg.protocols)
.build()
.expect("PtParameters constructed incorrectly");
// FIXME(eta): I really think this expansion should happen at builder validation time...
let path = cfg.path.path().map_err(|e| PtError::PathExpansionFailed {
path: cfg.path,
error: e,
})?;
let mut pt = PluggableTransport::new(path, cfg.arguments, pt_params);
pt.launch(rt).await?;
Ok(pt)
}
#[cfg(feature = "tor-channel-factory")]
#[async_trait]
impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
// There is going to be a lot happening "under the hood" here.
//
// When we are asked to get a ChannelFactory for a given
// connection, we will need to:
// - launch the binary for that transport if it is not already running*.
// - If we launched the binary, talk to it and see which ports it
// is listening on.
// - Return a ChannelFactory that connects via one of those ports,
// using the appropriate version of SOCKS, passing K=V parameters
// encoded properly.
//
// * As in other managers, we'll need to avoid trying to launch the same
// transport twice if we get two concurrent requests.
//
// Later if the binary crashes, we should detect that. We should relaunch
// it on demand.
//
// On reconfigure, we should shut down any no-longer-used transports.
//
// Maybe, we should shut down transports that haven't been used
// for a long time.
async fn factory_for_transport(
&self,
transport: &PtTransportName,
) -> Result<Option<Arc<dyn ChannelFactory + Send + Sync>>, Arc<dyn AbstractPtError>> {
// NOTE(eta): This is using a RwLock inside async code (but not across an await point).
// Arguably this is fine since it's just a small read, and nothing should ever
// hold this lock for very long.
let (mut cmethod, configured) = {
let inner = self.state.read().expect("ptmgr poisoned");
let cmethod = inner.cmethods.get(transport).copied();
let configured = cmethod.is_some() || inner.configured.get(transport).is_some();
(cmethod, configured)
};
if cmethod.is_none() {
if configured {
// Tell the reactor to spawn the PT, and wait for it.
// (The reactor will handle coalescing multiple requests.)
let (tx, rx) = oneshot::channel();
self.tx
.unbounded_send(PtReactorMessage::Spawn {
pt: transport.clone(),
result: tx,
})
.map_err(|_| Arc::new(PtError::ReactorFailed) as Arc<dyn AbstractPtError>)?;
cmethod = Some(
// NOTE(eta): Could be improved with result flattening.
rx.await
.map_err(|_| Arc::new(PtError::ReactorFailed) as Arc<dyn AbstractPtError>)?
.map_err(|x| Arc::new(x) as Arc<dyn AbstractPtError>)?,
);
} else {
return Ok(None);
}
}
let cmethod = cmethod.expect("impossible");
let proxy = ExternalProxyPlugin::new(self.runtime.clone(), cmethod.endpoint, cmethod.kind);
let factory = ChanBuilder::new(self.runtime.clone(), proxy);
// FIXME(eta): Should we cache constructed factories? If no: should this still be an Arc?
// FIXME(eta): Should we track what transports are live somehow, so we can shut them down?
Ok(Some(Arc::new(factory)))
}
}