chanmgr: Move Timeout functionality into a decorator object.

This commit is contained in:
Nick Mathewson 2022-10-12 09:21:53 -04:00
parent e21ac24c77
commit 15108be5ce
2 changed files with 51 additions and 12 deletions

View File

@ -4,7 +4,7 @@ use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use crate::factory::TransportHelper;
use crate::factory::{ChannelFactory, TransportHelper};
use crate::{event::ChanMgrEventSender, Error};
use safelog::sensitive as sv;
@ -13,6 +13,7 @@ use tor_error::{bad_api_usage, internal};
use tor_linkspec::{ChannelMethod, HasChanMethod, HasRelayIds, OwnedChanTarget};
use tor_llcrypto::pk;
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
use tor_rtcompat::SleepProvider;
use tor_rtcompat::{tls::TlsConnector, Runtime, TcpProvider, TlsProvider};
use async_trait::async_trait;
@ -59,22 +60,56 @@ where
}
#[async_trait]
impl<R: Runtime, H: TransportHelper + Sync + Send> crate::mgr::AbstractChannelFactory
for ChanBuilder<R, H>
impl<CF> crate::mgr::AbstractChannelFactory for CF
where
R: tor_rtcompat::TlsProvider<H::Stream>,
CF: ChannelFactory + Sync,
{
type Channel = tor_proto::channel::Channel;
type BuildSpec = OwnedChanTarget;
async fn build_channel(&self, target: &Self::BuildSpec) -> crate::Result<Self::Channel> {
self.connect_via_transport(target).await
}
}
/// Wrapper to apply a timeout to an inner [`ChannelFactory`]
#[derive(Clone, Debug)]
pub(crate) struct TimeoutChannelFactory<R, F> {
/// The inner factory to which we're applying a timeout.
inner: F,
/// The runtime we use for sleeping.
runtime: R,
}
impl<R, F> TimeoutChannelFactory<R, F> {
/// Construct a new TimeoutChannelFactory to apply a standard timeout to a given ChannelFactory.
pub(crate) fn new(runtime: R, inner: F) -> Self {
Self { inner, runtime }
}
}
#[async_trait]
impl<R, F> crate::factory::ChannelFactory for TimeoutChannelFactory<R, F>
where
R: SleepProvider + Sync + Send,
F: crate::factory::ChannelFactory + Sync + Send,
{
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
) -> crate::Result<tor_proto::channel::Channel> {
use tor_rtcompat::SleepProviderExt;
// TODO: make this an option. And make a better value.
let five_seconds = std::time::Duration::new(5, 0);
let delay = if target.chan_method().is_direct() {
std::time::Duration::new(5, 0)
} else {
std::time::Duration::new(10, 0)
};
let connect_future = self.inner.connect_via_transport(target);
self.runtime
.timeout(five_seconds, self.build_channel_notimeout(target))
.timeout(delay, connect_future)
.await
.map_err(|_| Error::ChanTimeout {
peer: target.clone(),
@ -192,12 +227,13 @@ impl<R: Runtime> crate::factory::TransportHelper for DefaultTransport<R> {
}
}
impl<R: Runtime, H: TransportHelper> ChanBuilder<R, H>
#[async_trait]
impl<R: Runtime, H: TransportHelper> ChannelFactory for ChanBuilder<R, H>
where
R: tor_rtcompat::TlsProvider<H::Stream>,
R: tor_rtcompat::TlsProvider<H::Stream> + Send + Sync,
H: Send + Sync,
{
/// As build_channel, but don't include a timeout.
async fn build_channel_notimeout(
async fn connect_via_transport(
&self,
target: &OwnedChanTarget,
) -> crate::Result<tor_proto::channel::Channel> {

View File

@ -111,7 +111,9 @@ use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
/// get one if it exists.
pub struct ChanMgr<R: Runtime> {
/// Internal channel manager object that does the actual work.
mgr: mgr::AbstractChanMgr<builder::ChanBuilder<R, builder::DefaultTransport<R>>>,
mgr: mgr::AbstractChanMgr<
builder::TimeoutChannelFactory<R, builder::ChanBuilder<R, builder::DefaultTransport<R>>>,
>,
/// Stream of [`ConnStatus`] events.
bootstrap_status: event::ConnStatusEvents,
@ -190,7 +192,8 @@ impl<R: Runtime> ChanMgr<R> {
) -> Self {
let (sender, receiver) = event::channel();
let transport = builder::DefaultTransport::new(runtime.clone());
let builder = builder::ChanBuilder::new(runtime, transport, sender);
let builder = builder::ChanBuilder::new(runtime.clone(), transport, sender);
let builder = builder::TimeoutChannelFactory::new(runtime, builder);
let mgr = mgr::AbstractChanMgr::new(builder, config, dormancy, netparams);
ChanMgr {
mgr,