From bc83d1e1de6b1d0a0c5bec7cc2c6f3cf5db89d98 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 3 Mar 2023 12:11:34 -0500 Subject: [PATCH] tor-proto: Implement functionality to send a message and accept replies This new function combines "sending a message" and "accepting replies in a stream" into a single call, so that there is no gap between when the message is sent and the replies are available. There are a number of compromises here, in order to avoid API proliferation. I've tried to contain them as best I can. See comments for additional design discussion. --- Cargo.lock | 1 + crates/tor-proto/Cargo.toml | 8 +- crates/tor-proto/src/circuit.rs | 91 +++++++++++------------ crates/tor-proto/src/circuit/msgfilter.rs | 91 +++++++++++++++++++++++ crates/tor-proto/src/circuit/reactor.rs | 29 ++++++-- 5 files changed, 166 insertions(+), 54 deletions(-) create mode 100644 crates/tor-proto/src/circuit/msgfilter.rs diff --git a/Cargo.lock b/Cargo.lock index 4601a55e4..9c5023087 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4233,6 +4233,7 @@ dependencies = [ "tor-units", "tracing", "typenum", + "visibility", "zeroize", ] diff --git a/crates/tor-proto/Cargo.toml b/crates/tor-proto/Cargo.toml index 432d92d3c..75159c7dc 100644 --- a/crates/tor-proto/Cargo.toml +++ b/crates/tor-proto/Cargo.toml @@ -20,7 +20,7 @@ ntor_v3 = [] hs-client = ["hs-common"] hs-service = ["hs-common"] hs-common = [] -experimental-api = [] +experimental-api = ["visibility"] # Enable testing-only APIs. APIs under this feature are not # covered by semver. testing = [] @@ -63,6 +63,7 @@ tor-rtmock = { path = "../tor-rtmock", version = "0.7.0" } tor-units = { path = "../tor-units", version = "0.5.0" } tracing = "0.1.18" typenum = "1.12" +visibility = { version = "0.0.1", optional = true } zeroize = "1" [dev-dependencies] @@ -73,7 +74,10 @@ itertools = "0.10.1" regex = { version = "1", default-features = false, features = ["std"] } statrs = "0.16.0" tokio-crate = { package = "tokio", version = "1.7", features = ["full"] } -tor-rtcompat = { path = "../tor-rtcompat", version = "0.8.1", features = ["tokio", "native-tls"] } +tor-rtcompat = { path = "../tor-rtcompat", version = "0.8.1", features = [ + "tokio", + "native-tls", +] } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index af51db01b..63d6068f2 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -43,6 +43,7 @@ pub(crate) mod halfcirc; mod halfstream; #[cfg(feature = "hs-common")] pub mod handshake; +mod msgfilter; mod path; pub(crate) mod reactor; pub(crate) mod sendme; @@ -85,6 +86,9 @@ use self::reactor::RequireSendmeAuth; /// The size of the buffer for communication between `ClientCirc` and its reactor. pub const CIRCUIT_BUFFER_SIZE: usize = 128; +#[cfg(feature = "experimental-api")] +pub use {msgfilter::MsgFilter, reactor::MetaCellDisposition}; + #[derive(Clone, Debug)] /// A circuit that we have constructed over the Tor network. /// @@ -233,74 +237,67 @@ impl ClientCirc { &self.channel } - /// Send a control message to the final hop on this circuit. + /// Send a control message to the final hop on this circuit, and wait for + /// one or more messages in reply. + /// + /// (These steps are performed atomically, so that incoming messages can be + /// accepted immediately after the outbound message is sent.) /// /// Note that it is quite possible to use this function to violate the tor /// protocol; most users of this API will not need to call it. It is used /// to implement most of the onion service handshake. /// - /// (This function is not yet implemented. Right now it will always panic.) // // TODO hs: rename this. "control_messages" is kind of ambiguous; we use // "control" for a lot of other things. We say "meta" elsewhere in the // reactor code, but "meta messages" just sounds odd. // - // TODO hs: possibly this should take a more encoded message type. - // // TODO hs: it might be nice to avoid exposing tor-cell APIs in the // tor-proto interface. - #[allow(clippy::missing_panics_doc, unused_variables)] // TODO hs remove - #[cfg(feature = "experimental-api")] - pub async fn send_control_message(&self, msg: AnyRelayMsg) -> Result<()> { - todo!() // TODO hs - } - - /// Begin accepting 'control' messages from the final hop on this circuit, - /// and return an asynchronous stream of any such messages that arrive. - /// - /// A "control" message is a message without a stream ID that `tor-proto` - /// does not handle on its own. (The messages that `tor-proto` can handle - /// are DESTROY, DATA, SENDME, ...) Ordinarily, any unexpected control - /// message will cause the circuit to exit with an error. - /// - /// There can only be one stream of this type created on a given circuit at - /// a time. If a such a stream already exists, this method will return an - /// error. - /// - /// The caller should be sure to close the circuit if a command that _it_ - /// doesn't recognize shows up. - /// - /// (This function is not yet implemented; right now, it will always panic.) // - // TODO hs: Possibly this function (and send_control_message) should use + // TODO hs: I'm not sure this API is the right shape... + // + // It's a little overkill for ESTABLISH_RENDEZVOUS where we expect a single + // RENDEZVOUS_ESTABLISHED, then eventually a single RENDEZVOUS2. It's also a + // little overkill for INTRODUCE1 where we expect an INTRODUCE_ACK. + // + // It will work for it's good for ESTABLISH_INTRO where we expect an + // INTRO_ESTABLISHED followed by a large number of INTRODUCE2-- though we + // might regret an unbounded circuit? + // + // It isn't quite right for RENDEZVOUS1, where we expect no reply, and want + // to send the message to the second-to-last hop (having added a virtual hop + // to the circuit.) + // + // TODO hs: Possibly this function should use // HopNum or similar to indicate which hop we're talking to, rather than // just doing "the last hop". - // - // TODO hs: There is possibly some kind of type trickery we could do here so - // that the stream would return a chosen type that implements - // `TryFrom` or something like that. Not sure whether that's a - // good idea. - // + // TODO hs: Perhaps the stream here should yield a different type. Ian // thinks maybe we should store a callback instead. // - // TODO hs: rename this. "control_messages" is kind of ambiguous; we use - // "control" for a lot of other things. We say "meta" elsewhere in the - // reactor code, but "meta messages" just sounds odd. - // - // TODO hs: This should return a stream of UnparsedRelayCell. - // - // TODO hs: it might be nice to avoid exposing tor-cell APIs in the - // tor-proto interface. #[cfg(feature = "experimental-api")] - #[allow(clippy::missing_panics_doc, unused_variables)] // TODO hs remove - pub fn receive_control_messages( + pub async fn send_control_message( &self, - ) -> Result>> { - if false { - return Ok(futures::stream::empty()); // TODO hs remove; this is just here for type inference. + msg: tor_cell::relaycell::AnyRelayCell, + reply_filter: impl MsgFilter + Send + 'static, + ) -> Result>> { + if msg.stream_id() != 0.into() { + return Err(bad_api_usage!("Not a control message.").into()); } - todo!() // TODO hs implement. + let last_hop = self + .path + .last_hop_num() + .ok_or_else(|| internal!("no last hop index"))?; + let (send, recv) = futures::channel::mpsc::unbounded(); + let handler = Box::new(msgfilter::UserMsgHandler::new(last_hop, send, reply_filter)); + + let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler { msg, handler }; + self.control + .unbounded_send(ctrl_msg) + .map_err(|_| Error::CircuitClosed)?; + + Ok(recv) } /// Tell this circuit to begin allowing the final hop of the circuit to try diff --git a/crates/tor-proto/src/circuit/msgfilter.rs b/crates/tor-proto/src/circuit/msgfilter.rs new file mode 100644 index 000000000..eabd7cdf8 --- /dev/null +++ b/crates/tor-proto/src/circuit/msgfilter.rs @@ -0,0 +1,91 @@ +//! Declare a message filter, for use with +//! [`Circuit::send_control_message`](super::Circuit::send_control_message). +//! +//! Although this is similar to `stream::cmdcheck`, I am deliberately leaving +//! them separate. Conceivably they should be unified at some point down the +//! road? +use futures::channel::mpsc; +use tor_cell::relaycell::UnparsedRelayCell; + +use crate::crypto::cell::HopNum; +use crate::Result; + +use super::MetaCellDisposition; + +/// An object that checks whether incoming control messages are acceptable on a +/// circuit. +/// +/// The filter is supplied to +/// [`Circuit::end_control_message`](super::Circuit::send_control_message). It +/// is used to check any incoming message whose stream ID is 0, and which would +/// otherwise not be accepted on a given circuit. + +/// (The messages that `tor-proto` will handle on its own, and _not_ deliver, are +/// are DESTROY, DATA, SENDME, ...) Ordinarily, any unexpected control +/// message will cause the circuit to exit with an error. +/// +/// There can only be one stream of this type created on a given circuit at +/// a time. If a such a stream already exists, this method will return an +/// error. +/// +/// The caller should be sure to close the circuit if a command that _it_ +/// doesn't recognize shows up. +/// +/// (This function is not yet implemented; right now, it will always panic.) +#[cfg_attr(feature = "experimental-api", visibility::make(pub))] +pub(super) trait MsgFilter { + /// Check whether this message is an acceptable one to receive in reply to + /// our command. + fn check_msg(&mut self, msg: &UnparsedRelayCell) -> Result; +} + +/// Wrapper for `MsgFilter` to implement `MetaCellHandler` +pub(super) struct UserMsgHandler { + /// From which hop to we expect to get messages? + hop: HopNum, + /// An unbounded sender that we use for reporting messages that match the + /// filter. + sender: mpsc::UnboundedSender>, + /// The filter itself. + filter: T, +} + +impl UserMsgHandler { + /// Create a new UserMsgHandler to be the MetaCellHandler for a user request. + pub(super) fn new( + hop: HopNum, + sender: mpsc::UnboundedSender>, + filter: T, + ) -> Self { + Self { + hop, + sender, + filter, + } + } +} + +impl super::reactor::MetaCellHandler for UserMsgHandler { + fn expected_hop(&self) -> HopNum { + self.hop + } + + fn handle_msg( + &mut self, + msg: UnparsedRelayCell, + _reactor: &mut super::reactor::Reactor, + ) -> Result { + match self.filter.check_msg(&msg) { + Ok(status) => match self.sender.unbounded_send(Ok(msg)) { + Ok(_) => Ok(status), + Err(_) => Ok(MetaCellDisposition::UninstallHandler), + }, + Err(e) => { + // (It's okay to ignore send errors here, since we are already + // closing this circuit.) + let _ignore = self.sender.unbounded_send(Err(e.clone())); + Err(e) + } + } + } +} diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index 63947e1a6..3965078f9 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -87,7 +87,8 @@ pub(super) enum CircuitHandshake { } /// A message telling the reactor to do something. -#[derive(Debug)] +#[derive(educe::Educe)] +#[educe(Debug)] pub(super) enum CtrlMsg { /// Create the first hop of this circuit. Create { @@ -143,6 +144,16 @@ pub(super) enum CtrlMsg { /// A `CmdChecker` to keep track of which message types are acceptable. cmd_checker: AnyCmdChecker, }, + /// Send a given control message on this circuit, and install a control-message handler to + /// receive responses. + // TODO hs naming. + SendMsgAndInstallHandler { + /// The message to send + msg: AnyRelayCell, + /// A message handler to install. + #[educe(Debug(ignore))] + handler: Box, + }, /// Send a SENDME cell (used to ask for more data to be sent) on the given stream. SendSendme { /// The stream ID to send a SENDME for. @@ -288,9 +299,13 @@ pub(super) trait MetaCellHandler: Send { ) -> Result; } -/// A possible successful outcome of giving a message to a [`MetaCellHandler`]. -#[derive(Debug, Copy, Clone)] -#[allow(dead_code)] // TODO HS remove +/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler). +/// +/// (This deliberately does _not_ implement `Clone`, in case we want it to include +/// a the cell itself later on.) +#[derive(Debug)] +#[cfg_attr(feature = "experimental-api", visibility::make(pub))] +#[non_exhaustive] pub(super) enum MetaCellDisposition { /// The message was consumed; the handler should remain installed. Consumed, @@ -481,7 +496,7 @@ where if let Some(done) = self.operation_finished.take() { // ignore it if the receiving channel went away. - let _ = done.send(status.clone().map(|_| ())); + let _ = done.send(status.as_ref().map(|_| ()).map_err(Clone::clone)); status } else { Err(Error::from(internal!( @@ -1220,6 +1235,10 @@ impl Reactor { let cell = AnyRelayCell::new(stream_id, sendme.into()); self.send_relay_cell(cx, hop_num, false, cell)?; } + CtrlMsg::SendMsgAndInstallHandler { msg, handler } => { + self.send_relay_cell(cx, handler.expected_hop(), false, msg)?; + self.set_meta_handler(handler)?; + } #[cfg(test)] CtrlMsg::AddFakeHop { supports_flowctrl_1,