Merge branch 'ptmgr-cleanup' into 'main'

Post-merge cleanups for PtMgr

Closes #667 and #659

See merge request tpo/core/arti!893
This commit is contained in:
Nick Mathewson 2022-11-29 14:46:55 +00:00
commit f08cfb6567
12 changed files with 182 additions and 53 deletions

2
Cargo.lock generated
View File

@ -4087,7 +4087,6 @@ dependencies = [
"derive_builder_fork_arti",
"futures",
"serde",
"tempfile",
"thiserror",
"tokio",
"tor-chanmgr",
@ -4098,6 +4097,7 @@ dependencies = [
"tor-socksproto",
"tracing",
"tracing-subscriber",
"visibility",
]
[[package]]

View File

@ -36,7 +36,7 @@ async-std = ["tor-rtcompat/async-std"]
bridge-client = ["tor-guardmgr/bridge-client", "tor-dirmgr/bridge-client"]
tokio = ["tor-rtcompat/tokio", "tor-proto/tokio"]
native-tls = ["tor-rtcompat/native-tls"]
pt-client = ["bridge-client", "tor-guardmgr/pt-client"]
pt-client = ["bridge-client", "tor-chanmgr/pt-client", "tor-guardmgr/pt-client"]
rustls = ["tor-rtcompat/rustls"]
static = ["static-sqlite", "static-native-tls"]

View File

@ -89,6 +89,9 @@ pub struct TorClient<R: Runtime> {
// which we can't do when the Arc is inside.
#[cfg(feature = "bridge-client")]
bridge_desc_mgr: Arc<Mutex<Option<Arc<BridgeDescMgr<R>>>>>,
/// Pluggable transport manager.
#[cfg(feature = "pt-client")]
pt_mgr: Arc<tor_ptmgr::PtMgr<R>>,
/// Guard manager
#[cfg_attr(not(feature = "bridge-client"), allow(dead_code))]
guardmgr: GuardMgr<R>,
@ -445,6 +448,23 @@ impl<R: Runtime> TorClient<R> {
let guardmgr = tor_guardmgr::GuardMgr::new(runtime.clone(), statemgr.clone(), &config)
.map_err(ErrorDetail::GuardMgrSetup)?;
#[cfg(feature = "pt-client")]
let pt_mgr = {
let mut pt_state_dir = config.storage.expand_state_dir()?;
pt_state_dir.push("pt_state");
config.storage.permissions().make_directory(&pt_state_dir)?;
let mgr = Arc::new(tor_ptmgr::PtMgr::new(
config.bridges.transports.clone(),
pt_state_dir,
runtime.clone(),
)?);
chanmgr.set_pt_mgr(mgr.clone());
mgr
};
let circmgr = tor_circmgr::CircMgr::new(
&config,
statemgr.clone(),
@ -484,6 +504,9 @@ impl<R: Runtime> TorClient<R> {
#[cfg(feature = "bridge-client")]
let bridge_desc_mgr = Arc::new(Mutex::new(None));
// TODO pt-client: We'll need to tell the ptmgr to launch any background
// tasks, if it has not already done so.
runtime
.spawn(tasks_monitor_dormant(
dormant_recv,
@ -519,6 +542,8 @@ impl<R: Runtime> TorClient<R> {
dirmgr,
#[cfg(feature = "bridge-client")]
bridge_desc_mgr,
#[cfg(feature = "pt-client")]
pt_mgr,
guardmgr,
statemgr,
addrcfg: Arc::new(addr_cfg.into()),
@ -710,6 +735,11 @@ impl<R: Runtime> TorClient<R> {
.reconfigure(&new_config.channel, how, netparams)
.map_err(wrap_err)?;
#[cfg(feature = "pt-client")]
self.pt_mgr
.reconfigure(how, new_config.bridges.transports.clone())
.map_err(wrap_err)?;
if how == tor_config::Reconfigure::CheckAllOrNothing {
return Ok(());
}

View File

@ -44,6 +44,9 @@ pub mod dir {
}
/// Types for configuring pluggable transports.
//
// TODO pt-client: Could we make these ignored when pt-client is not enabled? If
// so, we could make the ptmgr dependency optional, which is something we wanted.
pub mod pt {
pub use tor_ptmgr::config::{ManagedTransportConfig, ManagedTransportConfigBuilder};
}
@ -242,7 +245,7 @@ pub struct BridgesConfig {
/// Configured list of pluggable transports.
#[builder(sub_builder, setter(custom))]
#[builder_field_attr(serde(default))]
transports: TransportConfigList,
pub(crate) transports: TransportConfigList,
}
/// A list of configured transport binaries (type alias for macrology).

View File

@ -198,6 +198,15 @@ enum ErrorDetail {
#[error("Unable to change configuration")]
Reconfigure(#[from] tor_config::ReconfigureError),
/// Problem creating or launching a pluggable transport.
#[cfg(feature="pt-client")]
#[error("Problem with a pluggable transport")]
PluggableTransport(#[from] tor_ptmgr::err::PtError),
/// We encountered a problem while inspecting or creating a directory.
#[error("Filesystem permissions problem")]
FsMistrust(#[from] fs_mistrust::Error),
/// Unable to spawn task
#[error("Unable to spawn {spawning}")]
Spawn {
@ -300,6 +309,8 @@ impl tor_error::HasKind for ErrorDetail {
E::DirMgrSetup(e) => e.kind(),
E::StateMgrSetup(e) => e.kind(),
E::DirMgrBootstrap(e) => e.kind(),
#[cfg(feature = "pt-client")]
E::PluggableTransport(e) => e.kind(),
E::StreamFailed { cause, .. } => cause.kind(),
E::StateAccess(e) => e.kind(),
E::Configuration(e) => e.kind(),
@ -310,6 +321,7 @@ impl tor_error::HasKind for ErrorDetail {
E::LocalAddress => EK::ForbiddenStreamTarget,
E::ChanMgrSetup(e) => e.kind(),
E::NoDir { error, .. } => error.kind(),
E::FsMistrust(_) => EK::FsPermissions,
E::Bug(e) => e.kind(),
}
}

View File

@ -277,21 +277,18 @@ impl<R: Runtime> ChanMgr<R> {
/// This method can be used to e.g. tell Arti to use a proxy for
/// outgoing connections.
#[cfg(feature = "experimental-api")]
pub fn set_default_transport(&self, factory: impl factory::ChannelFactory + 'static) {
pub fn set_default_transport(&self, factory: Arc<dyn factory::ChannelFactory + 'static>) {
// TODO pt-client: Perhaps we actually want to take this as part of the constructor instead?
// TODO pt-client: It's not clear to me that we really need this method.
// TODO pt-client: Should this method take an ArcFactory instead?
self.mgr
.with_mut_builder(|f| f.replace_default_factory(Arc::new(factory)));
.with_mut_builder(|f| f.replace_default_factory(factory));
}
/// Replace the transport registry with one that may know about
/// more transports.
#[cfg(feature = "pt-client")]
pub fn set_pt_mgr(&self, ptmgr: impl factory::AbstractPtMgr + 'static) {
// TODO pt-client: Should this method take an ArcPtMgr instead?
self.mgr
.with_mut_builder(|f| f.replace_ptmgr(Arc::new(ptmgr)));
pub fn set_pt_mgr(&self, ptmgr: Arc<dyn factory::AbstractPtMgr + 'static>) {
self.mgr.with_mut_builder(|f| f.replace_ptmgr(ptmgr));
}
/// Watch for things that ought to change the configuration of all channels in the client

View File

@ -341,6 +341,11 @@ pub enum ErrorKind {
#[display(fmt = "local authentication refused.")]
LocalProtocolFailed,
/// A problem occurred when launching or communicating with an external
/// process running on this computer.
#[display(fmt = "local plug-in tool failed")]
LocalPluginFailed,
/// A relay had an identity other than the one we expected.
///
/// This could indicate a MITM attack, but more likely indicates that the

View File

@ -14,13 +14,16 @@ repository = "https://gitlab.torproject.org/tpo/core/arti.git/"
[features]
default = ["tor-channel-factory"]
tor-channel-factory = []
full = ["tor-channel-factory"]
experimental = ["experimental-api"]
experimental-api = ["visibility"]
[dependencies]
async-trait = "0.1.2"
derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" }
futures = "0.3.14"
serde = { version = "1.0.103", features = ["derive"] }
tempfile = "3.3"
thiserror = "1"
tor-chanmgr = { version = "0.7.0", path = "../tor-chanmgr", features = ["pt-client"] }
tor-config = { version = "0.6.0", path = "../tor-config" }
@ -29,6 +32,7 @@ tor-linkspec = { version = "0.5.1", path = "../tor-linkspec", features = ["pt-cl
tor-rtcompat = { version = "0.7.0", path = "../tor-rtcompat" }
tor-socksproto = { version = "0.5.1", path = "../tor-socksproto" }
tracing = "0.1.18"
visibility = { version = "0.0.1", optional = true }
[dev-dependencies]
anyhow = "1.0.23"
@ -42,3 +46,7 @@ tokio = { version = "1.7", features = [
] }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.7.0", features = ["tokio", "native-tls"] }
tracing-subscriber = "0.3.0"
[[example]]
name = "run-pt"
required-features = ["experimental-api"]

View File

@ -21,18 +21,38 @@ transports
## Limitations
TODO pt-client: Currently, the APIs for this crate make it quite
TODO: Currently, the APIs for this crate make it quite
tor-specific. Notably, it can only return Channels! It would be good
instead to adapt it so that it was more generally useful by other projects
that want to use pluggable transports in rust. For now, I have put the
Tor-channel-specific stuff behind a `tor-channel-factory` feature, but there
are no APIs for using PTs without that feature currently. That should
change.
change. (See issue [arti#666](https://gitlab.torproject.org/tpo/core/arti/-/issues/666))
TODO pt-client: Nothing in this crate is actually implemented yet.
TODO pt-client: The first version of this crate will probably only conform
to the old Tor pluggable transport protocol, and not to more recent variants
TODO: The first version of this crate will probably only conform
to the original Tor pluggable transport protocol, and not to more recent variants
as documented at `pluggabletransports.info`
## Feature flags
### Additive features
* `tor-channel-factory`: Build with support for a ChannelFactory implementation
that allows this crate's use with Tor. (Currently, this is the only way to
use the crate; see "Limitations" section above.)
* `full` -- Build with all the features above.
### Experimental and unstable features
Note that the APIs enabled by these features are NOT covered by semantic
versioning guarantees: we might break them or remove them between patch
versions.
* `experimental-api` -- build with experimental, unstable API support.
* `experimental` -- Build with all experimental features above, along with
all experimental features from other arti crates.
License: MIT OR Apache-2.0

View File

@ -78,21 +78,55 @@ pub enum PtError {
error: CfgPathError,
},
/// The pluggable transport reactor failed.
#[error("PT reactor failed")]
// TODO pt-client: This should just be a bug.
ReactorFailed,
#[error("Internal error")]
Internal(#[from] tor_error::Bug),
}
// TODO pt-client: implement.
impl HasKind for PtError {
fn kind(&self) -> ErrorKind {
todo!()
use ErrorKind as EK;
use PtError as E;
match self {
E::ClientTransportsUnsupported(_) => EK::InvalidConfig,
E::ChildProtocolViolation(_)
| E::ProtocolViolation(_)
| E::UnsupportedVersion
| E::IpcParseFailed { .. } => EK::LocalProtocolViolation,
E::Timeout
| E::ClientTransportFailed { .. }
| E::ChildGone
| E::ChildReadFailed(_)
| E::ChildSpawnFailed { .. }
| E::StdioUnavailable
| E::ProxyError(_) => EK::LocalPluginFailed,
E::TempdirCreateFailed(_) => EK::FsPermissions,
E::PathExpansionFailed { .. } => EK::InvalidConfig,
E::Internal(e) => e.kind(),
}
}
}
impl HasRetryTime for PtError {
fn retry_time(&self) -> RetryTime {
todo!()
use PtError as E;
use RetryTime as RT;
match self {
E::ClientTransportsUnsupported(_)
| E::ClientTransportFailed { .. }
| E::ChildProtocolViolation(_)
| E::ProtocolViolation(_)
| E::ChildSpawnFailed { .. }
| E::IpcParseFailed { .. }
| E::UnsupportedVersion
| E::Internal(_)
| E::PathExpansionFailed { .. } => RT::Never,
E::TempdirCreateFailed(_)
| E::StdioUnavailable
| E::Timeout
| E::ProxyError(_)
| E::ChildGone
| E::ChildReadFailed(_) => RT::AfterWaiting,
}
}
}

View File

@ -47,10 +47,10 @@ pub struct PtStatus {
/// A message sent from a pluggable transport child process.
///
/// For more in-depth information about these messages, consult pt-spec.txt.
// TODO pt-client: this shouldn't be `pub`
#[derive(PartialEq, Eq, Debug, Clone)]
#[non_exhaustive]
pub enum PtMessage {
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
pub(crate) enum PtMessage {
/// `VERSION-ERROR`: No compatible pluggable transport specification version was provided.
VersionError(String),
/// `VERSION`: Specifies the version the binary is using for the IPC protocol.
@ -589,6 +589,7 @@ impl PluggableTransport {
///
/// If it hasn't been launched, the returned map will be empty.
// TODO(eta): Actually figure out a way to expose this more stably.
#[allow(dead_code)] // TODO: remove unless this turns out to be useful.
pub(crate) fn transport_methods(&self) -> &HashMap<PtTransportName, PtClientMethod> {
&self.cmethods
}
@ -600,7 +601,8 @@ impl PluggableTransport {
// FIXME(eta): This API will probably go away and get replaced with something better.
// In particular, we'd want to cache `Status` messages from before this method
// was called.
pub async fn next_message(&mut self) -> err::Result<PtMessage> {
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
pub(crate) async fn next_message(&mut self) -> err::Result<PtMessage> {
let inner = self.inner.as_mut().ok_or(PtError::ChildGone)?;
let ret = inner.recv().await;
if let Err(PtError::ChildGone) | Err(PtError::ChildReadFailed { .. }) = ret {
@ -622,6 +624,11 @@ impl PluggableTransport {
);
return Ok(());
}
info!(
"Launching pluggable transport at {} for {:?}",
self.binary_path.display(),
self.params.transports
);
let child = Command::new(&self.binary_path)
.args(self.arguments.iter())
.envs(self.params.environment_variables())

View File

@ -37,8 +37,7 @@
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
#![allow(dead_code)] // FIXME TODO pt-client: remove.
#![allow(unused_imports)] // FIXME TODO pt-client: remove.
#![allow(dead_code)] // FIXME TODO pt-client remove after implementing reactor.
pub mod config;
pub mod err;
@ -46,26 +45,25 @@ pub mod ipc;
use crate::config::ManagedTransportConfig;
use crate::err::PtError;
use crate::ipc::{PluggableTransport, PtClientMethod, PtParameters, PtParametersBuilder};
use crate::ipc::{PluggableTransport, PtClientMethod, PtParameters};
use crate::mpsc::Receiver;
use async_trait::async_trait;
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::channel::mpsc::{self, UnboundedSender};
use futures::channel::oneshot;
use futures::StreamExt;
use std::collections::HashMap;
use std::mem;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tempfile::TempDir;
use tor_chanmgr::builder::ChanBuilder;
#[cfg(feature = "tor-channel-factory")]
use tor_chanmgr::factory::ChannelFactory;
use tor_chanmgr::factory::{AbstractPtError, AbstractPtMgr};
use tor_chanmgr::transport::{ExternalProxyPlugin, TransportHelper};
use tor_linkspec::{PtTransportName, TransportId};
use tor_linkspec::PtTransportName;
use tor_rtcompat::Runtime;
use tracing::{info, warn};
use tracing::warn;
#[cfg(feature = "tor-channel-factory")]
use {
async_trait::async_trait,
tor_chanmgr::{
builder::ChanBuilder,
factory::{AbstractPtError, ChannelFactory},
transport::ExternalProxyPlugin,
},
};
/// Shared mutable state between the `PtReactor` and `PtMgr`.
#[derive(Default, Debug)]
@ -123,10 +121,8 @@ pub struct PtMgr<R> {
state: Arc<RwLock<PtSharedState>>,
/// PtReactor channel.
tx: UnboundedSender<PtReactorMessage>,
/// Temporary directory to store PT state in.
//
// FIXME(eta): This should be configurable.
state_dir: TempDir,
/// Directory to store PT state in.
state_dir: PathBuf,
}
impl<R: Runtime> PtMgr<R> {
@ -149,7 +145,11 @@ impl<R: Runtime> PtMgr<R> {
/// Create a new PtMgr.
// TODO pt-client: maybe don't have the Vec directly exposed?
pub fn new(transports: Vec<ManagedTransportConfig>, rt: R) -> Result<Self, PtError> {
pub fn new(
transports: Vec<ManagedTransportConfig>,
state_dir: PathBuf,
rt: R,
) -> Result<Self, PtError> {
let state = PtSharedState {
cmethods: Default::default(),
configured: Self::transform_config(transports),
@ -161,18 +161,23 @@ impl<R: Runtime> PtMgr<R> {
runtime: rt,
state,
tx,
state_dir: TempDir::new().map_err(|e| PtError::TempdirCreateFailed(Arc::new(e)))?,
state_dir,
})
}
/// Reload the configuration
pub fn reconfigure(
&mut self,
&self,
how: tor_config::Reconfigure,
transports: Vec<ManagedTransportConfig>,
) -> Result<(), tor_config::ReconfigureError> {
let configured = Self::transform_config(transports);
if how == tor_config::Reconfigure::CheckAllOrNothing {
return Ok(());
}
{
let mut inner = self.state.write().expect("ptmgr poisoned");
inner.configured = Self::transform_config(transports);
inner.configured = configured;
}
// We don't have any way of propagating this sanely; the caller will find out the reactor
// has died later on anyway.
@ -251,11 +256,19 @@ impl<R: Runtime> tor_chanmgr::factory::AbstractPtMgr for PtMgr<R> {
pt: transport.clone(),
result: tx,
})
.map_err(|_| Arc::new(PtError::ReactorFailed) as Arc<dyn AbstractPtError>)?;
.map_err(|_| {
Arc::new(PtError::Internal(tor_error::internal!(
"PT reactor closed unexpectedly"
))) as Arc<dyn AbstractPtError>
})?;
cmethod = Some(
// NOTE(eta): Could be improved with result flattening.
rx.await
.map_err(|_| Arc::new(PtError::ReactorFailed) as Arc<dyn AbstractPtError>)?
.map_err(|_| {
Arc::new(PtError::Internal(tor_error::internal!(
"PT reactor closed unexpectedly"
))) as Arc<dyn AbstractPtError>
})?
.map_err(|x| Arc::new(x) as Arc<dyn AbstractPtError>)?,
);
} else {