diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index 209a62fbf..986d39424 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -71,6 +71,9 @@ use tor_cell::{ use tor_error::{bad_api_usage, internal, into_internal}; use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType}; +#[cfg(feature = "hs-service")] +use tor_cell::relaycell::msg as relaymsg; + use futures::channel::{mpsc, oneshot}; use crate::circuit::sendme::StreamRecvWindow; @@ -946,6 +949,49 @@ impl StreamTarget { Ok(()) } + /// Close the pending stream that owns this StreamTarget, delivering the specified + /// END message. + /// + /// The StreamTarget will set the correct stream ID and pick the + /// right hop, but will not validate that the message is well-formed + /// or meaningful in context. + /// + /// Note that in many cases, the actual contents of an END message can leak unwanted + /// information. Please consider carefully before sending anything but an + /// [`End::new_misc()`](relaymsg::End::new_misc) message over a `ClientCirc`. + /// + /// In addition to sending the END message, this function also ensures + /// the state of the stream map entry of this stream is updated + /// accordingly. + /// + /// Normally, you shouldn't need to call this function, as streams are implicitly closed by the + /// reactor when their corresponding `StreamTarget` is droppped. The only valid use of this + /// function is for closing pending incoming streams (a stream is said to be pending if we have + /// received the message initiating the stream but have not responded to it yet). + /// + /// **NOTE**: This function should be called at most once per request. Calling it twice will + /// cause the reactor to panic. + // + // TODO HSS: do not panic the reactor if this function is called twice. We have 2 options: + // + // * make StreamMap::terminate() not panic if the stream entry is already StreamEnt::EndSent + // * keep the panicky StreamMap::terminate() behaviour and make this function only send the + // ClosePendingStream control message if it hasn't previously sent it (we'll need to add a + // sent_close_pending_stream boolean flag in StreamTarget to remember if close() has been + // called before) + #[cfg(feature = "hs-service")] + pub(crate) async fn close(&self, msg: relaymsg::End) -> Result<()> { + self.circ + .control + .unbounded_send(CtrlMsg::ClosePendingStream { + stream_id: self.stream_id, + hop_num: self.hop_num, + message: msg, + }) + .map_err(|_| Error::CircuitClosed)?; + Ok(()) + } + /// Called when a circuit-level protocol error has occurred and the /// circuit needs to shut down. pub(crate) fn protocol_error(&mut self) {