Merge branch 'more_introducing' into 'main'

Write more of IptEstablisher.

See merge request tpo/core/arti!1510
This commit is contained in:
Nick Mathewson 2023-08-16 13:02:29 +00:00
commit 2a3fe5bf20
6 changed files with 272 additions and 99 deletions

2
Cargo.lock generated
View File

@ -4630,6 +4630,7 @@ version = "0.2.3"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"futures", "futures",
"postage",
"rand_core 0.6.4", "rand_core 0.6.4",
"thiserror", "thiserror",
"tor-cell", "tor-cell",
@ -4642,6 +4643,7 @@ dependencies = [
"tor-netdir", "tor-netdir",
"tor-proto", "tor-proto",
"tor-rtcompat", "tor-rtcompat",
"tracing",
] ]
[[package]] [[package]]

View File

@ -1 +1,2 @@
ADDED: establish_intro functions now take any impl<Into<HsMacKey>>. ADDED: establish_intro functions now take any impl<Into<HsMacKey>>.
ADDED: Iterator over IntroEstablishedExt.

View File

@ -310,7 +310,8 @@ decl_extension_group! {
/// ///
/// (Currently, no extensions of this type are recognized) /// (Currently, no extensions of this type are recognized)
#[derive(Debug,Clone)] #[derive(Debug,Clone)]
enum IntroEstablishedExt [ IntroEstablishedExtType ] { #[non_exhaustive]
pub enum IntroEstablishedExt [ IntroEstablishedExtType ] {
} }
} }
@ -327,6 +328,11 @@ impl IntroEstablished {
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
} }
/// Return an iterator over the extensions declared in this message.
pub fn iter_extensions(&self) -> impl Iterator<Item = &IntroEstablishedExt> {
self.extensions.iter()
}
} }
impl Body for IntroEstablished { impl Body for IntroEstablished {

View File

@ -102,7 +102,7 @@ impl<T: ExtGroup> ExtList<T> {
} }
} }
/// An unrecognized extension for some HS-related message. /// An unrecognized or unencoded extension for some HS-related message.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct UnrecognizedExt<ID> { pub struct UnrecognizedExt<ID> {
/// The field type ID for this extension. /// The field type ID for this extension.
@ -147,7 +147,8 @@ macro_rules! decl_extension_group {
$( $(#[$cmeta])* $( $(#[$cmeta])*
$case($case), $case($case),
)* )*
/// An extension of a type we do not recognize. /// An extension of a type we do not recognize, or which we have not
/// encoded.
Unrecognized(UnrecognizedExt<$type_id>) Unrecognized(UnrecognizedExt<$type_id>)
} }
impl Readable for $id { impl Readable for $id {

View File

@ -27,6 +27,7 @@ full = [
[dependencies] [dependencies]
async-trait = "0.1.54" async-trait = "0.1.54"
futures = "0.3.14" futures = "0.3.14"
postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] }
rand_core = "0.6.2" rand_core = "0.6.2"
thiserror = "1" thiserror = "1"
tor-cell = { version = "0.12.1", path = "../tor-cell", features = ["hs"] } tor-cell = { version = "0.12.1", path = "../tor-cell", features = ["hs"] }
@ -39,5 +40,6 @@ tor-llcrypto = { version = "0.5.2", path = "../tor-llcrypto" }
tor-netdir = { version = "0.9.3", path = "../tor-netdir" } tor-netdir = { version = "0.9.3", path = "../tor-netdir" }
tor-proto = { version = "0.12.0", path = "../tor-proto", features = ["hs-service", "send-control-msg"] } tor-proto = { version = "0.12.0", path = "../tor-proto", features = ["hs-service", "send-control-msg"] }
tor-rtcompat = { version = "0.9.1", path = "../tor-rtcompat" } tor-rtcompat = { version = "0.9.1", path = "../tor-rtcompat" }
tracing = "0.1.36"
[dev-dependencies] [dev-dependencies]

View File

@ -6,29 +6,38 @@
#![allow(clippy::needless_pass_by_value)] // TODO HSS remove #![allow(clippy::needless_pass_by_value)] // TODO HSS remove
use std::sync::Arc; use std::{sync::Arc, time::Duration};
use futures::{ use futures::{
channel::{mpsc, oneshot}, channel::{
mpsc::{self, UnboundedReceiver},
oneshot,
},
StreamExt as _, StreamExt as _,
}; };
use tor_cell::relaycell::{ use tor_cell::relaycell::{
hs::est_intro::EstablishIntroDetails, hs::est_intro::{self, EstablishIntroDetails},
msg::{AnyRelayMsg, IntroEstablished, Introduce2}, msg::{AnyRelayMsg, IntroEstablished, Introduce2},
RelayMsg as _, RelayMsg as _,
}; };
use tor_circmgr::hspool::HsCircPool; use tor_circmgr::hspool::HsCircPool;
use tor_error::{internal, into_internal}; use tor_error::{debug_report, internal, into_internal};
use tor_hscrypto::pk::HsIntroPtSessionIdKeypair; use tor_hscrypto::pk::HsIntroPtSessionIdKeypair;
use tor_linkspec::CircTarget; use tor_linkspec::{ChanTarget as _, OwnedCircTarget};
use tor_netdir::{NetDir, NetDirProvider, Relay}; use tor_netdir::{NetDir, NetDirProvider, Relay};
use tor_proto::circuit::{ConversationInHandler, MetaCellDisposition}; use tor_proto::circuit::{ClientCirc, ConversationInHandler, MetaCellDisposition};
use tor_rtcompat::Runtime; use tor_rtcompat::{Runtime, SleepProviderExt as _};
use tracing::debug;
use crate::RendRequest; use crate::RendRequest;
/// Handle onto the task which is establishing and maintaining one IPT /// Handle onto the task which is establishing and maintaining one IPT
pub(crate) struct IptEstablisher {} pub(crate) struct IptEstablisher {
/// A stream of events that is produced whenever we have a change in the
/// IptStatus for this intro point. Note that this stream is potentially
/// lossy.
status_events: postage::watch::Receiver<IptStatus>,
}
/// When the `IptEstablisher` is dropped it is torn down /// When the `IptEstablisher` is dropped it is torn down
/// ///
@ -69,6 +78,10 @@ pub(crate) enum IptError {
#[error("Unable to construct signed ESTABLISH_INTRO message")] #[error("Unable to construct signed ESTABLISH_INTRO message")]
CreateEstablishIntro(#[source] tor_cell::Error), CreateEstablishIntro(#[source] tor_cell::Error),
/// We encountered a timeout after building the circuit.
#[error("Timeout during ESTABLISH_INTRO handshake.")]
EstablishTimeout,
/// We encountered an error while sending our establish_intro /// We encountered an error while sending our establish_intro
/// message. /// message.
#[error("Unable to send an ESTABLISH_INTRO message")] #[error("Unable to send an ESTABLISH_INTRO message")]
@ -81,11 +94,41 @@ pub(crate) enum IptError {
// Circuit,... // Circuit,...
ReceiveAck, ReceiveAck,
/// We received an invalid INTRO_ESTABLISHED message.
#[error("Got an invalid INTRO_ESTABLISHED message")]
BadEstablished,
/// We encountered a programming error. /// We encountered a programming error.
#[error("Internal error")] #[error("Internal error")]
Bug(#[from] tor_error::Bug), Bug(#[from] tor_error::Bug),
} }
impl tor_error::HasKind for IptError {
fn kind(&self) -> tor_error::ErrorKind {
use tor_error::ErrorKind as EK;
use IptError as E;
match self {
E::NoNetdir(_) => EK::BootstrapRequired, // TODO HSS maybe not right.
E::NetdirProviderShutdown => EK::ArtiShuttingDown,
E::BuildCircuit(e) => e.kind(),
E::EstablishTimeout => EK::TorNetworkTimeout, // TODO HSS right?
E::SendEstablishIntro(e) => e.kind(),
E::ReceiveAck => EK::RemoteProtocolViolation, // TODO HSS not always right.
E::BadEstablished => EK::RemoteProtocolViolation,
E::CreateEstablishIntro(_) => EK::Internal,
E::Bug(e) => e.kind(),
}
}
}
impl IptError {
/// Return true if this error appears to be the introduction point's fault.
fn is_ipt_failure(&self) -> bool {
// TODO HSS: actually test something here.
true
}
}
impl IptEstablisher { impl IptEstablisher {
/// Try to set up, and maintain, an IPT at `Relay` /// Try to set up, and maintain, an IPT at `Relay`
/// ///
@ -94,14 +137,6 @@ impl IptEstablisher {
circ_pool: Arc<HsCircPool<R>>, circ_pool: Arc<HsCircPool<R>>,
dirprovider: Arc<dyn NetDirProvider>, dirprovider: Arc<dyn NetDirProvider>,
relay: &Relay<'_>, relay: &Relay<'_>,
// Not a postage::watch since we want to count `Good` to `Faulty`
// transitions
//
// (The alternative would be to count them as part of this structure and
// use a postage watch.)
//
// bounded sender with a fixed small bound; OK to stall waiting for manager to catch up
status: mpsc::Sender<IptStatus>,
// TODO HSS: this needs to take some configuration // TODO HSS: this needs to take some configuration
) -> Result<Self, IptError> { ) -> Result<Self, IptError> {
todo!() todo!()
@ -122,7 +157,7 @@ impl IptEstablisher {
/// `hssvc-ipt-algorithms.md`. /// `hssvc-ipt-algorithms.md`.
/// ///
/// TODO HSS Make that file unneeded. /// TODO HSS Make that file unneeded.
#[derive(Clone, Debug)] #[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) enum IptStatusStatus { pub(crate) enum IptStatusStatus {
/// We are (re)establishing our connection to the IPT /// We are (re)establishing our connection to the IPT
/// ///
@ -139,11 +174,11 @@ pub(crate) enum IptStatusStatus {
/// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT /// `Err(IptWantsToRetire)` indicates that the IPT Establisher wants to retire this IPT
/// ///
/// This happens when the IPT has had (too) many rendezvous requests. /// This happens when the IPT has had (too) many rendezvous requests.
#[derive(Clone, Debug)] #[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct IptWantsToRetire; pub(crate) struct IptWantsToRetire;
/// The current status of an introduction point. /// The current status of an introduction point.
#[derive(Clone, Debug)] #[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct IptStatus { pub(crate) struct IptStatus {
/// The current state of this introduction point as defined by /// The current state of this introduction point as defined by
/// `hssvc-ipt-algorithms.md`. /// `hssvc-ipt-algorithms.md`.
@ -151,11 +186,42 @@ pub(crate) struct IptStatus {
/// TODO HSS Make that file unneeded. /// TODO HSS Make that file unneeded.
pub(crate) status: IptStatusStatus, pub(crate) status: IptStatusStatus,
/// How many times have we transitioned into a Faulty state?
///
/// (This is not the same as the total number of failed attempts, since it
/// does not count times we retry from a Faulty state.)
pub(crate) n_faults: u32,
/// The current status of whether this introduction point circuit wants to be /// The current status of whether this introduction point circuit wants to be
/// retired based on having processed too many requests. /// retired based on having processed too many requests.
pub(crate) wants_to_retire: Result<(), IptWantsToRetire>, pub(crate) wants_to_retire: Result<(), IptWantsToRetire>,
} }
impl IptStatus {
/// Record that we have successfully connected to an introduction point.
fn note_open(&mut self) {
self.status = IptStatusStatus::Good;
}
/// Record that we are trying to connect to an introduction point.
fn note_attempt(&mut self) {
use IptStatusStatus::*;
self.status = match self.status {
Establishing | Good => Establishing,
Faulty => Faulty, // We don't change status if we think we're broken.
}
}
/// Record that an error has occurred.
fn note_error(&mut self, err: &IptError) {
use IptStatusStatus::*;
if err.is_ipt_failure() && self.status == Good {
self.n_faults += 1;
self.status = Faulty;
}
}
}
tor_cell::restricted_msg! { tor_cell::restricted_msg! {
/// An acceptable message to receive from an introduction point. /// An acceptable message to receive from an introduction point.
enum IptMsg : RelayMsg { enum IptMsg : RelayMsg {
@ -164,96 +230,191 @@ tor_cell::restricted_msg! {
} }
} }
/// Try, once, to make a circuit to a single relay and establish an introduction /// A set of extensions to send with our `ESTABLISH_INTRO` message.
/// point there.
/// ///
/// Does not retry. Does not time out except via `HsCircPool`. /// NOTE: we eventually might want to support unrecognized extensions. But
async fn establish_intro_once<R, T>( /// that's potentially troublesome, since the set of extensions we sent might
/// have an affect on how we validate the reply.
#[derive(Clone, Debug)]
pub(crate) struct EstIntroExtensionSet {
/// Parameters related to rate-limiting to prevent denial-of-service
/// attacks.
dos_params: Option<est_intro::DosParams>,
}
/// Implementation structure for the task that implements an IptEstablisher.
struct Reactor<R: Runtime> {
/// A copy of our runtime, used for timeouts and sleeping.
runtime: R,
/// A pool used to create circuits to the introduction point.
pool: Arc<HsCircPool<R>>, pool: Arc<HsCircPool<R>>,
/// A provider used to select the other relays in the circuit.
netdir_provider: Arc<dyn NetDirProvider>, netdir_provider: Arc<dyn NetDirProvider>,
target: T, /// The target introduction point.
ipt_sid_keypair: &HsIntroPtSessionIdKeypair, ///
// TODO HSS: Take other extensions. Ideally we would take not only /// TODO: Should this instead be an identity that we look up in the netdir
// DoSParams, but a full IntoIterator<Item=EstablishIntroExt> or ExtList. /// provider?
// But both of those types are private in tor-cell. We should consider target: OwnedCircTarget,
// whether that's what we want. /// The keypair to use when establishing the introduction point.
) -> Result<(), IptError> ///
where /// Knowledge of this private key prevents anybody else from impersonating
R: Runtime, /// us to the introduction point.
T: CircTarget, ipt_sid_keypair: HsIntroPtSessionIdKeypair,
{ /// The extensions to use when establishing the introduction point.
let circuit = { ///
let netdir = /// TODO: Should this be able to change over time if we re-establish this
wait_for_netdir(netdir_provider.as_ref(), tor_netdir::Timeliness::Timely).await?; /// intro point?
let kind = tor_circmgr::hspool::HsCircKind::SvcIntro; extensions: EstIntroExtensionSet,
pool.get_or_launch_specific(netdir.as_ref(), kind, target) }
.await
.map_err(IptError::BuildCircuit)?
// note that netdir is dropped here, to avoid holding on to it any
// longer than necessary.
};
let intro_pt_hop = circuit
.last_hop_num()
.map_err(into_internal!("Somehow built a circuit with no hops!?"))?;
let establish_intro = { /// An open session with a single introduction point.
let ipt_sid_id = ipt_sid_keypair.as_ref().public.into(); //
let details = EstablishIntroDetails::new(ipt_sid_id); // TODO: I've used Ipt and IntroPt in this module; maybe we shouldn't.
let circuit_binding_key = circuit pub(crate) struct IntroPtSession {
.binding_key(intro_pt_hop) /// The circuit to the introduction point, on which we're receiving
.ok_or(internal!("No binding key for introduction point!?"))?; /// Introduce2 messages.
if false { intro_circ: Arc<ClientCirc>,
// TODO HSS: Insert extensions as needed.
/// The stream that will receive Introduce2 messages.
///
/// TODO: we'll likely want to refactor this. @diziet favors having
/// `establish_intro_once` take a Sink as an argument, but I think that we
/// may need to keep this separate so that we can keep the ability to
/// start/stop the stream of Introduce2 messages, and/or detect when it's
/// closed. If we don't need to do that, we can refactor.
introduce_rx: UnboundedReceiver<Introduce2>,
// TODO HSS: How shall we know if the other side has closed the circuit? We
// can either wait for introduce_rx to close, or we can use
// ClientCirc::wait_for_close, if we stabilize it.
}
/// How long to allow for an introduction point to get established?
const ESTABLISH_TIMEOUT: Duration = Duration::new(10, 0); // TODO use a better timeout, taken from circuit estimator.
/// How long to wait after a single failure.
const DELAY_ON_FAILURE: Duration = Duration::new(2, 0); // TODO use stochastic jitter.
impl<R: Runtime> Reactor<R> {
/// Run forever, keeping an introduction point established.
///
/// TODO: If we're running this in its own task, we'll want some way to
/// cancel it.
async fn keep_intro_established(
&self,
mut status_tx: postage::watch::Sender<IptStatus>,
) -> Result<(), IptError> {
loop {
status_tx.borrow_mut().note_attempt();
let outcome = self
.runtime
.timeout(ESTABLISH_TIMEOUT, self.establish_intro_once())
.await
.unwrap_or(Err(IptError::EstablishTimeout));
match self.establish_intro_once().await {
Ok(session) => {
status_tx.borrow_mut().note_open();
debug!(
"Successfully established introduction point with {}",
self.target.display_chan_target()
);
// TODO HSS: let session continue until it dies, actually
// implementing it.
}
Err(e) => {
status_tx.borrow_mut().note_error(&e);
debug_report!(
e,
"Problem establishing introduction point with {}",
self.target.display_chan_target()
);
self.runtime.sleep(DELAY_ON_FAILURE).await;
}
}
} }
let body: Vec<u8> = details }
.sign_and_encode(ipt_sid_keypair.as_ref(), circuit_binding_key.hs_mac())
.map_err(IptError::CreateEstablishIntro)?;
// TODO HSS: This is ugly, but it is the sensible way to munge the above /// Try, once, to make a circuit to a single relay and establish an introduction
// body into a format that AnyRelayCell will accept without doing a /// point there.
// redundant parse step. ///
// /// Does not retry. Does not time out except via `HsCircPool`.
// One alternative would be allowing start_conversation to take an `impl async fn establish_intro_once(&self) -> Result<IntroPtSession, IptError> {
// RelayMsg` rather than an AnyRelayMsg. let circuit = {
// let netdir = wait_for_netdir(
// Or possibly, when we feel like it, we could rename one or more of self.netdir_provider.as_ref(),
// these "Unrecognized"s to Unparsed or Uninterpreted. If we do that, however, we'll tor_netdir::Timeliness::Timely,
// potentially face breaking changes up and down our crate stack. )
AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new( .await?;
tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO, let kind = tor_circmgr::hspool::HsCircKind::SvcIntro;
body, self.pool
)) .get_or_launch_specific(netdir.as_ref(), kind, self.target.clone())
}; .await
.map_err(IptError::BuildCircuit)?
// note that netdir is dropped here, to avoid holding on to it any
// longer than necessary.
};
let intro_pt_hop = circuit
.last_hop_num()
.map_err(into_internal!("Somehow built a circuit with no hops!?"))?;
let (established_tx, established_rx) = oneshot::channel(); let establish_intro = {
let (introduce_tx, introduce_rx) = mpsc::unbounded(); let ipt_sid_id = self.ipt_sid_keypair.as_ref().public.into();
let mut details = EstablishIntroDetails::new(ipt_sid_id);
if let Some(dos_params) = &self.extensions.dos_params {
details.set_extension_dos(dos_params.clone());
}
let circuit_binding_key = circuit
.binding_key(intro_pt_hop)
.ok_or(internal!("No binding key for introduction point!?"))?;
let body: Vec<u8> = details
.sign_and_encode(self.ipt_sid_keypair.as_ref(), circuit_binding_key.hs_mac())
.map_err(IptError::CreateEstablishIntro)?;
let handler = IptMsgHandler { // TODO HSS: This is ugly, but it is the sensible way to munge the above
established_tx: Some(established_tx), // body into a format that AnyRelayCell will accept without doing a
introduce_tx, // redundant parse step.
}; //
let conversation = circuit // One alternative would be allowing start_conversation to take an `impl
.start_conversation(Some(establish_intro), handler, intro_pt_hop) // RelayMsg` rather than an AnyRelayMsg.
.await //
.map_err(IptError::SendEstablishIntro)?; // Or possibly, when we feel like it, we could rename one or more of
// At this point, we have `await`ed for the Conversation to exist, so we know // these "Unrecognized"s to Unparsed or Uninterpreted. If we do that, however, we'll
// that the message was sent. We have to wait for any actual `established` // potentially face breaking changes up and down our crate stack.
// message, though. AnyRelayMsg::Unrecognized(tor_cell::relaycell::msg::Unrecognized::new(
tor_cell::relaycell::RelayCmd::ESTABLISH_INTRO,
body,
))
};
let established = established_rx.await.map_err(|_| IptError::ReceiveAck)?; let (established_tx, established_rx) = oneshot::channel();
let (introduce_tx, introduce_rx) = mpsc::unbounded();
// TODO HSS: handle all the extension data in the established field. let handler = IptMsgHandler {
established_tx: Some(established_tx),
introduce_tx,
};
let conversation = circuit
.start_conversation(Some(establish_intro), handler, intro_pt_hop)
.await
.map_err(IptError::SendEstablishIntro)?;
// At this point, we have `await`ed for the Conversation to exist, so we know
// that the message was sent. We have to wait for any actual `established`
// message, though.
// TODO HSS: Return the introduce_rx stream along with any related types. let established = established_rx.await.map_err(|_| IptError::ReceiveAck)?;
// Or should we have taken introduce_tx as an argument? (@diziet endorses
// the "take it as an argument" idea.)
// TODO HSS: Return the circuit too, of course. if established.iter_extensions().next().is_some() {
// We do not support any extensions from the introduction point; if it
// sent us any, that's a protocol violation.
return Err(IptError::BadEstablished);
}
// TODO HSS: How shall we know if the other side has closed the circuit? We could wait Ok(IntroPtSession {
// for introduce_rx.next() to yield None, but that will only work if we use intro_circ: circuit,
// one mpsc::Sender per circuit... introduce_rx,
todo!() })
}
} }
/// Get a NetDir from `provider`, waiting until one exists. /// Get a NetDir from `provider`, waiting until one exists.