Merge branch 'stream_ctrl' into 'main'

Experimental new stream-ctrl feature

Closes #847

See merge request tpo/core/arti!1198
This commit is contained in:
Nick Mathewson 2023-05-24 20:01:40 +00:00
commit 6703f3d52a
4 changed files with 238 additions and 3 deletions

View File

@ -32,13 +32,15 @@ full = [
"tor-units/full",
]
experimental = ["experimental-api", "hs-client", "hs-service", "ntor_v3", "testing"]
experimental = ["experimental-api", "hs-client", "hs-service", "ntor_v3", "stream-ctrl", "testing"]
ntor_v3 = ["__is_experimental"]
hs-client = ["hs-common", "__is_experimental"]
hs-service = ["hs-common", "__is_experimental"]
hs-common = ["tor-hscrypto"]
experimental-api = ["send-control-msg", "__is_experimental"]
send-control-msg = ["visibility"] # send_control_message etc.; TODO HS should this be in hs-client?
# send_control_message etc.; TODO HS should this be in hs-client?
send-control-msg = ["visibility"]
stream-ctrl = ["__is_experimental"]
# Enable testing-only APIs. APIs under this feature are not
# covered by semver.
testing = ["__is_experimental"]

View File

@ -10,6 +10,8 @@
//! There is no fairness, rate-limiting, or flow control.
mod cmdcheck;
#[cfg(feature = "stream-ctrl")]
mod ctrl;
mod data;
#[cfg(feature = "hs-service")]
mod incoming;
@ -28,3 +30,7 @@ pub use resolve::ResolveStream;
pub(crate) use {data::DataCmdChecker, resolve::ResolveCmdChecker};
pub use tor_cell::relaycell::msg::IpVersionPreference;
#[cfg(feature = "stream-ctrl")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream-ctrl")))]
pub use {ctrl::ClientStreamCtrl, data::DataStreamCtrl};

View File

@ -0,0 +1,25 @@
//! Common types for `StreamCtrl` traits and objects, used to provide a
//! shareable handle for controlling a string.
use std::sync::Arc;
use crate::circuit::ClientCirc;
/// An object that lets the owner "control" a client stream.
///
/// In some cases, this may be the stream itself; in others, it will be a handle
/// to the shared parts of the stream. (For data streams, it's not convenient to
/// make the actual `AsyncRead` and `AsyncWrite` types shared, since all the methods
/// on those traits take `&mut self`.)
//
// TODO RPC: Does this also apply to relay-side streams? (I say no-nickm)
// Does it apply to RESOLVE streams? (I say yes; they are streams-nickm)
// Which methods from DataStreamCtrl does it make sense to move here?
pub trait ClientStreamCtrl {
/// Return the circuit that this stream is attached to, if that circuit
/// object is still present.
///
/// (If the circuit object itself is not present, the stream is necessarily
/// closed.)
fn circuit(&self) -> Option<Arc<ClientCirc>>;
}

View File

