Wire up more of IptEstablisher.

It now supports running in a loop, trying to establish an
introduction point, and reporting status.
This commit is contained in:
Nick Mathewson 2023-08-15 19:13:27 -04:00
parent ff2d0cffab
commit 0ee63cb04b
3 changed files with 113 additions and 7 deletions

1
Cargo.lock generated
View File

@ -4695,6 +4695,7 @@ dependencies = [
"tor-netdir",
"tor-proto",
"tor-rtcompat",
"tracing",
]
[[package]]

View File

@ -40,5 +40,6 @@ tor-llcrypto = { version = "0.5.2", path = "../tor-llcrypto" }
tor-netdir = { version = "0.9.3", path = "../tor-netdir" }
tor-proto = { version = "0.12.0", path = "../tor-proto", features = ["hs-service", "send-control-msg"] }
tor-rtcompat = { version = "0.9.1", path = "../tor-rtcompat" }
tracing = "0.1.36"
[dev-dependencies]

View File

@ -6,7 +6,7 @@
#![allow(clippy::needless_pass_by_value)] // TODO HSS remove
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use futures::{
channel::{
@ -21,12 +21,13 @@ use tor_cell::relaycell::{
RelayMsg as _,
};
use tor_circmgr::hspool::HsCircPool;
use tor_error::{internal, into_internal};
use tor_error::{debug_report, internal, into_internal};
use tor_hscrypto::pk::HsIntroPtSessionIdKeypair;
use tor_linkspec::OwnedCircTarget;
use tor_linkspec::{ChanTarget as _, OwnedCircTarget};
use tor_netdir::{NetDir, NetDirProvider, Relay};
use tor_proto::circuit::{ClientCirc, ConversationInHandler, MetaCellDisposition};
use tor_rtcompat::Runtime;
use tor_rtcompat::{Runtime, SleepProviderExt as _};
use tracing::debug;
use crate::RendRequest;
@ -77,6 +78,10 @@ pub(crate) enum IptError {
#[error("Unable to construct signed ESTABLISH_INTRO message")]
CreateEstablishIntro(#[source] tor_cell::Error),
/// We encountered a timeout after building the circuit.
#[error("Timeout during ESTABLISH_INTRO handshake.")]
EstablishTimeout,
/// We encountered an error while sending our establish_intro
/// message.
#[error("Unable to send an ESTABLISH_INTRO message")]
@ -98,6 +103,32 @@ pub(crate) enum IptError {
Bug(#[from] tor_error::Bug),
}
impl tor_error::HasKind for IptError {
fn kind(&self) -> tor_error::ErrorKind {
use tor_error::ErrorKind as EK;
use IptError as E;
match self {
E::NoNetdir(_) => EK::BootstrapRequired, // TODO HSS maybe not right.
E::NetdirProviderShutdown => EK::ArtiShuttingDown,
E::BuildCircuit(e) => e.kind(),
E::EstablishTimeout => EK::TorNetworkTimeout, // TODO HSS right?
E::SendEstablishIntro(e) => e.kind(),
E::ReceiveAck => EK::RemoteProtocolViolation, // TODO HSS not always right.
E::BadEstablished => EK::RemoteProtocolViolation,
E::CreateEstablishIntro(_) => EK::Internal,
E::Bug(e) => e.kind(),
}
}
}
impl IptError {
/// Return true if this error appears to be the introduction point's fault.
fn is_ipt_failure(&self) -> bool {
// TODO HSS: actually test something here.
true
}
}
impl IptEstablisher {
/// Try to set up, and maintain, an IPT at `Relay`
///
@ -166,6 +197,31 @@ pub(crate) struct IptStatus {
pub(crate) wants_to_retire: Result<(), IptWantsToRetire>,
}
impl IptStatus {
/// Record that we have successfully connected to an introduction point.
fn note_open(&mut self) {
self.status = IptStatusStatus::Good;
}
/// Record that we are trying to connect to an introduction point.
fn note_attempt(&mut self) {
use IptStatusStatus::*;
self.status = match self.status {
Establishing | Good => Establishing,
Faulty => Faulty, // We don't change status if we think we're broken.
}
}
/// Record that an error has occurred.
fn note_error(&mut self, err: &IptError) {
use IptStatusStatus::*;
if err.is_ipt_failure() && self.status == Good {
self.n_faults += 1;
self.status = Faulty;
}
}
}
tor_cell::restricted_msg! {
/// An acceptable message to receive from an introduction point.
enum IptMsg : RelayMsg {
@ -187,7 +243,9 @@ pub(crate) struct EstIntroExtensionSet {
}
/// Implementation structure for the task that implements an IptEstablisher.
struct IptEstablisherReactor<R: Runtime> {
struct Reactor<R: Runtime> {
/// A copy of our runtime, used for timeouts and sleeping.
runtime: R,
/// A pool used to create circuits to the introduction point.
pool: Arc<HsCircPool<R>>,
/// A provider used to select the other relays in the circuit.
@ -230,7 +288,53 @@ pub(crate) struct IntroPtSession {
// ClientCirc::wait_for_close, if we stabilize it.
}
impl<R: Runtime> IptEstablisherReactor<R> {
/// How long to allow for an introduction point to get established?
const ESTABLISH_TIMEOUT: Duration = Duration::new(10, 0); // TODO use a better timeout, taken from circuit estimator.
/// How long to wait after a single failure.
const DELAY_ON_FAILURE: Duration = Duration::new(2, 0); // TODO use stochastic jitter.
impl<R: Runtime> Reactor<R> {
/// Run forever, keeping an introduction point established.
///
/// TODO: If we're running this in its own task, we'll want some way to
/// cancel it.
async fn keep_intro_established(
&self,
mut status_tx: postage::watch::Sender<IptStatus>,
) -> Result<(), IptError> {
loop {
status_tx.borrow_mut().note_attempt();
let outcome = self
.runtime
.timeout(ESTABLISH_TIMEOUT, self.establish_intro_once())
.await
.unwrap_or(Err(IptError::EstablishTimeout));
match self.establish_intro_once().await {
Ok(session) => {
status_tx.borrow_mut().note_open();
debug!(
"Successfully established introduction point with {}",
self.target.display_chan_target()
);
// TODO HSS: let session continue until it dies, actually
// implementing it.
}
Err(e) => {
status_tx.borrow_mut().note_error(&e);
debug_report!(
e,
"Problem establishing introduction point with {}",
self.target.display_chan_target()
);
self.runtime.sleep(DELAY_ON_FAILURE).await;
}
}
}
}
/// Try, once, to make a circuit to a single relay and establish an introduction
/// point there.
///