diff --git a/crates/tor-proto/Cargo.toml b/crates/tor-proto/Cargo.toml index fc17ec97c..997cb78b7 100644 --- a/crates/tor-proto/Cargo.toml +++ b/crates/tor-proto/Cargo.toml @@ -32,13 +32,15 @@ full = [ "tor-units/full", ] -experimental = ["experimental-api", "hs-client", "hs-service", "ntor_v3", "testing"] +experimental = ["experimental-api", "hs-client", "hs-service", "ntor_v3", "stream-ctrl", "testing"] ntor_v3 = ["__is_experimental"] hs-client = ["hs-common", "__is_experimental"] hs-service = ["hs-common", "__is_experimental"] hs-common = ["tor-hscrypto"] experimental-api = ["send-control-msg", "__is_experimental"] -send-control-msg = ["visibility"] # send_control_message etc.; TODO HS should this be in hs-client? +# send_control_message etc.; TODO HS should this be in hs-client? +send-control-msg = ["visibility"] +stream-ctrl = ["__is_experimental"] # Enable testing-only APIs. APIs under this feature are not # covered by semver. testing = ["__is_experimental"] diff --git a/crates/tor-proto/src/stream.rs b/crates/tor-proto/src/stream.rs index c6d6b2f03..3aa0f2503 100644 --- a/crates/tor-proto/src/stream.rs +++ b/crates/tor-proto/src/stream.rs @@ -10,6 +10,8 @@ //! There is no fairness, rate-limiting, or flow control. mod cmdcheck; +#[cfg(feature = "stream-ctrl")] +mod ctrl; mod data; #[cfg(feature = "hs-service")] mod incoming; @@ -28,3 +30,7 @@ pub use resolve::ResolveStream; pub(crate) use {data::DataCmdChecker, resolve::ResolveCmdChecker}; pub use tor_cell::relaycell::msg::IpVersionPreference; + +#[cfg(feature = "stream-ctrl")] +#[cfg_attr(docsrs, doc(cfg(feature = "stream-ctrl")))] +pub use {ctrl::ClientStreamCtrl, data::DataStreamCtrl}; diff --git a/crates/tor-proto/src/stream/ctrl.rs b/crates/tor-proto/src/stream/ctrl.rs new file mode 100644 index 000000000..d9e3ef677 --- /dev/null +++ b/crates/tor-proto/src/stream/ctrl.rs @@ -0,0 +1,25 @@ +//! Common types for `StreamCtrl` traits and objects, used to provide a +//! shareable handle for controlling a string. + +use std::sync::Arc; + +use crate::circuit::ClientCirc; + +/// An object that lets the owner "control" a client stream. +/// +/// In some cases, this may be the stream itself; in others, it will be a handle +/// to the shared parts of the stream. (For data streams, it's not convenient to +/// make the actual `AsyncRead` and `AsyncWrite` types shared, since all the methods +/// on those traits take `&mut self`.) +// +// TODO RPC: Does this also apply to relay-side streams? (I say no-nickm) +// Does it apply to RESOLVE streams? (I say yes; they are streams-nickm) +// Which methods from DataStreamCtrl does it make sense to move here? +pub trait ClientStreamCtrl { + /// Return the circuit that this stream is attached to, if that circuit + /// object is still present. + /// + /// (If the circuit object itself is not present, the stream is necessarily + /// closed.) + fn circuit(&self) -> Option>; +} diff --git a/crates/tor-proto/src/stream/data.rs b/crates/tor-proto/src/stream/data.rs index 4b9196651..7aef914c1 100644 --- a/crates/tor-proto/src/stream/data.rs +++ b/crates/tor-proto/src/stream/data.rs @@ -20,6 +20,8 @@ use tor_cell::restricted_msg; use std::fmt::Debug; use std::io::Result as IoResult; use std::pin::Pin; +#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))] +use std::sync::{Arc, Mutex, Weak}; use educe::Educe; @@ -108,8 +110,44 @@ pub struct DataStream { /// TODO: This is redundant with the reference in `StreamTarget` inside /// DataWriterState, but for now we can't actually access that state all the time, /// since it might be inside a boxed future. + /// + /// TODO: This is also redundant with the reference in `DataStreamCtrl`. #[cfg(feature = "experimental-api")] - circuit: std::sync::Arc, + circuit: Arc, + + /// A control object that can be used to monitor and control this stream + /// without needing to own it. + #[cfg(feature = "stream-ctrl")] + ctrl: std::sync::Arc, +} + +/// An object used to control and monitor a data stream. +/// +/// # Notes +/// +/// This is a separate type from [`DataStream`] because it's useful to have +/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`] +/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to +/// work correctly. +#[cfg(feature = "stream-ctrl")] +#[derive(Debug)] +pub struct DataStreamCtrl { + /// The circuit to which this stream is attached. + /// + /// Note that the stream's reader and writer halves each contain a `StreamTarget`, + /// which in turn has a strong reference to the `ClientCirc`. So as long as any + /// one of those is alive, this reference will be present. + /// + /// We make this a Weak reference so that once the stream itself is closed, + /// we can't leak circuits. + circuit: Weak, + + /// Shared user-visible information about the state of this stream. + /// + /// TODO RPC: This will probably want to be a `postage::Watch` or something + /// similar, if and when it stops moving around. + #[cfg(feature = "stream-ctrl")] + status: Arc>, } /// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`]. @@ -136,6 +174,11 @@ pub struct DataWriter { /// AsyncWrite functions. It might be possible to do better here, /// and we should refactor if so. state: Option, + + /// A control object that can be used to monitor and control this stream + /// without needing to own it. + #[cfg(feature = "stream-ctrl")] + ctrl: std::sync::Arc, } /// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`]. @@ -161,6 +204,68 @@ pub struct DataReader { /// poll_read(). It might be possible to do better here, and we /// should refactor if so. state: Option, + + /// A control object that can be used to monitor and control this stream + /// without needing to own it. + #[cfg(feature = "stream-ctrl")] + ctrl: std::sync::Arc, +} + +/// Shared status flags for tracking the status of as `DataStream`. +/// +/// We expect to refactor this a bit, so it's not exposed at all. +// +// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus +// from various points in this module, we should instead construct +// DataStreamStatus as needed from information available elsewhere. In any +// case, we should really eliminate as much duplicate state here as we can. +// (See discussions at !1198 for some challenges with this.) +#[cfg(feature = "stream-ctrl")] +#[derive(Clone, Debug, Default)] +struct DataStreamStatus { + /// True if we've received a CONNECTED message. + // + // TODO: This is redundant with `connected` in DataReaderImpl and + // `connected_received` in DataCmdChecker. + received_connected: bool, + /// True if we have decided to send an END message. + // + // TODO RPC: There is not an easy way to set this from this module! Really, + // the decision to send an "end" is made when the StreamTarget object is + // dropped, but we don't currently have any way to see when that happens. + // Perhaps we need a different shared StreamStatus object that the + // StreamTarget holds? + sent_end: bool, + /// True if we have received an END message telling us to close the stream. + received_end: bool, + /// True if we have received an error. + /// + /// (This is not a subset or superset of received_end; some errors are END + /// messages but some aren't; some END messages are errors but some aren't.) + received_err: bool, +} + +#[cfg(feature = "stream-ctrl")] +impl DataStreamStatus { + /// Remember that we've received a connected message. + fn record_connected(&mut self) { + self.received_connected = true; + } + + /// Remember that we've received an error of some kind. + fn record_error(&mut self, e: &Error) { + // TODO: Probably we should remember the actual error in a box or + // something. But that means making a redundant copy of the error + // even if nobody will want it. Do we care? + match e { + Error::EndReceived(EndReason::DONE) => self.received_end = true, + Error::EndReceived(_) => { + self.received_end = true; + self.received_err = true; + } + _ => self.received_err = true, + } + } } restricted_msg! { @@ -171,12 +276,44 @@ restricted_msg! { } } +// TODO RPC: Should we also implement this trait for everything that holds a +// DataStreamCtrl? +#[cfg(feature = "stream-ctrl")] +impl super::ctrl::ClientStreamCtrl for DataStreamCtrl { + fn circuit(&self) -> Option> { + self.circuit.upgrade() + } +} + +#[cfg(feature = "stream-ctrl")] +impl DataStreamCtrl { + /// Return true if the underlying stream is open. (That is, if it has + /// received a `CONNECTED` message, and has not been closed.) + // + // TODO RPC: Maybe this method belongs in ClientStreamCtrl; maybe others do + // as well! We need to talk about moving them around. + pub fn is_open(&self) -> bool { + let s = self.status.lock().expect("poisoned lock"); + s.received_connected && !(s.sent_end || s.received_end || s.received_err) + } + + // TODO RPC: Add more functions once we have the desired API more nailed + // down. +} + impl DataStream { /// Wrap raw stream reader and target parts as a DataStream. /// /// For non-optimistic stream, function `wait_for_connection` /// must be called after to make sure CONNECTED is received. pub(crate) fn new(reader: StreamReader, target: StreamTarget) -> Self { + #[cfg(feature = "stream-ctrl")] + let status = Arc::new(Mutex::new(DataStreamStatus::default())); + #[cfg(feature = "stream-ctrl")] + let ctrl = Arc::new(DataStreamCtrl { + circuit: Arc::downgrade(target.circuit()), + status: status.clone(), + }); #[cfg(feature = "experimental-api")] let circuit = target.circuit().clone(); let r = DataReader { @@ -185,20 +322,30 @@ impl DataStream { pending: Vec::new(), offset: 0, connected: false, + #[cfg(feature = "stream-ctrl")] + status: status.clone(), })), + #[cfg(feature = "stream-ctrl")] + ctrl: ctrl.clone(), }; let w = DataWriter { state: Some(DataWriterState::Ready(DataWriterImpl { s: target, buf: Box::new([0; Data::MAXLEN]), n_pending: 0, + #[cfg(feature = "stream-ctrl")] + status, })), + #[cfg(feature = "stream-ctrl")] + ctrl: ctrl.clone(), }; DataStream { w, r, #[cfg(feature = "experimental-api")] circuit, + #[cfg(feature = "stream-ctrl")] + ctrl, } } @@ -242,10 +389,19 @@ impl DataStream { /// /// TODO: Should there be an AttachedToCircuit trait that we use for all /// client stream types? Should this return an Option<&ClientCirc>? + /// + /// TODO RPC: Perhaps we should deprecate this in favor of `stream.ctrl().circuit()`. #[cfg(feature = "experimental-api")] pub fn circuit(&self) -> &ClientCirc { &self.circuit } + + /// Return a [`DataStreamCtrl`] object that can be used to monitor and + /// interact with this stream without holding the stream itself. + #[cfg(feature = "stream-ctrl")] + pub fn ctrl(&self) -> &Arc { + &self.ctrl + } } impl AsyncRead for DataStream { @@ -337,9 +493,20 @@ struct DataWriterImpl { /// Number of unflushed bytes in buf. n_pending: usize, + + /// Shared user-visible information about the state of this stream. + #[cfg(feature = "stream-ctrl")] + status: Arc>, } impl DataWriter { + /// Return a [`DataStreamCtrl`] object that can be used to monitor and + /// interact with this stream without holding the stream itself. + #[cfg(feature = "stream-ctrl")] + pub fn ctrl(&self) -> &Arc { + &self.ctrl + } + /// Helper for poll_flush() and poll_close(): Performs a flush, then /// closes the stream if should_close is true. fn poll_flush_impl( @@ -375,6 +542,13 @@ impl DataWriter { } Poll::Ready((imp, Ok(()))) => { if should_close { + #[cfg(feature = "stream-ctrl")] + { + // TODO RPC: This is not sufficient to track every case + // where we might have sent an End. See note on the + // `sent_end` field. + imp.status.lock().expect("lock poisoned").sent_end = true; + } self.state = Some(DataWriterState::Closed); } else { self.state = Some(DataWriterState::Ready(imp)); @@ -420,6 +594,10 @@ impl AsyncWrite for DataWriter { match future.as_mut().poll(cx) { Poll::Ready((_imp, Err(e))) => { + #[cfg(feature = "stream-ctrl")] + { + _imp.status.lock().expect("lock poisoned").record_error(&e); + } self.state = Some(DataWriterState::Closed); Poll::Ready(Err(e.into())) } @@ -494,6 +672,15 @@ impl DataWriterImpl { } } +impl DataReader { + /// Return a [`DataStreamCtrl`] object that can be used to monitor and + /// interact with this stream without holding the stream itself. + #[cfg(feature = "stream-ctrl")] + pub fn ctrl(&self) -> &Arc { + &self.ctrl + } +} + /// An enumeration for the state of a DataReader. /// /// We have to use an enum here because, when we're waiting for @@ -537,6 +724,10 @@ struct DataReaderImpl { /// If true, we have received a CONNECTED cell on this stream. connected: bool, + + /// Shared user-visible information about the state of this stream. + #[cfg(feature = "stream-ctrl")] + status: Arc>, } impl AsyncRead for DataReader { @@ -577,6 +768,10 @@ impl AsyncRead for DataReader { // There aren't any survivable errors in the current // design. self.state = Some(DataReaderState::Closed); + #[cfg(feature = "stream-ctrl")] + { + _imp.status.lock().expect("lock poisoned").record_error(&e); + } let result = if matches!(e, Error::EndReceived(EndReason::DONE)) { Ok(0) } else { @@ -650,6 +845,13 @@ impl DataReaderImpl { let result = match msg { Connected(_) if !self.connected => { self.connected = true; + #[cfg(feature = "stream-ctrl")] + { + self.status + .lock() + .expect("poisoned lock") + .record_connected(); + } Ok(()) } Connected(_) => {