diff --git a/crates/tor-basic-utils/src/futures.rs b/crates/tor-basic-utils/src/futures.rs index 40f568e78..1b1c44f22 100644 --- a/crates/tor-basic-utils/src/futures.rs +++ b/crates/tor-basic-utils/src/futures.rs @@ -290,3 +290,125 @@ where r } } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] // why is this not the default in tests +mod test { + use super::*; + use futures::channel::mpsc; + use futures::future::poll_fn; + use futures::select_biased; + use futures::SinkExt as _; + use futures_await_test::async_test; + use std::convert::Infallible; + use std::sync::Arc; + use std::sync::Mutex; + + #[derive(Debug, Eq, PartialEq)] + struct TestError(char); + + #[async_test] + async fn prepare_send() { + // Early versions of this used unfold quite a lot more, but it is not really + // convenient for testing. It buffers one item internally, and is also buggy: + // https://github.com/rust-lang/futures-rs/issues/2600 + // So we use mpsc channels, which (perhaps with buffering) are quite controllable. + + // The eprintln!("FOR ...") calls correspond go the dprintln1() calls in the impl, + // and can check that each code path in the impementation is used, + // by turning on the dbug and using `--nocapture`. + { + eprintln!("-- disconnected ---"); + eprintln!("FOR poll: output poll = IF.Err SO IF.Err"); + let (mut w, r) = mpsc::unbounded::(); + drop(r); + let ret = w.prepare_send_from(async { Ok::<_, Infallible>(12) }).await; + assert!(ret.map(|_| ()).unwrap_err().is_disconnected()); + } + + { + eprintln!("-- buffered late disconnect --"); + eprintln!("FOR poll: output poll = IF.Ok calling generator"); + eprintln!("FOR poll: output flush = IF.Err SO IF.Err"); + let (w, r) = mpsc::unbounded::(); + let mut w = w.buffer(10); + let mut r = Some(r); + w.feed(66).await.unwrap(); + let ret = w + .prepare_send_from(poll_fn(move |_cx| { + drop(r.take()); + Poll::Pending:: + })) + .await; + assert!(ret.map(|_| ()).unwrap_err().is_disconnected()); + } + + { + eprintln!("-- flushing before wait --"); + eprintln!("FOR poll: output flush = IF.Ok SO Pending"); + let (mut w, _r) = mpsc::unbounded::(); + let () = select_biased! { + _ = w.prepare_send_from(poll_fn( + move |_cx| { + Poll::Pending:: + } + )) => panic!(), + _ = futures::future::ready(()) => { }, + }; + } + + { + eprintln!("-- flush before wait is pending --"); + eprintln!("FOR poll: output flush = Pending SO Pending"); + let (mut w, _r) = mpsc::channel::(0); + let () = w.feed(77).await.unwrap(); + let mut w = w.buffer(10); + let () = select_biased! { + _ = w.prepare_send_from(poll_fn( + move |_cx| { + Poll::Pending:: + } + )) => panic!(), + _ = futures::future::ready(()) => { }, + }; + } + + { + eprintln!("-- flush before wait is pending --"); + eprintln!("FOR poll: generator = Ready SO IF.Ok"); + eprintln!("FOR send ..."); + eprintln!("ALSO check that bufferinrg works as expected"); + + let sunk = Arc::new(Mutex::new(vec![])); + let unfold = futures::sink::unfold((), |(), v| { + let sunk = sunk.clone(); + async move { + dbg!(); + sunk.lock().unwrap().push(v); + Ok::<_, Infallible>(()) + } + }); + let mut unfold = Box::pin(unfold.buffer(10)); + for v in [42, 43] { + // We can only do two here because that's how many we can actually buffer in Buffer + // and Unfold. Because our closure is always ready, the buffering isn't actually + // as copious as all that. This is fine, because the point of this test is to test + // *flushing*. + dbg!(v); + let ret = unfold + .prepare_send_from(async move { Ok::<_, Infallible>(v) }) + .await; + let (msg, sendable) = ret.unwrap(); + let msg = msg.unwrap(); + assert_eq!(msg, v); + let () = sendable.send(msg).unwrap(); + assert_eq!(*sunk.lock().unwrap(), &[]); // It's still buffered + } + select_biased! { + _ = unfold.prepare_send_from(futures::future::pending::<()>()) => panic!(), + _ = futures::future::ready(()) => { }, + }; + assert_eq!(*sunk.lock().unwrap(), &[42, 43]); + } + } +}