Have ChannelBuilder use TransportHelper.

This lets us build channels using different TransportHelpers,
including the (new) default TransportHelper, which just uses the old
connect_to_one() code.
This commit is contained in:
Nick Mathewson 2022-10-11 15:24:10 -04:00
parent 4d25049473
commit e21ac24c77
4 changed files with 92 additions and 27 deletions

View File

@ -0,0 +1 @@
BREAKING: Error::Io.peer is now a PtTargetAddr.

View File

@ -4,6 +4,7 @@ use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use crate::factory::TransportHelper;
use crate::{event::ChanMgrEventSender, Error};
use safelog::sensitive as sv;
@ -27,21 +28,30 @@ static CONNECTION_DELAY: Duration = Duration::from_millis(150);
///
/// This is a separate type so that we can keep our channel management
/// code network-agnostic.
pub(crate) struct ChanBuilder<R: Runtime> {
pub(crate) struct ChanBuilder<R: Runtime, H: TransportHelper>
where
R: tor_rtcompat::TlsProvider<H::Stream>,
{
/// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
runtime: R,
/// Used to update our bootstrap reporting status.
event_sender: Mutex<ChanMgrEventSender>,
/// The transport object that we use to construct streams.
transport: H,
/// Object to build TLS connections.
tls_connector: <R as TlsProvider<R::TcpStream>>::Connector,
tls_connector: <R as TlsProvider<H::Stream>>::Connector,
}
impl<R: Runtime> ChanBuilder<R> {
impl<R: Runtime, H: TransportHelper> ChanBuilder<R, H>
where
R: TlsProvider<H::Stream>,
{
/// Construct a new ChanBuilder.
pub(crate) fn new(runtime: R, event_sender: ChanMgrEventSender) -> Self {
let tls_connector = runtime.tls_connector();
pub(crate) fn new(runtime: R, transport: H, event_sender: ChanMgrEventSender) -> Self {
let tls_connector = <R as TlsProvider<H::Stream>>::tls_connector(&runtime);
ChanBuilder {
runtime,
transport,
event_sender: Mutex::new(event_sender),
tls_connector,
}
@ -49,7 +59,11 @@ impl<R: Runtime> ChanBuilder<R> {
}
#[async_trait]
impl<R: Runtime> crate::mgr::AbstractChannelFactory for ChanBuilder<R> {
impl<R: Runtime, H: TransportHelper + Sync + Send> crate::mgr::AbstractChannelFactory
for ChanBuilder<R, H>
where
R: tor_rtcompat::TlsProvider<H::Stream>,
{
type Channel = tor_proto::channel::Channel;
type BuildSpec = OwnedChanTarget;
@ -130,7 +144,58 @@ async fn connect_to_one<R: Runtime>(
})
}
impl<R: Runtime> ChanBuilder<R> {
/// A default transport object that opens TCP connections for a
/// `ChannelMethod::Direct`.
///
/// It opens almost-simultaneous parallel TCP connections to each address, and
/// chooses the first one to succeed.
#[derive(Clone, Debug)]
pub(crate) struct DefaultTransport<R: Runtime> {
/// The runtime that we use for connecting.
runtime: R,
}
impl<R: Runtime> DefaultTransport<R> {
/// Construct a new DefaultTransport
pub(crate) fn new(runtime: R) -> Self {
Self { runtime }
}
}
#[async_trait]
impl<R: Runtime> crate::factory::TransportHelper for DefaultTransport<R> {
type Stream = <R as TcpProvider>::TcpStream;
/// Implements the transport: makes a TCP connection (possibly
/// tunneled over whatever protocol) if possible.
async fn connect(
&self,
target: &OwnedChanTarget,
) -> crate::Result<(OwnedChanTarget, Self::Stream)> {
let direct_addrs: Vec<_> = match target.chan_method() {
ChannelMethod::Direct(addrs) => addrs,
#[allow(unreachable_patterns)]
_ => {
return Err(Error::UnusableTarget(bad_api_usage!(
"Used default transport implementation for an unsupported transport."
)))
}
};
let (stream, addr) = connect_to_one(&self.runtime, &direct_addrs).await?;
let using_target = match target.restrict_addr(&addr) {
Ok(v) => v,
Err(v) => v,
};
Ok((using_target, stream))
}
}
impl<R: Runtime, H: TransportHelper> ChanBuilder<R, H>
where
R: tor_rtcompat::TlsProvider<H::Stream>,
{
/// As build_channel, but don't include a timeout.
async fn build_channel_notimeout(
&self,
@ -139,7 +204,6 @@ impl<R: Runtime> ChanBuilder<R> {
use tor_proto::channel::ChannelBuilder;
use tor_rtcompat::tls::CertifiedConn;
// 1. Negotiate the TLS connection.
{
self.event_sender
.lock()
@ -147,33 +211,31 @@ impl<R: Runtime> ChanBuilder<R> {
.record_attempt();
}
// TODO pt-client: right now this only handles the Direct method.
let direct_addrs: Vec<_> = match target.chan_method() {
ChannelMethod::Direct(addrs) => addrs,
#[allow(unreachable_patterns)] // TODO pt-client
_ => vec![], // TODO pt-client
};
let (stream, addr) = connect_to_one(&self.runtime, &direct_addrs).await?;
let using_target = match target.restrict_addr(&addr) {
Ok(v) => v,
Err(v) => v,
};
// 1a. Negotiate the TCP connection or other stream.
let (using_target, stream) = self.transport.connect(target).await?;
let using_method = using_target.chan_method();
let peer = using_target.chan_method().target_addr();
let peer_ref = &peer;
let map_ioe = |action: &'static str| {
move |ioe: io::Error| Error::Io {
action,
peer: addr,
peer: peer_ref.clone(),
source: ioe.into(),
}
};
{
// TODO pt-client: distinguish which transport just succeeded.
self.event_sender
.lock()
.expect("Lock poisoned")
.record_tcp_success();
}
// 1b. Negotiate TLS.
// TODO: add a random hostname here if it will be used for SNI?
let tls = self
.tls_connector
@ -195,7 +257,7 @@ impl<R: Runtime> ChanBuilder<R> {
// 2. Set up the channel.
let mut builder = ChannelBuilder::new();
builder.set_declared_method(tor_linkspec::ChannelMethod::Direct(vec![addr]));
builder.set_declared_method(using_method);
let chan = builder
.launch(
tls,
@ -333,7 +395,8 @@ mod test {
// Create the channel builder that we want to test.
let (snd, _rcv) = crate::event::channel();
let builder = ChanBuilder::new(client_rt, snd);
let transport = DefaultTransport::new(client_rt.clone());
let builder = ChanBuilder::new(client_rt, transport, snd);
let (r1, r2): (Result<Channel>, Result<LocalStream>) = futures::join!(
async {

View File

@ -7,7 +7,7 @@ use futures::task::SpawnError;
use thiserror::Error;
use tor_error::{internal, ErrorKind};
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_linkspec::{ChanTarget, OwnedChanTarget, PtTargetAddr};
use tor_proto::ClockSkew;
/// An error returned by a channel manager.
@ -46,10 +46,10 @@ pub enum Error {
},
/// Network IO error or TLS error
#[error("Network IO error, or TLS error, in {action}, talking to {peer}")]
#[error("Network IO error, or TLS error, in {action}, talking to {peer:?}")]
Io {
/// Who we were talking to
peer: SocketAddr,
peer: Option<PtTargetAddr>,
/// What we were doing
action: &'static str,

View File

@ -111,7 +111,7 @@ use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
/// get one if it exists.
pub struct ChanMgr<R: Runtime> {
/// Internal channel manager object that does the actual work.
mgr: mgr::AbstractChanMgr<builder::ChanBuilder<R>>,
mgr: mgr::AbstractChanMgr<builder::ChanBuilder<R, builder::DefaultTransport<R>>>,
/// Stream of [`ConnStatus`] events.
bootstrap_status: event::ConnStatusEvents,
@ -189,7 +189,8 @@ impl<R: Runtime> ChanMgr<R> {
netparams: &NetParameters,
) -> Self {
let (sender, receiver) = event::channel();
let builder = builder::ChanBuilder::new(runtime, sender);
let transport = builder::DefaultTransport::new(runtime.clone());
let builder = builder::ChanBuilder::new(runtime, transport, sender);
let mgr = mgr::AbstractChanMgr::new(builder, config, dormancy, netparams);
ChanMgr {
mgr,