prepare_send_from: Add tests
When I added these tests, they didn't find any bugs in my own implementation, but I did find a bug in futures::future::unfold. See the in-code comment.
This commit is contained in:
parent
793782acc8
commit
426ff28b73
|
@ -290,3 +290,125 @@ where
|
|||
r
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unwrap_used)] // why is this not the default in tests
|
||||
mod test {
|
||||
use super::*;
|
||||
use futures::channel::mpsc;
|
||||
use futures::future::poll_fn;
|
||||
use futures::select_biased;
|
||||
use futures::SinkExt as _;
|
||||
use futures_await_test::async_test;
|
||||
use std::convert::Infallible;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
struct TestError(char);
|
||||
|
||||
#[async_test]
|
||||
async fn prepare_send() {
|
||||
// Early versions of this used unfold quite a lot more, but it is not really
|
||||
// convenient for testing. It buffers one item internally, and is also buggy:
|
||||
// https://github.com/rust-lang/futures-rs/issues/2600
|
||||
// So we use mpsc channels, which (perhaps with buffering) are quite controllable.
|
||||
|
||||
// The eprintln!("FOR ...") calls correspond go the dprintln1() calls in the impl,
|
||||
// and can check that each code path in the impementation is used,
|
||||
// by turning on the dbug and using `--nocapture`.
|
||||
{
|
||||
eprintln!("-- disconnected ---");
|
||||
eprintln!("FOR poll: output poll = IF.Err SO IF.Err");
|
||||
let (mut w, r) = mpsc::unbounded::<usize>();
|
||||
drop(r);
|
||||
let ret = w.prepare_send_from(async { Ok::<_, Infallible>(12) }).await;
|
||||
assert!(ret.map(|_| ()).unwrap_err().is_disconnected());
|
||||
}
|
||||
|
||||
{
|
||||
eprintln!("-- buffered late disconnect --");
|
||||
eprintln!("FOR poll: output poll = IF.Ok calling generator");
|
||||
eprintln!("FOR poll: output flush = IF.Err SO IF.Err");
|
||||
let (w, r) = mpsc::unbounded::<usize>();
|
||||
let mut w = w.buffer(10);
|
||||
let mut r = Some(r);
|
||||
w.feed(66).await.unwrap();
|
||||
let ret = w
|
||||
.prepare_send_from(poll_fn(move |_cx| {
|
||||
drop(r.take());
|
||||
Poll::Pending::<usize>
|
||||
}))
|
||||
.await;
|
||||
assert!(ret.map(|_| ()).unwrap_err().is_disconnected());
|
||||
}
|
||||
|
||||
{
|
||||
eprintln!("-- flushing before wait --");
|
||||
eprintln!("FOR poll: output flush = IF.Ok SO Pending");
|
||||
let (mut w, _r) = mpsc::unbounded::<usize>();
|
||||
let () = select_biased! {
|
||||
_ = w.prepare_send_from(poll_fn(
|
||||
move |_cx| {
|
||||
Poll::Pending::<usize>
|
||||
}
|
||||
)) => panic!(),
|
||||
_ = futures::future::ready(()) => { },
|
||||
};
|
||||
}
|
||||
|
||||
{
|
||||
eprintln!("-- flush before wait is pending --");
|
||||
eprintln!("FOR poll: output flush = Pending SO Pending");
|
||||
let (mut w, _r) = mpsc::channel::<usize>(0);
|
||||
let () = w.feed(77).await.unwrap();
|
||||
let mut w = w.buffer(10);
|
||||
let () = select_biased! {
|
||||
_ = w.prepare_send_from(poll_fn(
|
||||
move |_cx| {
|
||||
Poll::Pending::<usize>
|
||||
}
|
||||
)) => panic!(),
|
||||
_ = futures::future::ready(()) => { },
|
||||
};
|
||||
}
|
||||
|
||||
{
|
||||
eprintln!("-- flush before wait is pending --");
|
||||
eprintln!("FOR poll: generator = Ready SO IF.Ok");
|
||||
eprintln!("FOR send ...");
|
||||
eprintln!("ALSO check that bufferinrg works as expected");
|
||||
|
||||
let sunk = Arc::new(Mutex::new(vec![]));
|
||||
let unfold = futures::sink::unfold((), |(), v| {
|
||||
let sunk = sunk.clone();
|
||||
async move {
|
||||
dbg!();
|
||||
sunk.lock().unwrap().push(v);
|
||||
Ok::<_, Infallible>(())
|
||||
}
|
||||
});
|
||||
let mut unfold = Box::pin(unfold.buffer(10));
|
||||
for v in [42, 43] {
|
||||
// We can only do two here because that's how many we can actually buffer in Buffer
|
||||
// and Unfold. Because our closure is always ready, the buffering isn't actually
|
||||
// as copious as all that. This is fine, because the point of this test is to test
|
||||
// *flushing*.
|
||||
dbg!(v);
|
||||
let ret = unfold
|
||||
.prepare_send_from(async move { Ok::<_, Infallible>(v) })
|
||||
.await;
|
||||
let (msg, sendable) = ret.unwrap();
|
||||
let msg = msg.unwrap();
|
||||
assert_eq!(msg, v);
|
||||
let () = sendable.send(msg).unwrap();
|
||||
assert_eq!(*sunk.lock().unwrap(), &[]); // It's still buffered
|
||||
}
|
||||
select_biased! {
|
||||
_ = unfold.prepare_send_from(futures::future::pending::<()>()) => panic!(),
|
||||
_ = futures::future::ready(()) => { },
|
||||
};
|
||||
assert_eq!(*sunk.lock().unwrap(), &[42, 43]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue