tor-proto: Implement functionality to send a message and accept replies

This new function combines "sending a message" and "accepting
replies in a stream" into a single call, so that there is no gap
between when the message is sent and the replies are available.

There are a number of compromises here, in order to avoid API
proliferation. I've tried to contain them as best I can.

See comments for additional design discussion.
This commit is contained in:
Nick Mathewson 2023-03-03 12:11:34 -05:00
parent 094fdc0d8d
commit bc83d1e1de
5 changed files with 166 additions and 54 deletions

1
Cargo.lock generated
View File

@ -4233,6 +4233,7 @@ dependencies = [
"tor-units",
"tracing",
"typenum",
"visibility",
"zeroize",
]

View File

@ -20,7 +20,7 @@ ntor_v3 = []
hs-client = ["hs-common"]
hs-service = ["hs-common"]
hs-common = []
experimental-api = []
experimental-api = ["visibility"]
# Enable testing-only APIs. APIs under this feature are not
# covered by semver.
testing = []
@ -63,6 +63,7 @@ tor-rtmock = { path = "../tor-rtmock", version = "0.7.0" }
tor-units = { path = "../tor-units", version = "0.5.0" }
tracing = "0.1.18"
typenum = "1.12"
visibility = { version = "0.0.1", optional = true }
zeroize = "1"
[dev-dependencies]
@ -73,7 +74,10 @@ itertools = "0.10.1"
regex = { version = "1", default-features = false, features = ["std"] }
statrs = "0.16.0"
tokio-crate = { package = "tokio", version = "1.7", features = ["full"] }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.8.1", features = ["tokio", "native-tls"] }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.8.1", features = [
"tokio",
"native-tls",
] }
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

View File

