Implement a basic form of RFC 8305 ("happy eyeballs") for channels

This makes Arti usable in IPv6-only environments (arti#92) by letting us
attempt multiple connections to a given relay using all of its
addresses instead of just using the first (probably IPv4) one, using the
strategy from RFC 8305 § 5.

This isn't a complete implementation of Happy Eyeballs; ideally, we'd
sort the address list before doing concurrent connections. However, it
works (and has been tested inside an IPv6-only container inside eta's
network :p)
This commit is contained in:
eta 2022-03-04 15:58:21 +00:00
parent eb66d0af6f
commit c98d9dc5fe
2 changed files with 84 additions and 21 deletions

View File

@ -1,7 +1,8 @@
//! Implement a concrete type to build channels.
use std::io;
use std::sync::Mutex;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use crate::{event::ChanMgrEventSender, Error};
@ -9,10 +10,16 @@ use std::time::Duration;
use tor_error::{bad_api_usage, internal};
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_llcrypto::pk;
use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider};
use tor_rtcompat::{tls::TlsConnector, Runtime, TcpProvider, TlsProvider};
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::task::SpawnExt;
use futures::StreamExt;
use futures::{FutureExt, TryFutureExt};
/// Time to wait between starting parallel connections to the same relay.
static CONNECTION_DELAY: Duration = Duration::from_millis(150);
/// TLS-based channel builder.
///
@ -56,6 +63,68 @@ impl<R: Runtime> crate::mgr::ChannelFactory for ChanBuilder<R> {
}
}
/// Connect to one of the addresses in `addrs` by running connections in parallel until one works.
///
/// This implements a basic version of RFC 8305 "happy eyeballs".
async fn connect_to_one<R: Runtime>(
rt: &R,
addrs: &[SocketAddr],
) -> crate::Result<(<R as TcpProvider>::TcpStream, SocketAddr)> {
// We need *some* addresses to connect to.
if addrs.is_empty() {
return Err(Error::UnusableTarget(bad_api_usage!(
"No addresses for chosen relay"
)));
}
// Turn each address into a future that waits (i * CONNECTION_DELAY), then
// attempts to connect to the address using the runtime (where i is the
// array index). Shove all of these into a `FuturesUnordered`, polling them
// simultaneously and returning the results in completion order.
//
// This is basically the concurrent-connection stuff from RFC 8305, ish.
// TODO(eta): sort the addresses first?
let mut connections = addrs
.iter()
.enumerate()
.map(|(i, a)| {
let delay = rt.sleep(CONNECTION_DELAY * i as u32);
delay.then(move |_| {
tracing::info!("Connecting to {}", a);
rt.connect(a)
.map_ok(move |stream| (stream, *a))
.map_err(move |e| (e, *a))
})
})
.collect::<FuturesUnordered<_>>();
let mut ret = None;
let mut errors = vec![];
while let Some(result) = connections.next().await {
match result {
Ok(s) => {
// We got a stream (and address).
ret = Some(s);
break;
}
Err((e, a)) => {
// We got a failure on one of the streams. Store the error.
// TODO(eta): ideally we'd start the next connection attempt immediately.
tracing::warn!("Connection to {} failed: {}", a, e);
errors.push((e, a));
}
}
}
// Ensure we don't continue trying to make connections.
drop(connections);
ret.ok_or_else(|| Error::ChannelBuild {
addresses: errors.into_iter().map(|(e, a)| (a, Arc::new(e))).collect(),
})
}
impl<R: Runtime> ChanBuilder<R> {
/// As build_channel, but don't include a timeout.
async fn build_channel_notimeout(
@ -66,16 +135,6 @@ impl<R: Runtime> ChanBuilder<R> {
use tor_rtcompat::tls::CertifiedConn;
// 1. Negotiate the TLS connection.
// TODO: This just uses the first address. Instead we could be
// smarter, or use "happy eyeballs", or whatever. Maybe we will
// want to refactor as we do so?
let addr = target.addrs().get(0).ok_or_else(|| {
Error::UnusableTarget(bad_api_usage!("No addresses for chosen relay"))
})?;
tracing::info!("Negotiating TLS with {}", addr);
{
self.event_sender
.lock()
@ -83,21 +142,16 @@ impl<R: Runtime> ChanBuilder<R> {
.record_attempt();
}
let (stream, addr) = connect_to_one(&self.runtime, target.addrs()).await?;
let map_ioe = |action: &'static str| {
move |ioe: io::Error| Error::Io {
action,
peer: *addr,
peer: addr,
source: ioe.into(),
}
};
// Establish a TCP connection.
let stream = self
.runtime
.connect(addr)
.await
.map_err(map_ioe("connect"))?;
{
self.event_sender
.lock()
@ -126,7 +180,7 @@ impl<R: Runtime> ChanBuilder<R> {
// 2. Set up the channel.
let mut builder = ChannelBuilder::new();
builder.set_declared_addr(*addr);
builder.set_declared_addr(addr);
let chan = builder.launch(tls).connect().await?;
let now = self.runtime.wallclock();
let chan = chan.check(target, &peer_cert, Some(now))?;

View File

@ -42,6 +42,14 @@ pub enum Error {
source: Arc<std::io::Error>,
},
/// Failed to build a channel, after trying multiple addresses.
#[error("Channel build failed: [(address, error)] = {addresses:?}")]
ChannelBuild {
/// The list of addresses we tried to connect to, coupled with
/// the error we encountered connecting to each one.
addresses: Vec<(SocketAddr, Arc<std::io::Error>)>,
},
/// Unable to spawn task
#[error("unable to spawn {spawning}")]
Spawn {
@ -79,6 +87,7 @@ impl tor_error::HasKind for Error {
E::Proto(e) => e.kind(),
E::PendingFailed => EK::TorAccessFailed,
E::UnusableTarget(_) | E::Internal(_) => EK::Internal,
Error::ChannelBuild { .. } => EK::TorAccessFailed,
}
}
}