tor-proto: Use tor-rtcompat macros for testing, not tokio.
Closes #222.
This commit is contained in:
parent
787a995458
commit
f92ad644c9
|
@ -2842,6 +2842,7 @@ dependencies = [
|
||||||
"tor-linkspec",
|
"tor-linkspec",
|
||||||
"tor-llcrypto",
|
"tor-llcrypto",
|
||||||
"tor-protover",
|
"tor-protover",
|
||||||
|
"tor-rtcompat",
|
||||||
"tracing",
|
"tracing",
|
||||||
"typenum",
|
"typenum",
|
||||||
"zeroize",
|
"zeroize",
|
||||||
|
|
|
@ -50,6 +50,6 @@ tokio-util = { version = "0.6", features = ["compat"], optional = true }
|
||||||
coarsetime = { version = "0.1.20", optional = true }
|
coarsetime = { version = "0.1.20", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio-crate = { package = "tokio", version = "1.7.0", features = ["macros", "rt", "time"] }
|
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.1", features = ["tokio"] }
|
||||||
hex-literal = "0.3.1"
|
hex-literal = "0.3.1"
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
|
|
|
@ -397,8 +397,6 @@ pub(crate) mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::channel::codec::test::MsgBuf;
|
use crate::channel::codec::test::MsgBuf;
|
||||||
pub(crate) use crate::channel::reactor::test::new_reactor;
|
pub(crate) use crate::channel::reactor::test::new_reactor;
|
||||||
use tokio_crate as tokio;
|
|
||||||
use tokio_crate::test as async_test;
|
|
||||||
use tor_cell::chancell::{msg, ChanCell};
|
use tor_cell::chancell::{msg, ChanCell};
|
||||||
|
|
||||||
/// Make a new fake reactor-less channel. For testing only, obviously.
|
/// Make a new fake reactor-less channel. For testing only, obviously.
|
||||||
|
@ -414,31 +412,33 @@ pub(crate) mod test {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn send_bad() {
|
fn send_bad() {
|
||||||
let chan = fake_channel();
|
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
|
||||||
|
let chan = fake_channel();
|
||||||
|
|
||||||
let cell = ChanCell::new(7.into(), msg::Created2::new(&b"hihi"[..]).into());
|
let cell = ChanCell::new(7.into(), msg::Created2::new(&b"hihi"[..]).into());
|
||||||
let e = chan.check_cell(&cell);
|
let e = chan.check_cell(&cell);
|
||||||
assert!(e.is_err());
|
assert!(e.is_err());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e.unwrap_err()),
|
format!("{}", e.unwrap_err()),
|
||||||
"Internal programming error: Can't send CREATED2 cell on client channel"
|
"Internal programming error: Can't send CREATED2 cell on client channel"
|
||||||
);
|
);
|
||||||
let cell = ChanCell::new(0.into(), msg::Certs::new_empty().into());
|
let cell = ChanCell::new(0.into(), msg::Certs::new_empty().into());
|
||||||
let e = chan.check_cell(&cell);
|
let e = chan.check_cell(&cell);
|
||||||
assert!(e.is_err());
|
assert!(e.is_err());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e.unwrap_err()),
|
format!("{}", e.unwrap_err()),
|
||||||
"Internal programming error: Can't send CERTS cell after handshake is done"
|
"Internal programming error: Can't send CERTS cell after handshake is done"
|
||||||
);
|
);
|
||||||
|
|
||||||
let cell = ChanCell::new(5.into(), msg::Create2::new(2, &b"abc"[..]).into());
|
let cell = ChanCell::new(5.into(), msg::Create2::new(2, &b"abc"[..]).into());
|
||||||
let e = chan.check_cell(&cell);
|
let e = chan.check_cell(&cell);
|
||||||
assert!(e.is_ok());
|
assert!(e.is_ok());
|
||||||
// FIXME(eta): more difficult to test that sending works now that it has to go via reactor
|
// FIXME(eta): more difficult to test that sending works now that it has to go via reactor
|
||||||
// let got = output.next().await.unwrap();
|
// let got = output.next().await.unwrap();
|
||||||
// assert!(matches!(got.msg(), ChanMsg::Create2(_)));
|
// assert!(matches!(got.msg(), ChanMsg::Create2(_)));
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
@ -47,8 +47,6 @@ pub(crate) mod test {
|
||||||
use futures::task::{Context, Poll};
|
use futures::task::{Context, Poll};
|
||||||
use hex_literal::hex;
|
use hex_literal::hex;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use tokio::test as async_test;
|
|
||||||
use tokio_crate as tokio;
|
|
||||||
|
|
||||||
use super::{futures_codec, ChannelCodec};
|
use super::{futures_codec, ChannelCodec};
|
||||||
use tor_cell::chancell::{msg, ChanCell, ChanCmd, CircId};
|
use tor_cell::chancell::{msg, ChanCell, ChanCmd, CircId};
|
||||||
|
@ -111,48 +109,53 @@ pub(crate) mod test {
|
||||||
futures_codec::Framed::new(mbuf, ChannelCodec::new(4))
|
futures_codec::Framed::new(mbuf, ChannelCodec::new(4))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn check_encoding() -> std::result::Result<(), tor_cell::Error> {
|
fn check_encoding() {
|
||||||
let mb = MsgBuf::new(&b""[..]);
|
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
|
||||||
let mut framed = frame_buf(mb);
|
let mb = MsgBuf::new(&b""[..]);
|
||||||
|
let mut framed = frame_buf(mb);
|
||||||
|
|
||||||
let destroycell = msg::Destroy::new(2.into());
|
let destroycell = msg::Destroy::new(2.into());
|
||||||
framed
|
framed
|
||||||
.send(ChanCell::new(7.into(), destroycell.into()))
|
.send(ChanCell::new(7.into(), destroycell.into()))
|
||||||
.await?;
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let nocerts = msg::Certs::new_empty();
|
let nocerts = msg::Certs::new_empty();
|
||||||
framed.send(ChanCell::new(0.into(), nocerts.into())).await?;
|
framed
|
||||||
|
.send(ChanCell::new(0.into(), nocerts.into()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
framed.flush().await?;
|
framed.flush().await.unwrap();
|
||||||
|
|
||||||
let data = framed.into_inner().into_response();
|
let data = framed.into_inner().into_response();
|
||||||
|
|
||||||
assert_eq!(&data[0..10], &hex!("00000007 04 0200000000")[..]);
|
assert_eq!(&data[0..10], &hex!("00000007 04 0200000000")[..]);
|
||||||
|
|
||||||
assert_eq!(&data[514..], &hex!("00000000 81 0001 00")[..]);
|
assert_eq!(&data[514..], &hex!("00000000 81 0001 00")[..]);
|
||||||
Ok(())
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn check_decoding() -> std::result::Result<(), tor_cell::Error> {
|
fn check_decoding() {
|
||||||
let mut dat = Vec::new();
|
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
|
||||||
dat.extend_from_slice(&hex!("00000007 04 0200000000")[..]);
|
let mut dat = Vec::new();
|
||||||
dat.resize(514, 0);
|
dat.extend_from_slice(&hex!("00000007 04 0200000000")[..]);
|
||||||
dat.extend_from_slice(&hex!("00000000 81 0001 00")[..]);
|
dat.resize(514, 0);
|
||||||
let mb = MsgBuf::new(&dat[..]);
|
dat.extend_from_slice(&hex!("00000000 81 0001 00")[..]);
|
||||||
let mut framed = frame_buf(mb);
|
let mb = MsgBuf::new(&dat[..]);
|
||||||
|
let mut framed = frame_buf(mb);
|
||||||
|
|
||||||
let destroy = framed.next().await.unwrap()?;
|
let destroy = framed.next().await.unwrap().unwrap();
|
||||||
let nocerts = framed.next().await.unwrap()?;
|
let nocerts = framed.next().await.unwrap().unwrap();
|
||||||
|
|
||||||
assert_eq!(destroy.circid(), CircId::from(7));
|
assert_eq!(destroy.circid(), CircId::from(7));
|
||||||
assert_eq!(destroy.msg().cmd(), ChanCmd::DESTROY);
|
assert_eq!(destroy.msg().cmd(), ChanCmd::DESTROY);
|
||||||
assert_eq!(nocerts.circid(), CircId::from(0));
|
assert_eq!(nocerts.circid(), CircId::from(0));
|
||||||
assert_eq!(nocerts.msg().cmd(), ChanCmd::CERTS);
|
assert_eq!(nocerts.msg().cmd(), ChanCmd::CERTS);
|
||||||
|
|
||||||
assert!(framed.into_inner().all_consumed());
|
assert!(framed.into_inner().all_consumed());
|
||||||
|
});
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -421,8 +421,6 @@ pub(super) mod test {
|
||||||
#![allow(clippy::unwrap_used)]
|
#![allow(clippy::unwrap_used)]
|
||||||
use hex_literal::hex;
|
use hex_literal::hex;
|
||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
use tokio::test as async_test;
|
|
||||||
use tokio_crate as tokio;
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::channel::codec::test::MsgBuf;
|
use crate::channel::codec::test::MsgBuf;
|
||||||
|
@ -456,34 +454,36 @@ pub(super) mod test {
|
||||||
add_padded(buf, NETINFO_PREFIX);
|
add_padded(buf, NETINFO_PREFIX);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn connect_ok() -> Result<()> {
|
fn connect_ok() -> Result<()> {
|
||||||
let mut buf = Vec::new();
|
tor_rtcompat::test_with_one_runtime!(|_rt| async move {
|
||||||
// versions cell
|
let mut buf = Vec::new();
|
||||||
buf.extend_from_slice(VERSIONS);
|
// versions cell
|
||||||
// certs cell -- no certs in it, but this function doesn't care.
|
buf.extend_from_slice(VERSIONS);
|
||||||
buf.extend_from_slice(NOCERTS);
|
// certs cell -- no certs in it, but this function doesn't care.
|
||||||
// netinfo cell -- quite minimal.
|
buf.extend_from_slice(NOCERTS);
|
||||||
add_netinfo(&mut buf);
|
// netinfo cell -- quite minimal.
|
||||||
let mb = MsgBuf::new(&buf[..]);
|
add_netinfo(&mut buf);
|
||||||
let handshake = OutboundClientHandshake::new(mb, None);
|
let mb = MsgBuf::new(&buf[..]);
|
||||||
let unverified = handshake.connect().await?;
|
let handshake = OutboundClientHandshake::new(mb, None);
|
||||||
|
let unverified = handshake.connect().await?;
|
||||||
|
|
||||||
assert_eq!(unverified.link_protocol, 4);
|
assert_eq!(unverified.link_protocol, 4);
|
||||||
|
|
||||||
// Try again with an authchallenge cell and some padding.
|
// Try again with an authchallenge cell and some padding.
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
buf.extend_from_slice(VERSIONS);
|
buf.extend_from_slice(VERSIONS);
|
||||||
buf.extend_from_slice(NOCERTS);
|
buf.extend_from_slice(NOCERTS);
|
||||||
buf.extend_from_slice(VPADDING);
|
buf.extend_from_slice(VPADDING);
|
||||||
buf.extend_from_slice(AUTHCHALLENGE);
|
buf.extend_from_slice(AUTHCHALLENGE);
|
||||||
buf.extend_from_slice(VPADDING);
|
buf.extend_from_slice(VPADDING);
|
||||||
add_netinfo(&mut buf);
|
add_netinfo(&mut buf);
|
||||||
let mb = MsgBuf::new(&buf[..]);
|
let mb = MsgBuf::new(&buf[..]);
|
||||||
let handshake = OutboundClientHandshake::new(mb, None);
|
let handshake = OutboundClientHandshake::new(mb, None);
|
||||||
let _unverified = handshake.connect().await?;
|
let _unverified = handshake.connect().await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connect_err<T: Into<Vec<u8>>>(input: T) -> Error {
|
async fn connect_err<T: Into<Vec<u8>>>(input: T) -> Error {
|
||||||
|
@ -492,89 +492,99 @@ pub(super) mod test {
|
||||||
handshake.connect().await.err().unwrap()
|
handshake.connect().await.err().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn connect_badver() {
|
fn connect_badver() {
|
||||||
let err = connect_err(&b"HTTP://"[..]).await;
|
tor_rtcompat::test_with_one_runtime!(|_rt| async move {
|
||||||
assert!(matches!(err, Error::ChanProto(_)));
|
let err = connect_err(&b"HTTP://"[..]).await;
|
||||||
assert_eq!(
|
assert!(matches!(err, Error::ChanProto(_)));
|
||||||
format!("{}", err),
|
assert_eq!(
|
||||||
"channel protocol violation: Doesn't seem to be a tor relay"
|
format!("{}", err),
|
||||||
);
|
"channel protocol violation: Doesn't seem to be a tor relay"
|
||||||
|
);
|
||||||
|
|
||||||
let err = connect_err(&hex!("0000 07 0004 1234 ffff")[..]).await;
|
let err = connect_err(&hex!("0000 07 0004 1234 ffff")[..]).await;
|
||||||
assert!(matches!(err, Error::ChanProto(_)));
|
assert!(matches!(err, Error::ChanProto(_)));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", err),
|
format!("{}", err),
|
||||||
"channel protocol violation: No shared link protocols"
|
"channel protocol violation: No shared link protocols"
|
||||||
);
|
);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn connect_cellparse() {
|
fn connect_cellparse() {
|
||||||
let mut buf = Vec::new();
|
tor_rtcompat::test_with_one_runtime!(|_rt| async move {
|
||||||
buf.extend_from_slice(VERSIONS);
|
let mut buf = Vec::new();
|
||||||
// Here's a certs cell that will fail.
|
buf.extend_from_slice(VERSIONS);
|
||||||
buf.extend_from_slice(&hex!("00000000 81 0001 01")[..]);
|
// Here's a certs cell that will fail.
|
||||||
let err = connect_err(buf).await;
|
buf.extend_from_slice(&hex!("00000000 81 0001 01")[..]);
|
||||||
assert!(matches!(
|
let err = connect_err(buf).await;
|
||||||
err,
|
assert!(matches!(
|
||||||
Error::CellErr(tor_cell::Error::BytesErr(tor_bytes::Error::Truncated))
|
err,
|
||||||
));
|
Error::CellErr(tor_cell::Error::BytesErr(tor_bytes::Error::Truncated))
|
||||||
|
));
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn connect_duplicates() {
|
fn connect_duplicates() {
|
||||||
let mut buf = Vec::new();
|
tor_rtcompat::test_with_one_runtime!(|_rt| async move {
|
||||||
buf.extend_from_slice(VERSIONS);
|
let mut buf = Vec::new();
|
||||||
buf.extend_from_slice(NOCERTS);
|
buf.extend_from_slice(VERSIONS);
|
||||||
buf.extend_from_slice(NOCERTS);
|
buf.extend_from_slice(NOCERTS);
|
||||||
add_netinfo(&mut buf);
|
buf.extend_from_slice(NOCERTS);
|
||||||
let err = connect_err(buf).await;
|
add_netinfo(&mut buf);
|
||||||
assert!(matches!(err, Error::ChanProto(_)));
|
let err = connect_err(buf).await;
|
||||||
assert_eq!(
|
assert!(matches!(err, Error::ChanProto(_)));
|
||||||
format!("{}", err),
|
assert_eq!(
|
||||||
"channel protocol violation: Duplicate certs cell"
|
format!("{}", err),
|
||||||
);
|
"channel protocol violation: Duplicate certs cell"
|
||||||
|
);
|
||||||
|
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
buf.extend_from_slice(VERSIONS);
|
buf.extend_from_slice(VERSIONS);
|
||||||
buf.extend_from_slice(NOCERTS);
|
buf.extend_from_slice(NOCERTS);
|
||||||
buf.extend_from_slice(AUTHCHALLENGE);
|
buf.extend_from_slice(AUTHCHALLENGE);
|
||||||
buf.extend_from_slice(AUTHCHALLENGE);
|
buf.extend_from_slice(AUTHCHALLENGE);
|
||||||
add_netinfo(&mut buf);
|
add_netinfo(&mut buf);
|
||||||
let err = connect_err(buf).await;
|
let err = connect_err(buf).await;
|
||||||
assert!(matches!(err, Error::ChanProto(_)));
|
assert!(matches!(err, Error::ChanProto(_)));
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", err),
|
format!("{}", err),
|
||||||
"channel protocol violation: Duplicate authchallenge cell"
|
"channel protocol violation: Duplicate authchallenge cell"
|
||||||
);
|
);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn connect_missing_certs() {
|
fn connect_missing_certs() {
|
||||||
let mut buf = Vec::new();
|
tor_rtcompat::test_with_one_runtime!(|_rt| async move {
|
||||||
buf.extend_from_slice(VERSIONS);
|
let mut buf = Vec::new();
|
||||||
add_netinfo(&mut buf);
|
buf.extend_from_slice(VERSIONS);
|
||||||
let err = connect_err(buf).await;
|
add_netinfo(&mut buf);
|
||||||
assert!(matches!(err, Error::ChanProto(_)));
|
let err = connect_err(buf).await;
|
||||||
assert_eq!(
|
assert!(matches!(err, Error::ChanProto(_)));
|
||||||
format!("{}", err),
|
assert_eq!(
|
||||||
"channel protocol violation: Missing certs cell"
|
format!("{}", err),
|
||||||
);
|
"channel protocol violation: Missing certs cell"
|
||||||
|
);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn connect_misplaced_cell() {
|
fn connect_misplaced_cell() {
|
||||||
let mut buf = Vec::new();
|
tor_rtcompat::test_with_one_runtime!(|_rt| async move {
|
||||||
buf.extend_from_slice(VERSIONS);
|
let mut buf = Vec::new();
|
||||||
// here's a create cell.
|
buf.extend_from_slice(VERSIONS);
|
||||||
add_padded(&mut buf, &hex!("00000001 01")[..]);
|
// here's a create cell.
|
||||||
let err = connect_err(buf).await;
|
add_padded(&mut buf, &hex!("00000001 01")[..]);
|
||||||
assert!(matches!(err, Error::ChanProto(_)));
|
let err = connect_err(buf).await;
|
||||||
assert_eq!(
|
assert!(matches!(err, Error::ChanProto(_)));
|
||||||
format!("{}", err),
|
assert_eq!(
|
||||||
"channel protocol violation: Unexpected cell type CREATE"
|
format!("{}", err),
|
||||||
);
|
"channel protocol violation: Unexpected cell type CREATE"
|
||||||
|
);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_unverified(certs: msg::Certs) -> UnverifiedChannel<MsgBuf> {
|
fn make_unverified(certs: msg::Certs) -> UnverifiedChannel<MsgBuf> {
|
||||||
|
@ -823,22 +833,24 @@ pub(super) mod test {
|
||||||
pub(crate) const PEER_RSA: &[u8] = &hex!("2f1fb49bb332a9eec617e41e911c33fb3890aef3");
|
pub(crate) const PEER_RSA: &[u8] = &hex!("2f1fb49bb332a9eec617e41e911c33fb3890aef3");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn test_finish() {
|
fn test_finish() {
|
||||||
let ed25519_id = [3_u8; 32].into();
|
tor_rtcompat::test_with_one_runtime!(|_rt| async move {
|
||||||
let rsa_id = [4_u8; 20].into();
|
let ed25519_id = [3_u8; 32].into();
|
||||||
let peer_addr = "127.1.1.2:443".parse().unwrap();
|
let rsa_id = [4_u8; 20].into();
|
||||||
let ver = VerifiedChannel {
|
let peer_addr = "127.1.1.2:443".parse().unwrap();
|
||||||
link_protocol: 4,
|
let ver = VerifiedChannel {
|
||||||
tls: futures_codec::Framed::new(MsgBuf::new(&b""[..]), ChannelCodec::new(4)),
|
link_protocol: 4,
|
||||||
unique_id: UniqId::new(),
|
tls: futures_codec::Framed::new(MsgBuf::new(&b""[..]), ChannelCodec::new(4)),
|
||||||
target_addr: Some(peer_addr),
|
unique_id: UniqId::new(),
|
||||||
ed25519_id,
|
target_addr: Some(peer_addr),
|
||||||
rsa_id,
|
ed25519_id,
|
||||||
};
|
rsa_id,
|
||||||
|
};
|
||||||
|
|
||||||
let (_chan, _reactor) = ver.finish().await.unwrap();
|
let (_chan, _reactor) = ver.finish().await.unwrap();
|
||||||
|
|
||||||
// TODO: check contents of netinfo cell
|
// TODO: check contents of netinfo cell
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -393,13 +393,10 @@ impl Reactor {
|
||||||
pub(crate) mod test {
|
pub(crate) mod test {
|
||||||
#![allow(clippy::unwrap_used)]
|
#![allow(clippy::unwrap_used)]
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::circuit::CircParameters;
|
||||||
use futures::sink::SinkExt;
|
use futures::sink::SinkExt;
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
use tokio::test as async_test;
|
use futures::task::SpawnExt;
|
||||||
use tokio_crate as tokio;
|
|
||||||
use tokio_crate::runtime::Handle;
|
|
||||||
|
|
||||||
use crate::circuit::CircParameters;
|
|
||||||
|
|
||||||
type CodecResult = std::result::Result<ChanCell, tor_cell::Error>;
|
type CodecResult = std::result::Result<ChanCell, tor_cell::Error>;
|
||||||
|
|
||||||
|
@ -431,318 +428,339 @@ pub(crate) mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try shutdown from inside run_once..
|
// Try shutdown from inside run_once..
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn shutdown() {
|
fn shutdown() {
|
||||||
let (chan, mut reactor, _output, _input) = new_reactor();
|
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
|
||||||
|
let (chan, mut reactor, _output, _input) = new_reactor();
|
||||||
|
|
||||||
chan.terminate();
|
chan.terminate();
|
||||||
let r = reactor.run_once().await;
|
let r = reactor.run_once().await;
|
||||||
assert!(matches!(r, Err(ReactorError::Shutdown)));
|
assert!(matches!(r, Err(ReactorError::Shutdown)));
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try shutdown while reactor is running.
|
// Try shutdown while reactor is running.
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn shutdown2() {
|
fn shutdown2() {
|
||||||
// TODO: Ask a rust person if this is how to do this.
|
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
|
||||||
use futures::future::FutureExt;
|
// TODO: Ask a rust person if this is how to do this.
|
||||||
use futures::join;
|
|
||||||
|
|
||||||
let (chan, reactor, _output, _input) = new_reactor();
|
use futures::future::FutureExt;
|
||||||
// Let's get the reactor running...
|
use futures::join;
|
||||||
let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
|
|
||||||
|
|
||||||
let rr = run_reactor.clone();
|
let (chan, reactor, _output, _input) = new_reactor();
|
||||||
|
// Let's get the reactor running...
|
||||||
|
let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
|
||||||
|
|
||||||
let exit_then_check = async {
|
let rr = run_reactor.clone();
|
||||||
assert!(rr.peek().is_none());
|
|
||||||
// ... and terminate the channel while that's happening.
|
|
||||||
chan.terminate();
|
|
||||||
};
|
|
||||||
|
|
||||||
let (rr_s, _) = join!(run_reactor, exit_then_check);
|
let exit_then_check = async {
|
||||||
|
assert!(rr.peek().is_none());
|
||||||
|
// ... and terminate the channel while that's happening.
|
||||||
|
chan.terminate();
|
||||||
|
};
|
||||||
|
|
||||||
// Now let's see. The reactor should not _still_ be running.
|
let (rr_s, _) = join!(run_reactor, exit_then_check);
|
||||||
assert!(rr_s);
|
|
||||||
|
// Now let's see. The reactor should not _still_ be running.
|
||||||
|
assert!(rr_s);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn new_circ_closed() {
|
fn new_circ_closed() {
|
||||||
let (chan, mut reactor, mut output, _input) = new_reactor();
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
|
let (chan, mut reactor, mut output, _input) = new_reactor();
|
||||||
|
|
||||||
let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
|
let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
|
||||||
let (pending, circr) = ret.unwrap();
|
let (pending, circr) = ret.unwrap();
|
||||||
Handle::current().spawn(circr.run());
|
rt.spawn(async {
|
||||||
assert!(reac.is_ok());
|
let _ignore = circr.run().await;
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
assert!(reac.is_ok());
|
||||||
|
|
||||||
let id = pending.peek_circid();
|
let id = pending.peek_circid();
|
||||||
|
|
||||||
let ent = reactor.circs.get_mut(id);
|
let ent = reactor.circs.get_mut(id);
|
||||||
assert!(matches!(ent, Some(CircEnt::Opening(_, _))));
|
assert!(matches!(ent, Some(CircEnt::Opening(_, _))));
|
||||||
// Now drop the circuit; this should tell the reactor to remove
|
// Now drop the circuit; this should tell the reactor to remove
|
||||||
// the circuit from the map.
|
// the circuit from the map.
|
||||||
drop(pending);
|
drop(pending);
|
||||||
|
|
||||||
reactor.run_once().await.unwrap();
|
reactor.run_once().await.unwrap();
|
||||||
let ent = reactor.circs.get_mut(id);
|
let ent = reactor.circs.get_mut(id);
|
||||||
assert!(matches!(ent, Some(CircEnt::DestroySent(_))));
|
assert!(matches!(ent, Some(CircEnt::DestroySent(_))));
|
||||||
let cell = output.next().await.unwrap();
|
let cell = output.next().await.unwrap();
|
||||||
assert_eq!(cell.circid(), id);
|
assert_eq!(cell.circid(), id);
|
||||||
assert!(matches!(cell.msg(), ChanMsg::Destroy(_)));
|
assert!(matches!(cell.msg(), ChanMsg::Destroy(_)));
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test proper delivery of a created cell that doesn't make a channel
|
// Test proper delivery of a created cell that doesn't make a channel
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn new_circ_create_failure() {
|
fn new_circ_create_failure() {
|
||||||
use tor_cell::chancell::msg;
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
let (chan, mut reactor, mut output, mut input) = new_reactor();
|
use tor_cell::chancell::msg;
|
||||||
|
let (chan, mut reactor, mut output, mut input) = new_reactor();
|
||||||
|
|
||||||
let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
|
let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
|
||||||
let (pending, circr) = ret.unwrap();
|
let (pending, circr) = ret.unwrap();
|
||||||
Handle::current().spawn(circr.run());
|
rt.spawn(async {
|
||||||
assert!(reac.is_ok());
|
let _ignore = circr.run().await;
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
assert!(reac.is_ok());
|
||||||
|
|
||||||
let circparams = CircParameters::default();
|
let circparams = CircParameters::default();
|
||||||
|
|
||||||
let id = pending.peek_circid();
|
let id = pending.peek_circid();
|
||||||
|
|
||||||
let ent = reactor.circs.get_mut(id);
|
let ent = reactor.circs.get_mut(id);
|
||||||
assert!(matches!(ent, Some(CircEnt::Opening(_, _))));
|
assert!(matches!(ent, Some(CircEnt::Opening(_, _))));
|
||||||
// We'll get a bad handshake result from this createdfast cell.
|
// We'll get a bad handshake result from this createdfast cell.
|
||||||
let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into());
|
let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into());
|
||||||
input.send(Ok(created_cell)).await.unwrap();
|
input.send(Ok(created_cell)).await.unwrap();
|
||||||
|
|
||||||
let (circ, reac) =
|
let (circ, reac) =
|
||||||
futures::join!(pending.create_firsthop_fast(circparams), reactor.run_once());
|
futures::join!(pending.create_firsthop_fast(circparams), reactor.run_once());
|
||||||
// Make sure statuses are as expected.
|
// Make sure statuses are as expected.
|
||||||
assert!(matches!(circ.err().unwrap(), Error::BadHandshake));
|
assert!(matches!(circ.err().unwrap(), Error::BadHandshake));
|
||||||
assert!(reac.is_ok());
|
assert!(reac.is_ok());
|
||||||
|
|
||||||
reactor.run_once().await.unwrap();
|
reactor.run_once().await.unwrap();
|
||||||
|
|
||||||
// Make sure that the createfast cell got sent
|
// Make sure that the createfast cell got sent
|
||||||
let cell_sent = output.next().await.unwrap();
|
let cell_sent = output.next().await.unwrap();
|
||||||
assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_)));
|
assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_)));
|
||||||
|
|
||||||
// The circid now counts as open, since as far as the reactor knows,
|
// The circid now counts as open, since as far as the reactor knows,
|
||||||
// it was accepted. (TODO: is this a bug?)
|
// it was accepted. (TODO: is this a bug?)
|
||||||
let ent = reactor.circs.get_mut(id);
|
let ent = reactor.circs.get_mut(id);
|
||||||
assert!(matches!(ent, Some(CircEnt::Open(_))));
|
assert!(matches!(ent, Some(CircEnt::Open(_))));
|
||||||
|
|
||||||
// But the next run if the reactor will make the circuit get closed.
|
// But the next run if the reactor will make the circuit get closed.
|
||||||
reactor.run_once().await.unwrap();
|
reactor.run_once().await.unwrap();
|
||||||
let ent = reactor.circs.get_mut(id);
|
let ent = reactor.circs.get_mut(id);
|
||||||
assert!(matches!(ent, Some(CircEnt::DestroySent(_))));
|
assert!(matches!(ent, Some(CircEnt::DestroySent(_))));
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try incoming cells that shouldn't arrive on channels.
|
// Try incoming cells that shouldn't arrive on channels.
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn bad_cells() {
|
fn bad_cells() {
|
||||||
use tor_cell::chancell::msg;
|
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
|
||||||
let (_chan, mut reactor, _output, mut input) = new_reactor();
|
use tor_cell::chancell::msg;
|
||||||
|
let (_chan, mut reactor, _output, mut input) = new_reactor();
|
||||||
|
|
||||||
// We shouldn't get create cells, ever.
|
// We shouldn't get create cells, ever.
|
||||||
let create_cell = msg::Create2::new(4, *b"hihi").into();
|
let create_cell = msg::Create2::new(4, *b"hihi").into();
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(9.into(), create_cell)))
|
.send(Ok(ChanCell::new(9.into(), create_cell)))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// shouldn't get created2 cells for nonexistent circuits
|
// shouldn't get created2 cells for nonexistent circuits
|
||||||
let created2_cell = msg::Created2::new(*b"hihi").into();
|
let created2_cell = msg::Created2::new(*b"hihi").into();
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(7.into(), created2_cell)))
|
.send(Ok(ChanCell::new(7.into(), created2_cell)))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e),
|
format!("{}", e),
|
||||||
"channel protocol violation: CREATE2 cell on client channel"
|
"channel protocol violation: CREATE2 cell on client channel"
|
||||||
);
|
);
|
||||||
|
|
||||||
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e),
|
format!("{}", e),
|
||||||
"channel protocol violation: Unexpected CREATED* cell not on opening circuit"
|
"channel protocol violation: Unexpected CREATED* cell not on opening circuit"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Can't get a relay cell on a circuit we've never heard of.
|
// Can't get a relay cell on a circuit we've never heard of.
|
||||||
let relay_cell = msg::Relay::new(b"abc").into();
|
let relay_cell = msg::Relay::new(b"abc").into();
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(4.into(), relay_cell)))
|
.send(Ok(ChanCell::new(4.into(), relay_cell)))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e),
|
format!("{}", e),
|
||||||
"channel protocol violation: Relay cell on nonexistent circuit"
|
"channel protocol violation: Relay cell on nonexistent circuit"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Can't get handshaking cells while channel is open.
|
// Can't get handshaking cells while channel is open.
|
||||||
let versions_cell = msg::Versions::new([3]).unwrap().into();
|
let versions_cell = msg::Versions::new([3]).unwrap().into();
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(0.into(), versions_cell)))
|
.send(Ok(ChanCell::new(0.into(), versions_cell)))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e),
|
format!("{}", e),
|
||||||
"channel protocol violation: VERSIONS cell after handshake is done"
|
"channel protocol violation: VERSIONS cell after handshake is done"
|
||||||
);
|
);
|
||||||
|
|
||||||
// We don't accept CREATED.
|
// We don't accept CREATED.
|
||||||
let created_cell = msg::Created::new(&b"xyzzy"[..]).into();
|
let created_cell = msg::Created::new(&b"xyzzy"[..]).into();
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(25.into(), created_cell)))
|
.send(Ok(ChanCell::new(25.into(), created_cell)))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e),
|
format!("{}", e),
|
||||||
"channel protocol violation: CREATED cell received, but we never send CREATEs"
|
"channel protocol violation: CREATED cell received, but we never send CREATEs"
|
||||||
);
|
);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn deliver_relay() {
|
fn deliver_relay() {
|
||||||
use crate::circuit::celltypes::ClientCircChanMsg;
|
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
|
||||||
use futures::channel::oneshot;
|
use crate::circuit::celltypes::ClientCircChanMsg;
|
||||||
use tor_cell::chancell::msg;
|
use futures::channel::oneshot;
|
||||||
|
use tor_cell::chancell::msg;
|
||||||
|
|
||||||
let (_chan, mut reactor, _output, mut input) = new_reactor();
|
let (_chan, mut reactor, _output, mut input) = new_reactor();
|
||||||
|
|
||||||
let (_circ_stream_7, mut circ_stream_13) = {
|
let (_circ_stream_7, mut circ_stream_13) = {
|
||||||
let (snd1, _rcv1) = oneshot::channel();
|
let (snd1, _rcv1) = oneshot::channel();
|
||||||
let (snd2, rcv2) = mpsc::channel(64);
|
let (snd2, rcv2) = mpsc::channel(64);
|
||||||
reactor
|
reactor
|
||||||
.circs
|
.circs
|
||||||
.put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
|
.put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
|
||||||
|
|
||||||
let (snd3, rcv3) = mpsc::channel(64);
|
let (snd3, rcv3) = mpsc::channel(64);
|
||||||
reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
|
reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
|
||||||
|
|
||||||
reactor
|
reactor
|
||||||
.circs
|
.circs
|
||||||
.put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
|
.put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
|
||||||
(rcv2, rcv3)
|
(rcv2, rcv3)
|
||||||
};
|
};
|
||||||
|
|
||||||
// If a relay cell is sent on an open channel, the correct circuit
|
// If a relay cell is sent on an open channel, the correct circuit
|
||||||
// should get it.
|
// should get it.
|
||||||
let relaycell: ChanMsg = msg::Relay::new(b"do you suppose").into();
|
let relaycell: ChanMsg = msg::Relay::new(b"do you suppose").into();
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(13.into(), relaycell.clone())))
|
.send(Ok(ChanCell::new(13.into(), relaycell.clone())))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
reactor.run_once().await.unwrap();
|
reactor.run_once().await.unwrap();
|
||||||
let got = circ_stream_13.next().await.unwrap();
|
let got = circ_stream_13.next().await.unwrap();
|
||||||
assert!(matches!(got, ClientCircChanMsg::Relay(_)));
|
assert!(matches!(got, ClientCircChanMsg::Relay(_)));
|
||||||
|
|
||||||
// If a relay cell is sent on an opening channel, that's an error.
|
// If a relay cell is sent on an opening channel, that's an error.
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(7.into(), relaycell.clone())))
|
.send(Ok(ChanCell::new(7.into(), relaycell.clone())))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e),
|
format!("{}", e),
|
||||||
"channel protocol violation: Relay cell on pending circuit before CREATED* received"
|
"channel protocol violation: Relay cell on pending circuit before CREATED* received"
|
||||||
);
|
);
|
||||||
|
|
||||||
// If a relay cell is sent on a non-existent channel, that's an error.
|
// If a relay cell is sent on a non-existent channel, that's an error.
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(101.into(), relaycell.clone())))
|
.send(Ok(ChanCell::new(101.into(), relaycell.clone())))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e),
|
format!("{}", e),
|
||||||
"channel protocol violation: Relay cell on nonexistent circuit"
|
"channel protocol violation: Relay cell on nonexistent circuit"
|
||||||
);
|
);
|
||||||
|
|
||||||
// It's fine to get a relay cell on a DestroySent channel: that happens
|
// It's fine to get a relay cell on a DestroySent channel: that happens
|
||||||
// when the other side hasn't noticed the Destroy yet.
|
// when the other side hasn't noticed the Destroy yet.
|
||||||
|
|
||||||
// We can do this 25 more times according to our setup:
|
// We can do this 25 more times according to our setup:
|
||||||
for _ in 0..25 {
|
for _ in 0..25 {
|
||||||
|
input
|
||||||
|
.send(Ok(ChanCell::new(23.into(), relaycell.clone())))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
reactor.run_once().await.unwrap(); // should be fine.
|
||||||
|
}
|
||||||
|
|
||||||
|
// This one will fail.
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(23.into(), relaycell.clone())))
|
.send(Ok(ChanCell::new(23.into(), relaycell.clone())))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
reactor.run_once().await.unwrap(); // should be fine.
|
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
||||||
}
|
assert_eq!(
|
||||||
|
format!("{}", e),
|
||||||
// This one will fail.
|
"channel protocol violation: Too many cells received on destroyed circuit"
|
||||||
input
|
);
|
||||||
.send(Ok(ChanCell::new(23.into(), relaycell.clone())))
|
})
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
|
||||||
assert_eq!(
|
|
||||||
format!("{}", e),
|
|
||||||
"channel protocol violation: Too many cells received on destroyed circuit"
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn deliver_destroy() {
|
fn deliver_destroy() {
|
||||||
use crate::circuit::celltypes::*;
|
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
|
||||||
use futures::channel::oneshot;
|
use crate::circuit::celltypes::*;
|
||||||
use tor_cell::chancell::msg;
|
use futures::channel::oneshot;
|
||||||
|
use tor_cell::chancell::msg;
|
||||||
|
|
||||||
let (_chan, mut reactor, _output, mut input) = new_reactor();
|
let (_chan, mut reactor, _output, mut input) = new_reactor();
|
||||||
|
|
||||||
let (circ_oneshot_7, mut circ_stream_13) = {
|
let (circ_oneshot_7, mut circ_stream_13) = {
|
||||||
let (snd1, rcv1) = oneshot::channel();
|
let (snd1, rcv1) = oneshot::channel();
|
||||||
let (snd2, _rcv2) = mpsc::channel(64);
|
let (snd2, _rcv2) = mpsc::channel(64);
|
||||||
reactor
|
reactor
|
||||||
.circs
|
.circs
|
||||||
.put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
|
.put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
|
||||||
|
|
||||||
let (snd3, rcv3) = mpsc::channel(64);
|
let (snd3, rcv3) = mpsc::channel(64);
|
||||||
reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
|
reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
|
||||||
|
|
||||||
reactor
|
reactor
|
||||||
.circs
|
.circs
|
||||||
.put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
|
.put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
|
||||||
(rcv1, rcv3)
|
(rcv1, rcv3)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Destroying an opening circuit is fine.
|
// Destroying an opening circuit is fine.
|
||||||
let destroycell: ChanMsg = msg::Destroy::new(0.into()).into();
|
let destroycell: ChanMsg = msg::Destroy::new(0.into()).into();
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(7.into(), destroycell.clone())))
|
.send(Ok(ChanCell::new(7.into(), destroycell.clone())))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
reactor.run_once().await.unwrap();
|
reactor.run_once().await.unwrap();
|
||||||
let msg = circ_oneshot_7.await;
|
let msg = circ_oneshot_7.await;
|
||||||
assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
|
assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
|
||||||
|
|
||||||
// Destroying an open circuit is fine.
|
// Destroying an open circuit is fine.
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(13.into(), destroycell.clone())))
|
.send(Ok(ChanCell::new(13.into(), destroycell.clone())))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
reactor.run_once().await.unwrap();
|
reactor.run_once().await.unwrap();
|
||||||
let msg = circ_stream_13.next().await.unwrap();
|
let msg = circ_stream_13.next().await.unwrap();
|
||||||
assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
|
assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
|
||||||
|
|
||||||
// Destroying a DestroySent circuit is fine.
|
// Destroying a DestroySent circuit is fine.
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(23.into(), destroycell.clone())))
|
.send(Ok(ChanCell::new(23.into(), destroycell.clone())))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
reactor.run_once().await.unwrap();
|
reactor.run_once().await.unwrap();
|
||||||
|
|
||||||
// Destroying a nonexistent circuit is an error.
|
// Destroying a nonexistent circuit is an error.
|
||||||
input
|
input
|
||||||
.send(Ok(ChanCell::new(101.into(), destroycell.clone())))
|
.send(Ok(ChanCell::new(101.into(), destroycell.clone())))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
let e = reactor.run_once().await.unwrap_err().unwrap_err();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{}", e),
|
format!("{}", e),
|
||||||
"channel protocol violation: Destroy for nonexistent circuit"
|
"channel protocol violation: Destroy for nonexistent circuit"
|
||||||
);
|
);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -614,15 +614,14 @@ mod test {
|
||||||
use futures::io::{AsyncReadExt, AsyncWriteExt};
|
use futures::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use futures::sink::SinkExt;
|
use futures::sink::SinkExt;
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
|
use futures::task::SpawnExt;
|
||||||
use hex_literal::hex;
|
use hex_literal::hex;
|
||||||
use rand::thread_rng;
|
use rand::thread_rng;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::runtime::Handle;
|
|
||||||
use tokio_crate as tokio;
|
|
||||||
use tokio_crate::test as async_test;
|
|
||||||
use tor_cell::chancell::{msg as chanmsg, ChanCell};
|
use tor_cell::chancell::{msg as chanmsg, ChanCell};
|
||||||
use tor_cell::relaycell::{msg as relaymsg, RelayCell, StreamId};
|
use tor_cell::relaycell::{msg as relaymsg, RelayCell, StreamId};
|
||||||
use tor_llcrypto::pk;
|
use tor_llcrypto::pk;
|
||||||
|
use tor_rtcompat::{Runtime, SleepProvider};
|
||||||
use tracing::trace;
|
use tracing::trace;
|
||||||
|
|
||||||
fn rmsg_to_ccmsg<ID>(id: ID, msg: relaymsg::RelayMsg) -> ClientCircChanMsg
|
fn rmsg_to_ccmsg<ID>(id: ID, msg: relaymsg::RelayMsg) -> ClientCircChanMsg
|
||||||
|
@ -680,23 +679,28 @@ mod test {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn working_fake_channel() -> (
|
fn working_fake_channel<R: Runtime>(
|
||||||
|
rt: &R,
|
||||||
|
) -> (
|
||||||
Channel,
|
Channel,
|
||||||
Receiver<ChanCell>,
|
Receiver<ChanCell>,
|
||||||
Sender<std::result::Result<ChanCell, tor_cell::Error>>,
|
Sender<std::result::Result<ChanCell, tor_cell::Error>>,
|
||||||
) {
|
) {
|
||||||
let (channel, chan_reactor, rx, tx) = new_reactor();
|
let (channel, chan_reactor, rx, tx) = new_reactor();
|
||||||
Handle::current().spawn(chan_reactor.run());
|
rt.spawn(async {
|
||||||
|
let _ignore = chan_reactor.run().await;
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
(channel, rx, tx)
|
(channel, rx, tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test_create(fast: bool) {
|
async fn test_create<R: Runtime>(rt: &R, fast: bool) {
|
||||||
// We want to try progressing from a pending circuit to a circuit
|
// We want to try progressing from a pending circuit to a circuit
|
||||||
// via a crate_fast handshake.
|
// via a crate_fast handshake.
|
||||||
|
|
||||||
use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
|
use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
|
||||||
|
|
||||||
let (chan, mut rx, _sink) = working_fake_channel();
|
let (chan, mut rx, _sink) = working_fake_channel(rt);
|
||||||
let circid = 128.into();
|
let circid = 128.into();
|
||||||
let (created_send, created_recv) = oneshot::channel();
|
let (created_send, created_recv) = oneshot::channel();
|
||||||
let (_circmsg_send, circmsg_recv) = mpsc::channel(64);
|
let (_circmsg_send, circmsg_recv) = mpsc::channel(64);
|
||||||
|
@ -705,7 +709,10 @@ mod test {
|
||||||
let (pending, reactor) =
|
let (pending, reactor) =
|
||||||
PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id);
|
PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id);
|
||||||
|
|
||||||
Handle::current().spawn(reactor.run());
|
rt.spawn(async {
|
||||||
|
let _ignore = reactor.run().await;
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// Future to pretend to be a relay on the other end of the circuit.
|
// Future to pretend to be a relay on the other end of the circuit.
|
||||||
let simulate_relay_fut = async move {
|
let simulate_relay_fut = async move {
|
||||||
|
@ -756,13 +763,17 @@ mod test {
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn test_create_fast() {
|
fn test_create_fast() {
|
||||||
test_create(true).await
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
|
test_create(&rt, true).await;
|
||||||
|
})
|
||||||
}
|
}
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn test_create_ntor() {
|
fn test_create_ntor() {
|
||||||
test_create(false).await
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
|
test_create(&rt, false).await;
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// An encryption layer that doesn't do any crypto. Can be used
|
// An encryption layer that doesn't do any crypto. Can be used
|
||||||
|
@ -811,7 +822,8 @@ mod test {
|
||||||
|
|
||||||
// Helper: set up a 3-hop circuit with no encryption, where the
|
// Helper: set up a 3-hop circuit with no encryption, where the
|
||||||
// next inbound message seems to come from hop next_msg_from
|
// next inbound message seems to come from hop next_msg_from
|
||||||
async fn newcirc_ext(
|
async fn newcirc_ext<R: Runtime>(
|
||||||
|
rt: &R,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
next_msg_from: HopNum,
|
next_msg_from: HopNum,
|
||||||
) -> (ClientCirc, mpsc::Sender<ClientCircChanMsg>) {
|
) -> (ClientCirc, mpsc::Sender<ClientCircChanMsg>) {
|
||||||
|
@ -823,7 +835,10 @@ mod test {
|
||||||
let (pending, reactor) =
|
let (pending, reactor) =
|
||||||
PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id);
|
PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id);
|
||||||
|
|
||||||
Handle::current().spawn(reactor.run());
|
rt.spawn(async {
|
||||||
|
let _ignore = reactor.run().await;
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let PendingClientCirc {
|
let PendingClientCirc {
|
||||||
circ,
|
circ,
|
||||||
|
@ -850,33 +865,38 @@ mod test {
|
||||||
|
|
||||||
// Helper: set up a 3-hop circuit with no encryption, where the
|
// Helper: set up a 3-hop circuit with no encryption, where the
|
||||||
// next inbound message seems to come from hop next_msg_from
|
// next inbound message seems to come from hop next_msg_from
|
||||||
async fn newcirc(chan: Channel) -> (ClientCirc, mpsc::Sender<ClientCircChanMsg>) {
|
async fn newcirc<R: Runtime>(
|
||||||
newcirc_ext(chan, 2.into()).await
|
rt: &R,
|
||||||
|
chan: Channel,
|
||||||
|
) -> (ClientCirc, mpsc::Sender<ClientCircChanMsg>) {
|
||||||
|
newcirc_ext(rt, chan, 2.into()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try sending a cell via send_relay_cell
|
// Try sending a cell via send_relay_cell
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn send_simple() {
|
fn send_simple() {
|
||||||
let (chan, mut rx, _sink) = working_fake_channel();
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
let (circ, _send) = newcirc(chan).await;
|
let (chan, mut rx, _sink) = working_fake_channel(&rt);
|
||||||
let begindir = RelayCell::new(0.into(), RelayMsg::BeginDir);
|
let (circ, _send) = newcirc(&rt, chan).await;
|
||||||
circ.control
|
let begindir = RelayCell::new(0.into(), RelayMsg::BeginDir);
|
||||||
.unbounded_send(CtrlMsg::SendRelayCell {
|
circ.control
|
||||||
hop: 2.into(),
|
.unbounded_send(CtrlMsg::SendRelayCell {
|
||||||
early: false,
|
hop: 2.into(),
|
||||||
cell: begindir,
|
early: false,
|
||||||
})
|
cell: begindir,
|
||||||
.unwrap();
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
// Here's what we tried to put on the TLS channel. Note that
|
// Here's what we tried to put on the TLS channel. Note that
|
||||||
// we're using dummy relay crypto for testing convenience.
|
// we're using dummy relay crypto for testing convenience.
|
||||||
let rcvd = rx.next().await.unwrap();
|
let rcvd = rx.next().await.unwrap();
|
||||||
assert_eq!(rcvd.circid(), 128.into());
|
assert_eq!(rcvd.circid(), 128.into());
|
||||||
let m = match rcvd.into_circid_and_msg().1 {
|
let m = match rcvd.into_circid_and_msg().1 {
|
||||||
ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
|
ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
|
||||||
_ => panic!(),
|
_ => panic!(),
|
||||||
};
|
};
|
||||||
assert!(matches!(m.msg(), RelayMsg::BeginDir));
|
assert!(matches!(m.msg(), RelayMsg::BeginDir));
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE(eta): this test is commented out because it basically tested implementation details
|
// NOTE(eta): this test is commented out because it basically tested implementation details
|
||||||
|
@ -945,49 +965,55 @@ mod test {
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn extend() {
|
fn extend() {
|
||||||
use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
|
use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
|
||||||
|
|
||||||
let (chan, mut rx, _sink) = working_fake_channel();
|
let (chan, mut rx, _sink) = working_fake_channel(&rt);
|
||||||
let (circ, mut sink) = newcirc(chan).await;
|
let (circ, mut sink) = newcirc(&rt, chan).await;
|
||||||
let params = CircParameters::default();
|
let params = CircParameters::default();
|
||||||
|
|
||||||
let extend_fut = async move {
|
let extend_fut = async move {
|
||||||
let target = example_target();
|
let target = example_target();
|
||||||
circ.extend_ntor(&target, ¶ms).await.unwrap();
|
circ.extend_ntor(&target, ¶ms).await.unwrap();
|
||||||
circ // gotta keep the circ alive, or the reactor would exit.
|
circ // gotta keep the circ alive, or the reactor would exit.
|
||||||
};
|
|
||||||
let reply_fut = async move {
|
|
||||||
// We've disabled encryption on this circuit, so we can just
|
|
||||||
// read the extend2 cell.
|
|
||||||
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
|
|
||||||
assert_eq!(id, 128.into());
|
|
||||||
let rmsg = match chmsg {
|
|
||||||
ChanMsg::RelayEarly(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
|
|
||||||
_ => panic!(),
|
|
||||||
};
|
};
|
||||||
let e2 = match rmsg.msg() {
|
let reply_fut = async move {
|
||||||
RelayMsg::Extend2(e2) => e2,
|
// We've disabled encryption on this circuit, so we can just
|
||||||
_ => panic!(),
|
// read the extend2 cell.
|
||||||
|
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
|
||||||
|
assert_eq!(id, 128.into());
|
||||||
|
let rmsg = match chmsg {
|
||||||
|
ChanMsg::RelayEarly(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
|
||||||
|
_ => panic!(),
|
||||||
|
};
|
||||||
|
let e2 = match rmsg.msg() {
|
||||||
|
RelayMsg::Extend2(e2) => e2,
|
||||||
|
_ => panic!(),
|
||||||
|
};
|
||||||
|
let mut rng = thread_rng();
|
||||||
|
let (_, reply) =
|
||||||
|
NtorServer::server(&mut rng, &[example_ntor_key()], e2.handshake()).unwrap();
|
||||||
|
let extended2 = relaymsg::Extended2::new(reply).into();
|
||||||
|
sink.send(rmsg_to_ccmsg(0, extended2)).await.unwrap();
|
||||||
|
sink // gotta keep the sink alive, or the reactor will exit.
|
||||||
};
|
};
|
||||||
let mut rng = thread_rng();
|
|
||||||
let (_, reply) =
|
|
||||||
NtorServer::server(&mut rng, &[example_ntor_key()], e2.handshake()).unwrap();
|
|
||||||
let extended2 = relaymsg::Extended2::new(reply).into();
|
|
||||||
sink.send(rmsg_to_ccmsg(0, extended2)).await.unwrap();
|
|
||||||
sink // gotta keep the sink alive, or the reactor will exit.
|
|
||||||
};
|
|
||||||
|
|
||||||
let (circ, _) = futures::join!(extend_fut, reply_fut);
|
let (circ, _) = futures::join!(extend_fut, reply_fut);
|
||||||
|
|
||||||
// Did we really add another hop?
|
// Did we really add another hop?
|
||||||
assert_eq!(circ.n_hops(), 4);
|
assert_eq!(circ.n_hops(), 4);
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn bad_extend_test_impl(reply_hop: HopNum, bad_reply: ClientCircChanMsg) -> Error {
|
async fn bad_extend_test_impl<R: Runtime>(
|
||||||
let (chan, _rx, _sink) = working_fake_channel();
|
rt: &R,
|
||||||
let (circ, mut sink) = newcirc_ext(chan, reply_hop).await;
|
reply_hop: HopNum,
|
||||||
|
bad_reply: ClientCircChanMsg,
|
||||||
|
) -> Error {
|
||||||
|
let (chan, _rx, _sink) = working_fake_channel(rt);
|
||||||
|
let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await;
|
||||||
let params = CircParameters::default();
|
let params = CircParameters::default();
|
||||||
|
|
||||||
let extend_fut = async move {
|
let extend_fut = async move {
|
||||||
|
@ -1006,121 +1032,132 @@ mod test {
|
||||||
outcome.unwrap_err()
|
outcome.unwrap_err()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn bad_extend_wronghop() {
|
fn bad_extend_wronghop() {
|
||||||
let extended2 = relaymsg::Extended2::new(vec![]).into();
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
let cc = rmsg_to_ccmsg(0, extended2);
|
let extended2 = relaymsg::Extended2::new(vec![]).into();
|
||||||
|
let cc = rmsg_to_ccmsg(0, extended2);
|
||||||
|
|
||||||
let error = bad_extend_test_impl(1.into(), cc).await;
|
let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
|
||||||
// This case shows up as a CircDestroy, since a message sent
|
// This case shows up as a CircDestroy, since a message sent
|
||||||
// from the wrong hop won't even be delivered to the extend
|
// from the wrong hop won't even be delivered to the extend
|
||||||
// code's meta-handler. Instead the unexpected message will cause
|
// code's meta-handler. Instead the unexpected message will cause
|
||||||
// the circuit to get torn down.
|
// the circuit to get torn down.
|
||||||
match error {
|
match error {
|
||||||
Error::CircuitClosed => {}
|
Error::CircuitClosed => {}
|
||||||
x => panic!("got other error: {}", x),
|
x => panic!("got other error: {}", x),
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_test]
|
|
||||||
async fn bad_extend_wrongtype() {
|
|
||||||
let extended = relaymsg::Extended::new(vec![7; 200]).into();
|
|
||||||
let cc = rmsg_to_ccmsg(0, extended);
|
|
||||||
|
|
||||||
let error = bad_extend_test_impl(2.into(), cc).await;
|
|
||||||
match error {
|
|
||||||
Error::CircProto(s) => {
|
|
||||||
assert_eq!(s, "wanted EXTENDED2; got EXTENDED")
|
|
||||||
}
|
}
|
||||||
_ => panic!(),
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn bad_extend_destroy() {
|
fn bad_extend_wrongtype() {
|
||||||
let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
let error = bad_extend_test_impl(2.into(), cc).await;
|
let extended = relaymsg::Extended::new(vec![7; 200]).into();
|
||||||
match error {
|
let cc = rmsg_to_ccmsg(0, extended);
|
||||||
Error::CircuitClosed => {}
|
|
||||||
_ => panic!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_test]
|
let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
|
||||||
async fn bad_extend_crypto() {
|
match error {
|
||||||
let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
|
Error::CircProto(s) => {
|
||||||
let cc = rmsg_to_ccmsg(0, extended2);
|
assert_eq!(s, "wanted EXTENDED2; got EXTENDED")
|
||||||
let error = bad_extend_test_impl(2.into(), cc).await;
|
}
|
||||||
assert!(matches!(error, Error::BadHandshake));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_test]
|
|
||||||
async fn begindir() {
|
|
||||||
let (chan, mut rx, _sink) = working_fake_channel();
|
|
||||||
let (circ, mut sink) = newcirc(chan).await;
|
|
||||||
|
|
||||||
let begin_and_send_fut = async move {
|
|
||||||
// Here we'll say we've got a circuit, and we want to
|
|
||||||
// make a simple BEGINDIR request with it.
|
|
||||||
let mut stream = Arc::new(circ).begin_dir_stream().await.unwrap();
|
|
||||||
stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
|
|
||||||
stream.flush().await.unwrap();
|
|
||||||
let mut buf = [0_u8; 1024];
|
|
||||||
let n = stream.read(&mut buf).await.unwrap();
|
|
||||||
assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
|
|
||||||
let n = stream.read(&mut buf).await.unwrap();
|
|
||||||
assert_eq!(n, 0);
|
|
||||||
stream
|
|
||||||
};
|
|
||||||
let reply_fut = async move {
|
|
||||||
// We've disabled encryption on this circuit, so we can just
|
|
||||||
// read the begindir cell.
|
|
||||||
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
|
|
||||||
assert_eq!(id, 128.into()); // hardcoded circid.
|
|
||||||
let rmsg = match chmsg {
|
|
||||||
ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
|
|
||||||
_ => panic!(),
|
_ => panic!(),
|
||||||
};
|
|
||||||
let (streamid, rmsg) = rmsg.into_streamid_and_msg();
|
|
||||||
assert!(matches!(rmsg, RelayMsg::BeginDir));
|
|
||||||
|
|
||||||
// Reply with a Connected cell to indicate success.
|
|
||||||
let connected = relaymsg::Connected::new_empty().into();
|
|
||||||
sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
|
|
||||||
|
|
||||||
// Now read a DATA cell...
|
|
||||||
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
|
|
||||||
assert_eq!(id, 128.into());
|
|
||||||
let rmsg = match chmsg {
|
|
||||||
ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
|
|
||||||
_ => panic!(),
|
|
||||||
};
|
|
||||||
let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
|
|
||||||
assert_eq!(streamid_2, streamid);
|
|
||||||
if let RelayMsg::Data(d) = rmsg {
|
|
||||||
assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
|
|
||||||
} else {
|
|
||||||
panic!();
|
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Write another data cell in reply!
|
#[test]
|
||||||
let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
|
fn bad_extend_destroy() {
|
||||||
.unwrap()
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
.into();
|
let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
|
||||||
sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
|
let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
|
||||||
|
match error {
|
||||||
|
Error::CircuitClosed => {}
|
||||||
|
_ => panic!(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Send an END cell to say that the conversation is over.
|
#[test]
|
||||||
let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
|
fn bad_extend_crypto() {
|
||||||
sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
|
let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
|
||||||
|
let cc = rmsg_to_ccmsg(0, extended2);
|
||||||
|
let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
|
||||||
|
assert!(matches!(error, Error::BadHandshake));
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
sink // gotta keep the sink alive, or the reactor will exit.
|
#[test]
|
||||||
};
|
fn begindir() {
|
||||||
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
|
let (chan, mut rx, _sink) = working_fake_channel(&rt);
|
||||||
|
let (circ, mut sink) = newcirc(&rt, chan).await;
|
||||||
|
|
||||||
let (_stream, _) = futures::join!(begin_and_send_fut, reply_fut);
|
let begin_and_send_fut = async move {
|
||||||
|
// Here we'll say we've got a circuit, and we want to
|
||||||
|
// make a simple BEGINDIR request with it.
|
||||||
|
let mut stream = Arc::new(circ).begin_dir_stream().await.unwrap();
|
||||||
|
stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
|
||||||
|
stream.flush().await.unwrap();
|
||||||
|
let mut buf = [0_u8; 1024];
|
||||||
|
let n = stream.read(&mut buf).await.unwrap();
|
||||||
|
assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
|
||||||
|
let n = stream.read(&mut buf).await.unwrap();
|
||||||
|
assert_eq!(n, 0);
|
||||||
|
stream
|
||||||
|
};
|
||||||
|
let reply_fut = async move {
|
||||||
|
// We've disabled encryption on this circuit, so we can just
|
||||||
|
// read the begindir cell.
|
||||||
|
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
|
||||||
|
assert_eq!(id, 128.into()); // hardcoded circid.
|
||||||
|
let rmsg = match chmsg {
|
||||||
|
ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
|
||||||
|
_ => panic!(),
|
||||||
|
};
|
||||||
|
let (streamid, rmsg) = rmsg.into_streamid_and_msg();
|
||||||
|
assert!(matches!(rmsg, RelayMsg::BeginDir));
|
||||||
|
|
||||||
|
// Reply with a Connected cell to indicate success.
|
||||||
|
let connected = relaymsg::Connected::new_empty().into();
|
||||||
|
sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
|
||||||
|
|
||||||
|
// Now read a DATA cell...
|
||||||
|
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
|
||||||
|
assert_eq!(id, 128.into());
|
||||||
|
let rmsg = match chmsg {
|
||||||
|
ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
|
||||||
|
_ => panic!(),
|
||||||
|
};
|
||||||
|
let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
|
||||||
|
assert_eq!(streamid_2, streamid);
|
||||||
|
if let RelayMsg::Data(d) = rmsg {
|
||||||
|
assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
|
||||||
|
} else {
|
||||||
|
panic!();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write another data cell in reply!
|
||||||
|
let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
|
||||||
|
.unwrap()
|
||||||
|
.into();
|
||||||
|
sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
|
||||||
|
|
||||||
|
// Send an END cell to say that the conversation is over.
|
||||||
|
let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
|
||||||
|
sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
|
||||||
|
|
||||||
|
sink // gotta keep the sink alive, or the reactor will exit.
|
||||||
|
};
|
||||||
|
|
||||||
|
let (_stream, _) = futures::join!(begin_and_send_fut, reply_fut);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set up a circuit and stream that expects some incoming SENDMEs.
|
// Set up a circuit and stream that expects some incoming SENDMEs.
|
||||||
async fn setup_incoming_sendme_case(
|
async fn setup_incoming_sendme_case<R: Runtime>(
|
||||||
|
rt: &R,
|
||||||
n_to_send: usize,
|
n_to_send: usize,
|
||||||
) -> (
|
) -> (
|
||||||
ClientCirc,
|
ClientCirc,
|
||||||
|
@ -1131,8 +1168,8 @@ mod test {
|
||||||
Receiver<ChanCell>,
|
Receiver<ChanCell>,
|
||||||
Sender<std::result::Result<ChanCell, tor_cell::Error>>,
|
Sender<std::result::Result<ChanCell, tor_cell::Error>>,
|
||||||
) {
|
) {
|
||||||
let (chan, mut rx, sink2) = working_fake_channel();
|
let (chan, mut rx, sink2) = working_fake_channel(rt);
|
||||||
let (circ, mut sink) = newcirc(chan).await;
|
let (circ, mut sink) = newcirc(rt, chan).await;
|
||||||
|
|
||||||
let circ_clone = Arc::new(circ.clone());
|
let circ_clone = Arc::new(circ.clone());
|
||||||
let begin_and_send_fut = async move {
|
let begin_and_send_fut = async move {
|
||||||
|
@ -1195,96 +1232,103 @@ mod test {
|
||||||
(circ, stream, sink, streamid, cells_received, rx, sink2)
|
(circ, stream, sink, streamid, cells_received, rx, sink2)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn accept_valid_sendme() {
|
fn accept_valid_sendme() {
|
||||||
let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
setup_incoming_sendme_case(300 * 498 + 3).await;
|
let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
|
||||||
|
setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
|
||||||
|
|
||||||
assert_eq!(cells_received, 301);
|
assert_eq!(cells_received, 301);
|
||||||
|
|
||||||
// Make sure that the circuit is indeed expecting the right sendmes
|
// Make sure that the circuit is indeed expecting the right sendmes
|
||||||
{
|
{
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
circ.control
|
circ.control
|
||||||
.unbounded_send(CtrlMsg::QuerySendWindow {
|
.unbounded_send(CtrlMsg::QuerySendWindow {
|
||||||
hop: 2.into(),
|
hop: 2.into(),
|
||||||
done: tx,
|
done: tx,
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let (window, tags) = rx.await.unwrap().unwrap();
|
let (window, tags) = rx.await.unwrap().unwrap();
|
||||||
assert_eq!(window, 1000 - 301);
|
assert_eq!(window, 1000 - 301);
|
||||||
assert_eq!(tags.len(), 3);
|
assert_eq!(tags.len(), 3);
|
||||||
// 100
|
// 100
|
||||||
assert_eq!(tags[0], hex!("6400000000000000000000000000000000000000"));
|
assert_eq!(tags[0], hex!("6400000000000000000000000000000000000000"));
|
||||||
// 200
|
// 200
|
||||||
assert_eq!(tags[1], hex!("c800000000000000000000000000000000000000"));
|
assert_eq!(tags[1], hex!("c800000000000000000000000000000000000000"));
|
||||||
// 300
|
// 300
|
||||||
assert_eq!(tags[2], hex!("2c01000000000000000000000000000000000000"));
|
assert_eq!(tags[2], hex!("2c01000000000000000000000000000000000000"));
|
||||||
}
|
}
|
||||||
|
|
||||||
let reply_with_sendme_fut = async move {
|
let reply_with_sendme_fut = async move {
|
||||||
// make and send a circuit-level sendme.
|
// make and send a circuit-level sendme.
|
||||||
let c_sendme =
|
let c_sendme =
|
||||||
relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000")).into();
|
relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
|
||||||
sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap();
|
.into();
|
||||||
|
sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap();
|
||||||
|
|
||||||
// Make and send a stream-level sendme.
|
// Make and send a stream-level sendme.
|
||||||
let s_sendme = relaymsg::Sendme::new_empty().into();
|
let s_sendme = relaymsg::Sendme::new_empty().into();
|
||||||
sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
|
sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
|
||||||
|
|
||||||
sink
|
sink
|
||||||
};
|
};
|
||||||
|
|
||||||
let _sink = reply_with_sendme_fut.await;
|
let _sink = reply_with_sendme_fut.await;
|
||||||
|
|
||||||
// FIXME(eta): this is a hacky way of waiting for the reactor to run before doing the below
|
// FIXME(eta): this is a hacky way of waiting for the reactor to run before doing the below
|
||||||
// query; should find some way to properly synchronize to avoid flakiness
|
// query; should find some way to properly synchronize to avoid flakiness
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
rt.sleep(Duration::from_millis(100)).await;
|
||||||
// Now make sure that the circuit is still happy, and its
|
// Now make sure that the circuit is still happy, and its
|
||||||
// window is updated.
|
// window is updated.
|
||||||
{
|
{
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
circ.control
|
circ.control
|
||||||
.unbounded_send(CtrlMsg::QuerySendWindow {
|
.unbounded_send(CtrlMsg::QuerySendWindow {
|
||||||
hop: 2.into(),
|
hop: 2.into(),
|
||||||
done: tx,
|
done: tx,
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let (window, _tags) = rx.await.unwrap().unwrap();
|
let (window, _tags) = rx.await.unwrap().unwrap();
|
||||||
assert_eq!(window, 1000 - 201);
|
assert_eq!(window, 1000 - 201);
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_test]
|
#[test]
|
||||||
async fn invalid_circ_sendme() {
|
fn invalid_circ_sendme() {
|
||||||
// Same setup as accept_valid_sendme() test above but try giving
|
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
|
||||||
// a sendme with the wrong tag.
|
// Same setup as accept_valid_sendme() test above but try giving
|
||||||
|
// a sendme with the wrong tag.
|
||||||
|
|
||||||
let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
|
let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
|
||||||
setup_incoming_sendme_case(300 * 498 + 3).await;
|
setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
|
||||||
|
|
||||||
let reply_with_sendme_fut = async move {
|
let reply_with_sendme_fut = async move {
|
||||||
// make and send a circuit-level sendme with a bad tag.
|
// make and send a circuit-level sendme with a bad tag.
|
||||||
let c_sendme =
|
let c_sendme =
|
||||||
relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF")).into();
|
relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
|
||||||
sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap();
|
.into();
|
||||||
sink
|
sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap();
|
||||||
};
|
sink
|
||||||
|
};
|
||||||
|
|
||||||
let _sink = reply_with_sendme_fut.await;
|
let _sink = reply_with_sendme_fut.await;
|
||||||
|
|
||||||
let mut tries = 0;
|
let mut tries = 0;
|
||||||
// FIXME(eta): we aren't testing the error message like we used to; however, we can at least
|
// FIXME(eta): we aren't testing the error message like we used to; however, we can at least
|
||||||
// check whether the reactor dies as a result of receiving invalid data.
|
// check whether the reactor dies as a result of receiving invalid data.
|
||||||
while !circ.control.is_closed() {
|
while !circ.control.is_closed() {
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
// TODO: Don't sleep in tests.
|
||||||
tries += 1;
|
rt.sleep(Duration::from_millis(100)).await;
|
||||||
if tries > 10 {
|
tries += 1;
|
||||||
panic!("reactor continued running after invalid sendme");
|
if tries > 10 {
|
||||||
|
panic!("reactor continued running after invalid sendme");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: check that the circuit is shut down too
|
// TODO: check that the circuit is shut down too
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in New Issue