@ -20,6 +20,8 @@ use tor_cell::restricted_msg;
use std::fmt::Debug;
use std::io::Result as IoResult;
use std::pin::Pin;
#[cfg(any(feature = "stream-ctrl", feature = "experimental-api"))]
use std::sync::{Arc, Mutex, Weak};
use educe::Educe;
@ -108,8 +110,44 @@ pub struct DataStream {
/// TODO: This is redundant with the reference in `StreamTarget` inside
/// DataWriterState, but for now we can't actually access that state all the time,
/// since it might be inside a boxed future.
///
/// TODO: This is also redundant with the reference in `DataStreamCtrl`.
#[cfg(feature = "experimental-api")]
circuit: std::sync::Arc<ClientCirc>,
circuit: Arc<ClientCirc>,
/// A control object that can be used to monitor and control this stream
/// without needing to own it.
#[cfg(feature = "stream-ctrl")]
ctrl: std::sync::Arc<DataStreamCtrl>,
}
/// An object used to control and monitor a data stream.
///
/// # Notes
///
/// This is a separate type from [`DataStream`] because it's useful to have
/// multiple references to this object, whereas a [`DataReader`] and [`DataWriter`]
/// need to have a single owner for the `AsyncRead` and `AsyncWrite` APIs to
/// work correctly.
#[cfg(feature = "stream-ctrl")]
#[derive(Debug)]
pub struct DataStreamCtrl {
/// The circuit to which this stream is attached.
///
/// Note that the stream's reader and writer halves each contain a `StreamTarget`,
/// which in turn has a strong reference to the `ClientCirc`. So as long as any
/// one of those is alive, this reference will be present.
///
/// We make this a Weak reference so that once the stream itself is closed,
/// we can't leak circuits.
circuit: Weak<ClientCirc>,
/// Shared user-visible information about the state of this stream.
///
/// TODO RPC: This will probably want to be a `postage::Watch` or something
/// similar, if and when it stops moving around.
#[cfg(feature = "stream-ctrl")]
status: Arc<Mutex<DataStreamStatus>>,
}
/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
@ -136,6 +174,11 @@ pub struct DataWriter {
/// AsyncWrite functions. It might be possible to do better here,
/// and we should refactor if so.
state: Option<DataWriterState>,
/// A control object that can be used to monitor and control this stream
/// without needing to own it.
#[cfg(feature = "stream-ctrl")]
ctrl: std::sync::Arc<DataStreamCtrl>,
}
/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
@ -161,6 +204,68 @@ pub struct DataReader {
/// poll_read(). It might be possible to do better here, and we
/// should refactor if so.
state: Option<DataReaderState>,
/// A control object that can be used to monitor and control this stream
/// without needing to own it.
#[cfg(feature = "stream-ctrl")]
ctrl: std::sync::Arc<DataStreamCtrl>,
}
/// Shared status flags for tracking the status of as `DataStream`.
///
/// We expect to refactor this a bit, so it's not exposed at all.
//
// TODO RPC: Possibly instead of manipulating the fields of DataStreamStatus
// from various points in this module, we should instead construct
// DataStreamStatus as needed from information available elsewhere. In any
// case, we should really eliminate as much duplicate state here as we can.
// (See discussions at !1198 for some challenges with this.)
#[cfg(feature = "stream-ctrl")]
#[derive(Clone, Debug, Default)]
struct DataStreamStatus {
/// True if we've received a CONNECTED message.
//
// TODO: This is redundant with `connected` in DataReaderImpl and
// `connected_received` in DataCmdChecker.
received_connected: bool,
/// True if we have decided to send an END message.
//
// TODO RPC: There is not an easy way to set this from this module! Really,
// the decision to send an "end" is made when the StreamTarget object is
// dropped, but we don't currently have any way to see when that happens.
// Perhaps we need a different shared StreamStatus object that the
// StreamTarget holds?
sent_end: bool,
/// True if we have received an END message telling us to close the stream.
received_end: bool,
/// True if we have received an error.
///
/// (This is not a subset or superset of received_end; some errors are END
/// messages but some aren't; some END messages are errors but some aren't.)
received_err: bool,
}
#[cfg(feature = "stream-ctrl")]
impl DataStreamStatus {
/// Remember that we've received a connected message.
fn record_connected(&mut self) {
self.received_connected = true;
}
/// Remember that we've received an error of some kind.
fn record_error(&mut self, e: &Error) {
// TODO: Probably we should remember the actual error in a box or
// something. But that means making a redundant copy of the error
// even if nobody will want it. Do we care?
match e {
Error::EndReceived(EndReason::DONE) => self.received_end = true,
Error::EndReceived(_) => {
self.received_end = true;
self.received_err = true;
}
_ => self.received_err = true,
}
}
}
restricted_msg! {
@ -171,12 +276,44 @@ restricted_msg! {
}
}
// TODO RPC: Should we also implement this trait for everything that holds a
// DataStreamCtrl?
#[cfg(feature = "stream-ctrl")]
impl super::ctrl::ClientStreamCtrl for DataStreamCtrl {
fn circuit(&self) -> Option<Arc<ClientCirc>> {
self.circuit.upgrade()
}
}
#[cfg(feature = "stream-ctrl")]
impl DataStreamCtrl {
/// Return true if the underlying stream is open. (That is, if it has
/// received a `CONNECTED` message, and has not been closed.)
//
// TODO RPC: Maybe this method belongs in ClientStreamCtrl; maybe others do
// as well! We need to talk about moving them around.
pub fn is_open(&self) -> bool {
let s = self.status.lock().expect("poisoned lock");
s.received_connected && !(s.sent_end || s.received_end || s.received_err)
}
// TODO RPC: Add more functions once we have the desired API more nailed
// down.
}
impl DataStream {
/// Wrap raw stream reader and target parts as a DataStream.
///
/// For non-optimistic stream, function `wait_for_connection`
/// must be called after to make sure CONNECTED is received.
pub(crate) fn new(reader: StreamReader, target: StreamTarget) -> Self {
#[cfg(feature = "stream-ctrl")]
let status = Arc::new(Mutex::new(DataStreamStatus::default()));
#[cfg(feature = "stream-ctrl")]
let ctrl = Arc::new(DataStreamCtrl {
circuit: Arc::downgrade(target.circuit()),
status: status.clone(),
});
#[cfg(feature = "experimental-api")]
let circuit = target.circuit().clone();
let r = DataReader {
@ -185,20 +322,30 @@ impl DataStream {
pending: Vec::new(),
offset: 0,
connected: false,
#[cfg(feature = "stream-ctrl")]
status: status.clone(),
})),
#[cfg(feature = "stream-ctrl")]
ctrl: ctrl.clone(),
};
let w = DataWriter {
state: Some(DataWriterState::Ready(DataWriterImpl {
s: target,
buf: Box::new([0; Data::MAXLEN]),
n_pending: 0,
#[cfg(feature = "stream-ctrl")]
status,
})),
#[cfg(feature = "stream-ctrl")]
ctrl: ctrl.clone(),
};
DataStream {
w,
r,
#[cfg(feature = "experimental-api")]
circuit,
#[cfg(feature = "stream-ctrl")]
ctrl,
}
}
@ -242,10 +389,19 @@ impl DataStream {
///
/// TODO: Should there be an AttachedToCircuit trait that we use for all
/// client stream types? Should this return an Option<&ClientCirc>?
///
/// TODO RPC: Perhaps we should deprecate this in favor of `stream.ctrl().circuit()`.
#[cfg(feature = "experimental-api")]
pub fn circuit(&self) -> &ClientCirc {
&self.circuit
}
/// Return a [`DataStreamCtrl`] object that can be used to monitor and
/// interact with this stream without holding the stream itself.
#[cfg(feature = "stream-ctrl")]
pub fn ctrl(&self) -> &Arc<DataStreamCtrl> {
&self.ctrl
}
}
impl AsyncRead for DataStream {
@ -337,9 +493,20 @@ struct DataWriterImpl {
/// Number of unflushed bytes in buf.
n_pending: usize,
/// Shared user-visible information about the state of this stream.
#[cfg(feature = "stream-ctrl")]
status: Arc<Mutex<DataStreamStatus>>,
}
impl DataWriter {
/// Return a [`DataStreamCtrl`] object that can be used to monitor and
/// interact with this stream without holding the stream itself.
#[cfg(feature = "stream-ctrl")]
pub fn ctrl(&self) -> &Arc<DataStreamCtrl> {
&self.ctrl
}
/// Helper for poll_flush() and poll_close(): Performs a flush, then
/// closes the stream if should_close is true.
fn poll_flush_impl(
@ -375,6 +542,13 @@ impl DataWriter {
}
Poll::Ready((imp, Ok(()))) => {
if should_close {
#[cfg(feature = "stream-ctrl")]
{
// TODO RPC: This is not sufficient to track every case
// where we might have sent an End. See note on the
// `sent_end` field.
imp.status.lock().expect("lock poisoned").sent_end = true;
}
self.state = Some(DataWriterState::Closed);
} else {
self.state = Some(DataWriterState::Ready(imp));
@ -420,6 +594,10 @@ impl AsyncWrite for DataWriter {
match future.as_mut().poll(cx) {
Poll::Ready((_imp, Err(e))) => {
#[cfg(feature = "stream-ctrl")]
{
_imp.status.lock().expect("lock poisoned").record_error(&e);
}
self.state = Some(DataWriterState::Closed);
Poll::Ready(Err(e.into()))
}
@ -494,6 +672,15 @@ impl DataWriterImpl {
}
}
impl DataReader {
/// Return a [`DataStreamCtrl`] object that can be used to monitor and
/// interact with this stream without holding the stream itself.
#[cfg(feature = "stream-ctrl")]
pub fn ctrl(&self) -> &Arc<DataStreamCtrl> {
&self.ctrl
}
}
/// An enumeration for the state of a DataReader.
///
/// We have to use an enum here because, when we're waiting for
@ -537,6 +724,10 @@ struct DataReaderImpl {
/// If true, we have received a CONNECTED cell on this stream.
connected: bool,
/// Shared user-visible information about the state of this stream.
#[cfg(feature = "stream-ctrl")]
status: Arc<Mutex<DataStreamStatus>>,
}
impl AsyncRead for DataReader {
@ -577,6 +768,10 @@ impl AsyncRead for DataReader {
// There aren't any survivable errors in the current
// design.
self.state = Some(DataReaderState::Closed);
#[cfg(feature = "stream-ctrl")]
{
_imp.status.lock().expect("lock poisoned").record_error(&e);
}
let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
Ok(0)
} else {
@ -650,6 +845,13 @@ impl DataReaderImpl {
let result = match msg {
Connected(_) if !self.connected => {
self.connected = true;
#[cfg(feature = "stream-ctrl")]
{
self.status
.lock()
.expect("poisoned lock")
.record_connected();
}
Ok(())
}
Connected(_) => {