diff --git a/Cargo.lock b/Cargo.lock index f7fd1467f..c898fa452 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2842,6 +2842,7 @@ dependencies = [ "tor-linkspec", "tor-llcrypto", "tor-protover", + "tor-rtcompat", "tracing", "typenum", "zeroize", diff --git a/crates/tor-proto/Cargo.toml b/crates/tor-proto/Cargo.toml index 3797f94a0..fc7914fae 100644 --- a/crates/tor-proto/Cargo.toml +++ b/crates/tor-proto/Cargo.toml @@ -50,6 +50,6 @@ tokio-util = { version = "0.6", features = ["compat"], optional = true } coarsetime = { version = "0.1.20", optional = true } [dev-dependencies] -tokio-crate = { package = "tokio", version = "1.7.0", features = ["macros", "rt", "time"] } +tor-rtcompat = { path="../tor-rtcompat", version = "0.0.1", features = ["tokio"] } hex-literal = "0.3.1" hex = "0.4.3" diff --git a/crates/tor-proto/src/channel.rs b/crates/tor-proto/src/channel.rs index 16e9b9a57..2a8057c64 100644 --- a/crates/tor-proto/src/channel.rs +++ b/crates/tor-proto/src/channel.rs @@ -397,8 +397,6 @@ pub(crate) mod test { use super::*; use crate::channel::codec::test::MsgBuf; pub(crate) use crate::channel::reactor::test::new_reactor; - use tokio_crate as tokio; - use tokio_crate::test as async_test; use tor_cell::chancell::{msg, ChanCell}; /// Make a new fake reactor-less channel. For testing only, obviously. @@ -414,31 +412,33 @@ pub(crate) mod test { } } - #[async_test] - async fn send_bad() { - let chan = fake_channel(); + #[test] + fn send_bad() { + tor_rtcompat::test_with_all_runtimes!(|_rt| async move { + let chan = fake_channel(); - let cell = ChanCell::new(7.into(), msg::Created2::new(&b"hihi"[..]).into()); - let e = chan.check_cell(&cell); - assert!(e.is_err()); - assert_eq!( - format!("{}", e.unwrap_err()), - "Internal programming error: Can't send CREATED2 cell on client channel" - ); - let cell = ChanCell::new(0.into(), msg::Certs::new_empty().into()); - let e = chan.check_cell(&cell); - assert!(e.is_err()); - assert_eq!( - format!("{}", e.unwrap_err()), - "Internal programming error: Can't send CERTS cell after handshake is done" - ); + let cell = ChanCell::new(7.into(), msg::Created2::new(&b"hihi"[..]).into()); + let e = chan.check_cell(&cell); + assert!(e.is_err()); + assert_eq!( + format!("{}", e.unwrap_err()), + "Internal programming error: Can't send CREATED2 cell on client channel" + ); + let cell = ChanCell::new(0.into(), msg::Certs::new_empty().into()); + let e = chan.check_cell(&cell); + assert!(e.is_err()); + assert_eq!( + format!("{}", e.unwrap_err()), + "Internal programming error: Can't send CERTS cell after handshake is done" + ); - let cell = ChanCell::new(5.into(), msg::Create2::new(2, &b"abc"[..]).into()); - let e = chan.check_cell(&cell); - assert!(e.is_ok()); - // FIXME(eta): more difficult to test that sending works now that it has to go via reactor - // let got = output.next().await.unwrap(); - // assert!(matches!(got.msg(), ChanMsg::Create2(_))); + let cell = ChanCell::new(5.into(), msg::Create2::new(2, &b"abc"[..]).into()); + let e = chan.check_cell(&cell); + assert!(e.is_ok()); + // FIXME(eta): more difficult to test that sending works now that it has to go via reactor + // let got = output.next().await.unwrap(); + // assert!(matches!(got.msg(), ChanMsg::Create2(_))); + }) } #[test] diff --git a/crates/tor-proto/src/channel/codec.rs b/crates/tor-proto/src/channel/codec.rs index 7908bc695..f81624c02 100644 --- a/crates/tor-proto/src/channel/codec.rs +++ b/crates/tor-proto/src/channel/codec.rs @@ -47,8 +47,6 @@ pub(crate) mod test { use futures::task::{Context, Poll}; use hex_literal::hex; use std::pin::Pin; - use tokio::test as async_test; - use tokio_crate as tokio; use super::{futures_codec, ChannelCodec}; use tor_cell::chancell::{msg, ChanCell, ChanCmd, CircId}; @@ -111,48 +109,53 @@ pub(crate) mod test { futures_codec::Framed::new(mbuf, ChannelCodec::new(4)) } - #[async_test] - async fn check_encoding() -> std::result::Result<(), tor_cell::Error> { - let mb = MsgBuf::new(&b""[..]); - let mut framed = frame_buf(mb); + #[test] + fn check_encoding() { + tor_rtcompat::test_with_all_runtimes!(|_rt| async move { + let mb = MsgBuf::new(&b""[..]); + let mut framed = frame_buf(mb); - let destroycell = msg::Destroy::new(2.into()); - framed - .send(ChanCell::new(7.into(), destroycell.into())) - .await?; + let destroycell = msg::Destroy::new(2.into()); + framed + .send(ChanCell::new(7.into(), destroycell.into())) + .await + .unwrap(); - let nocerts = msg::Certs::new_empty(); - framed.send(ChanCell::new(0.into(), nocerts.into())).await?; + let nocerts = msg::Certs::new_empty(); + framed + .send(ChanCell::new(0.into(), nocerts.into())) + .await + .unwrap(); - framed.flush().await?; + framed.flush().await.unwrap(); - let data = framed.into_inner().into_response(); + let data = framed.into_inner().into_response(); - assert_eq!(&data[0..10], &hex!("00000007 04 0200000000")[..]); + assert_eq!(&data[0..10], &hex!("00000007 04 0200000000")[..]); - assert_eq!(&data[514..], &hex!("00000000 81 0001 00")[..]); - Ok(()) + assert_eq!(&data[514..], &hex!("00000000 81 0001 00")[..]); + }); } - #[async_test] - async fn check_decoding() -> std::result::Result<(), tor_cell::Error> { - let mut dat = Vec::new(); - dat.extend_from_slice(&hex!("00000007 04 0200000000")[..]); - dat.resize(514, 0); - dat.extend_from_slice(&hex!("00000000 81 0001 00")[..]); - let mb = MsgBuf::new(&dat[..]); - let mut framed = frame_buf(mb); + #[test] + fn check_decoding() { + tor_rtcompat::test_with_all_runtimes!(|_rt| async move { + let mut dat = Vec::new(); + dat.extend_from_slice(&hex!("00000007 04 0200000000")[..]); + dat.resize(514, 0); + dat.extend_from_slice(&hex!("00000000 81 0001 00")[..]); + let mb = MsgBuf::new(&dat[..]); + let mut framed = frame_buf(mb); - let destroy = framed.next().await.unwrap()?; - let nocerts = framed.next().await.unwrap()?; + let destroy = framed.next().await.unwrap().unwrap(); + let nocerts = framed.next().await.unwrap().unwrap(); - assert_eq!(destroy.circid(), CircId::from(7)); - assert_eq!(destroy.msg().cmd(), ChanCmd::DESTROY); - assert_eq!(nocerts.circid(), CircId::from(0)); - assert_eq!(nocerts.msg().cmd(), ChanCmd::CERTS); + assert_eq!(destroy.circid(), CircId::from(7)); + assert_eq!(destroy.msg().cmd(), ChanCmd::DESTROY); + assert_eq!(nocerts.circid(), CircId::from(0)); + assert_eq!(nocerts.msg().cmd(), ChanCmd::CERTS); - assert!(framed.into_inner().all_consumed()); - - Ok(()) + assert!(framed.into_inner().all_consumed()); + }); } } diff --git a/crates/tor-proto/src/channel/handshake.rs b/crates/tor-proto/src/channel/handshake.rs index 956e1f56a..b9ccb3102 100644 --- a/crates/tor-proto/src/channel/handshake.rs +++ b/crates/tor-proto/src/channel/handshake.rs @@ -421,8 +421,6 @@ pub(super) mod test { #![allow(clippy::unwrap_used)] use hex_literal::hex; use std::time::{Duration, SystemTime}; - use tokio::test as async_test; - use tokio_crate as tokio; use super::*; use crate::channel::codec::test::MsgBuf; @@ -456,34 +454,36 @@ pub(super) mod test { add_padded(buf, NETINFO_PREFIX); } - #[async_test] - async fn connect_ok() -> Result<()> { - let mut buf = Vec::new(); - // versions cell - buf.extend_from_slice(VERSIONS); - // certs cell -- no certs in it, but this function doesn't care. - buf.extend_from_slice(NOCERTS); - // netinfo cell -- quite minimal. - add_netinfo(&mut buf); - let mb = MsgBuf::new(&buf[..]); - let handshake = OutboundClientHandshake::new(mb, None); - let unverified = handshake.connect().await?; + #[test] + fn connect_ok() -> Result<()> { + tor_rtcompat::test_with_one_runtime!(|_rt| async move { + let mut buf = Vec::new(); + // versions cell + buf.extend_from_slice(VERSIONS); + // certs cell -- no certs in it, but this function doesn't care. + buf.extend_from_slice(NOCERTS); + // netinfo cell -- quite minimal. + add_netinfo(&mut buf); + let mb = MsgBuf::new(&buf[..]); + let handshake = OutboundClientHandshake::new(mb, None); + let unverified = handshake.connect().await?; - assert_eq!(unverified.link_protocol, 4); + assert_eq!(unverified.link_protocol, 4); - // Try again with an authchallenge cell and some padding. - let mut buf = Vec::new(); - buf.extend_from_slice(VERSIONS); - buf.extend_from_slice(NOCERTS); - buf.extend_from_slice(VPADDING); - buf.extend_from_slice(AUTHCHALLENGE); - buf.extend_from_slice(VPADDING); - add_netinfo(&mut buf); - let mb = MsgBuf::new(&buf[..]); - let handshake = OutboundClientHandshake::new(mb, None); - let _unverified = handshake.connect().await?; + // Try again with an authchallenge cell and some padding. + let mut buf = Vec::new(); + buf.extend_from_slice(VERSIONS); + buf.extend_from_slice(NOCERTS); + buf.extend_from_slice(VPADDING); + buf.extend_from_slice(AUTHCHALLENGE); + buf.extend_from_slice(VPADDING); + add_netinfo(&mut buf); + let mb = MsgBuf::new(&buf[..]); + let handshake = OutboundClientHandshake::new(mb, None); + let _unverified = handshake.connect().await?; - Ok(()) + Ok(()) + }) } async fn connect_err>>(input: T) -> Error { @@ -492,89 +492,99 @@ pub(super) mod test { handshake.connect().await.err().unwrap() } - #[async_test] - async fn connect_badver() { - let err = connect_err(&b"HTTP://"[..]).await; - assert!(matches!(err, Error::ChanProto(_))); - assert_eq!( - format!("{}", err), - "channel protocol violation: Doesn't seem to be a tor relay" - ); + #[test] + fn connect_badver() { + tor_rtcompat::test_with_one_runtime!(|_rt| async move { + let err = connect_err(&b"HTTP://"[..]).await; + assert!(matches!(err, Error::ChanProto(_))); + assert_eq!( + format!("{}", err), + "channel protocol violation: Doesn't seem to be a tor relay" + ); - let err = connect_err(&hex!("0000 07 0004 1234 ffff")[..]).await; - assert!(matches!(err, Error::ChanProto(_))); - assert_eq!( - format!("{}", err), - "channel protocol violation: No shared link protocols" - ); + let err = connect_err(&hex!("0000 07 0004 1234 ffff")[..]).await; + assert!(matches!(err, Error::ChanProto(_))); + assert_eq!( + format!("{}", err), + "channel protocol violation: No shared link protocols" + ); + }) } - #[async_test] - async fn connect_cellparse() { - let mut buf = Vec::new(); - buf.extend_from_slice(VERSIONS); - // Here's a certs cell that will fail. - buf.extend_from_slice(&hex!("00000000 81 0001 01")[..]); - let err = connect_err(buf).await; - assert!(matches!( - err, - Error::CellErr(tor_cell::Error::BytesErr(tor_bytes::Error::Truncated)) - )); + #[test] + fn connect_cellparse() { + tor_rtcompat::test_with_one_runtime!(|_rt| async move { + let mut buf = Vec::new(); + buf.extend_from_slice(VERSIONS); + // Here's a certs cell that will fail. + buf.extend_from_slice(&hex!("00000000 81 0001 01")[..]); + let err = connect_err(buf).await; + assert!(matches!( + err, + Error::CellErr(tor_cell::Error::BytesErr(tor_bytes::Error::Truncated)) + )); + }) } - #[async_test] - async fn connect_duplicates() { - let mut buf = Vec::new(); - buf.extend_from_slice(VERSIONS); - buf.extend_from_slice(NOCERTS); - buf.extend_from_slice(NOCERTS); - add_netinfo(&mut buf); - let err = connect_err(buf).await; - assert!(matches!(err, Error::ChanProto(_))); - assert_eq!( - format!("{}", err), - "channel protocol violation: Duplicate certs cell" - ); + #[test] + fn connect_duplicates() { + tor_rtcompat::test_with_one_runtime!(|_rt| async move { + let mut buf = Vec::new(); + buf.extend_from_slice(VERSIONS); + buf.extend_from_slice(NOCERTS); + buf.extend_from_slice(NOCERTS); + add_netinfo(&mut buf); + let err = connect_err(buf).await; + assert!(matches!(err, Error::ChanProto(_))); + assert_eq!( + format!("{}", err), + "channel protocol violation: Duplicate certs cell" + ); - let mut buf = Vec::new(); - buf.extend_from_slice(VERSIONS); - buf.extend_from_slice(NOCERTS); - buf.extend_from_slice(AUTHCHALLENGE); - buf.extend_from_slice(AUTHCHALLENGE); - add_netinfo(&mut buf); - let err = connect_err(buf).await; - assert!(matches!(err, Error::ChanProto(_))); - assert_eq!( - format!("{}", err), - "channel protocol violation: Duplicate authchallenge cell" - ); + let mut buf = Vec::new(); + buf.extend_from_slice(VERSIONS); + buf.extend_from_slice(NOCERTS); + buf.extend_from_slice(AUTHCHALLENGE); + buf.extend_from_slice(AUTHCHALLENGE); + add_netinfo(&mut buf); + let err = connect_err(buf).await; + assert!(matches!(err, Error::ChanProto(_))); + assert_eq!( + format!("{}", err), + "channel protocol violation: Duplicate authchallenge cell" + ); + }) } - #[async_test] - async fn connect_missing_certs() { - let mut buf = Vec::new(); - buf.extend_from_slice(VERSIONS); - add_netinfo(&mut buf); - let err = connect_err(buf).await; - assert!(matches!(err, Error::ChanProto(_))); - assert_eq!( - format!("{}", err), - "channel protocol violation: Missing certs cell" - ); + #[test] + fn connect_missing_certs() { + tor_rtcompat::test_with_one_runtime!(|_rt| async move { + let mut buf = Vec::new(); + buf.extend_from_slice(VERSIONS); + add_netinfo(&mut buf); + let err = connect_err(buf).await; + assert!(matches!(err, Error::ChanProto(_))); + assert_eq!( + format!("{}", err), + "channel protocol violation: Missing certs cell" + ); + }) } - #[async_test] - async fn connect_misplaced_cell() { - let mut buf = Vec::new(); - buf.extend_from_slice(VERSIONS); - // here's a create cell. - add_padded(&mut buf, &hex!("00000001 01")[..]); - let err = connect_err(buf).await; - assert!(matches!(err, Error::ChanProto(_))); - assert_eq!( - format!("{}", err), - "channel protocol violation: Unexpected cell type CREATE" - ); + #[test] + fn connect_misplaced_cell() { + tor_rtcompat::test_with_one_runtime!(|_rt| async move { + let mut buf = Vec::new(); + buf.extend_from_slice(VERSIONS); + // here's a create cell. + add_padded(&mut buf, &hex!("00000001 01")[..]); + let err = connect_err(buf).await; + assert!(matches!(err, Error::ChanProto(_))); + assert_eq!( + format!("{}", err), + "channel protocol violation: Unexpected cell type CREATE" + ); + }) } fn make_unverified(certs: msg::Certs) -> UnverifiedChannel { @@ -823,22 +833,24 @@ pub(super) mod test { pub(crate) const PEER_RSA: &[u8] = &hex!("2f1fb49bb332a9eec617e41e911c33fb3890aef3"); } - #[async_test] - async fn test_finish() { - let ed25519_id = [3_u8; 32].into(); - let rsa_id = [4_u8; 20].into(); - let peer_addr = "127.1.1.2:443".parse().unwrap(); - let ver = VerifiedChannel { - link_protocol: 4, - tls: futures_codec::Framed::new(MsgBuf::new(&b""[..]), ChannelCodec::new(4)), - unique_id: UniqId::new(), - target_addr: Some(peer_addr), - ed25519_id, - rsa_id, - }; + #[test] + fn test_finish() { + tor_rtcompat::test_with_one_runtime!(|_rt| async move { + let ed25519_id = [3_u8; 32].into(); + let rsa_id = [4_u8; 20].into(); + let peer_addr = "127.1.1.2:443".parse().unwrap(); + let ver = VerifiedChannel { + link_protocol: 4, + tls: futures_codec::Framed::new(MsgBuf::new(&b""[..]), ChannelCodec::new(4)), + unique_id: UniqId::new(), + target_addr: Some(peer_addr), + ed25519_id, + rsa_id, + }; - let (_chan, _reactor) = ver.finish().await.unwrap(); + let (_chan, _reactor) = ver.finish().await.unwrap(); - // TODO: check contents of netinfo cell + // TODO: check contents of netinfo cell + }) } } diff --git a/crates/tor-proto/src/channel/reactor.rs b/crates/tor-proto/src/channel/reactor.rs index 07b2b9f87..52b84580a 100644 --- a/crates/tor-proto/src/channel/reactor.rs +++ b/crates/tor-proto/src/channel/reactor.rs @@ -393,13 +393,10 @@ impl Reactor { pub(crate) mod test { #![allow(clippy::unwrap_used)] use super::*; + use crate::circuit::CircParameters; use futures::sink::SinkExt; use futures::stream::StreamExt; - use tokio::test as async_test; - use tokio_crate as tokio; - use tokio_crate::runtime::Handle; - - use crate::circuit::CircParameters; + use futures::task::SpawnExt; type CodecResult = std::result::Result; @@ -431,318 +428,339 @@ pub(crate) mod test { } // Try shutdown from inside run_once.. - #[async_test] - async fn shutdown() { - let (chan, mut reactor, _output, _input) = new_reactor(); + #[test] + fn shutdown() { + tor_rtcompat::test_with_all_runtimes!(|_rt| async move { + let (chan, mut reactor, _output, _input) = new_reactor(); - chan.terminate(); - let r = reactor.run_once().await; - assert!(matches!(r, Err(ReactorError::Shutdown))); + chan.terminate(); + let r = reactor.run_once().await; + assert!(matches!(r, Err(ReactorError::Shutdown))); + }) } // Try shutdown while reactor is running. - #[async_test] - async fn shutdown2() { - // TODO: Ask a rust person if this is how to do this. - use futures::future::FutureExt; - use futures::join; + #[test] + fn shutdown2() { + tor_rtcompat::test_with_all_runtimes!(|_rt| async move { + // TODO: Ask a rust person if this is how to do this. - let (chan, reactor, _output, _input) = new_reactor(); - // Let's get the reactor running... - let run_reactor = reactor.run().map(|x| x.is_ok()).shared(); + use futures::future::FutureExt; + use futures::join; - let rr = run_reactor.clone(); + let (chan, reactor, _output, _input) = new_reactor(); + // Let's get the reactor running... + let run_reactor = reactor.run().map(|x| x.is_ok()).shared(); - let exit_then_check = async { - assert!(rr.peek().is_none()); - // ... and terminate the channel while that's happening. - chan.terminate(); - }; + let rr = run_reactor.clone(); - let (rr_s, _) = join!(run_reactor, exit_then_check); + let exit_then_check = async { + assert!(rr.peek().is_none()); + // ... and terminate the channel while that's happening. + chan.terminate(); + }; - // Now let's see. The reactor should not _still_ be running. - assert!(rr_s); + let (rr_s, _) = join!(run_reactor, exit_then_check); + + // Now let's see. The reactor should not _still_ be running. + assert!(rr_s); + }) } - #[async_test] - async fn new_circ_closed() { - let (chan, mut reactor, mut output, _input) = new_reactor(); + #[test] + fn new_circ_closed() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + let (chan, mut reactor, mut output, _input) = new_reactor(); - let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once()); - let (pending, circr) = ret.unwrap(); - Handle::current().spawn(circr.run()); - assert!(reac.is_ok()); + let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once()); + let (pending, circr) = ret.unwrap(); + rt.spawn(async { + let _ignore = circr.run().await; + }) + .unwrap(); + assert!(reac.is_ok()); - let id = pending.peek_circid(); + let id = pending.peek_circid(); - let ent = reactor.circs.get_mut(id); - assert!(matches!(ent, Some(CircEnt::Opening(_, _)))); - // Now drop the circuit; this should tell the reactor to remove - // the circuit from the map. - drop(pending); + let ent = reactor.circs.get_mut(id); + assert!(matches!(ent, Some(CircEnt::Opening(_, _)))); + // Now drop the circuit; this should tell the reactor to remove + // the circuit from the map. + drop(pending); - reactor.run_once().await.unwrap(); - let ent = reactor.circs.get_mut(id); - assert!(matches!(ent, Some(CircEnt::DestroySent(_)))); - let cell = output.next().await.unwrap(); - assert_eq!(cell.circid(), id); - assert!(matches!(cell.msg(), ChanMsg::Destroy(_))); + reactor.run_once().await.unwrap(); + let ent = reactor.circs.get_mut(id); + assert!(matches!(ent, Some(CircEnt::DestroySent(_)))); + let cell = output.next().await.unwrap(); + assert_eq!(cell.circid(), id); + assert!(matches!(cell.msg(), ChanMsg::Destroy(_))); + }) } // Test proper delivery of a created cell that doesn't make a channel - #[async_test] - async fn new_circ_create_failure() { - use tor_cell::chancell::msg; - let (chan, mut reactor, mut output, mut input) = new_reactor(); + #[test] + fn new_circ_create_failure() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + use tor_cell::chancell::msg; + let (chan, mut reactor, mut output, mut input) = new_reactor(); - let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once()); - let (pending, circr) = ret.unwrap(); - Handle::current().spawn(circr.run()); - assert!(reac.is_ok()); + let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once()); + let (pending, circr) = ret.unwrap(); + rt.spawn(async { + let _ignore = circr.run().await; + }) + .unwrap(); + assert!(reac.is_ok()); - let circparams = CircParameters::default(); + let circparams = CircParameters::default(); - let id = pending.peek_circid(); + let id = pending.peek_circid(); - let ent = reactor.circs.get_mut(id); - assert!(matches!(ent, Some(CircEnt::Opening(_, _)))); - // We'll get a bad handshake result from this createdfast cell. - let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into()); - input.send(Ok(created_cell)).await.unwrap(); + let ent = reactor.circs.get_mut(id); + assert!(matches!(ent, Some(CircEnt::Opening(_, _)))); + // We'll get a bad handshake result from this createdfast cell. + let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into()); + input.send(Ok(created_cell)).await.unwrap(); - let (circ, reac) = - futures::join!(pending.create_firsthop_fast(circparams), reactor.run_once()); - // Make sure statuses are as expected. - assert!(matches!(circ.err().unwrap(), Error::BadHandshake)); - assert!(reac.is_ok()); + let (circ, reac) = + futures::join!(pending.create_firsthop_fast(circparams), reactor.run_once()); + // Make sure statuses are as expected. + assert!(matches!(circ.err().unwrap(), Error::BadHandshake)); + assert!(reac.is_ok()); - reactor.run_once().await.unwrap(); + reactor.run_once().await.unwrap(); - // Make sure that the createfast cell got sent - let cell_sent = output.next().await.unwrap(); - assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_))); + // Make sure that the createfast cell got sent + let cell_sent = output.next().await.unwrap(); + assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_))); - // The circid now counts as open, since as far as the reactor knows, - // it was accepted. (TODO: is this a bug?) - let ent = reactor.circs.get_mut(id); - assert!(matches!(ent, Some(CircEnt::Open(_)))); + // The circid now counts as open, since as far as the reactor knows, + // it was accepted. (TODO: is this a bug?) + let ent = reactor.circs.get_mut(id); + assert!(matches!(ent, Some(CircEnt::Open(_)))); - // But the next run if the reactor will make the circuit get closed. - reactor.run_once().await.unwrap(); - let ent = reactor.circs.get_mut(id); - assert!(matches!(ent, Some(CircEnt::DestroySent(_)))); + // But the next run if the reactor will make the circuit get closed. + reactor.run_once().await.unwrap(); + let ent = reactor.circs.get_mut(id); + assert!(matches!(ent, Some(CircEnt::DestroySent(_)))); + }) } // Try incoming cells that shouldn't arrive on channels. - #[async_test] - async fn bad_cells() { - use tor_cell::chancell::msg; - let (_chan, mut reactor, _output, mut input) = new_reactor(); + #[test] + fn bad_cells() { + tor_rtcompat::test_with_all_runtimes!(|_rt| async move { + use tor_cell::chancell::msg; + let (_chan, mut reactor, _output, mut input) = new_reactor(); - // We shouldn't get create cells, ever. - let create_cell = msg::Create2::new(4, *b"hihi").into(); - input - .send(Ok(ChanCell::new(9.into(), create_cell))) - .await - .unwrap(); + // We shouldn't get create cells, ever. + let create_cell = msg::Create2::new(4, *b"hihi").into(); + input + .send(Ok(ChanCell::new(9.into(), create_cell))) + .await + .unwrap(); - // shouldn't get created2 cells for nonexistent circuits - let created2_cell = msg::Created2::new(*b"hihi").into(); - input - .send(Ok(ChanCell::new(7.into(), created2_cell))) - .await - .unwrap(); + // shouldn't get created2 cells for nonexistent circuits + let created2_cell = msg::Created2::new(*b"hihi").into(); + input + .send(Ok(ChanCell::new(7.into(), created2_cell))) + .await + .unwrap(); - let e = reactor.run_once().await.unwrap_err().unwrap_err(); - assert_eq!( - format!("{}", e), - "channel protocol violation: CREATE2 cell on client channel" - ); + let e = reactor.run_once().await.unwrap_err().unwrap_err(); + assert_eq!( + format!("{}", e), + "channel protocol violation: CREATE2 cell on client channel" + ); - let e = reactor.run_once().await.unwrap_err().unwrap_err(); - assert_eq!( - format!("{}", e), - "channel protocol violation: Unexpected CREATED* cell not on opening circuit" - ); + let e = reactor.run_once().await.unwrap_err().unwrap_err(); + assert_eq!( + format!("{}", e), + "channel protocol violation: Unexpected CREATED* cell not on opening circuit" + ); - // Can't get a relay cell on a circuit we've never heard of. - let relay_cell = msg::Relay::new(b"abc").into(); - input - .send(Ok(ChanCell::new(4.into(), relay_cell))) - .await - .unwrap(); - let e = reactor.run_once().await.unwrap_err().unwrap_err(); - assert_eq!( - format!("{}", e), - "channel protocol violation: Relay cell on nonexistent circuit" - ); + // Can't get a relay cell on a circuit we've never heard of. + let relay_cell = msg::Relay::new(b"abc").into(); + input + .send(Ok(ChanCell::new(4.into(), relay_cell))) + .await + .unwrap(); + let e = reactor.run_once().await.unwrap_err().unwrap_err(); + assert_eq!( + format!("{}", e), + "channel protocol violation: Relay cell on nonexistent circuit" + ); - // Can't get handshaking cells while channel is open. - let versions_cell = msg::Versions::new([3]).unwrap().into(); - input - .send(Ok(ChanCell::new(0.into(), versions_cell))) - .await - .unwrap(); - let e = reactor.run_once().await.unwrap_err().unwrap_err(); - assert_eq!( - format!("{}", e), - "channel protocol violation: VERSIONS cell after handshake is done" - ); + // Can't get handshaking cells while channel is open. + let versions_cell = msg::Versions::new([3]).unwrap().into(); + input + .send(Ok(ChanCell::new(0.into(), versions_cell))) + .await + .unwrap(); + let e = reactor.run_once().await.unwrap_err().unwrap_err(); + assert_eq!( + format!("{}", e), + "channel protocol violation: VERSIONS cell after handshake is done" + ); - // We don't accept CREATED. - let created_cell = msg::Created::new(&b"xyzzy"[..]).into(); - input - .send(Ok(ChanCell::new(25.into(), created_cell))) - .await - .unwrap(); - let e = reactor.run_once().await.unwrap_err().unwrap_err(); - assert_eq!( - format!("{}", e), - "channel protocol violation: CREATED cell received, but we never send CREATEs" - ); + // We don't accept CREATED. + let created_cell = msg::Created::new(&b"xyzzy"[..]).into(); + input + .send(Ok(ChanCell::new(25.into(), created_cell))) + .await + .unwrap(); + let e = reactor.run_once().await.unwrap_err().unwrap_err(); + assert_eq!( + format!("{}", e), + "channel protocol violation: CREATED cell received, but we never send CREATEs" + ); + }) } - #[async_test] - async fn deliver_relay() { - use crate::circuit::celltypes::ClientCircChanMsg; - use futures::channel::oneshot; - use tor_cell::chancell::msg; + #[test] + fn deliver_relay() { + tor_rtcompat::test_with_all_runtimes!(|_rt| async move { + use crate::circuit::celltypes::ClientCircChanMsg; + use futures::channel::oneshot; + use tor_cell::chancell::msg; - let (_chan, mut reactor, _output, mut input) = new_reactor(); + let (_chan, mut reactor, _output, mut input) = new_reactor(); - let (_circ_stream_7, mut circ_stream_13) = { - let (snd1, _rcv1) = oneshot::channel(); - let (snd2, rcv2) = mpsc::channel(64); - reactor - .circs - .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2)); + let (_circ_stream_7, mut circ_stream_13) = { + let (snd1, _rcv1) = oneshot::channel(); + let (snd2, rcv2) = mpsc::channel(64); + reactor + .circs + .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2)); - let (snd3, rcv3) = mpsc::channel(64); - reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3)); + let (snd3, rcv3) = mpsc::channel(64); + reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3)); - reactor - .circs - .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25))); - (rcv2, rcv3) - }; + reactor + .circs + .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25))); + (rcv2, rcv3) + }; - // If a relay cell is sent on an open channel, the correct circuit - // should get it. - let relaycell: ChanMsg = msg::Relay::new(b"do you suppose").into(); - input - .send(Ok(ChanCell::new(13.into(), relaycell.clone()))) - .await - .unwrap(); - reactor.run_once().await.unwrap(); - let got = circ_stream_13.next().await.unwrap(); - assert!(matches!(got, ClientCircChanMsg::Relay(_))); + // If a relay cell is sent on an open channel, the correct circuit + // should get it. + let relaycell: ChanMsg = msg::Relay::new(b"do you suppose").into(); + input + .send(Ok(ChanCell::new(13.into(), relaycell.clone()))) + .await + .unwrap(); + reactor.run_once().await.unwrap(); + let got = circ_stream_13.next().await.unwrap(); + assert!(matches!(got, ClientCircChanMsg::Relay(_))); - // If a relay cell is sent on an opening channel, that's an error. - input - .send(Ok(ChanCell::new(7.into(), relaycell.clone()))) - .await - .unwrap(); - let e = reactor.run_once().await.unwrap_err().unwrap_err(); - assert_eq!( + // If a relay cell is sent on an opening channel, that's an error. + input + .send(Ok(ChanCell::new(7.into(), relaycell.clone()))) + .await + .unwrap(); + let e = reactor.run_once().await.unwrap_err().unwrap_err(); + assert_eq!( format!("{}", e), "channel protocol violation: Relay cell on pending circuit before CREATED* received" ); - // If a relay cell is sent on a non-existent channel, that's an error. - input - .send(Ok(ChanCell::new(101.into(), relaycell.clone()))) - .await - .unwrap(); - let e = reactor.run_once().await.unwrap_err().unwrap_err(); - assert_eq!( - format!("{}", e), - "channel protocol violation: Relay cell on nonexistent circuit" - ); + // If a relay cell is sent on a non-existent channel, that's an error. + input + .send(Ok(ChanCell::new(101.into(), relaycell.clone()))) + .await + .unwrap(); + let e = reactor.run_once().await.unwrap_err().unwrap_err(); + assert_eq!( + format!("{}", e), + "channel protocol violation: Relay cell on nonexistent circuit" + ); - // It's fine to get a relay cell on a DestroySent channel: that happens - // when the other side hasn't noticed the Destroy yet. + // It's fine to get a relay cell on a DestroySent channel: that happens + // when the other side hasn't noticed the Destroy yet. - // We can do this 25 more times according to our setup: - for _ in 0..25 { + // We can do this 25 more times according to our setup: + for _ in 0..25 { + input + .send(Ok(ChanCell::new(23.into(), relaycell.clone()))) + .await + .unwrap(); + reactor.run_once().await.unwrap(); // should be fine. + } + + // This one will fail. input .send(Ok(ChanCell::new(23.into(), relaycell.clone()))) .await .unwrap(); - reactor.run_once().await.unwrap(); // should be fine. - } - - // This one will fail. - input - .send(Ok(ChanCell::new(23.into(), relaycell.clone()))) - .await - .unwrap(); - let e = reactor.run_once().await.unwrap_err().unwrap_err(); - assert_eq!( - format!("{}", e), - "channel protocol violation: Too many cells received on destroyed circuit" - ); + let e = reactor.run_once().await.unwrap_err().unwrap_err(); + assert_eq!( + format!("{}", e), + "channel protocol violation: Too many cells received on destroyed circuit" + ); + }) } - #[async_test] - async fn deliver_destroy() { - use crate::circuit::celltypes::*; - use futures::channel::oneshot; - use tor_cell::chancell::msg; + #[test] + fn deliver_destroy() { + tor_rtcompat::test_with_all_runtimes!(|_rt| async move { + use crate::circuit::celltypes::*; + use futures::channel::oneshot; + use tor_cell::chancell::msg; - let (_chan, mut reactor, _output, mut input) = new_reactor(); + let (_chan, mut reactor, _output, mut input) = new_reactor(); - let (circ_oneshot_7, mut circ_stream_13) = { - let (snd1, rcv1) = oneshot::channel(); - let (snd2, _rcv2) = mpsc::channel(64); - reactor - .circs - .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2)); + let (circ_oneshot_7, mut circ_stream_13) = { + let (snd1, rcv1) = oneshot::channel(); + let (snd2, _rcv2) = mpsc::channel(64); + reactor + .circs + .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2)); - let (snd3, rcv3) = mpsc::channel(64); - reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3)); + let (snd3, rcv3) = mpsc::channel(64); + reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3)); - reactor - .circs - .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25))); - (rcv1, rcv3) - }; + reactor + .circs + .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25))); + (rcv1, rcv3) + }; - // Destroying an opening circuit is fine. - let destroycell: ChanMsg = msg::Destroy::new(0.into()).into(); - input - .send(Ok(ChanCell::new(7.into(), destroycell.clone()))) - .await - .unwrap(); - reactor.run_once().await.unwrap(); - let msg = circ_oneshot_7.await; - assert!(matches!(msg, Ok(CreateResponse::Destroy(_)))); + // Destroying an opening circuit is fine. + let destroycell: ChanMsg = msg::Destroy::new(0.into()).into(); + input + .send(Ok(ChanCell::new(7.into(), destroycell.clone()))) + .await + .unwrap(); + reactor.run_once().await.unwrap(); + let msg = circ_oneshot_7.await; + assert!(matches!(msg, Ok(CreateResponse::Destroy(_)))); - // Destroying an open circuit is fine. - input - .send(Ok(ChanCell::new(13.into(), destroycell.clone()))) - .await - .unwrap(); - reactor.run_once().await.unwrap(); - let msg = circ_stream_13.next().await.unwrap(); - assert!(matches!(msg, ClientCircChanMsg::Destroy(_))); + // Destroying an open circuit is fine. + input + .send(Ok(ChanCell::new(13.into(), destroycell.clone()))) + .await + .unwrap(); + reactor.run_once().await.unwrap(); + let msg = circ_stream_13.next().await.unwrap(); + assert!(matches!(msg, ClientCircChanMsg::Destroy(_))); - // Destroying a DestroySent circuit is fine. - input - .send(Ok(ChanCell::new(23.into(), destroycell.clone()))) - .await - .unwrap(); - reactor.run_once().await.unwrap(); + // Destroying a DestroySent circuit is fine. + input + .send(Ok(ChanCell::new(23.into(), destroycell.clone()))) + .await + .unwrap(); + reactor.run_once().await.unwrap(); - // Destroying a nonexistent circuit is an error. - input - .send(Ok(ChanCell::new(101.into(), destroycell.clone()))) - .await - .unwrap(); - let e = reactor.run_once().await.unwrap_err().unwrap_err(); - assert_eq!( - format!("{}", e), - "channel protocol violation: Destroy for nonexistent circuit" - ); + // Destroying a nonexistent circuit is an error. + input + .send(Ok(ChanCell::new(101.into(), destroycell.clone()))) + .await + .unwrap(); + let e = reactor.run_once().await.unwrap_err().unwrap_err(); + assert_eq!( + format!("{}", e), + "channel protocol violation: Destroy for nonexistent circuit" + ); + }) } } diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index 046a2be3a..17e244208 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -614,15 +614,14 @@ mod test { use futures::io::{AsyncReadExt, AsyncWriteExt}; use futures::sink::SinkExt; use futures::stream::StreamExt; + use futures::task::SpawnExt; use hex_literal::hex; use rand::thread_rng; use std::time::Duration; - use tokio::runtime::Handle; - use tokio_crate as tokio; - use tokio_crate::test as async_test; use tor_cell::chancell::{msg as chanmsg, ChanCell}; use tor_cell::relaycell::{msg as relaymsg, RelayCell, StreamId}; use tor_llcrypto::pk; + use tor_rtcompat::{Runtime, SleepProvider}; use tracing::trace; fn rmsg_to_ccmsg(id: ID, msg: relaymsg::RelayMsg) -> ClientCircChanMsg @@ -680,23 +679,28 @@ mod test { ) } - fn working_fake_channel() -> ( + fn working_fake_channel( + rt: &R, + ) -> ( Channel, Receiver, Sender>, ) { let (channel, chan_reactor, rx, tx) = new_reactor(); - Handle::current().spawn(chan_reactor.run()); + rt.spawn(async { + let _ignore = chan_reactor.run().await; + }) + .unwrap(); (channel, rx, tx) } - async fn test_create(fast: bool) { + async fn test_create(rt: &R, fast: bool) { // We want to try progressing from a pending circuit to a circuit // via a crate_fast handshake. use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake}; - let (chan, mut rx, _sink) = working_fake_channel(); + let (chan, mut rx, _sink) = working_fake_channel(rt); let circid = 128.into(); let (created_send, created_recv) = oneshot::channel(); let (_circmsg_send, circmsg_recv) = mpsc::channel(64); @@ -705,7 +709,10 @@ mod test { let (pending, reactor) = PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id); - Handle::current().spawn(reactor.run()); + rt.spawn(async { + let _ignore = reactor.run().await; + }) + .unwrap(); // Future to pretend to be a relay on the other end of the circuit. let simulate_relay_fut = async move { @@ -756,13 +763,17 @@ mod test { */ } - #[async_test] - async fn test_create_fast() { - test_create(true).await + #[test] + fn test_create_fast() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + test_create(&rt, true).await; + }) } - #[async_test] - async fn test_create_ntor() { - test_create(false).await + #[test] + fn test_create_ntor() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + test_create(&rt, false).await; + }) } // An encryption layer that doesn't do any crypto. Can be used @@ -811,7 +822,8 @@ mod test { // Helper: set up a 3-hop circuit with no encryption, where the // next inbound message seems to come from hop next_msg_from - async fn newcirc_ext( + async fn newcirc_ext( + rt: &R, chan: Channel, next_msg_from: HopNum, ) -> (ClientCirc, mpsc::Sender) { @@ -823,7 +835,10 @@ mod test { let (pending, reactor) = PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id); - Handle::current().spawn(reactor.run()); + rt.spawn(async { + let _ignore = reactor.run().await; + }) + .unwrap(); let PendingClientCirc { circ, @@ -850,33 +865,38 @@ mod test { // Helper: set up a 3-hop circuit with no encryption, where the // next inbound message seems to come from hop next_msg_from - async fn newcirc(chan: Channel) -> (ClientCirc, mpsc::Sender) { - newcirc_ext(chan, 2.into()).await + async fn newcirc( + rt: &R, + chan: Channel, + ) -> (ClientCirc, mpsc::Sender) { + newcirc_ext(rt, chan, 2.into()).await } // Try sending a cell via send_relay_cell - #[async_test] - async fn send_simple() { - let (chan, mut rx, _sink) = working_fake_channel(); - let (circ, _send) = newcirc(chan).await; - let begindir = RelayCell::new(0.into(), RelayMsg::BeginDir); - circ.control - .unbounded_send(CtrlMsg::SendRelayCell { - hop: 2.into(), - early: false, - cell: begindir, - }) - .unwrap(); + #[test] + fn send_simple() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + let (chan, mut rx, _sink) = working_fake_channel(&rt); + let (circ, _send) = newcirc(&rt, chan).await; + let begindir = RelayCell::new(0.into(), RelayMsg::BeginDir); + circ.control + .unbounded_send(CtrlMsg::SendRelayCell { + hop: 2.into(), + early: false, + cell: begindir, + }) + .unwrap(); - // Here's what we tried to put on the TLS channel. Note that - // we're using dummy relay crypto for testing convenience. - let rcvd = rx.next().await.unwrap(); - assert_eq!(rcvd.circid(), 128.into()); - let m = match rcvd.into_circid_and_msg().1 { - ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(), - _ => panic!(), - }; - assert!(matches!(m.msg(), RelayMsg::BeginDir)); + // Here's what we tried to put on the TLS channel. Note that + // we're using dummy relay crypto for testing convenience. + let rcvd = rx.next().await.unwrap(); + assert_eq!(rcvd.circid(), 128.into()); + let m = match rcvd.into_circid_and_msg().1 { + ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(), + _ => panic!(), + }; + assert!(matches!(m.msg(), RelayMsg::BeginDir)); + }) } // NOTE(eta): this test is commented out because it basically tested implementation details @@ -945,49 +965,55 @@ mod test { } */ - #[async_test] - async fn extend() { - use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake}; + #[test] + fn extend() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake}; - let (chan, mut rx, _sink) = working_fake_channel(); - let (circ, mut sink) = newcirc(chan).await; - let params = CircParameters::default(); + let (chan, mut rx, _sink) = working_fake_channel(&rt); + let (circ, mut sink) = newcirc(&rt, chan).await; + let params = CircParameters::default(); - let extend_fut = async move { - let target = example_target(); - circ.extend_ntor(&target, ¶ms).await.unwrap(); - circ // gotta keep the circ alive, or the reactor would exit. - }; - let reply_fut = async move { - // We've disabled encryption on this circuit, so we can just - // read the extend2 cell. - let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg(); - assert_eq!(id, 128.into()); - let rmsg = match chmsg { - ChanMsg::RelayEarly(r) => RelayCell::decode(r.into_relay_body()).unwrap(), - _ => panic!(), + let extend_fut = async move { + let target = example_target(); + circ.extend_ntor(&target, ¶ms).await.unwrap(); + circ // gotta keep the circ alive, or the reactor would exit. }; - let e2 = match rmsg.msg() { - RelayMsg::Extend2(e2) => e2, - _ => panic!(), + let reply_fut = async move { + // We've disabled encryption on this circuit, so we can just + // read the extend2 cell. + let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg(); + assert_eq!(id, 128.into()); + let rmsg = match chmsg { + ChanMsg::RelayEarly(r) => RelayCell::decode(r.into_relay_body()).unwrap(), + _ => panic!(), + }; + let e2 = match rmsg.msg() { + RelayMsg::Extend2(e2) => e2, + _ => panic!(), + }; + let mut rng = thread_rng(); + let (_, reply) = + NtorServer::server(&mut rng, &[example_ntor_key()], e2.handshake()).unwrap(); + let extended2 = relaymsg::Extended2::new(reply).into(); + sink.send(rmsg_to_ccmsg(0, extended2)).await.unwrap(); + sink // gotta keep the sink alive, or the reactor will exit. }; - let mut rng = thread_rng(); - let (_, reply) = - NtorServer::server(&mut rng, &[example_ntor_key()], e2.handshake()).unwrap(); - let extended2 = relaymsg::Extended2::new(reply).into(); - sink.send(rmsg_to_ccmsg(0, extended2)).await.unwrap(); - sink // gotta keep the sink alive, or the reactor will exit. - }; - let (circ, _) = futures::join!(extend_fut, reply_fut); + let (circ, _) = futures::join!(extend_fut, reply_fut); - // Did we really add another hop? - assert_eq!(circ.n_hops(), 4); + // Did we really add another hop? + assert_eq!(circ.n_hops(), 4); + }) } - async fn bad_extend_test_impl(reply_hop: HopNum, bad_reply: ClientCircChanMsg) -> Error { - let (chan, _rx, _sink) = working_fake_channel(); - let (circ, mut sink) = newcirc_ext(chan, reply_hop).await; + async fn bad_extend_test_impl( + rt: &R, + reply_hop: HopNum, + bad_reply: ClientCircChanMsg, + ) -> Error { + let (chan, _rx, _sink) = working_fake_channel(rt); + let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await; let params = CircParameters::default(); let extend_fut = async move { @@ -1006,121 +1032,132 @@ mod test { outcome.unwrap_err() } - #[async_test] - async fn bad_extend_wronghop() { - let extended2 = relaymsg::Extended2::new(vec![]).into(); - let cc = rmsg_to_ccmsg(0, extended2); + #[test] + fn bad_extend_wronghop() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + let extended2 = relaymsg::Extended2::new(vec![]).into(); + let cc = rmsg_to_ccmsg(0, extended2); - let error = bad_extend_test_impl(1.into(), cc).await; - // This case shows up as a CircDestroy, since a message sent - // from the wrong hop won't even be delivered to the extend - // code's meta-handler. Instead the unexpected message will cause - // the circuit to get torn down. - match error { - Error::CircuitClosed => {} - x => panic!("got other error: {}", x), - } - } - - #[async_test] - async fn bad_extend_wrongtype() { - let extended = relaymsg::Extended::new(vec![7; 200]).into(); - let cc = rmsg_to_ccmsg(0, extended); - - let error = bad_extend_test_impl(2.into(), cc).await; - match error { - Error::CircProto(s) => { - assert_eq!(s, "wanted EXTENDED2; got EXTENDED") + let error = bad_extend_test_impl(&rt, 1.into(), cc).await; + // This case shows up as a CircDestroy, since a message sent + // from the wrong hop won't even be delivered to the extend + // code's meta-handler. Instead the unexpected message will cause + // the circuit to get torn down. + match error { + Error::CircuitClosed => {} + x => panic!("got other error: {}", x), } - _ => panic!(), - } + }) } - #[async_test] - async fn bad_extend_destroy() { - let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into())); - let error = bad_extend_test_impl(2.into(), cc).await; - match error { - Error::CircuitClosed => {} - _ => panic!(), - } - } + #[test] + fn bad_extend_wrongtype() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + let extended = relaymsg::Extended::new(vec![7; 200]).into(); + let cc = rmsg_to_ccmsg(0, extended); - #[async_test] - async fn bad_extend_crypto() { - let extended2 = relaymsg::Extended2::new(vec![99; 256]).into(); - let cc = rmsg_to_ccmsg(0, extended2); - let error = bad_extend_test_impl(2.into(), cc).await; - assert!(matches!(error, Error::BadHandshake)); - } - - #[async_test] - async fn begindir() { - let (chan, mut rx, _sink) = working_fake_channel(); - let (circ, mut sink) = newcirc(chan).await; - - let begin_and_send_fut = async move { - // Here we'll say we've got a circuit, and we want to - // make a simple BEGINDIR request with it. - let mut stream = Arc::new(circ).begin_dir_stream().await.unwrap(); - stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap(); - stream.flush().await.unwrap(); - let mut buf = [0_u8; 1024]; - let n = stream.read(&mut buf).await.unwrap(); - assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n"); - let n = stream.read(&mut buf).await.unwrap(); - assert_eq!(n, 0); - stream - }; - let reply_fut = async move { - // We've disabled encryption on this circuit, so we can just - // read the begindir cell. - let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg(); - assert_eq!(id, 128.into()); // hardcoded circid. - let rmsg = match chmsg { - ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(), + let error = bad_extend_test_impl(&rt, 2.into(), cc).await; + match error { + Error::CircProto(s) => { + assert_eq!(s, "wanted EXTENDED2; got EXTENDED") + } _ => panic!(), - }; - let (streamid, rmsg) = rmsg.into_streamid_and_msg(); - assert!(matches!(rmsg, RelayMsg::BeginDir)); - - // Reply with a Connected cell to indicate success. - let connected = relaymsg::Connected::new_empty().into(); - sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap(); - - // Now read a DATA cell... - let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg(); - assert_eq!(id, 128.into()); - let rmsg = match chmsg { - ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(), - _ => panic!(), - }; - let (streamid_2, rmsg) = rmsg.into_streamid_and_msg(); - assert_eq!(streamid_2, streamid); - if let RelayMsg::Data(d) = rmsg { - assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]); - } else { - panic!(); } + }) + } - // Write another data cell in reply! - let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n") - .unwrap() - .into(); - sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap(); + #[test] + fn bad_extend_destroy() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into())); + let error = bad_extend_test_impl(&rt, 2.into(), cc).await; + match error { + Error::CircuitClosed => {} + _ => panic!(), + } + }) + } - // Send an END cell to say that the conversation is over. - let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into(); - sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap(); + #[test] + fn bad_extend_crypto() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + let extended2 = relaymsg::Extended2::new(vec![99; 256]).into(); + let cc = rmsg_to_ccmsg(0, extended2); + let error = bad_extend_test_impl(&rt, 2.into(), cc).await; + assert!(matches!(error, Error::BadHandshake)); + }) + } - sink // gotta keep the sink alive, or the reactor will exit. - }; + #[test] + fn begindir() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + let (chan, mut rx, _sink) = working_fake_channel(&rt); + let (circ, mut sink) = newcirc(&rt, chan).await; - let (_stream, _) = futures::join!(begin_and_send_fut, reply_fut); + let begin_and_send_fut = async move { + // Here we'll say we've got a circuit, and we want to + // make a simple BEGINDIR request with it. + let mut stream = Arc::new(circ).begin_dir_stream().await.unwrap(); + stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap(); + stream.flush().await.unwrap(); + let mut buf = [0_u8; 1024]; + let n = stream.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n"); + let n = stream.read(&mut buf).await.unwrap(); + assert_eq!(n, 0); + stream + }; + let reply_fut = async move { + // We've disabled encryption on this circuit, so we can just + // read the begindir cell. + let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg(); + assert_eq!(id, 128.into()); // hardcoded circid. + let rmsg = match chmsg { + ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(), + _ => panic!(), + }; + let (streamid, rmsg) = rmsg.into_streamid_and_msg(); + assert!(matches!(rmsg, RelayMsg::BeginDir)); + + // Reply with a Connected cell to indicate success. + let connected = relaymsg::Connected::new_empty().into(); + sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap(); + + // Now read a DATA cell... + let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg(); + assert_eq!(id, 128.into()); + let rmsg = match chmsg { + ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(), + _ => panic!(), + }; + let (streamid_2, rmsg) = rmsg.into_streamid_and_msg(); + assert_eq!(streamid_2, streamid); + if let RelayMsg::Data(d) = rmsg { + assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]); + } else { + panic!(); + } + + // Write another data cell in reply! + let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n") + .unwrap() + .into(); + sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap(); + + // Send an END cell to say that the conversation is over. + let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into(); + sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap(); + + sink // gotta keep the sink alive, or the reactor will exit. + }; + + let (_stream, _) = futures::join!(begin_and_send_fut, reply_fut); + }); } // Set up a circuit and stream that expects some incoming SENDMEs. - async fn setup_incoming_sendme_case( + async fn setup_incoming_sendme_case( + rt: &R, n_to_send: usize, ) -> ( ClientCirc, @@ -1131,8 +1168,8 @@ mod test { Receiver, Sender>, ) { - let (chan, mut rx, sink2) = working_fake_channel(); - let (circ, mut sink) = newcirc(chan).await; + let (chan, mut rx, sink2) = working_fake_channel(rt); + let (circ, mut sink) = newcirc(rt, chan).await; let circ_clone = Arc::new(circ.clone()); let begin_and_send_fut = async move { @@ -1195,96 +1232,103 @@ mod test { (circ, stream, sink, streamid, cells_received, rx, sink2) } - #[async_test] - async fn accept_valid_sendme() { - let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) = - setup_incoming_sendme_case(300 * 498 + 3).await; + #[test] + fn accept_valid_sendme() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) = + setup_incoming_sendme_case(&rt, 300 * 498 + 3).await; - assert_eq!(cells_received, 301); + assert_eq!(cells_received, 301); - // Make sure that the circuit is indeed expecting the right sendmes - { - let (tx, rx) = oneshot::channel(); - circ.control - .unbounded_send(CtrlMsg::QuerySendWindow { - hop: 2.into(), - done: tx, - }) - .unwrap(); - let (window, tags) = rx.await.unwrap().unwrap(); - assert_eq!(window, 1000 - 301); - assert_eq!(tags.len(), 3); - // 100 - assert_eq!(tags[0], hex!("6400000000000000000000000000000000000000")); - // 200 - assert_eq!(tags[1], hex!("c800000000000000000000000000000000000000")); - // 300 - assert_eq!(tags[2], hex!("2c01000000000000000000000000000000000000")); - } + // Make sure that the circuit is indeed expecting the right sendmes + { + let (tx, rx) = oneshot::channel(); + circ.control + .unbounded_send(CtrlMsg::QuerySendWindow { + hop: 2.into(), + done: tx, + }) + .unwrap(); + let (window, tags) = rx.await.unwrap().unwrap(); + assert_eq!(window, 1000 - 301); + assert_eq!(tags.len(), 3); + // 100 + assert_eq!(tags[0], hex!("6400000000000000000000000000000000000000")); + // 200 + assert_eq!(tags[1], hex!("c800000000000000000000000000000000000000")); + // 300 + assert_eq!(tags[2], hex!("2c01000000000000000000000000000000000000")); + } - let reply_with_sendme_fut = async move { - // make and send a circuit-level sendme. - let c_sendme = - relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000")).into(); - sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap(); + let reply_with_sendme_fut = async move { + // make and send a circuit-level sendme. + let c_sendme = + relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000")) + .into(); + sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap(); - // Make and send a stream-level sendme. - let s_sendme = relaymsg::Sendme::new_empty().into(); - sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap(); + // Make and send a stream-level sendme. + let s_sendme = relaymsg::Sendme::new_empty().into(); + sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap(); - sink - }; + sink + }; - let _sink = reply_with_sendme_fut.await; + let _sink = reply_with_sendme_fut.await; - // FIXME(eta): this is a hacky way of waiting for the reactor to run before doing the below - // query; should find some way to properly synchronize to avoid flakiness - tokio::time::sleep(Duration::from_millis(100)).await; - // Now make sure that the circuit is still happy, and its - // window is updated. - { - let (tx, rx) = oneshot::channel(); - circ.control - .unbounded_send(CtrlMsg::QuerySendWindow { - hop: 2.into(), - done: tx, - }) - .unwrap(); - let (window, _tags) = rx.await.unwrap().unwrap(); - assert_eq!(window, 1000 - 201); - } + // FIXME(eta): this is a hacky way of waiting for the reactor to run before doing the below + // query; should find some way to properly synchronize to avoid flakiness + rt.sleep(Duration::from_millis(100)).await; + // Now make sure that the circuit is still happy, and its + // window is updated. + { + let (tx, rx) = oneshot::channel(); + circ.control + .unbounded_send(CtrlMsg::QuerySendWindow { + hop: 2.into(), + done: tx, + }) + .unwrap(); + let (window, _tags) = rx.await.unwrap().unwrap(); + assert_eq!(window, 1000 - 201); + } + }) } - #[async_test] - async fn invalid_circ_sendme() { - // Same setup as accept_valid_sendme() test above but try giving - // a sendme with the wrong tag. + #[test] + fn invalid_circ_sendme() { + tor_rtcompat::test_with_all_runtimes!(|rt| async move { + // Same setup as accept_valid_sendme() test above but try giving + // a sendme with the wrong tag. - let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) = - setup_incoming_sendme_case(300 * 498 + 3).await; + let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) = + setup_incoming_sendme_case(&rt, 300 * 498 + 3).await; - let reply_with_sendme_fut = async move { - // make and send a circuit-level sendme with a bad tag. - let c_sendme = - relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF")).into(); - sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap(); - sink - }; + let reply_with_sendme_fut = async move { + // make and send a circuit-level sendme with a bad tag. + let c_sendme = + relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF")) + .into(); + sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap(); + sink + }; - let _sink = reply_with_sendme_fut.await; + let _sink = reply_with_sendme_fut.await; - let mut tries = 0; - // FIXME(eta): we aren't testing the error message like we used to; however, we can at least - // check whether the reactor dies as a result of receiving invalid data. - while !circ.control.is_closed() { - tokio::time::sleep(Duration::from_millis(100)).await; - tries += 1; - if tries > 10 { - panic!("reactor continued running after invalid sendme"); + let mut tries = 0; + // FIXME(eta): we aren't testing the error message like we used to; however, we can at least + // check whether the reactor dies as a result of receiving invalid data. + while !circ.control.is_closed() { + // TODO: Don't sleep in tests. + rt.sleep(Duration::from_millis(100)).await; + tries += 1; + if tries > 10 { + panic!("reactor continued running after invalid sendme"); + } } - } - // TODO: check that the circuit is shut down too + // TODO: check that the circuit is shut down too + }) } #[test]