channel: Provide and use Sink::prepare_send_from

This is a general-purpose implementation of the ad-hoc approach
currently taken in (eg) crates/tor-proto/src/channel/reactor.rs,
with an API intended to defned against the more obvious mistakes.

This allows us to separate the two concerns: the channel reactor can
focus on handling channel cells and control messages and is over 2.5x
shorter.

The complexity of the manual sink implementation, and the machinery
needed to avoid having to suspend while holding an item, are dealt
with separately.  That separate implemenation now has proper
documentation.  (Tests are in the nest commit to avoid this one being
even more unwieldy.)

We use `extend` to define this as an extension trait.  A competitor is
`ext` but in my personal projects I have found `extend` slightly
better.
This commit is contained in:
Ian Jackson 2022-05-18 16:02:52 +01:00
parent b137d64e18
commit 793782acc8
5 changed files with 326 additions and 75 deletions

4
Cargo.lock generated
View File

@ -3278,7 +3278,11 @@ version = "0.3.0"
dependencies = [
"derive_more",
"educe",
"futures",
"futures-await-test",
"pin-project",
"rand 0.8.5",
"tokio",
]
[[package]]

View File

@ -13,8 +13,12 @@ categories = ["rust-patterns"]
repository = "https://gitlab.torproject.org/tpo/core/arti.git/"
[dependencies]
futures = "0.3.14"
rand = "0.8"
pin-project = "1"
[dev-dependencies]
derive_more = "0.99"
educe = "0.4.6"
futures-await-test = "0.3.0"
tokio = { version = "1.7", features = ["macros", "rt", "rt-multi-thread", "time"] }

View File

