tor-cell: Rename RelayMsg and RelayCell-related types.

Thanks to rust-analyzer for making this simple.
This commit is contained in:
Nick Mathewson 2023-02-07 09:18:17 -05:00
parent d99c130679
commit d63d7926bd
16 changed files with 174 additions and 164 deletions

View File

@ -5,3 +5,4 @@ BREAKING: Renamed VPadding to Vpadding, for consistent snake case.
BREAKING: Moved ChanMsg methods into a trait.
BREAKING: Moved RelayMsg methods into a trait.
BREAKING: Renamed ChanCell->AnyChanCell, ChanMsg->AnyChanMsg.
BREAKING: Renamed RelayCell->AnyRelayCell, RelayMsg->AnyRelayMsg.

View File

@ -178,13 +178,13 @@ impl StreamId {
}
/// A decoded and parsed relay cell of unrestricted type.
pub type RelayCell = RestrictedRelayCell<msg::RelayMsg>;
pub type AnyRelayCell = RelayCell<msg::AnyRelayMsg>;
/// Trait implemented by anything that can serve as a relay message.
///
/// Typically, this will be [`RelayMsg`] (to represent an unrestricted relay
/// message), or a restricted subset of `RelayMsg`.
pub trait RelayMsgClass {
pub trait RelayMsg {
/// Return the stream command associated with this message.
fn cmd(&self) -> RelayCmd;
/// Encode the body of this message, not including command or length
@ -201,17 +201,17 @@ pub trait RelayMsgClass {
/// circuit, along with the ID for an associated stream that the
/// message is meant for.
#[derive(Debug)]
pub struct RestrictedRelayCell<M> {
pub struct RelayCell<M> {
/// The stream ID for the stream that this cell corresponds to.
streamid: StreamId,
/// The relay message for this cell.
msg: M,
}
impl<M: RelayMsgClass> RestrictedRelayCell<M> {
impl<M: RelayMsg> RelayCell<M> {
/// Construct a new relay cell.
pub fn new(streamid: StreamId, msg: M) -> Self {
RestrictedRelayCell { streamid, msg }
RelayCell { streamid, msg }
}
/// Consume this cell and return its components.
pub fn into_streamid_and_msg(self) -> (StreamId, M) {

View File

@ -29,7 +29,7 @@ crate::restrict::restricted_msg! {
/// A single parsed relay message, sent or received along a circuit
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum RelayMsg : RelayMsg {
pub enum AnyRelayMsg : RelayMsg {
/// Create a stream
Begin,
/// Send data on a stream
@ -106,15 +106,15 @@ pub enum RelayMsg : RelayMsg {
/// Internal: traits in common different cell bodies.
pub trait Body: Sized {
/// Convert this type into a RelayMsg, wrapped appropriate.
fn into_message(self) -> RelayMsg;
fn into_message(self) -> AnyRelayMsg;
/// Decode a relay cell body from a provided reader.
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self>;
/// Encode the body of this cell into the end of a writer.
fn encode_onto<W: Writer + ?Sized>(self, w: &mut W) -> EncodeResult<()>;
}
impl<B: Body> From<B> for RelayMsg {
fn from(b: B) -> RelayMsg {
impl<B: Body> From<B> for AnyRelayMsg {
fn from(b: B) -> AnyRelayMsg {
b.into_message()
}
}
@ -207,8 +207,8 @@ impl Begin {
}
impl Body for Begin {
fn into_message(self) -> RelayMsg {
RelayMsg::Begin(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Begin(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let addr = {
@ -322,8 +322,8 @@ impl AsRef<[u8]> for Data {
}
impl Body for Data {
fn into_message(self) -> RelayMsg {
RelayMsg::Data(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Data(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
Ok(Data {
@ -435,8 +435,8 @@ impl End {
}
}
impl Body for End {
fn into_message(self) -> RelayMsg {
RelayMsg::End(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::End(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
if r.remaining() == 0 {
@ -526,8 +526,8 @@ impl Connected {
}
}
impl Body for Connected {
fn into_message(self) -> RelayMsg {
RelayMsg::Connected(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Connected(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
if r.remaining() == 0 {
@ -605,8 +605,8 @@ impl Sendme {
}
}
impl Body for Sendme {
fn into_message(self) -> RelayMsg {
RelayMsg::Sendme(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Sendme(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let digest = if r.remaining() == 0 {
@ -670,8 +670,8 @@ impl Extend {
}
}
impl Body for Extend {
fn into_message(self) -> RelayMsg {
RelayMsg::Extend(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Extend(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let addr = r.extract()?;
@ -711,8 +711,8 @@ impl Extended {
}
}
impl Body for Extended {
fn into_message(self) -> RelayMsg {
RelayMsg::Extended(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Extended(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let handshake = r.take(TAP_S_HANDSHAKE_LEN)?.into();
@ -775,8 +775,8 @@ impl Extend2 {
}
impl Body for Extend2 {
fn into_message(self) -> RelayMsg {
RelayMsg::Extend2(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Extend2(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let n = r.take_u8()?;
@ -834,8 +834,8 @@ impl Extended2 {
}
}
impl Body for Extended2 {
fn into_message(self) -> RelayMsg {
RelayMsg::Extended2(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Extended2(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let hlen = r.take_u16()?;
@ -879,8 +879,8 @@ impl Truncated {
}
}
impl Body for Truncated {
fn into_message(self) -> RelayMsg {
RelayMsg::Truncated(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Truncated(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
Ok(Truncated {
@ -935,8 +935,8 @@ impl Resolve {
}
}
impl Body for Resolve {
fn into_message(self) -> RelayMsg {
RelayMsg::Resolve(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Resolve(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let query = r.take_until(0)?;
@ -1104,8 +1104,8 @@ impl Resolved {
}
}
impl Body for Resolved {
fn into_message(self) -> RelayMsg {
RelayMsg::Resolved(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Resolved(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let mut answers = Vec::new();
@ -1159,8 +1159,8 @@ impl Unrecognized {
}
impl Body for Unrecognized {
fn into_message(self) -> RelayMsg {
RelayMsg::Unrecognized(self)
fn into_message(self) -> AnyRelayMsg {
AnyRelayMsg::Unrecognized(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
Ok(Unrecognized {
@ -1185,8 +1185,8 @@ macro_rules! empty_body {
#[non_exhaustive]
pub struct $name {}
impl $crate::relaycell::msg::Body for $name {
fn into_message(self) -> $crate::relaycell::msg::RelayMsg {
$crate::relaycell::msg::RelayMsg::$name(self)
fn into_message(self) -> $crate::relaycell::msg::AnyRelayMsg {
$crate::relaycell::msg::AnyRelayMsg::$name(self)
}
fn decode_from_reader(_r: &mut Reader<'_>) -> Result<Self> {
Ok(Self::default())

View File

@ -157,8 +157,8 @@ pub struct EstablishIntro {
}
impl msg::Body for EstablishIntro {
fn into_message(self) -> msg::RelayMsg {
msg::RelayMsg::EstablishIntro(self)
fn into_message(self) -> msg::AnyRelayMsg {
msg::AnyRelayMsg::EstablishIntro(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let auth_key_type = r.take_u8()?.into();
@ -259,8 +259,8 @@ impl EstablishRendezvous {
}
}
impl msg::Body for EstablishRendezvous {
fn into_message(self) -> msg::RelayMsg {
msg::RelayMsg::EstablishRendezvous(self)
fn into_message(self) -> msg::AnyRelayMsg {
msg::AnyRelayMsg::EstablishRendezvous(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
let cookie = r.extract()?;
@ -277,8 +277,8 @@ impl msg::Body for EstablishRendezvous {
pub struct Introduce1(Introduce);
impl msg::Body for Introduce1 {
fn into_message(self) -> msg::RelayMsg {
msg::RelayMsg::Introduce1(self)
fn into_message(self) -> msg::AnyRelayMsg {
msg::AnyRelayMsg::Introduce1(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
Ok(Self(Introduce::decode_from_reader(r)?))
@ -300,8 +300,8 @@ impl Introduce1 {
pub struct Introduce2(Introduce);
impl msg::Body for Introduce2 {
fn into_message(self) -> msg::RelayMsg {
msg::RelayMsg::Introduce2(self)
fn into_message(self) -> msg::AnyRelayMsg {
msg::AnyRelayMsg::Introduce2(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
Ok(Self(Introduce::decode_from_reader(r)?))
@ -387,7 +387,7 @@ pub struct Rendezvous1 {
}
impl Body for Rendezvous1 {
fn into_message(self) -> msg::RelayMsg {
fn into_message(self) -> msg::AnyRelayMsg {
todo!()
}
@ -409,7 +409,7 @@ pub struct Rendezvous2 {
}
impl Body for Rendezvous2 {
fn into_message(self) -> msg::RelayMsg {
fn into_message(self) -> msg::AnyRelayMsg {
todo!()
}
@ -434,7 +434,7 @@ pub struct IntroEstablished {
}
impl Body for IntroEstablished {
fn into_message(self) -> msg::RelayMsg {
fn into_message(self) -> msg::AnyRelayMsg {
todo!()
}
@ -466,7 +466,7 @@ pub struct IntroduceAck {
}
impl Body for IntroduceAck {
fn into_message(self) -> msg::RelayMsg {
fn into_message(self) -> msg::AnyRelayMsg {
todo!()
}

View File

@ -188,8 +188,8 @@ impl ConnectUdp {
}
impl msg::Body for ConnectUdp {
fn into_message(self) -> msg::RelayMsg {
msg::RelayMsg::ConnectUdp(self)
fn into_message(self) -> msg::AnyRelayMsg {
msg::AnyRelayMsg::ConnectUdp(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
@ -230,8 +230,8 @@ impl ConnectedUdp {
}
impl msg::Body for ConnectedUdp {
fn into_message(self) -> msg::RelayMsg {
msg::RelayMsg::ConnectedUdp(self)
fn into_message(self) -> msg::AnyRelayMsg {
msg::AnyRelayMsg::ConnectedUdp(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {
@ -305,8 +305,8 @@ impl AsRef<[u8]> for Datagram {
}
impl msg::Body for Datagram {
fn into_message(self) -> msg::RelayMsg {
msg::RelayMsg::Datagram(self)
fn into_message(self) -> msg::AnyRelayMsg {
msg::AnyRelayMsg::Datagram(self)
}
fn decode_from_reader(r: &mut Reader<'_>) -> Result<Self> {

View File

@ -14,7 +14,7 @@ pub use tor_bytes;
///
/// The restricted message type is an enum, and is declared with a syntax as follows:
/// ```
/// use tor_cell::{restrict::restricted_msg, relaycell::RestrictedRelayCell};
/// use tor_cell::{restrict::restricted_msg, relaycell::RelayCell};
///
/// restricted_msg! {
/// enum OpenStreamMsg : RelayMsg {
@ -25,7 +25,7 @@ pub use tor_bytes;
/// }
/// }
///
/// type OpenStreamCell = RestrictedRelayCell<OpenStreamMsg>;
/// type OpenStreamCell = RelayCell<OpenStreamMsg>;
/// ```
///
/// Instead of `RelayMsg`, you can say `ChanMsg` to get a restricted channel
@ -46,12 +46,12 @@ macro_rules! restricted_msg {
} => {
$crate::restrict::restricted_msg!{
[
base_type: $crate::relaycell::msg::RelayMsg,
base_type: $crate::relaycell::msg::AnyRelayMsg,
msg_mod: $crate::relaycell::msg,
cmd_type: $crate::relaycell::RelayCmd,
unrecognized: $crate::relaycell::msg::Unrecognized,
body_trait: $crate::relaycell::msg::Body,
msg_trait: $crate::relaycell::RelayMsgClass
msg_trait: $crate::relaycell::RelayMsg
]
$(#[$meta])*
$v enum $name { $($tt)*}

View File

@ -2,7 +2,7 @@
#![allow(clippy::uninlined_format_args)]
use tor_bytes::Error;
use tor_cell::relaycell::{msg, msg::RelayMsg, RelayCell, RelayCmd, RelayMsgClass, StreamId};
use tor_cell::relaycell::{msg, msg::AnyRelayMsg, AnyRelayCell, RelayCmd, RelayMsg, StreamId};
#[cfg(feature = "experimental-udp")]
use std::{
@ -45,13 +45,13 @@ fn decode(body: &str) -> [u8; CELL_BODY_LEN] {
result
}
fn cell(body: &str, id: StreamId, msg: RelayMsg) {
fn cell(body: &str, id: StreamId, msg: AnyRelayMsg) {
let body = decode(body);
let mut bad_rng = BadRng;
let expected = RelayCell::new(id, msg);
let expected = AnyRelayCell::new(id, msg);
let decoded = RelayCell::decode(body).unwrap();
let decoded = AnyRelayCell::decode(body).unwrap();
assert_eq!(format!("{:?}", expected), format!("{:?}", decoded));
@ -90,13 +90,13 @@ fn test_cells() {
// length too big: 0x1f3 is one byte too many.
let m = decode("02 0000 9999 12345678 01f3 6e6565642d746f2d6b6e6f77 00000000");
assert_eq!(
RelayCell::decode(m).err(),
AnyRelayCell::decode(m).err(),
Some(Error::BadMessage("Insufficient data in relay cell"))
);
// check accessors.
let m = decode("02 0000 9999 12345678 01f2 6e6565642d746f2d6b6e6f77 00000000");
let c = RelayCell::decode(m).unwrap();
let c = AnyRelayCell::decode(m).unwrap();
assert_eq!(c.cmd(), RelayCmd::from(2));
assert_eq!(c.msg().cmd(), RelayCmd::from(2));
let (s, _) = c.into_streamid_and_msg();

View File

@ -7,7 +7,7 @@ use tor_bytes::Error as BytesError;
/// Except where noted, these were taken by instrumenting Tor
/// 0.4.5.0-alpha-dev to dump all of its cells to the logs, and
/// running in a chutney network with "test-network-all".
use tor_cell::relaycell::{msg, RelayCmd, RelayMsgClass};
use tor_cell::relaycell::{msg, RelayCmd, RelayMsg};
use tor_llcrypto::pk::rsa::RsaIdentity;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@ -28,15 +28,15 @@ fn unhex(s: &str) -> Vec<u8> {
hex::decode(s).unwrap()
}
fn decode(cmd: RelayCmd, body: &[u8]) -> Result<msg::RelayMsg, BytesError> {
fn decode(cmd: RelayCmd, body: &[u8]) -> Result<msg::AnyRelayMsg, BytesError> {
let mut r = tor_bytes::Reader::from_slice(body);
msg::RelayMsg::decode_from_reader(cmd, &mut r)
msg::AnyRelayMsg::decode_from_reader(cmd, &mut r)
}
/// Assert that, when treated as a cell of type `cmd`, the hexadecimal
/// body `s` decodes into the message `msg`, and then re-encodes into
/// `s2`.
fn msg_noncanonical(cmd: RelayCmd, s: &str, s2: &str, msg: &msg::RelayMsg) {
fn msg_noncanonical(cmd: RelayCmd, s: &str, s2: &str, msg: &msg::AnyRelayMsg) {
assert_eq!(msg.cmd(), cmd);
let body = unhex(s);
let body2 = unhex(s2);
@ -60,7 +60,7 @@ fn msg_noncanonical(cmd: RelayCmd, s: &str, s2: &str, msg: &msg::RelayMsg) {
/// Assert that, when treated as a cell of type `cmd`, the hexadecimal
/// body `s` decodes into the message `msg`, and then re-encodes into
/// `s`.
fn msg(cmd: RelayCmd, s: &str, msg: &msg::RelayMsg) {
fn msg(cmd: RelayCmd, s: &str, msg: &msg::AnyRelayMsg) {
msg_noncanonical(cmd, s, s, msg);
}
@ -123,7 +123,7 @@ fn test_begindir() {
let cmd = RelayCmd::BEGIN_DIR;
assert_eq!(Into::<u8>::into(cmd), 13_u8);
msg(cmd, "", &msg::RelayMsg::BeginDir(Default::default()));
msg(cmd, "", &msg::AnyRelayMsg::BeginDir(Default::default()));
}
#[test]
@ -160,7 +160,7 @@ fn test_drop() {
let cmd = RelayCmd::DROP;
assert_eq!(Into::<u8>::into(cmd), 10_u8);
msg(cmd, "", &msg::RelayMsg::Drop(Default::default()));
msg(cmd, "", &msg::AnyRelayMsg::Drop(Default::default()));
}
#[test]
@ -225,7 +225,7 @@ fn test_extend2() {
);
let message = decode(cmd, &unhex(body)[..]).unwrap();
if let msg::RelayMsg::Extend2(message) = message {
if let msg::AnyRelayMsg::Extend2(message) = message {
assert_eq!(message.handshake_type(), 2);
assert_eq!(message.handshake(), &handshake[..]);
} else {
@ -385,7 +385,7 @@ fn test_resolved() {
&unhex("06 10 12340000000000000000000000005678 00000080")[..],
)
.unwrap();
if let msg::RelayMsg::Resolved(res) = message {
if let msg::AnyRelayMsg::Resolved(res) = message {
assert_eq!(
res.into_answers(),
vec![(msg::ResolvedVal::Ip("1234::5678".parse().unwrap()), 128_u32)]
@ -443,7 +443,7 @@ fn test_truncate() {
let cmd = RelayCmd::TRUNCATE;
assert_eq!(Into::<u8>::into(cmd), 8_u8);
msg(cmd, "", &msg::RelayMsg::Truncate(Default::default()));
msg(cmd, "", &msg::AnyRelayMsg::Truncate(Default::default()));
}
#[test]
@ -659,7 +659,7 @@ fn test_establish_rendezvous() {
#[test]
fn test_establish_intro() {
use tor_cell::relaycell::{
msg::RelayMsg,
msg::AnyRelayMsg,
onion_service::{AuthKeyType, EstIntroExtDoS, EstablishIntro},
};
@ -715,7 +715,7 @@ fn test_establish_intro() {
.expect("Encode msg onto byte vector");
let mut es_intro = EstablishIntro::new(auth_key_type, auth_key, handshake_auth, sig);
es_intro.set_extension_dos(extension_dos);
let expected_msg: RelayMsg = es_intro.into();
let expected_msg: AnyRelayMsg = es_intro.into();
expected_msg
.encode_onto(&mut expect_bytes)
.expect("Encode msg onto byte vector");
@ -726,7 +726,7 @@ fn test_establish_intro() {
#[test]
fn test_introduce() {
use tor_cell::relaycell::{
msg::RelayMsg,
msg::AnyRelayMsg,
onion_service::{AuthKeyType, Introduce1},
};
@ -760,7 +760,7 @@ fn test_introduce() {
actual_msg
.encode_onto(&mut actual_bytes)
.expect("Encode msg onto byte vector");
let expected_msg: RelayMsg = intro1.into();
let expected_msg: AnyRelayMsg = intro1.into();
expected_msg
.encode_onto(&mut expect_bytes)
.expect("Encode msg onto byte vector");

View File

@ -60,7 +60,7 @@ use crate::stream::{DataStream, ResolveStream, StreamParameters, StreamReader};
use crate::{Error, ResolveError, Result};
use tor_cell::{
chancell::{self, msg::AnyChanMsg, CircId},
relaycell::msg::{Begin, RelayMsg, Resolve, Resolved, ResolvedVal},
relaycell::msg::{AnyRelayMsg, Begin, Resolve, Resolved, ResolvedVal},
};
use tor_error::{bad_api_usage, internal, into_internal};
@ -197,7 +197,7 @@ pub(crate) struct StreamTarget {
/// Reactor ID for this stream.
stream_id: StreamId,
/// Channel to send cells down.
tx: mpsc::Sender<RelayMsg>,
tx: mpsc::Sender<AnyRelayMsg>,
/// Reference to the circuit that this stream is on.
circ: ClientCirc,
}
@ -243,7 +243,7 @@ impl ClientCirc {
// reactor code, but "meta messages" just sounds odd.
#[allow(clippy::missing_panics_doc, unused_variables)] // TODO hs remove
#[cfg(feature = "experimental-api")]
pub async fn send_control_message(&self, msg: RelayMsg) -> Result<()> {
pub async fn send_control_message(&self, msg: AnyRelayMsg) -> Result<()> {
todo!() // TODO hs
}
@ -391,7 +391,10 @@ impl ClientCirc {
///
/// The caller will typically want to see the first cell in response,
/// to see whether it is e.g. an END or a CONNECTED.
async fn begin_stream_impl(&self, begin_msg: RelayMsg) -> Result<(StreamReader, StreamTarget)> {
async fn begin_stream_impl(
&self,
begin_msg: AnyRelayMsg,
) -> Result<(StreamReader, StreamTarget)> {
// TODO: Possibly this should take a hop, rather than just
// assuming it's the last hop.
@ -439,7 +442,7 @@ impl ClientCirc {
/// Start a DataStream (anonymized connection) to the given
/// address and port, using a BEGIN cell.
async fn begin_data_stream(&self, msg: RelayMsg, optimistic: bool) -> Result<DataStream> {
async fn begin_data_stream(&self, msg: AnyRelayMsg, optimistic: bool) -> Result<DataStream> {
let (reader, target) = self.begin_stream_impl(msg).await?;
let mut stream = DataStream::new(reader, target);
if !optimistic {
@ -474,7 +477,7 @@ impl ClientCirc {
// Since they are local to a relay that we've already authenticated
// with and built a circuit to, there should be no additional checks
// we need to perform to see whether the BEGINDIR will succeed.
self.begin_data_stream(RelayMsg::BeginDir(Default::default()), true)
self.begin_data_stream(AnyRelayMsg::BeginDir(Default::default()), true)
.await
}
@ -751,7 +754,7 @@ impl StreamTarget {
/// The StreamTarget will set the correct stream ID and pick the
/// right hop, but will not validate that the message is well-formed
/// or meaningful in context.
pub(crate) async fn send(&mut self, msg: RelayMsg) -> Result<()> {
pub(crate) async fn send(&mut self, msg: AnyRelayMsg) -> Result<()> {
self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
Ok(())
}
@ -812,16 +815,16 @@ mod test {
use std::time::Duration;
use tor_basic_utils::test_rng::testing_rng;
use tor_cell::chancell::{msg as chanmsg, AnyChanCell};
use tor_cell::relaycell::{msg as relaymsg, RelayCell, StreamId};
use tor_cell::relaycell::{msg as relaymsg, AnyRelayCell, StreamId};
use tor_linkspec::OwnedCircTarget;
use tor_rtcompat::{Runtime, SleepProvider};
use tracing::trace;
fn rmsg_to_ccmsg<ID>(id: ID, msg: relaymsg::RelayMsg) -> ClientCircChanMsg
fn rmsg_to_ccmsg<ID>(id: ID, msg: relaymsg::AnyRelayMsg) -> ClientCircChanMsg
where
ID: Into<StreamId>,
{
let body: RelayCellBody = RelayCell::new(id.into(), msg)
let body: RelayCellBody = AnyRelayCell::new(id.into(), msg)
.encode(&mut testing_rng())
.unwrap()
.into();
@ -1051,7 +1054,7 @@ mod test {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
let (chan, mut rx, _sink) = working_fake_channel(&rt);
let (circ, _send) = newcirc(&rt, chan).await;
let begindir = RelayCell::new(0.into(), RelayMsg::BeginDir(Default::default()));
let begindir = AnyRelayCell::new(0.into(), AnyRelayMsg::BeginDir(Default::default()));
circ.control
.unbounded_send(CtrlMsg::SendRelayCell {
hop: 2.into(),
@ -1065,10 +1068,10 @@ mod test {
let rcvd = rx.next().await.unwrap();
assert_eq!(rcvd.circid(), 128.into());
let m = match rcvd.into_circid_and_msg().1 {
AnyChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
AnyChanMsg::Relay(r) => AnyRelayCell::decode(r.into_relay_body()).unwrap(),
_ => panic!(),
};
assert!(matches!(m.msg(), RelayMsg::BeginDir(_)));
assert!(matches!(m.msg(), AnyRelayMsg::BeginDir(_)));
});
}
@ -1158,11 +1161,11 @@ mod test {
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
assert_eq!(id, 128.into());
let rmsg = match chmsg {
AnyChanMsg::RelayEarly(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
AnyChanMsg::RelayEarly(r) => AnyRelayCell::decode(r.into_relay_body()).unwrap(),
_ => panic!(),
};
let e2 = match rmsg.msg() {
RelayMsg::Extend2(e2) => e2,
AnyRelayMsg::Extend2(e2) => e2,
_ => panic!(),
};
let mut rng = testing_rng();
@ -1295,11 +1298,11 @@ mod test {
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
assert_eq!(id, 128.into()); // hardcoded circid.
let rmsg = match chmsg {
AnyChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
AnyChanMsg::Relay(r) => AnyRelayCell::decode(r.into_relay_body()).unwrap(),
_ => panic!(),
};
let (streamid, rmsg) = rmsg.into_streamid_and_msg();
assert!(matches!(rmsg, RelayMsg::BeginDir(_)));
assert!(matches!(rmsg, AnyRelayMsg::BeginDir(_)));
// Reply with a Connected cell to indicate success.
let connected = relaymsg::Connected::new_empty().into();
@ -1309,12 +1312,12 @@ mod test {
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
assert_eq!(id, 128.into());
let rmsg = match chmsg {
AnyChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
AnyChanMsg::Relay(r) => AnyRelayCell::decode(r.into_relay_body()).unwrap(),
_ => panic!(),
};
let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
assert_eq!(streamid_2, streamid);
if let RelayMsg::Data(d) = rmsg {
if let AnyRelayMsg::Data(d) = rmsg {
assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
} else {
panic!();
@ -1375,11 +1378,11 @@ mod test {
// Read the begindir cell.
let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
let rmsg = match chmsg {
AnyChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
AnyChanMsg::Relay(r) => AnyRelayCell::decode(r.into_relay_body()).unwrap(),
_ => panic!(),
};
let (streamid, rmsg) = rmsg.into_streamid_and_msg();
assert!(matches!(rmsg, RelayMsg::Begin(_)));
assert!(matches!(rmsg, AnyRelayMsg::Begin(_)));
// Reply with a connected cell...
let connected = relaymsg::Connected::new_empty().into();
sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
@ -1392,12 +1395,12 @@ mod test {
assert_eq!(id, 128.into());
let rmsg = match chmsg {
AnyChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
AnyChanMsg::Relay(r) => AnyRelayCell::decode(r.into_relay_body()).unwrap(),
_ => panic!(),
};
let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
assert_eq!(streamid2, streamid);
if let RelayMsg::Data(dat) = rmsg {
if let AnyRelayMsg::Data(dat) = rmsg {
cells_received += 1;
bytes_received += dat.as_ref().len();
} else {

View File

@ -5,7 +5,7 @@
use crate::circuit::sendme::{StreamRecvWindow, StreamSendWindow};
use crate::{Error, Result};
use tor_cell::relaycell::{msg::RelayMsg, RelayMsgClass};
use tor_cell::relaycell::{msg::AnyRelayMsg, RelayMsg};
use tor_error::internal;
/// Type to track state of half-closed streams.
@ -47,17 +47,17 @@ impl HalfStream {
/// The caller must handle END cells; it is an internal error to pass
/// END cells to this method.
/// no ends here.
pub(super) fn handle_msg(&mut self, msg: &RelayMsg) -> Result<()> {
pub(super) fn handle_msg(&mut self, msg: &AnyRelayMsg) -> Result<()> {
match msg {
RelayMsg::Sendme(_) => {
AnyRelayMsg::Sendme(_) => {
self.sendw.put(Some(()))?;
Ok(())
}
RelayMsg::Data(_) => {
AnyRelayMsg::Data(_) => {
self.recvw.take()?;
Ok(())
}
RelayMsg::Connected(_) => {
AnyRelayMsg::Connected(_) => {
if self.connected_ok {
self.connected_ok = false;
Ok(())
@ -67,7 +67,7 @@ impl HalfStream {
))
}
}
RelayMsg::End(_) => Err(Error::from(internal!(
AnyRelayMsg::End(_) => Err(Error::from(internal!(
"END cell in HalfStream::handle_msg()"
))),
_ => Err(Error::CircProto(format!(

View File

@ -15,8 +15,8 @@ use std::collections::VecDeque;
use std::marker::PhantomData;
use std::pin::Pin;
use tor_cell::chancell::msg::{AnyChanMsg, Relay};
use tor_cell::relaycell::msg::{End, RelayMsg, Sendme};
use tor_cell::relaycell::{RelayCell, RelayCmd, RelayMsgClass, StreamId};
use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
use tor_cell::relaycell::{AnyRelayCell, RelayCmd, RelayMsg, StreamId};
use futures::channel::{mpsc, oneshot};
use futures::Sink;
@ -111,16 +111,16 @@ pub(super) enum CtrlMsg {
/// The hop number to begin the stream with.
hop_num: HopNum,
/// The message to send.
message: RelayMsg,
message: AnyRelayMsg,
/// A channel to send messages on this stream down.
///
/// This sender shouldn't ever block, because we use congestion control and only send
/// SENDME cells once we've read enough out of the other end. If it *does* block, we
/// can assume someone is trying to send us more cells than they should, and abort
/// the stream.
sender: mpsc::Sender<RelayMsg>,
sender: mpsc::Sender<AnyRelayMsg>,
/// A channel to receive messages to send on this stream from.
rx: mpsc::Receiver<RelayMsg>,
rx: mpsc::Receiver<AnyRelayMsg>,
/// Oneshot channel to notify on completion, with the allocated stream ID.
done: ReactorResultChannel<StreamId>,
},
@ -153,7 +153,7 @@ pub(super) enum CtrlMsg {
SendRelayCell {
hop: HopNum,
early: bool,
cell: RelayCell,
cell: AnyRelayCell,
},
}
/// Represents the reactor's view of a single hop.
@ -182,7 +182,7 @@ pub(super) struct CircHop {
///
/// NOTE: Control messages could potentially add unboundedly to this, although that's
/// not likely to happen (and isn't triggereable from the network, either).
outbound: VecDeque<(bool, RelayCell)>,
outbound: VecDeque<(bool, AnyRelayCell)>,
}
/// Enumeration to determine whether we require circuit-level SENDME cells to be
@ -260,7 +260,7 @@ pub(super) trait MetaCellHandler: Send {
/// Called when the message we were waiting for arrives.
///
/// Gets a copy of the `Reactor` in order to do anything it likes there.
fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()>;
fn finish(&mut self, msg: AnyRelayMsg, reactor: &mut Reactor) -> Result<()>;
}
/// An object that can extend a circuit by one hop, using the `MetaCellHandler` trait.
@ -339,7 +339,7 @@ where
);
let extend_msg = Extend2::new(linkspecs, handshake_id, msg);
let cell = RelayCell::new(0.into(), extend_msg.into_message());
let cell = AnyRelayCell::new(0.into(), extend_msg.into_message());
// Send the message to the last hop...
reactor.send_relay_cell(
@ -373,7 +373,7 @@ where
fn expected_hop(&self) -> HopNum {
self.expected_hop
}
fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()> {
fn finish(&mut self, msg: AnyRelayMsg, reactor: &mut Reactor) -> Result<()> {
// Did we get the right response?
if msg.cmd() != RelayCmd::EXTENDED2 {
return Err(Error::CircProto(format!(
@ -386,7 +386,7 @@ where
// ???? cases in this function?
let msg = match msg {
RelayMsg::Extended2(e) => e,
AnyRelayMsg::Extended2(e) => e,
_ => {
return Err(Error::from(internal!(
"Message body {:?} didn't match cmd {:?}",
@ -602,7 +602,7 @@ impl Reactor {
match Pin::new(rx).poll_next(cx) {
Poll::Ready(Some(m)) => {
stream_relaycells
.push((hop_num, RelayCell::new(*id, m)));
.push((hop_num, AnyRelayCell::new(*id, m)));
}
Poll::Ready(None) => {
// Stream receiver was dropped; close the stream.
@ -822,12 +822,12 @@ impl Reactor {
}
/// Handle a RELAY cell on this circuit with stream ID 0.
fn handle_meta_cell(&mut self, hopnum: HopNum, msg: RelayMsg) -> Result<CellStatus> {
fn handle_meta_cell(&mut self, hopnum: HopNum, msg: AnyRelayMsg) -> Result<CellStatus> {
// SENDME cells and TRUNCATED get handled internally by the circuit.
if let RelayMsg::Sendme(s) = msg {
if let AnyRelayMsg::Sendme(s) = msg {
return self.handle_sendme(hopnum, s);
}
if let RelayMsg::Truncated(t) = msg {
if let AnyRelayMsg::Truncated(t) = msg {
let reason = t.reason();
debug!(
"{}: Truncated from hop {}. Reason: {} [{}]",
@ -974,7 +974,7 @@ impl Reactor {
cx: &mut Context<'_>,
hop: HopNum,
early: bool,
cell: RelayCell,
cell: AnyRelayCell,
) -> Result<()> {
let c_t_w = sendme::cell_counts_towards_windows(&cell);
let stream_id = cell.stream_id();
@ -1102,7 +1102,7 @@ impl Reactor {
}
CtrlMsg::SendSendme { stream_id, hop_num } => {
let sendme = Sendme::new_empty();
let cell = RelayCell::new(stream_id, sendme.into());
let cell = AnyRelayCell::new(stream_id, sendme.into());
self.send_relay_cell(cx, hop_num, false, cell)?;
}
#[cfg(test)]
@ -1158,16 +1158,16 @@ impl Reactor {
&mut self,
cx: &mut Context<'_>,
hopnum: HopNum,
message: RelayMsg,
sender: mpsc::Sender<RelayMsg>,
rx: mpsc::Receiver<RelayMsg>,
message: AnyRelayMsg,
sender: mpsc::Sender<AnyRelayMsg>,
rx: mpsc::Receiver<AnyRelayMsg>,
) -> Result<StreamId> {
let hop = self
.hop_mut(hopnum)
.ok_or_else(|| Error::from(internal!("No such hop {:?}", hopnum)))?;
let send_window = StreamSendWindow::new(SEND_WINDOW_INIT);
let r = hop.map.add_ent(sender, rx, send_window)?;
let cell = RelayCell::new(r, message);
let cell = AnyRelayCell::new(r, message);
self.send_relay_cell(cx, hopnum, false, cell)?;
Ok(r)
}
@ -1195,7 +1195,7 @@ impl Reactor {
// TODO: I am about 80% sure that we only send an END cell if
// we didn't already get an END cell. But I should double-check!
if should_send_end == ShouldSendEnd::Send {
let end_cell = RelayCell::new(id, End::new_misc().into());
let end_cell = AnyRelayCell::new(id, End::new_misc().into());
self.send_relay_cell(cx, hopnum, false, end_cell)?;
}
Ok(())
@ -1242,8 +1242,8 @@ impl Reactor {
tag_copy
};
// Decode the cell.
let msg =
RelayCell::decode(body.into()).map_err(|e| Error::from_bytes_err(e, "relay cell"))?;
let msg = AnyRelayCell::decode(body.into())
.map_err(|e| Error::from_bytes_err(e, "relay cell"))?;
let c_t_w = sendme::cell_counts_towards_windows(&msg);
@ -1264,7 +1264,7 @@ impl Reactor {
// every increase that parameter to a higher number, this will
// become incorrect. (Higher numbers are not currently defined.)
let sendme = Sendme::new_tag(tag);
let cell = RelayCell::new(0.into(), sendme.into());
let cell = AnyRelayCell::new(0.into(), sendme.into());
self.send_relay_cell(cx, hopnum, false, cell)?;
self.hop_mut(hopnum)
.ok_or_else(|| {
@ -1309,7 +1309,7 @@ impl Reactor {
}) => {
// The stream for this message exists, and is open.
if let RelayMsg::Sendme(_) = msg {
if let AnyRelayMsg::Sendme(_) = msg {
// We need to handle sendmes here, not in the stream's
// recv() method, or else we'd never notice them if the
// stream isn't reading.
@ -1317,7 +1317,7 @@ impl Reactor {
return Ok(CellStatus::Continue);
}
if matches!(msg, RelayMsg::Connected(_)) {
if matches!(msg, AnyRelayMsg::Connected(_)) {
// Remember that we've received a Connected cell, and can't get another,
// even if we become a HalfStream. (This rule is enforced separately at
// DataStreamReader.)
@ -1326,7 +1326,7 @@ impl Reactor {
// Remember whether this was an end cell: if so we should
// close the stream.
let is_end_cell = matches!(msg, RelayMsg::End(_));
let is_end_cell = matches!(msg, AnyRelayMsg::End(_));
// TODO: Add a wrapper type here to reject cells that should
// never go to a client, like BEGIN.
@ -1354,7 +1354,7 @@ impl Reactor {
Some(StreamEnt::EndSent(halfstream)) => {
// We sent an end but maybe the other side hasn't heard.
if matches!(msg, RelayMsg::End(_)) {
if matches!(msg, AnyRelayMsg::End(_)) {
hop.map.end_received(streamid)?;
} else {
halfstream.handle_msg(&msg)?;

View File

@ -12,8 +12,8 @@
use std::collections::VecDeque;
use tor_cell::relaycell::msg::RelayMsg;
use tor_cell::relaycell::RelayCell;
use tor_cell::relaycell::msg::AnyRelayMsg;
use tor_cell::relaycell::AnyRelayCell;
use tor_error::internal;
use crate::{Error, Result};
@ -268,12 +268,12 @@ impl<P: WindowParams> RecvWindow<P> {
}
/// Return true if this message is counted by flow-control windows.
pub(crate) fn msg_counts_towards_windows(msg: &RelayMsg) -> bool {
matches!(msg, RelayMsg::Data(_))
pub(crate) fn msg_counts_towards_windows(msg: &AnyRelayMsg) -> bool {
matches!(msg, AnyRelayMsg::Data(_))
}
/// Return true if this message is counted by flow-control windows.
pub(crate) fn cell_counts_towards_windows(cell: &RelayCell) -> bool {
pub(crate) fn cell_counts_towards_windows(cell: &AnyRelayCell) -> bool {
msg_counts_towards_windows(cell.msg())
}
@ -290,7 +290,7 @@ mod test {
#![allow(clippy::unchecked_duration_subtraction)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use tor_cell::relaycell::{msg, RelayCell};
use tor_cell::relaycell::{msg, AnyRelayCell};
#[test]
fn what_counts() {
@ -298,11 +298,17 @@ mod test {
.unwrap()
.into();
assert!(!msg_counts_towards_windows(&m));
assert!(!cell_counts_towards_windows(&RelayCell::new(77.into(), m)));
assert!(!cell_counts_towards_windows(&AnyRelayCell::new(
77.into(),
m
)));
let m = msg::Data::new(&b"Education is not a prerequisite to political control-political control is the cause of popular education."[..]).unwrap().into(); // Du Bois
assert!(msg_counts_towards_windows(&m));
assert!(cell_counts_towards_windows(&RelayCell::new(128.into(), m)));
assert!(cell_counts_towards_windows(&AnyRelayCell::new(
128.into(),
m
)));
}
#[test]

View File

@ -6,7 +6,7 @@ use crate::{Error, Result};
/// Mapping from stream ID to streams.
// NOTE: This is a work in progress and I bet I'll refactor it a lot;
// it needs to stay opaque!
use tor_cell::relaycell::{msg::RelayMsg, StreamId};
use tor_cell::relaycell::{msg::AnyRelayMsg, StreamId};
use futures::channel::mpsc;
use std::collections::hash_map::Entry;
@ -24,9 +24,9 @@ pub(super) enum StreamEnt {
/// An open stream.
Open {
/// Sink to send relay cells tagged for this stream into.
sink: mpsc::Sender<RelayMsg>,
sink: mpsc::Sender<AnyRelayMsg>,
/// Stream for cells that should be sent down this stream.
rx: mpsc::Receiver<RelayMsg>,
rx: mpsc::Receiver<AnyRelayMsg>,
/// Send window, for congestion control purposes.
send_window: sendme::StreamSendWindow,
/// Number of cells dropped due to the stream disappearing before we can
@ -105,8 +105,8 @@ impl StreamMap {
/// Add an entry to this map; return the newly allocated StreamId.
pub(super) fn add_ent(
&mut self,
sink: mpsc::Sender<RelayMsg>,
rx: mpsc::Receiver<RelayMsg>,
sink: mpsc::Sender<AnyRelayMsg>,
rx: mpsc::Receiver<AnyRelayMsg>,
send_window: sendme::StreamSendWindow,
) -> Result<StreamId> {
let stream_ent = StreamEnt::Open {

View File

@ -3,7 +3,7 @@
use crate::{Error, Result};
use tor_cell::relaycell::msg::EndReason;
use tor_cell::relaycell::RelayMsgClass;
use tor_cell::relaycell::RelayMsg;
use futures::io::{AsyncRead, AsyncWrite};
use futures::task::{Context, Poll};
@ -25,7 +25,7 @@ use educe::Educe;
use crate::circuit::StreamTarget;
use crate::stream::StreamReader;
use tor_basic_utils::skip_fmt;
use tor_cell::relaycell::msg::{Data, RelayMsg};
use tor_cell::relaycell::msg::{AnyRelayMsg, Data};
use tor_error::internal;
/// An anonymized stream over the Tor network.
@ -595,15 +595,15 @@ impl DataReaderImpl {
let cell = self.s.recv().await;
let result = match cell {
Ok(RelayMsg::Connected(_)) if !self.connected => {
Ok(AnyRelayMsg::Connected(_)) if !self.connected => {
self.connected = true;
Ok(())
}
Ok(RelayMsg::Data(d)) if self.connected => {
Ok(AnyRelayMsg::Data(d)) if self.connected => {
self.add_data(d.into());
Ok(())
}
Ok(RelayMsg::End(e)) => Err(Error::EndReceived(e.reason())),
Ok(AnyRelayMsg::End(e)) => Err(Error::EndReceived(e.reason())),
Err(e) => Err(e),
Ok(m) => {
self.s.protocol_error();

View File

@ -3,7 +3,7 @@
use crate::circuit::{sendme, StreamTarget};
use crate::{Error, Result};
use tor_cell::relaycell::msg::RelayMsg;
use tor_cell::relaycell::msg::AnyRelayMsg;
use crate::circuit::sendme::StreamRecvWindow;
use futures::channel::mpsc;
@ -15,7 +15,7 @@ pub struct StreamReader {
/// The underlying `StreamTarget` for this stream.
pub(crate) target: StreamTarget,
/// Channel to receive stream messages from the reactor.
pub(crate) receiver: mpsc::Receiver<RelayMsg>,
pub(crate) receiver: mpsc::Receiver<AnyRelayMsg>,
/// Congestion control receive window for this stream.
///
/// Having this here means we're only going to update it when the end consumer of this stream
@ -29,7 +29,7 @@ pub struct StreamReader {
impl StreamReader {
/// Try to read the next relay message from this stream.
async fn recv_raw(&mut self) -> Result<RelayMsg> {
async fn recv_raw(&mut self) -> Result<AnyRelayMsg> {
if self.ended {
// Prevent reading from streams after they've ended.
return Err(Error::NotConnected);
@ -54,10 +54,10 @@ impl StreamReader {
/// As recv_raw, but if there is an error or an end cell, note that this
/// stream has ended.
pub async fn recv(&mut self) -> Result<RelayMsg> {
pub async fn recv(&mut self) -> Result<AnyRelayMsg> {
let val = self.recv_raw().await;
match val {
Err(_) | Ok(RelayMsg::End(_)) => {
Err(_) | Ok(AnyRelayMsg::End(_)) => {
self.ended = true;
}
_ => {}

View File

@ -2,8 +2,8 @@
use crate::stream::StreamReader;
use crate::{Error, Result};
use tor_cell::relaycell::msg::{RelayMsg, Resolved};
use tor_cell::relaycell::RelayMsgClass;
use tor_cell::relaycell::msg::{AnyRelayMsg, Resolved};
use tor_cell::relaycell::RelayMsg;
/// A ResolveStream represents a pending DNS request made with a RESOLVE
/// cell.
@ -26,8 +26,8 @@ impl ResolveStream {
pub async fn read_msg(&mut self) -> Result<Resolved> {
let cell = self.s.recv().await?;
match cell {
RelayMsg::End(e) => Err(Error::EndReceived(e.reason())),
RelayMsg::Resolved(r) => Ok(r),
AnyRelayMsg::End(e) => Err(Error::EndReceived(e.reason())),
AnyRelayMsg::Resolved(r) => Ok(r),
m => {
self.s.protocol_error();
Err(Error::StreamProto(format!(