Flatten TimeoutChannelFactory into ChannelBuilder.

This commit is contained in:
Nick Mathewson 2022-10-13 08:32:39 -04:00
parent 69b64a2795
commit 7b58126706
2 changed files with 30 additions and 48 deletions

View File

@ -13,7 +13,6 @@ use tor_error::{bad_api_usage, internal};
use tor_linkspec::{ChannelMethod, HasChanMethod, HasRelayIds, OwnedChanTarget};
use tor_llcrypto::pk;
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
use tor_rtcompat::SleepProvider;
use tor_rtcompat::{tls::TlsConnector, Runtime, TcpProvider, TlsProvider};
use async_trait::async_trait;
@ -76,51 +75,6 @@ where
}
}
/// Wrapper to apply a timeout to an inner [`ChannelFactory`]
#[derive(Clone, Debug)]
pub(crate) struct TimeoutChannelFactory<R, F> {
/// The inner factory to which we're applying a timeout.
inner: F,
/// The runtime we use for sleeping.
runtime: R,
}
impl<R, F> TimeoutChannelFactory<R, F> {
/// Construct a new TimeoutChannelFactory to apply a standard timeout to a given ChannelFactory.
pub(crate) fn new(runtime: R, inner: F) -> Self {
Self { inner, runtime }
}
}
#[async_trait]
impl<R, F> crate::factory::ChannelFactory for TimeoutChannelFactory<R, F>
where
R: SleepProvider + Sync + Send,
F: crate::factory::ChannelFactory + Sync + Send,
{
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
) -> crate::Result<tor_proto::channel::Channel> {
use tor_rtcompat::SleepProviderExt;
// TODO: make this an option. And make a better value.
let delay = if target.chan_method().is_direct() {
std::time::Duration::new(5, 0)
} else {
std::time::Duration::new(10, 0)
};
let connect_future = self.inner.connect_via_transport(target);
self.runtime
.timeout(delay, connect_future)
.await
.map_err(|_| Error::ChanTimeout {
peer: target.clone(),
})?
}
}
/// Connect to one of the addresses in `addrs` by running connections in parallel until one works.
///
/// This implements a basic version of RFC 8305 "happy eyeballs".
@ -240,6 +194,35 @@ where
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
) -> crate::Result<tor_proto::channel::Channel> {
use tor_rtcompat::SleepProviderExt;
// TODO: make this an option. And make a better value.
let delay = if target.chan_method().is_direct() {
std::time::Duration::new(5, 0)
} else {
std::time::Duration::new(10, 0)
};
let connect_future = self.connect_no_timeout(target);
self.runtime
.timeout(delay, connect_future)
.await
.map_err(|_| Error::ChanTimeout {
peer: target.clone(),
})?
}
}
impl<R: Runtime, H: TransportHelper> ChanBuilder<R, H>
where
R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
H: Send + Sync,
{
/// Perform the work of `connect_via_transport`, but without enforcing a timeout.
async fn connect_no_timeout(
&self,
target: &OwnedChanTarget,
) -> crate::Result<tor_proto::channel::Channel> {
use tor_proto::channel::ChannelBuilder;
use tor_rtcompat::tls::CertifiedConn;

View File

@ -199,8 +199,7 @@ impl<R: Runtime> ChanMgr<R> {
let (sender, receiver) = event::channel();
let sender = Arc::new(std::sync::Mutex::new(sender));
let transport = builder::DefaultTransport::new(runtime.clone());
let builder = builder::ChanBuilder::new(runtime.clone(), transport, sender);
let builder = builder::TimeoutChannelFactory::new(runtime, builder);
let builder = builder::ChanBuilder::new(runtime, transport, sender);
let builder: Box<dyn ChannelFactory + Send + Sync + 'static> = Box::new(builder);
let mgr = mgr::AbstractChanMgr::new(builder, config, dormancy, netparams);
ChanMgr {