@ -0,0 +1,292 @@
//! Futures helpers
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::future::FusedFuture;
use futures::ready;
use futures::Sink;
use pin_project::pin_project;
/// Switch to the nontrivial version of this, to get debugging output on stderr
macro_rules! dprintln { { $f:literal $($a:tt)* } => { } }
//macro_rules! dprintln { { $f:literal $($a:tt)* } => { eprintln!(concat!(" ",$f) $($a)*) } }
/// Extension trait for [`Sink`]
pub trait SinkExt<'w, OS, OM>
where
OS: Sink<OM>,
{
/// For processing an item obtained from a future, avoiding async cancel lossage
///
/// Prepares to send a output message `OM` to an input stream `OS` (`self`),
/// where the `OM` is made from an input message `IM`,
/// and the `IM` is obtained from a future, `generator: IF`.
// This slightly inconsistent terminology, "item" vs "message",
// avoids having to have the generic parameters named `OI` and `II`
// where `I` is sometimes "item" and sometimes "input".
///
/// When successfully run, `prepare_send_from` gives `(IM, SinkSendable)`.
///
/// After processing `IM` into `OM`,
/// use the [`SinkSendable`] to [`send`](SinkSendable::send) the `OM` to `OS`.
///
/// # Why use this
///
/// This avoids the following async cancellation hazaard
/// which exists with naive use of `select!`
/// followed by `OS.send().await`:
///
/// If the input is ready, the corresponding `select!` branch
/// will trigger, yielding the next input item. Then, if the output is *not* ready, awaiting
/// will have that arm return `Pending`, disscarding the item.
///
/// # Example
///
/// This comprehensive example demonstrates how to read from possibly multiple sources
/// and also be able to process other events:
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use futures::select;
/// use futures::{SinkExt as _, StreamExt as _};
/// use tor_basic_utils::futures::SinkExt as _;
///
/// let (mut input_w, mut input_r) = futures::channel::mpsc::unbounded::<usize>();
/// let (mut output_w, mut output_r) = futures::channel::mpsc::unbounded::<String>();
/// input_w.send(42).await;
/// select!{
/// ret = output_w.prepare_send_from(async {
/// select!{
/// got_input = input_r.next() => got_input.expect("input stream ended!"),
/// () = futures::future::pending() => panic!(), // other branches are OK here
/// }
/// }) => {
/// let (input_msg, sendable) = ret.unwrap();
/// let output_msg = input_msg.to_string();
/// let () = sendable.send(output_msg).unwrap();
/// },
/// () = futures::future::pending() => panic!(), // other branches are OK here
/// }
///
/// assert_eq!(output_r.next().await.unwrap(), "42");
/// # }
/// ```
///
/// # Formally
///
/// [`prepare_send_from`](SinkExt::prepare_send_from)
///
/// * Waits for `OS` to be ready to receive an item.
/// * Runs `message_generator` to obtain a `IM`.
/// * Returns the `IM` (for processing), and a [`SinkSendable`].
///
/// The caller should then:
///
/// * Check the error from `prepare_send_from`
/// (which came from the *output* stream).
/// * Process the `IM`, making an `OM` out of it.
/// * Call [`sendable.send()`](SinkSendable::send) (and check its error).
///
/// # Flushing
///
/// `prepare_send_from` will [`flush`](futures::SinkExt::flush) the output sink
/// when it finds the input is not ready yet.
/// Until then items may be buffered
/// (as if they had been written with [`feed`](futures::SinkExt::feed)).
///
/// # Errors
///
/// ## Output sink errors
///
/// The call site can experience output sink errors in two places,
/// [`prepare_send_from()`](SinkExt::prepare_send_from) and [`SinkSendable::send()`].
/// The caller should typically handle them the same way regardless of when they occurred.
///
/// If the error happens at [`SinkSendable::send()`],
/// the call site will usually be forced to discard the item being processed.
/// This will only occur if the sink is actually broken.
///
/// ## Errors specific to the call site: faillible input, and fallible processing
///
/// At some call sites, the input future may yield errors
/// (perhaps it is reading from a `Stream` of [`Result`]s).
/// in that case the value from the input future will be a [`Result`].
/// Then `IM` is a `Result`, and is provided in the `.0` element
/// of the "successful" return from `prepare_send_from`.
///
/// And, at some call sites, the processing of an `IM` into an `OM` is fallible.
///
/// Handling these latter two error caess is up to the caller,
/// in the code which processes `IM`.
/// The call site will often want to deal with such an error
/// without sending anything into the output sink,
/// and can then just drop the [`SinkSendable`].
///
/// # Implementations
///
/// This is an extension trait and you are not expected to need to implement it.
///
/// There are provided implementations for `Pin<&mut impl Sink>`
/// and `&mut impl Sink + Unpin`, for your convenience.
fn prepare_send_from<IF, IM>(
self,
message_generator: IF,
) -> SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>;
}
impl<'w, OS, OM> SinkExt<'w, OS, OM> for Pin<&'w mut OS>
where
OS: Sink<OM>,
{
fn prepare_send_from<'r, IF, IM>(
self,
message_generator: IF,
) -> SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>,
{
SinkPrepareSendFuture {
output: Some(self),
generator: message_generator,
tw: PhantomData,
}
}
}
impl<'w, OS, OM> SinkExt<'w, OS, OM> for &'w mut OS
where
OS: Sink<OM> + Unpin,
{
fn prepare_send_from<'r, IF, IM>(
self,
message_generator: IF,
) -> SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>,
{
Pin::new(self).prepare_send_from(message_generator)
}
}
/// Future for `SinkExt::prepare_send_from`
#[pin_project]
#[must_use]
pub struct SinkPrepareSendFuture<'w, IF, OS, OM> {
#[pin]
generator: IF,
// This Option exists because otherwise SinkPrepareSendFuture::poll()
// can't move `output` out of this struct to put it into the `SinkSendable`.
// (The poll() impl cannot borrow from SinkPrepareSendFuture.)
output: Option<Pin<&'w mut OS>>,
tw: PhantomData<fn(OM)>,
}
/// A [`Sink`] which is ready to receive an item
///
/// Produced by [`SinkExt::prepare_send_from`]. See there for the overview docs.
///
/// This references an output sink `OS`.
/// It offers the ability to write into the sink without blocking,
/// (and constitutes a proof token that the sink has declared itself ready for that).
///
/// The only useful method is [`send`](SinkSendable::send).
#[must_use]
pub struct SinkSendable<'w, OS, OM> {
output: Pin<&'w mut OS>,
tw: PhantomData<fn(OM)>,
}
impl<'w, IF, OS, IM, OM> Future for SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>,
OS: Sink<OM>,
{
type Output = Result<(IM, SinkSendable<'w, OS, OM>), OS::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut self_ = self.project();
let () = match ready!(self_.output.as_mut().unwrap().as_mut().poll_ready(cx)) {
Err(e) => {
dprintln!("poll: output poll = IF.Err SO IF.Err");
// Deliberately don't fuse by taking output
return Poll::Ready(Err(e));
}
Ok(x) => {
dprintln!("poll: output poll = IF.Ok calling generator");
x
}
};
let value = match self_.generator.as_mut().poll(cx) {
Poll::Pending => {
// We defer flushing the output until the input stops yielding.
// Or to put it another way, we do not return `Pending` without flushing.
dprintln!("poll: generator = Pending calling output flush");
let flushed = self_.output.as_mut().unwrap().as_mut().poll_flush(cx);
return match flushed {
Poll::Ready(Err(e)) => {
dprintln!("poll: output flush = IF.Err SO IF.Err");
Poll::Ready(Err(e))
}
Poll::Ready(Ok(())) => {
dprintln!("poll: output flush = IF.Ok SO Pending");
Poll::Pending
}
Poll::Pending => {
dprintln!("poll: output flush = Pending SO Pending");
Poll::Pending
}
};
}
Poll::Ready(v) => {
dprintln!("poll: generator = Ready SO IF.Ok");
v
}
};
let sendable = SinkSendable {
output: self_.output.take().unwrap(),
tw: PhantomData,
};
Poll::Ready(Ok((value, sendable)))
}
}
impl<'w, IF, OS, IM, OM> FusedFuture for SinkPrepareSendFuture<'w, IF, OS, OM>
where
IF: Future<Output = IM>,
OS: Sink<OM>,
{
fn is_terminated(&self) -> bool {
let r = self.output.is_none();
dprintln!("is_terminated = {}", r);
r
}
}
impl<'w, OS, OM> SinkSendable<'w, OS, OM>
where
OS: Sink<OM>,
{
/// Synchronously send an item into `OS`, which is a [`Sink`]
///
/// Can fail if the sink `OS` reports an error.
///
/// (However, the existence of the `SinkSendable` demonstrates that
/// the sink reported itself ready for sending,
/// so this call is synchronous, avoding cancellation hazards.)
pub fn send(self, item: OM) -> Result<(), OS::Error> {
dprintln!("send ...");
let r = self.output.start_send(item);
dprintln!("send: {:?}", r.as_ref().map_err(|_| (())));
r
}
}