@ -43,6 +43,7 @@ pub(crate) mod halfcirc;
mod halfstream;
#[cfg(feature = "hs-common")]
pub mod handshake;
mod msgfilter;
mod path;
pub(crate) mod reactor;
pub(crate) mod sendme;
@ -85,6 +86,9 @@ use self::reactor::RequireSendmeAuth;
/// The size of the buffer for communication between `ClientCirc` and its reactor.
pub const CIRCUIT_BUFFER_SIZE: usize = 128;
#[cfg(feature = "experimental-api")]
pub use {msgfilter::MsgFilter, reactor::MetaCellDisposition};
#[derive(Clone, Debug)]
/// A circuit that we have constructed over the Tor network.
///
@ -233,74 +237,67 @@ impl ClientCirc {
&self.channel
}
/// Send a control message to the final hop on this circuit.
/// Send a control message to the final hop on this circuit, and wait for
/// one or more messages in reply.
///
/// (These steps are performed atomically, so that incoming messages can be
/// accepted immediately after the outbound message is sent.)
///
/// Note that it is quite possible to use this function to violate the tor
/// protocol; most users of this API will not need to call it. It is used
/// to implement most of the onion service handshake.
///
/// (This function is not yet implemented. Right now it will always panic.)
//
// TODO hs: rename this. "control_messages" is kind of ambiguous; we use
// "control" for a lot of other things. We say "meta" elsewhere in the
// reactor code, but "meta messages" just sounds odd.
//
// TODO hs: possibly this should take a more encoded message type.
//
// TODO hs: it might be nice to avoid exposing tor-cell APIs in the
// tor-proto interface.
#[allow(clippy::missing_panics_doc, unused_variables)] // TODO hs remove
#[cfg(feature = "experimental-api")]
pub async fn send_control_message(&self, msg: AnyRelayMsg) -> Result<()> {
todo!() // TODO hs
}
/// Begin accepting 'control' messages from the final hop on this circuit,
/// and return an asynchronous stream of any such messages that arrive.
///
/// A "control" message is a message without a stream ID that `tor-proto`
/// does not handle on its own. (The messages that `tor-proto` can handle
/// are DESTROY, DATA, SENDME, ...) Ordinarily, any unexpected control
/// message will cause the circuit to exit with an error.
///
/// There can only be one stream of this type created on a given circuit at
/// a time. If a such a stream already exists, this method will return an
/// error.
///
/// The caller should be sure to close the circuit if a command that _it_
/// doesn't recognize shows up.
///
/// (This function is not yet implemented; right now, it will always panic.)
//
// TODO hs: Possibly this function (and send_control_message) should use
// TODO hs: I'm not sure this API is the right shape...
//
// It's a little overkill for ESTABLISH_RENDEZVOUS where we expect a single
// RENDEZVOUS_ESTABLISHED, then eventually a single RENDEZVOUS2. It's also a
// little overkill for INTRODUCE1 where we expect an INTRODUCE_ACK.
//
// It will work for it's good for ESTABLISH_INTRO where we expect an
// INTRO_ESTABLISHED followed by a large number of INTRODUCE2-- though we
// might regret an unbounded circuit?
//
// It isn't quite right for RENDEZVOUS1, where we expect no reply, and want
// to send the message to the second-to-last hop (having added a virtual hop
// to the circuit.)
//
// TODO hs: Possibly this function should use
// HopNum or similar to indicate which hop we're talking to, rather than
// just doing "the last hop".
//
// TODO hs: There is possibly some kind of type trickery we could do here so
// that the stream would return a chosen type that implements
// `TryFrom<RelayMsg>` or something like that. Not sure whether that's a
// good idea.
//
// TODO hs: Perhaps the stream here should yield a different type. Ian
// thinks maybe we should store a callback instead.
//
// TODO hs: rename this. "control_messages" is kind of ambiguous; we use
// "control" for a lot of other things. We say "meta" elsewhere in the
// reactor code, but "meta messages" just sounds odd.
//
// TODO hs: This should return a stream of UnparsedRelayCell.
//
// TODO hs: it might be nice to avoid exposing tor-cell APIs in the
// tor-proto interface.
#[cfg(feature = "experimental-api")]
#[allow(clippy::missing_panics_doc, unused_variables)] // TODO hs remove
pub fn receive_control_messages(
pub async fn send_control_message(
&self,
) -> Result<impl futures::Stream<Item = Box<chancell::RawCellBody>>> {
if false {
return Ok(futures::stream::empty()); // TODO hs remove; this is just here for type inference.
msg: tor_cell::relaycell::AnyRelayCell,
reply_filter: impl MsgFilter + Send + 'static,
) -> Result<impl futures::Stream<Item = Result<tor_cell::relaycell::UnparsedRelayCell>>> {
if msg.stream_id() != 0.into() {
return Err(bad_api_usage!("Not a control message.").into());
}
todo!() // TODO hs implement.
let last_hop = self
.path
.last_hop_num()
.ok_or_else(|| internal!("no last hop index"))?;
let (send, recv) = futures::channel::mpsc::unbounded();
let handler = Box::new(msgfilter::UserMsgHandler::new(last_hop, send, reply_filter));
let ctrl_msg = CtrlMsg::SendMsgAndInstallHandler { msg, handler };
self.control
.unbounded_send(ctrl_msg)
.map_err(|_| Error::CircuitClosed)?;
Ok(recv)
}
/// Tell this circuit to begin allowing the final hop of the circuit to try

View File

@ -0,0 +1,91 @@
//! Declare a message filter, for use with
//! [`Circuit::send_control_message`](super::Circuit::send_control_message).
//!
//! Although this is similar to `stream::cmdcheck`, I am deliberately leaving
//! them separate. Conceivably they should be unified at some point down the
//! road?
use futures::channel::mpsc;
use tor_cell::relaycell::UnparsedRelayCell;
use crate::crypto::cell::HopNum;
use crate::Result;
use super::MetaCellDisposition;
/// An object that checks whether incoming control messages are acceptable on a
/// circuit.
///
/// The filter is supplied to
/// [`Circuit::end_control_message`](super::Circuit::send_control_message). It
/// is used to check any incoming message whose stream ID is 0, and which would
/// otherwise not be accepted on a given circuit.
/// (The messages that `tor-proto` will handle on its own, and _not_ deliver, are
/// are DESTROY, DATA, SENDME, ...) Ordinarily, any unexpected control
/// message will cause the circuit to exit with an error.
///
/// There can only be one stream of this type created on a given circuit at
/// a time. If a such a stream already exists, this method will return an
/// error.
///
/// The caller should be sure to close the circuit if a command that _it_
/// doesn't recognize shows up.
///
/// (This function is not yet implemented; right now, it will always panic.)
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
pub(super) trait MsgFilter {
/// Check whether this message is an acceptable one to receive in reply to
/// our command.
fn check_msg(&mut self, msg: &UnparsedRelayCell) -> Result<MetaCellDisposition>;
}
/// Wrapper for `MsgFilter` to implement `MetaCellHandler`
pub(super) struct UserMsgHandler<T> {
/// From which hop to we expect to get messages?
hop: HopNum,
/// An unbounded sender that we use for reporting messages that match the
/// filter.
sender: mpsc::UnboundedSender<Result<UnparsedRelayCell>>,
/// The filter itself.
filter: T,
}
impl<T> UserMsgHandler<T> {
/// Create a new UserMsgHandler to be the MetaCellHandler for a user request.
pub(super) fn new(
hop: HopNum,
sender: mpsc::UnboundedSender<Result<UnparsedRelayCell>>,
filter: T,
) -> Self {
Self {
hop,
sender,
filter,
}
}
}
impl<T: MsgFilter + Send> super::reactor::MetaCellHandler for UserMsgHandler<T> {
fn expected_hop(&self) -> HopNum {
self.hop
}
fn handle_msg(
&mut self,
msg: UnparsedRelayCell,
_reactor: &mut super::reactor::Reactor,
) -> Result<MetaCellDisposition> {
match self.filter.check_msg(&msg) {
Ok(status) => match self.sender.unbounded_send(Ok(msg)) {
Ok(_) => Ok(status),
Err(_) => Ok(MetaCellDisposition::UninstallHandler),
},
Err(e) => {
// (It's okay to ignore send errors here, since we are already
// closing this circuit.)
let _ignore = self.sender.unbounded_send(Err(e.clone()));
Err(e)
}
}
}
}

View File

@ -87,7 +87,8 @@ pub(super) enum CircuitHandshake {
}
/// A message telling the reactor to do something.
#[derive(Debug)]
#[derive(educe::Educe)]
#[educe(Debug)]
pub(super) enum CtrlMsg {
/// Create the first hop of this circuit.
Create {
@ -143,6 +144,16 @@ pub(super) enum CtrlMsg {
/// A `CmdChecker` to keep track of which message types are acceptable.
cmd_checker: AnyCmdChecker,
},
/// Send a given control message on this circuit, and install a control-message handler to
/// receive responses.
// TODO hs naming.
SendMsgAndInstallHandler {
/// The message to send
msg: AnyRelayCell,
/// A message handler to install.
#[educe(Debug(ignore))]
handler: Box<dyn MetaCellHandler + Send + 'static>,
},
/// Send a SENDME cell (used to ask for more data to be sent) on the given stream.
SendSendme {
/// The stream ID to send a SENDME for.
@ -288,9 +299,13 @@ pub(super) trait MetaCellHandler: Send {
) -> Result<MetaCellDisposition>;
}
/// A possible successful outcome of giving a message to a [`MetaCellHandler`].
#[derive(Debug, Copy, Clone)]
#[allow(dead_code)] // TODO HS remove
/// A possible successful outcome of giving a message to a [`MsgHandler`](super::msghandler::MsgHandler).
///
/// (This deliberately does _not_ implement `Clone`, in case we want it to include
/// a the cell itself later on.)
#[derive(Debug)]
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
#[non_exhaustive]
pub(super) enum MetaCellDisposition {
/// The message was consumed; the handler should remain installed.
Consumed,
@ -481,7 +496,7 @@ where
if let Some(done) = self.operation_finished.take() {
// ignore it if the receiving channel went away.
let _ = done.send(status.clone().map(|_| ()));
let _ = done.send(status.as_ref().map(|_| ()).map_err(Clone::clone));
status
} else {
Err(Error::from(internal!(
@ -1220,6 +1235,10 @@ impl Reactor {
let cell = AnyRelayCell::new(stream_id, sendme.into());
self.send_relay_cell(cx, hop_num, false, cell)?;
}
CtrlMsg::SendMsgAndInstallHandler { msg, handler } => {
self.send_relay_cell(cx, handler.expected_hop(), false, msg)?;
self.set_meta_handler(handler)?;
}
#[cfg(test)]
CtrlMsg::AddFakeHop {
supports_flowctrl_1,