Add a test for running and shutting down reactors.

This took a long time since I needed to learn about futures and
executors, but I think it'll work out okay.
This commit is contained in:
Nick Mathewson 2020-10-25 14:12:51 -04:00
parent e20bfaab0f
commit 0e91d97f76
1 changed files with 80 additions and 1 deletions

View File

@ -48,6 +48,7 @@ pub(super) type CtrlResult = std::result::Result<CtrlMsg, oneshot::Canceled>;
type OneshotStream = stream::SelectAll<stream::Once<oneshot::Receiver<CtrlMsg>>>;
/// Error return value from run_once: indicates an error or a shutdown.
#[derive(Debug)]
enum ReactorError {
/// The reactor should shut down with an abnormal exit condition.
Err(Error),
@ -135,17 +136,21 @@ where
}
debug!("{}: Running reactor", self.logid);
let result: Result<()> = loop {
dbg!("Loop");
match self.run_once().await {
Ok(()) => (),
Err(ReactorError::Shutdown) => break Ok(()),
Err(ReactorError::Err(e)) => break Err(e),
}
};
dbg!("Out");
debug!("{}: Reactor stopped: {:?}", self.logid, result);
if let Some(chan) = self.channel.upgrade() {
let mut chan = chan.lock().await;
chan.closed = true;
}
dbg!("None.");
dbg!(&result);
result
}
@ -364,4 +369,78 @@ where
}
#[cfg(test)]
mod test {}
mod test {
use super::*;
use futures::executor::LocalPool;
use futures::sink::SinkExt;
use futures_await_test::async_test;
type CodecResult = std::result::Result<ChanCell, tor_cell::Error>;
fn new_reactor() -> (
crate::channel::Channel,
Reactor<mpsc::Receiver<CodecResult>>,
mpsc::Receiver<ChanCell>,
mpsc::Sender<CodecResult>,
) {
let link_protocol = 4;
let (send1, recv1) = mpsc::channel(32);
let (send2, recv2) = mpsc::channel(32);
let logid = LogId::new();
let ed_id = [0x1; 32].into();
let rsa_id = [0x2; 20].into();
let send1 = send1.sink_map_err(|_| tor_cell::Error::ChanProto("dummy message".into()));
let (chan, reactor) = crate::channel::Channel::new(
link_protocol,
Box::new(send1),
recv2,
logid,
ed_id,
rsa_id,
);
(chan, reactor, recv1, send2)
}
// Try shutdown from inside run_once..
#[async_test]
async fn shutdown() {
let (chan, mut reactor, _output, _input) = new_reactor();
chan.terminate().await;
let r = reactor.run_once().await;
assert!(matches!(r, Err(ReactorError::Shutdown)));
// This "run" won't even start.
let r = reactor.run().await;
assert!(matches!(r, Err(Error::ChannelClosed)));
}
// Try shutdown while reactor is running.
#[test]
fn shutdown2() {
// TODO: Ask a rust person if this is how to do this.
use futures::future::FutureExt;
use futures::task::LocalSpawnExt;
let mut pool = LocalPool::new();
let spawner = pool.spawner();
let (chan, reactor, _output, _input) = new_reactor();
// Let's get the reactor running...
let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
let run_handle = spawner
.spawn_local_with_handle(run_reactor.clone())
.unwrap();
run_handle.forget();
pool.run_until_stalled();
// Now let's see. The reactor should _still_ be running.
assert!(run_reactor.peek().is_none());
// Now let's try shutting down.
spawner.spawn_local(chan.terminate()).unwrap();
pool.run_until_stalled();
// Now let's see. The reactor should not _still_ be running.
assert_eq!(run_reactor.peek(), Some(&true));
}
}