diff --git a/Cargo.lock b/Cargo.lock index 71c551551..f99079250 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4630,6 +4630,7 @@ version = "0.2.3" dependencies = [ "async-trait", "futures", + "postage", "rand_core 0.6.4", "thiserror", "tor-cell", @@ -4642,6 +4643,7 @@ dependencies = [ "tor-netdir", "tor-proto", "tor-rtcompat", + "tracing", ] [[package]] diff --git a/crates/tor-cell/semver.md b/crates/tor-cell/semver.md index c8d977be2..d187e5957 100644 --- a/crates/tor-cell/semver.md +++ b/crates/tor-cell/semver.md @@ -1 +1,2 @@ ADDED: establish_intro functions now take any impl>. +ADDED: Iterator over IntroEstablishedExt. diff --git a/crates/tor-cell/src/relaycell/hs.rs b/crates/tor-cell/src/relaycell/hs.rs index 2aa07dc61..45510c8e9 100644 --- a/crates/tor-cell/src/relaycell/hs.rs +++ b/crates/tor-cell/src/relaycell/hs.rs @@ -310,7 +310,8 @@ decl_extension_group! { /// /// (Currently, no extensions of this type are recognized) #[derive(Debug,Clone)] - enum IntroEstablishedExt [ IntroEstablishedExtType ] { + #[non_exhaustive] + pub enum IntroEstablishedExt [ IntroEstablishedExtType ] { } } @@ -327,6 +328,11 @@ impl IntroEstablished { pub fn new() -> Self { Self::default() } + + /// Return an iterator over the extensions declared in this message. + pub fn iter_extensions(&self) -> impl Iterator { + self.extensions.iter() + } } impl Body for IntroEstablished { diff --git a/crates/tor-cell/src/relaycell/hs/ext.rs b/crates/tor-cell/src/relaycell/hs/ext.rs index 1795359bc..fd4e7951e 100644 --- a/crates/tor-cell/src/relaycell/hs/ext.rs +++ b/crates/tor-cell/src/relaycell/hs/ext.rs @@ -102,7 +102,7 @@ impl ExtList { } } -/// An unrecognized extension for some HS-related message. +/// An unrecognized or unencoded extension for some HS-related message. #[derive(Clone, Debug)] pub struct UnrecognizedExt { /// The field type ID for this extension. @@ -147,7 +147,8 @@ macro_rules! decl_extension_group { $( $(#[$cmeta])* $case($case), )* - /// An extension of a type we do not recognize. + /// An extension of a type we do not recognize, or which we have not + /// encoded. Unrecognized(UnrecognizedExt<$type_id>) } impl Readable for $id { diff --git a/crates/tor-hsservice/Cargo.toml b/crates/tor-hsservice/Cargo.toml index e358f03b6..fdfb6501b 100644 --- a/crates/tor-hsservice/Cargo.toml +++ b/crates/tor-hsservice/Cargo.toml @@ -27,6 +27,7 @@ full = [ [dependencies] async-trait = "0.1.54" futures = "0.3.14" +postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] } rand_core = "0.6.2" thiserror = "1" tor-cell = { version = "0.12.1", path = "../tor-cell", features = ["hs"] } @@ -39,5 +40,6 @@ tor-llcrypto = { version = "0.5.2", path = "../tor-llcrypto" } tor-netdir = { version = "0.9.3", path = "../tor-netdir" } tor-proto = { version = "0.12.0", path = "../tor-proto", features = ["hs-service", "send-control-msg"] } tor-rtcompat = { version = "0.9.1", path = "../tor-rtcompat" } +tracing = "0.1.36" [dev-dependencies] diff --git a/crates/tor-hsservice/src/svc/ipt_establish.rs b/crates/tor-hsservice/src/svc/ipt_establish.rs index 7504b835e..c3a8f8819 100644 --- a/crates/tor-hsservice/src/svc/ipt_establish.rs +++ b/crates/tor-hsservice/src/svc/ipt_establish.rs @@ -6,29 +6,38 @@ #![allow(clippy::needless_pass_by_value)] // TODO HSS remove -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use futures::{ - channel::{mpsc, oneshot}, + channel::{ + mpsc::{self, UnboundedReceiver}, + oneshot, + }, StreamExt as _, }; use tor_cell::relaycell::{ - hs::est_intro::EstablishIntroDetails, + hs::est_intro::{self, EstablishIntroDetails}, msg::{AnyRelayMsg, IntroEstablished, Introduce2}, RelayMsg as _, }; use tor_circmgr::hspool::HsCircPool; -use tor_error::{internal, into_internal}; +use tor_error::{debug_report, internal, into_internal}; use tor_hscrypto::pk::HsIntroPtSessionIdKeypair; -use tor_linkspec::CircTarget; +use tor_linkspec::{ChanTarget as _, OwnedCircTarget}; use tor_netdir::{NetDir, NetDirProvider, Relay}; -use tor_proto::circuit::{ConversationInHandler, MetaCellDisposition}; -use tor_rtcompat::Runtime; +use tor_proto::circuit::{ClientCirc, ConversationInHandler, MetaCellDisposition}; +use tor_rtcompat::{Runtime, SleepProviderExt as _}; +use tracing::debug; use crate::RendRequest; /// Handle onto the task which is establishing and maintaining one IPT -pub(crate) struct IptEstablisher {} +pub(crate) struct IptEstablisher { + /// A stream of events that is produced whenever we have a change in the + /// IptStatus for this intro point. Note that this stream is potentially + /// lossy. + status_events: postage::watch::Receiver, +} /// When the `IptEstablisher` is dropped it is torn down /// @@ -69,6 +78,10 @@ pub(crate) enum IptError { #[error("Unable to construct signed ESTABLISH_INTRO message")] CreateEstablishIntro(#[source] tor_cell::Error), + /// We encountered a timeout after building the circuit. + #[error("Timeout during ESTABLISH_INTRO handshake.")] + EstablishTimeout, + /// We encountered an error while sending our establish_intro /// message. #[error("Unable to send an ESTABLISH_INTRO message")] @@ -81,11 +94,41 @@ pub(crate) enum IptError { // Circuit,... ReceiveAck, + /// We received an invalid INTRO_ESTABLISHED message. + #[error("Got an invalid INTRO_ESTABLISHED message")] + BadEstablished, + /// We encountered a programming error. #[error("Internal error")] Bug(#[from] tor_error::Bug), } +impl tor_error::HasKind for IptError { + fn kind(&self) -> tor_error::ErrorKind { + use tor_error::ErrorKind as EK; + use IptError as E; + match self { + E::NoNetdir(_) => EK::BootstrapRequired, // TODO HSS maybe not right. + E::NetdirProviderShutdown => EK::ArtiShuttingDown, + E::BuildCircuit(e) => e.kind(), + E::EstablishTimeout => EK::TorNetworkTimeout, // TODO HSS right? + E::SendEstablishIntro(e) => e.kind(), + E::ReceiveAck => EK::RemoteProtocolViolation, // TODO HSS not always right. + E::BadEstablished => EK::RemoteProtocolViolation, + E::CreateEstablishIntro(_) => EK::Internal, + E::Bug(e) => e.kind(), + } + } +} + +impl IptError { + /// Return true if this error appears to be the introduction point's fault. + fn is_ipt_failure(&self) -> bool { + // TODO HSS: actually test something here. + true + } +} + impl IptEstablisher { /// Try to set up, and maintain, an IPT at `Relay` /// @@ -94,14 +137,6 @@ impl IptEstablisher { circ_pool: Arc>, dirprovider: Arc, relay: &Relay<'_>, - // Not a postage::watch since we want to count `Good` to `Faulty` - // transitions - // - // (The alternative would be to count them as part of this structure and - // use a postage watch.) - // - // bounded sender with a fixed small bound; OK to stall waiting for manager to catch up - status: mpsc::Sender, // TODO HSS: this needs to take some configuration ) -> Result { todo!() @@ -122,7 +157,7 @@ impl IptEstablisher { /// `hssvc-ipt-algorithms.md`. /// /// TODO HSS Make that file unneeded. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] pub(crate) enum IptStatusStatus { /// We are (re)establishing our connection to the IPT /// @@ -139,11 +174,11 @@ pub(crate) enum IptStatusStatus { /// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT /// /// This happens when the IPT has had (too) many rendezvous requests. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct IptWantsToRetire; /// The current status of an introduction point. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct IptStatus { /// The current state of this introduction point as defined by /// `hssvc-ipt-algorithms.md`. @@ -151,11 +186,42 @@ pub(crate) struct IptStatus { /// TODO HSS Make that file unneeded. pub(crate) status: IptStatusStatus, + /// How many times have we transitioned into a Faulty state? + /// + /// (This is not the same as the total number of failed attempts, since it + /// does not count times we retry from a Faulty state.) + pub(crate) n_faults: u32, + /// The current status of whether this introduction point circuit wants to be /// retired based on having processed too many requests. pub(crate) wants_to_retire: Result<(), IptWantsToRetire>, } +impl IptStatus { + /// Record that we have successfully connected to an introduction point. + fn note_open(&mut self) { + self.status = IptStatusStatus::Good; + } + + /// Record that we are trying to connect to an introduction point. + fn note_attempt(&mut self) { + use IptStatusStatus::*; + self.status = match self.status { + Establishing | Good => Establishing, + Faulty => Faulty, // We don't change status if we think we're broken. + } + } + + /// Record that an error has occurred. + fn note_error(&mut self, err: &IptError) { + use IptStatusStatus::*; + if err.is_ipt_failure() && self.status == Good { + self.n_faults += 1; + self.status = Faulty; + } + } +} + tor_cell::restricted_msg! { /// An acceptable message to receive from an introduction point. enum IptMsg : RelayMsg { @@ -164,96 +230,191 @@ tor_cell::restricted_msg! { } } -/// Try, once, to make a circuit to a single relay and establish an introduction -/// point there. +/// A set of extensions to send with our `ESTABLISH_INTRO` message. /// -/// Does not retry. Does not time out except via `HsCircPool`. -async fn establish_intro_once( +/// NOTE: we eventually might want to support unrecognized extensions. But +/// that's potentially troublesome, since the set of extensions we sent might +/// have an affect on how we validate the reply. +#[derive(Clone, Debug)] +pub(crate) struct EstIntroExtensionSet { + /// Parameters related to rate-limiting to prevent denial-of-service + /// attacks. + dos_params: Option, +} + +/// Implementation structure for the task that implements an IptEstablisher. +struct Reactor { + /// A copy of our runtime, used for timeouts and sleeping. + runtime: R, + /// A pool used to create circuits to the introduction point. pool: Arc>, + /// A provider used to select the other relays in the circuit. netdir_provider: Arc, - target: T, - ipt_sid_keypair: &HsIntroPtSessionIdKeypair, - // TODO HSS: Take other extensions. Ideally we would take not only - // DoSParams, but a full IntoIterator or ExtList. - // But both of those types are private in tor-cell. We should consider - // whether that's what we want. -) -> Result<(), IptError> -where - R: Runtime, - T: CircTarget, -{ - let circuit = { - let netdir = - wait_for_netdir(netdir_provider.as_ref(), tor_netdir::Timeliness::Timely).await?; - let kind = tor_circmgr::hspool::HsCircKind::SvcIntro; - pool.get_or_launch_specific(netdir.as_ref(), kind, target) - .await - .map_err(IptError::BuildCircuit)? - // note that netdir is dropped here, to avoid holding on to it any - // longer than necessary. - }; - let intro_pt_hop = circuit - .last_hop_num() - .map_err(into_internal!("Somehow built a circuit with no hops!?"))?; + /// The target introduction point. + /// + /// TODO: Should this instead be an identity that we look up in the netdir + /// provider? + target: OwnedCircTarget, + /// The keypair to use when establishing the introduction point. + /// + /// Knowledge of this private key prevents anybody else from impersonating + /// us to the introduction point. + ipt_sid_keypair: HsIntroPtSessionIdKeypair, + /// The extensions to use when establishing the introduction point. + /// + /// TODO: Should this be able to change over time if we re-establish this + /// intro point? + extensions: EstIntroExtensionSet, +} - let establish_intro = { - let ipt_sid_id = ipt_sid_keypair.as_ref().public.into(); - let details = EstablishIntroDetails::new(ipt_sid_id); - let circuit_binding_key = circuit - .binding_key(intro_pt_hop) - .ok_or(internal!("No binding key for introduction point!?"))?; - if false { - // TODO HSS: Insert extensions as needed. +/// An open session with a single introduction point. +// +// TODO: I've used Ipt and IntroPt in this module; maybe we shouldn't. +pub(crate) struct IntroPtSession { + /// The circuit to the introduction point, on which we're receiving + /// Introduce2 messages. + intro_circ: Arc, + + /// The stream that will receive Introduce2 messages. + /// + /// TODO: we'll likely want to refactor this. @diziet favors having + /// `establish_intro_once` take a Sink as an argument, but I think that we + /// may need to keep this separate so that we can keep the ability to + /// start/stop the stream of Introduce2 messages, and/or detect when it's + /// closed. If we don't need to do that, we can refactor. + introduce_rx: UnboundedReceiver, + // TODO HSS: How shall we know if the other side has closed the circuit? We + // can either wait for introduce_rx to close, or we can use + // ClientCirc::wait_for_close, if we stabilize it. +} + +/// How long to allow for an introduction point to get established? +const ESTABLISH_TIMEOUT: Duration = Duration::new(10, 0); // TODO use a better timeout, taken from circuit estimator. + +/// How long to wait after a single failure. +const DELAY_ON_FAILURE: Duration = Duration::new(2, 0); // TODO use stochastic jitter. + +impl Reactor { + /// Run forever, keeping an introduction point established. + /// + /// TODO: If we're running this in its own task, we'll want some way to + /// cancel it. + async fn keep_intro_established( + &self, + mut status_tx: postage::watch::Sender, + ) -> Result<(), IptError> { + loop { + status_tx.borrow_mut().note_attempt(); + let outcome = self + .runtime + .timeout(ESTABLISH_TIMEOUT, self.establish_intro_once()) + .await + .unwrap_or(Err(IptError::EstablishTimeout)); + + match self.establish_intro_once().await { + Ok(session) => { + status_tx.borrow_mut().note_open(); + debug!( + "Successfully established introduction point with {}", + self.target.display_chan_target() + ); + + // TODO HSS: let session continue until it dies, actually + // implementing it. + } + Err(e) => { + status_tx.borrow_mut().note_error(&e); + debug_report!( + e, + "Problem establishing introduction point with {}", + self.target.display_chan_target() + ); + self.runtime.sleep(DELAY_ON_FAILURE).await; + } + } } - let body: Vec = details - .sign_and_encode(ipt_sid_keypair.as_ref(), circuit_binding_key.hs_mac()) - .map_err(IptError::CreateEstablishIntro)?; + } - // TODO HSS: This is ugly, but it is the sensible way to munge the above - // body into a format that AnyRelayCell will accept without doing a - // redundant parse step. - // - // One alternative would be allowing start_conversation to take an `impl - // RelayMsg` rather than an AnyRelayMsg. - // - // Or possibly, when we feel like it, we could rename one or more of - // these "Unrecognized"s to Unparsed or Uninterpreted. If we do that, however, we'll - // potentially face breaking changes up and down our crate stack. - AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new( - tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO, - body, - )) - }; + /// Try, once, to make a circuit to a single relay and establish an introduction + /// point there. + /// + /// Does not retry. Does not time out except via `HsCircPool`. + async fn establish_intro_once(&self) -> Result { + let circuit = { + let netdir = wait_for_netdir( + self.netdir_provider.as_ref(), + tor_netdir::Timeliness::Timely, + ) + .await?; + let kind = tor_circmgr::hspool::HsCircKind::SvcIntro; + self.pool + .get_or_launch_specific(netdir.as_ref(), kind, self.target.clone()) + .await + .map_err(IptError::BuildCircuit)? + // note that netdir is dropped here, to avoid holding on to it any + // longer than necessary. + }; + let intro_pt_hop = circuit + .last_hop_num() + .map_err(into_internal!("Somehow built a circuit with no hops!?"))?; - let (established_tx, established_rx) = oneshot::channel(); - let (introduce_tx, introduce_rx) = mpsc::unbounded(); + let establish_intro = { + let ipt_sid_id = self.ipt_sid_keypair.as_ref().public.into(); + let mut details = EstablishIntroDetails::new(ipt_sid_id); + if let Some(dos_params) = &self.extensions.dos_params { + details.set_extension_dos(dos_params.clone()); + } + let circuit_binding_key = circuit + .binding_key(intro_pt_hop) + .ok_or(internal!("No binding key for introduction point!?"))?; + let body: Vec = details + .sign_and_encode(self.ipt_sid_keypair.as_ref(), circuit_binding_key.hs_mac()) + .map_err(IptError::CreateEstablishIntro)?; - let handler = IptMsgHandler { - established_tx: Some(established_tx), - introduce_tx, - }; - let conversation = circuit - .start_conversation(Some(establish_intro), handler, intro_pt_hop) - .await - .map_err(IptError::SendEstablishIntro)?; - // At this point, we have `await`ed for the Conversation to exist, so we know - // that the message was sent. We have to wait for any actual `established` - // message, though. + // TODO HSS: This is ugly, but it is the sensible way to munge the above + // body into a format that AnyRelayCell will accept without doing a + // redundant parse step. + // + // One alternative would be allowing start_conversation to take an `impl + // RelayMsg` rather than an AnyRelayMsg. + // + // Or possibly, when we feel like it, we could rename one or more of + // these "Unrecognized"s to Unparsed or Uninterpreted. If we do that, however, we'll + // potentially face breaking changes up and down our crate stack. + AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new( + tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO, + body, + )) + }; - let established = established_rx.await.map_err(|_| IptError::ReceiveAck)?; + let (established_tx, established_rx) = oneshot::channel(); + let (introduce_tx, introduce_rx) = mpsc::unbounded(); - // TODO HSS: handle all the extension data in the established field. + let handler = IptMsgHandler { + established_tx: Some(established_tx), + introduce_tx, + }; + let conversation = circuit + .start_conversation(Some(establish_intro), handler, intro_pt_hop) + .await + .map_err(IptError::SendEstablishIntro)?; + // At this point, we have `await`ed for the Conversation to exist, so we know + // that the message was sent. We have to wait for any actual `established` + // message, though. - // TODO HSS: Return the introduce_rx stream along with any related types. - // Or should we have taken introduce_tx as an argument? (@diziet endorses - // the "take it as an argument" idea.) + let established = established_rx.await.map_err(|_| IptError::ReceiveAck)?; - // TODO HSS: Return the circuit too, of course. + if established.iter_extensions().next().is_some() { + // We do not support any extensions from the introduction point; if it + // sent us any, that's a protocol violation. + return Err(IptError::BadEstablished); + } - // TODO HSS: How shall we know if the other side has closed the circuit? We could wait - // for introduce_rx.next() to yield None, but that will only work if we use - // one mpsc::Sender per circuit... - todo!() + Ok(IntroPtSession { + intro_circ: circuit, + introduce_rx, + }) + } } /// Get a NetDir from `provider`, waiting until one exists.