View File

@ -41,6 +41,7 @@
use std::fmt;
pub mod futures;
pub mod retry;
// ----------------------------------------------------------------------

View File

@ -10,21 +10,23 @@ use super::circmap::{CircEnt, CircMap};
use crate::circuit::halfcirc::HalfCirc;
use crate::util::err::ReactorError;
use crate::{Error, Result};
use tor_basic_utils::futures::SinkExt as _;
use tor_cell::chancell::msg::{Destroy, DestroyReason};
use tor_cell::chancell::{msg::ChanMsg, ChanCell, CircId};
use futures::channel::{mpsc, oneshot};
use futures::select;
use futures::sink::SinkExt;
use futures::stream::Stream;
use futures::Sink;
use futures::StreamExt as _;
use tor_error::internal;
use std::fmt;
use std::pin::Pin;
//use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Poll;
use crate::channel::{codec::CodecError, unique_id, ChannelDetails};
use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
@ -134,87 +136,35 @@ impl Reactor {
/// Helper for run(): handles only one action, and doesn't mark
/// the channel closed on finish.
async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
// This is written this way (manually calling poll) for a bunch of reasons:
//
// - We can only send things onto self.output if poll_ready has returned Ready, so
// we need some custom logic to implement that.
// - We probably want to call poll_flush on every reactor iteration, to ensure it continues
// to make progress flushing.
// - We also need to do the equivalent of select! between self.cells, self.control, and
// self.input, but with the extra logic bits added above.
//
// In Rust 2021, it would theoretically be possible to do this with a hybrid mix of select!
// and manually implemented poll_fn, but we aren't using that yet. (also, arguably doing
// it this way is both less confusing and more flexible).
let fut = futures::future::poll_fn(|cx| -> Poll<std::result::Result<_, ReactorError>> {
// We've potentially got three types of thing to deal with in this reactor iteration:
let mut cell_to_send = None;
let mut control_message = None;
let mut input = None;
select! {
// See if the output sink can have cells written to it yet.
if let Poll::Ready(ret) = Pin::new(&mut self.output).poll_ready(cx) {
let () = ret.map_err(codec_err_to_chan)?;
// If it can, check whether we have any cells to send it from `Channel` senders.
if let Poll::Ready(msg) = Pin::new(&mut self.cells).poll_next(cx) {
match msg {
x @ Some(..) => cell_to_send = x,
None => {
// cells sender dropped, shut down the reactor!
return Poll::Ready(Err(ReactorError::Shutdown));
}
}
}
// If so, see if we have to-be-transmitted cells.
ret = self.output.prepare_send_from(
// This runs if we will be able to write, so do the read:
self.cells.next()
) => {
let (msg, sendable) = ret.map_err(codec_err_to_chan)?;
let msg = msg.ok_or_else(|| ReactorError::Shutdown)?;
sendable.send(msg).map_err(codec_err_to_chan)?;
}
// Check whether we've got a control message pending.
if let Poll::Ready(ret) = Pin::new(&mut self.control).poll_next(cx) {
match ret {
None | Some(CtrlMsg::Shutdown) => {
return Poll::Ready(Err(ReactorError::Shutdown))
}
x @ Some(..) => control_message = x,
}
ret = self.control.next() => {
let ctrl = match ret {
None | Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
Some(x) => x,
};
self.handle_control(ctrl).await?;
}
// Check whether we've got any incoming cells.
if let Poll::Ready(ret) = Pin::new(&mut self.input).poll_next(cx) {
match ret {
None => return Poll::Ready(Err(ReactorError::Shutdown)),
Some(r) => input = Some(r.map_err(codec_err_to_chan)?),
}
ret = self.input.next() => {
let item = ret
.ok_or_else(|| ReactorError::Shutdown)?
.map_err(codec_err_to_chan)?;
crate::note_incoming_traffic();
self.handle_cell(item).await?;
}
// Flush the output sink. We don't actually care about whether it's ready or not;
// we just want to keep flushing it (hence the _).
let _ = Pin::new(&mut self.output)
.poll_flush(cx)
.map_err(codec_err_to_chan)?;
// If all three values aren't present, return Pending and wait to get polled again
// so that one of them is present.
if cell_to_send.is_none() && control_message.is_none() && input.is_none() {
return Poll::Pending;
}
// Otherwise, return the three Options, one of which is going to be Some.
Poll::Ready(Ok((cell_to_send, control_message, input)))
});
let (cell_to_send, control_message, input) = fut.await?;
if let Some(ctrl) = control_message {
self.handle_control(ctrl).await?;
}
if let Some(item) = input {
crate::note_incoming_traffic();
self.handle_cell(item).await?;
}
if let Some(cts) = cell_to_send {
Pin::new(&mut self.output)
.start_send(cts)
.map_err(codec_err_to_chan)?;
// Give the sink a little flush, to make sure it actually starts doing things.
futures::future::poll_fn(|cx| Pin::new(&mut self.output).poll_flush(cx))
.await
.map_err(codec_err_to_chan)?;
}
Ok(()) // Run again.
}