From 9e2b6f3aed648e52f1ee18f3741f5380fa20a379 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 10 Feb 2023 12:09:24 -0500 Subject: [PATCH 01/17] tor-bytes: Add a new writer implementation for fixed-size objects Because the API assumes that many writes are infallible, this writer takes ownership of the backing object, and will only return it to you if you didn't run over the end. I'm going to use this to save some allocations in relay cell bodies --- crates/tor-bytes/semver.md | 1 + crates/tor-bytes/src/lib.rs | 2 + crates/tor-bytes/src/slicewriter.rs | 122 ++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+) create mode 100644 crates/tor-bytes/src/slicewriter.rs diff --git a/crates/tor-bytes/semver.md b/crates/tor-bytes/semver.md index 2d979ebc8..57673c94e 100644 --- a/crates/tor-bytes/semver.md +++ b/crates/tor-bytes/semver.md @@ -1 +1,2 @@ MODIFIED: Error::BadMessage is deprecated, Error::InvalidMessage is new. +MODIFIED: New SliceWriter type. diff --git a/crates/tor-bytes/src/lib.rs b/crates/tor-bytes/src/lib.rs index 51d116ec3..a76f527a9 100644 --- a/crates/tor-bytes/src/lib.rs +++ b/crates/tor-bytes/src/lib.rs @@ -42,11 +42,13 @@ mod err; mod impls; mod reader; mod secretbuf; +mod slicewriter; mod writer; pub use err::{EncodeError, Error}; pub use reader::Reader; pub use secretbuf::SecretBuf; +pub use slicewriter::{SliceWriter, SliceWriterError}; pub use writer::Writer; use arrayref::array_ref; diff --git a/crates/tor-bytes/src/slicewriter.rs b/crates/tor-bytes/src/slicewriter.rs new file mode 100644 index 000000000..b3928a8e9 --- /dev/null +++ b/crates/tor-bytes/src/slicewriter.rs @@ -0,0 +1,122 @@ +//! A Writer that can put its results into an buffer of known byte size without +//! changing that size. + +use thiserror::Error; + +use crate::Writer; + +/// An error that occurred while trying to unwrap a SliceWriter. +#[non_exhaustive] +#[derive(Clone, Debug, Error)] +pub enum SliceWriterError { + /// We've + #[error("Tried to write more than would fit into a fixed-size slice.")] + Truncated, +} + +/// An object that supports writing into a byte-slice of fixed size. +/// +/// Since the writer API does not allow all `write_*` functions to report errors, +/// this type defers any truncated-data errors until you try to retrieve the +/// inner data. +/// +// +// TODO: in theory we could have a version of this that used MaybeUninit, but I +// don't think that would be worth it. +pub struct SliceWriter { + /// The object we're writing into. Must have fewer than usize::LEN bytes. + data: T, + /// Our current write position within that object. + offset: usize, +} + +impl Writer for SliceWriter +where + T: AsMut<[u8]>, +{ + fn write_all(&mut self, b: &[u8]) { + let new_len = self.offset.saturating_add(b.len()); + if new_len <= self.data.as_mut().len() { + // Note that if we reach this case, the addition was not saturating. + self.data.as_mut()[self.offset..new_len].copy_from_slice(b); + self.offset = new_len; + } else { + self.offset = usize::MAX; + } + } +} +impl SliceWriter { + /// Construct a new SliceWriter + /// + /// Typically, you would want to use this on a type that implements + /// `AsMut<[u8]>`, or else it won't be very useful. + /// + /// Preexisting bytes in the `data` object will be unchanged, unless you use + /// the [`Writer`] API to write to them. + pub fn new(data: T) -> Self { + Self { data, offset: 0 } + } + + /// Try to extract the data from this `SliceWriter`. + /// + /// On success (if we did not write "off the end" of the underlying object), + /// return the object and the number of bytes we wrote into it. (Bytes + /// after that position are unchanged.) + /// + /// On failure (if we tried to write too much), return an error. + pub fn try_unwrap(self) -> Result<(T, usize), SliceWriterError> { + if self.offset != usize::MAX { + Ok((self.data, self.offset)) + } else { + Err(SliceWriterError::Truncated) + } + } +} + +#[cfg(test)] +mod test { + // @@ begin test lint list maintained by maint/add_warning @@ + #![allow(clippy::bool_assert_comparison)] + #![allow(clippy::clone_on_copy)] + #![allow(clippy::dbg_macro)] + #![allow(clippy::print_stderr)] + #![allow(clippy::print_stdout)] + #![allow(clippy::single_char_pattern)] + #![allow(clippy::unwrap_used)] + #![allow(clippy::unchecked_duration_subtraction)] + //! + use super::*; + + #[test] + fn basics() { + let mut w = SliceWriter::new([0_u8; 16]); + w.write_u8(b'h'); + w.write_u16(0x656c); + w.write_u32(0x6c6f2077); + w.write_all(b"orld!"); + let (a, len) = w.try_unwrap().unwrap(); + + assert_eq!(a.as_ref(), b"hello world!\0\0\0\0"); + assert_eq!(len, 12); + } + + #[test] + fn full_is_ok() { + let mut w = SliceWriter::new([0_u8; 4]); + w.write_u8(1); + w.write_u16(0x0203); + w.write_u8(4); + let (a, len) = w.try_unwrap().unwrap(); + + assert_eq!(a.as_ref(), [1, 2, 3, 4]); + assert_eq!(len, 4); + } + + #[test] + fn too_full_is_not_ok() { + let mut w = SliceWriter::new([0_u8; 5]); + w.write_u32(12); + w.write_u32(12); + assert!(matches!(w.try_unwrap(), Err(SliceWriterError::Truncated))); + } +} From ca3b33a1afc58b84cc7a39ea3845a82f17cee0da Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Sun, 12 Feb 2023 21:19:28 -0500 Subject: [PATCH 02/17] tor-cell: Refactor relay cells to copy much less We now manipulate raw relay cell bodies as (an alias for) `Box<[u8;509]>` rather than as (an alias for) `[u8;509]`. This enables us to do much less copying. It will become more important soon, as we defer parsing relay cell bodies even longer. Related to #7. We also use SliceWriter to avoid allocating a Vec<> for every relay message we want to encode, and instead encode directly into the cell. --- Cargo.lock | 1 + crates/tor-bytes/src/slicewriter.rs | 9 ++- crates/tor-cell/semver.md | 2 + crates/tor-cell/src/chancell.rs | 5 ++ crates/tor-cell/src/chancell/msg.rs | 19 +++--- crates/tor-cell/src/relaycell.rs | 82 ++++++++++++++++--------- crates/tor-cell/tests/test_relaycell.rs | 4 +- crates/tor-proto/Cargo.toml | 1 + crates/tor-proto/src/circuit.rs | 9 ++- crates/tor-proto/src/circuit/reactor.rs | 4 +- crates/tor-proto/src/crypto/cell.rs | 26 +++----- 11 files changed, 94 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c8c0e64c..5abf0c3b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4136,6 +4136,7 @@ dependencies = [ "cipher", "coarsetime", "derive_builder_fork_arti", + "derive_more", "digest 0.10.6", "educe", "futures", diff --git a/crates/tor-bytes/src/slicewriter.rs b/crates/tor-bytes/src/slicewriter.rs index b3928a8e9..762f32ab9 100644 --- a/crates/tor-bytes/src/slicewriter.rs +++ b/crates/tor-bytes/src/slicewriter.rs @@ -65,8 +65,15 @@ impl SliceWriter { /// /// On failure (if we tried to write too much), return an error. pub fn try_unwrap(self) -> Result<(T, usize), SliceWriterError> { + let offset = self.offset()?; + Ok((self.data, offset)) + } + + /// Return the number of bytes written into this `SliceWriter` so far, + /// or an error if it has overflowed. + pub fn offset(&self) -> Result { if self.offset != usize::MAX { - Ok((self.data, self.offset)) + Ok(self.offset) } else { Err(SliceWriterError::Truncated) } diff --git a/crates/tor-cell/semver.md b/crates/tor-cell/semver.md index 63d94a85b..4e508fe42 100644 --- a/crates/tor-cell/semver.md +++ b/crates/tor-cell/semver.md @@ -8,3 +8,5 @@ BREAKING: Renamed ChanCell->AnyChanCell, ChanMsg->AnyChanMsg. BREAKING: Renamed RelayCell->AnyRelayCell, RelayMsg->AnyRelayMsg. BREAKING: Make ChannelCodec::decode() parameterized. BREAKING: RelayEarly is now a real type. +BREAKING: RelayCell encoding and decoding functions now expect a Box; + /// Channel-local identifier for a circuit. /// /// A circuit ID can be 2 or 4 bytes long; since version 4 of the Tor diff --git a/crates/tor-cell/src/chancell/msg.rs b/crates/tor-cell/src/chancell/msg.rs index e4d10f7b8..d428a8e62 100644 --- a/crates/tor-cell/src/chancell/msg.rs +++ b/crates/tor-cell/src/chancell/msg.rs @@ -1,6 +1,6 @@ //! Different kinds of messages that can be encoded in channel cells. -use super::{ChanCmd, RawCellBody, CELL_DATA_LEN}; +use super::{BoxedCellBody, ChanCmd, RawCellBody, CELL_DATA_LEN}; use std::net::{IpAddr, Ipv4Addr}; use tor_basic_utils::skip_fmt; use tor_bytes::{self, EncodeError, EncodeResult, Error, Readable, Reader, Result, Writer}; @@ -353,7 +353,7 @@ impl Readable for Created2 { /// /// A different protocol is defined over the relay cells; it is implemented /// in the [crate::relaycell] module. -#[derive(Clone, Educe)] +#[derive(Clone, Educe, derive_more::From)] #[educe(Debug)] pub struct Relay { /// The contents of the relay cell as encoded for transfer. @@ -364,7 +364,7 @@ pub struct Relay { /// necessary happen. We should refactor our data handling until we're mostly /// moving around pointers rather than copying data; see ticket #7. #[educe(Debug(method = "skip_fmt"))] - body: Box, + body: BoxedCellBody, } impl Relay { /// Construct a Relay message from a slice containing its contents. @@ -385,11 +385,10 @@ impl Relay { body: Box::new(body), } } - - /// Consume this Relay message and return a RelayCellBody for + /// Consume this Relay message and return a BoxedCellBody for /// encryption/decryption. - pub fn into_relay_body(self) -> RawCellBody { - *self.body + pub fn into_relay_body(self) -> BoxedCellBody { + self.body } /// Wrap this Relay message into a RelayMsg as a RELAY_EARLY cell. pub fn into_early(self) -> AnyChanMsg { @@ -426,13 +425,13 @@ impl Body for RelayEarly { } } impl RelayEarly { - /// Consume this RelayEarly message and return a RelayCellBody for + /// Consume this RelayEarly message and return a BoxedCellBody for /// encryption/decryption. // // (Since this method takes `self` by value, we can't take advantage of // Deref.) - pub fn into_relay_body(self) -> RawCellBody { - *self.0.body + pub fn into_relay_body(self) -> BoxedCellBody { + self.0.body } } diff --git a/crates/tor-cell/src/relaycell.rs b/crates/tor-cell/src/relaycell.rs index d6d53cd7e..9e0cb1705 100644 --- a/crates/tor-cell/src/relaycell.rs +++ b/crates/tor-cell/src/relaycell.rs @@ -1,7 +1,7 @@ //! Implementation for parsing and encoding relay cells -use crate::chancell::{RawCellBody, CELL_DATA_LEN}; -use tor_bytes::{EncodeResult, Error, Result}; +use crate::chancell::{BoxedCellBody, CELL_DATA_LEN}; +use tor_bytes::{EncodeError, EncodeResult, Error, Result}; use tor_bytes::{Reader, Writer}; use tor_error::internal; @@ -231,54 +231,76 @@ impl RelayCell { } /// Consume this relay message and encode it as a 509-byte padded cell /// body. - pub fn encode(self, rng: &mut R) -> crate::Result { + pub fn encode(self, rng: &mut R) -> crate::Result { /// We skip this much space before adding any random padding to the /// end of the cell const MIN_SPACE_BEFORE_PADDING: usize = 4; - // TODO: This implementation is inefficient; it copies too much. - let encoded = self.encode_to_vec()?; - let enc_len = encoded.len(); - if enc_len > CELL_DATA_LEN { - return Err(crate::Error::Internal(internal!( - "too many bytes in relay cell" - ))); - } - let mut raw = [0_u8; CELL_DATA_LEN]; - raw[0..enc_len].copy_from_slice(&encoded); - + let (mut body, enc_len) = self.encode_to_cell()?; + debug_assert!(enc_len <= CELL_DATA_LEN); if enc_len < CELL_DATA_LEN - MIN_SPACE_BEFORE_PADDING { - rng.fill_bytes(&mut raw[enc_len + MIN_SPACE_BEFORE_PADDING..]); + rng.fill_bytes(&mut body[enc_len + MIN_SPACE_BEFORE_PADDING..]); } - Ok(raw) + Ok(body) } /// Consume a relay cell and return its contents, encoded for use - /// in a RELAY or RELAY_EARLY cell - /// - /// TODO: not the best interface, as this requires copying into a cell. - fn encode_to_vec(self) -> EncodeResult> { - let mut w = Vec::new(); + /// in a RELAY or RELAY_EARLY cell. + fn encode_to_cell(self) -> EncodeResult<(BoxedCellBody, usize)> { + // NOTE: This implementation is a bit optimized, since it happens to + // literally every relay cell that we produce. + + // TODO -NM: Add a specialized implementation for making a DATA cell from + // a body? + + /// Wrap a BoxedCellBody and implement AsMut<[u8]> + struct BodyWrapper(BoxedCellBody); + impl AsMut<[u8]> for BodyWrapper { + fn as_mut(&mut self) -> &mut [u8] { + self.0.as_mut() + } + } + /// The position of the length field within a relay cell. + const LEN_POS: usize = 9; + /// The position of the body a relay cell. + const BODY_POS: usize = 11; + + let body = BodyWrapper(Box::new([0_u8; 509])); + + let mut w = tor_bytes::SliceWriter::new(body); w.write_u8(self.msg.cmd().into()); w.write_u16(0); // "Recognized" w.write_u16(self.streamid.0); w.write_u32(0); // Digest - let len_pos = w.len(); + // (It would be simpler to use NestedWriter at this point, but it uses an internal Vec that we are trying to avoid.) + debug_assert_eq!( + w.offset().expect("Overflowed a cell with just the header!"), + LEN_POS + ); w.write_u16(0); // Length. - let body_pos = w.len(); - self.msg.encode_onto(&mut w)?; - assert!(w.len() >= body_pos); // nothing was removed - let payload_len = w.len() - body_pos; - assert!(payload_len <= std::u16::MAX as usize); - *(array_mut_ref![w, len_pos, 2]) = (payload_len as u16).to_be_bytes(); - Ok(w) + debug_assert_eq!( + w.offset().expect("Overflowed a cell with just the header!"), + BODY_POS + ); + self.msg.encode_onto(&mut w)?; // body + let (mut body, written) = w.try_unwrap().map_err(|_| { + EncodeError::Bug(internal!( + "Encoding of relay message was too long to fit into a cell!" + )) + })?; + let payload_len = written - BODY_POS; + debug_assert!(payload_len < std::u16::MAX as usize); + *(array_mut_ref![body.0, LEN_POS, 2]) = (payload_len as u16).to_be_bytes(); + Ok((body.0, written)) } + /// Parse a RELAY or RELAY_EARLY cell body into a RelayCell. /// /// Requires that the cryptographic checks on the message have already been /// performed - pub fn decode(body: RawCellBody) -> Result { + #[allow(clippy::needless_pass_by_value)] // TODO this will go away soon. + pub fn decode(body: BoxedCellBody) -> Result { let mut reader = Reader::from_slice(body.as_ref()); Self::decode_from_reader(&mut reader) } diff --git a/crates/tor-cell/tests/test_relaycell.rs b/crates/tor-cell/tests/test_relaycell.rs index 404056a72..08496c261 100644 --- a/crates/tor-cell/tests/test_relaycell.rs +++ b/crates/tor-cell/tests/test_relaycell.rs @@ -34,7 +34,7 @@ impl rand::RngCore for BadRng { // I won't tell if you don't. impl rand::CryptoRng for BadRng {} -fn decode(body: &str) -> [u8; CELL_BODY_LEN] { +fn decode(body: &str) -> Box<[u8; CELL_BODY_LEN]> { let mut body = body.to_string(); body.retain(|c| !c.is_whitespace()); let mut body = hex::decode(body).unwrap(); @@ -42,7 +42,7 @@ fn decode(body: &str) -> [u8; CELL_BODY_LEN] { let mut result = [0; CELL_BODY_LEN]; result.copy_from_slice(&body[..]); - result + Box::new(result) } fn cell(body: &str, id: StreamId, msg: AnyRelayMsg) { diff --git a/crates/tor-proto/Cargo.toml b/crates/tor-proto/Cargo.toml index 573903c3c..20df80d8d 100644 --- a/crates/tor-proto/Cargo.toml +++ b/crates/tor-proto/Cargo.toml @@ -33,6 +33,7 @@ bytes = "1" cipher = { version = "0.4.1", features = ["zeroize"] } coarsetime = "0.1.20" derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" } +derive_more = "0.99.3" digest = "0.10.0" educe = "0.4.6" futures = "0.3.14" diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index 366a3508c..1fc756442 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -815,7 +815,7 @@ mod test { use hex_literal::hex; use std::time::Duration; use tor_basic_utils::test_rng::testing_rng; - use tor_cell::chancell::{msg as chanmsg, AnyChanCell}; + use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody}; use tor_cell::relaycell::{msg as relaymsg, AnyRelayCell, StreamId}; use tor_linkspec::OwnedCircTarget; use tor_rtcompat::{Runtime, SleepProvider}; @@ -825,11 +825,10 @@ mod test { where ID: Into, { - let body: RelayCellBody = AnyRelayCell::new(id.into(), msg) + let body: BoxedCellBody = AnyRelayCell::new(id.into(), msg) .encode(&mut testing_rng()) - .unwrap() - .into(); - let chanmsg = chanmsg::Relay::from_raw(body.into()); + .unwrap(); + let chanmsg = chanmsg::Relay::from(body); ClientCircChanMsg::Relay(chanmsg) } diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index 0f457691e..1f84ed05a 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -34,7 +34,7 @@ use crate::circuit::sendme::StreamSendWindow; use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey}; use crate::crypto::handshake::{ClientHandshake, KeyGenerator}; use safelog::sensitive as sv; -use tor_cell::chancell::{self, ChanMsg}; +use tor_cell::chancell::{self, BoxedCellBody, ChanMsg}; use tor_cell::chancell::{AnyChanCell, CircId}; use tor_linkspec::{LinkSpec, OwnedChanTarget, RelayIds}; use tor_llcrypto::pk; @@ -1005,7 +1005,7 @@ impl Reactor { let tag = self.crypto_out.encrypt(&mut body, hop)?; // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort // the whole circuit (e.g. by returning an error). - let msg = chancell::msg::Relay::from_raw(body.into()); + let msg = chancell::msg::Relay::from(BoxedCellBody::from(body)); let msg = if early { AnyChanMsg::RelayEarly(msg.into()) } else { diff --git a/crates/tor-proto/src/crypto/cell.rs b/crates/tor-proto/src/crypto/cell.rs index acdc2c13b..82764685c 100644 --- a/crates/tor-proto/src/crypto/cell.rs +++ b/crates/tor-proto/src/crypto/cell.rs @@ -8,25 +8,15 @@ //! use crate::{Error, Result}; -use tor_cell::chancell::RawCellBody; +use tor_cell::chancell::BoxedCellBody; use tor_error::internal; use generic_array::GenericArray; /// Type for the body of a relay cell. -#[derive(Clone)] -pub(crate) struct RelayCellBody(RawCellBody); +#[derive(Clone, derive_more::From, derive_more::Into)] +pub(crate) struct RelayCellBody(BoxedCellBody); -impl From for RelayCellBody { - fn from(body: RawCellBody) -> Self { - RelayCellBody(body) - } -} -impl From for RawCellBody { - fn from(cell: RelayCellBody) -> Self { - cell.0 - } -} impl AsRef<[u8]> for RelayCellBody { fn as_ref(&self) -> &[u8] { &self.0[..] @@ -447,7 +437,7 @@ mod test { let mut rng = testing_rng(); for _ in 1..300 { // outbound cell - let mut cell = [0_u8; 509]; + let mut cell = Box::new([0_u8; 509]); let mut cell_orig = [0_u8; 509]; rng.fill_bytes(&mut cell_orig); cell.copy_from_slice(&cell_orig); @@ -461,7 +451,7 @@ mod test { assert_eq!(&cell.as_ref()[9..], &cell_orig.as_ref()[9..]); // inbound cell - let mut cell = [0_u8; 509]; + let mut cell = Box::new([0_u8; 509]); let mut cell_orig = [0_u8; 509]; rng.fill_bytes(&mut cell_orig); cell.copy_from_slice(&cell_orig); @@ -480,14 +470,14 @@ mod test { // Try a failure: sending a cell to a nonexistent hop. { - let mut cell = [0_u8; 509].into(); + let mut cell = Box::new([0_u8; 509]).into(); let err = cc_out.encrypt(&mut cell, 10.into()); assert!(matches!(err, Err(Error::NoSuchHop))); } // Try a failure: A junk cell with no correct auth from any layer. { - let mut cell = [0_u8; 509].into(); + let mut cell = Box::new([0_u8; 509]).into(); let err = cc_in.decrypt(&mut cell); assert!(matches!(err, Err(Error::BadCellAuth))); } @@ -527,7 +517,7 @@ mod test { let mut j = 0; for cellno in 0..51 { - let mut body = [0_u8; 509]; + let mut body = Box::new([0_u8; 509]); body[0] = 2; // command: data. body[4] = 1; // streamid: 1. body[9] = 1; // length: 498 From a809a809ba22a91387a4f49c4a22164369830270 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 12:16:05 -0500 Subject: [PATCH 03/17] tor-cell: Add a new UnparsedRelayCell We'll use this to router relay messages on a circuit to the appropriate stream, and hand them to that stream, without parsing the message until the stream has been determined. --- crates/tor-cell/src/relaycell.rs | 46 +++++++++++++++++++++++++ crates/tor-cell/tests/test_relaycell.rs | 16 +++++++-- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/crates/tor-cell/src/relaycell.rs b/crates/tor-cell/src/relaycell.rs index 9e0cb1705..aa2fd56fd 100644 --- a/crates/tor-cell/src/relaycell.rs +++ b/crates/tor-cell/src/relaycell.rs @@ -177,6 +177,52 @@ impl StreamId { } } +/// A relay cell that has not yet been fully parsed, but where we have access to +/// the command and stream ID, for dispatching purposes. +// +// TODO HS: Settle on some names here. I would prefer "UnparsedRelayMsg" here so +// it can eventually be compatible with proposal 340. But that would make our +// RelayCell and RelayMsg types below kind of illogical. Perhaps we should rename... +// this -> UnparsedRelayMsg +// RelayCell -> ParsedRelayMsg +// RelayMsg -> RelayMsgBody? +// Ideas appreciated -NM +#[derive(Clone, Debug)] +pub struct UnparsedRelayCell { + /// The body of the cell. + body: BoxedCellBody, + // NOTE: we could also have a separate command and stream ID field here, but + // we expect to be working with a TON of these, so we will be mildly + // over-optimized and just peek into the body. + // + // It *is* a bit ugly to have to encode so much knowledge about the format in + // different functions here, but that information shouldn't leak out of this module. +} + +impl UnparsedRelayCell { + /// Wrap a BoxedCellBody as an UnparsedRelayCell. + pub fn from_body(body: BoxedCellBody) -> Self { + Self { body } + } + /// Return the command for this cell. + pub fn cmd(&self) -> RelayCmd { + /// Position of the command within the cell body. + const CMD_OFFSET: usize = 0; + self.body[CMD_OFFSET].into() + } + /// Return the stream ID for the stream that this cell corresponds to. + pub fn stream_id(&self) -> StreamId { + /// Position of the stream ID within the cell body. + const STREAM_ID_OFFSET: usize = 3; + + u16::from_be_bytes(*arrayref::array_ref![self.body, STREAM_ID_OFFSET, 2]).into() + } + /// Decode this unparsed cell into a given cell type. + pub fn decode(self) -> Result> { + RelayCell::decode(self.body) + } +} + /// A decoded and parsed relay cell of unrestricted type. pub type AnyRelayCell = RelayCell; diff --git a/crates/tor-cell/tests/test_relaycell.rs b/crates/tor-cell/tests/test_relaycell.rs index 08496c261..a0480c2ca 100644 --- a/crates/tor-cell/tests/test_relaycell.rs +++ b/crates/tor-cell/tests/test_relaycell.rs @@ -2,7 +2,9 @@ #![allow(clippy::uninlined_format_args)] use tor_bytes::Error; -use tor_cell::relaycell::{msg, msg::AnyRelayMsg, AnyRelayCell, RelayCmd, RelayMsg, StreamId}; +use tor_cell::relaycell::{ + msg, msg::AnyRelayMsg, AnyRelayCell, RelayCmd, RelayMsg, StreamId, UnparsedRelayCell, +}; #[cfg(feature = "experimental-udp")] use std::{ @@ -51,9 +53,19 @@ fn cell(body: &str, id: StreamId, msg: AnyRelayMsg) { let expected = AnyRelayCell::new(id, msg); - let decoded = AnyRelayCell::decode(body).unwrap(); + let decoded = AnyRelayCell::decode(body.clone()).unwrap(); + + let decoded_from_partial = UnparsedRelayCell::from_body(body) + .decode::() + .unwrap(); + assert_eq!(decoded_from_partial.stream_id(), decoded.stream_id()); + assert_eq!(decoded_from_partial.cmd(), decoded.cmd()); assert_eq!(format!("{:?}", expected), format!("{:?}", decoded)); + assert_eq!( + format!("{:?}", expected), + format!("{:?}", decoded_from_partial) + ); let encoded1 = decoded.encode(&mut bad_rng).unwrap(); let encoded2 = expected.encode(&mut bad_rng).unwrap(); From 65cc7d09743250ca98ea237bd7fd9eae788491fa Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 14:33:17 -0500 Subject: [PATCH 04/17] tor-cell: Note an opportunity for future optimization --- crates/tor-cell/src/relaycell/msg.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/tor-cell/src/relaycell/msg.rs b/crates/tor-cell/src/relaycell/msg.rs index f81bcf0d8..a33afbae4 100644 --- a/crates/tor-cell/src/relaycell/msg.rs +++ b/crates/tor-cell/src/relaycell/msg.rs @@ -268,6 +268,11 @@ impl Body for Begin { #[derive(Debug, Clone)] pub struct Data { /// Contents of the cell, to be sent on a specific stream + // + // TODO: There's a good case to be made that this should be a BoxedCellBody + // instead, to avoid allocations and copies. But first probably we should + // figure out how proposal 340 will work with this. Possibly, we will wind + // up using `bytes` or something. body: Vec, } impl Data { From 3f1457ea046b724a1df33dee6c91c1e51a773dd5 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 15:53:59 -0500 Subject: [PATCH 05/17] tor-cell: Implement {Relay,Chan}Msg for every body type This will make it ergonomic to decode a single body type without having to declare a variant that accepts only a single message. --- crates/tor-cell/src/chancell/msg.rs | 48 ++++++++++++++++++++++++++++ crates/tor-cell/src/relaycell/msg.rs | 46 ++++++++++++++++++++++++++ crates/tor-cell/src/restrict.rs | 8 ++--- 3 files changed, 97 insertions(+), 5 deletions(-) diff --git a/crates/tor-cell/src/chancell/msg.rs b/crates/tor-cell/src/chancell/msg.rs index d428a8e62..49571e99e 100644 --- a/crates/tor-cell/src/chancell/msg.rs +++ b/crates/tor-cell/src/chancell/msg.rs @@ -1170,6 +1170,54 @@ msg_into_cell!(AuthChallenge); msg_into_cell!(Authenticate); msg_into_cell!(Authorize); +/// Helper: declare a ChanMsg implementation for a message type that has a +/// fixed command. +// +// TODO: It might be better to merge Body with ChanMsg, but that is complex, +// since their needs are _slightly_ different. +macro_rules! msg_impl_chanmsg { + ($($body:ident,)*) => + {paste::paste!{ + $(impl crate::chancell::ChanMsg for $body { + fn cmd(&self) -> crate::chancell::ChanCmd { crate::chancell::ChanCmd::[< $body:snake:upper >] } + fn encode_onto(self, w: &mut W) -> tor_bytes::EncodeResult<()> { + crate::chancell::msg::Body::encode_onto(self, w) + } + fn decode_from_reader(cmd: ChanCmd, r: &mut tor_bytes::Reader<'_>) -> tor_bytes::Result { + if cmd != crate::chancell::ChanCmd::[< $body:snake:upper >] { + return Err(tor_bytes::Error::InvalidMessage( + format!("Expected {} command; got {cmd}", stringify!([< $body:snake:upper >])).into() + )); + } + crate::chancell::msg::Body::decode_from_reader(r) + } + })* + }} +} + +// We implement ChanMsg for every body type, so that you can write code that does +// e.g. ChanCell. +msg_impl_chanmsg!( + Padding, + Vpadding, + Create, + CreateFast, + Create2, + Created, + CreatedFast, + Created2, + Relay, + RelayEarly, + Destroy, + Netinfo, + Versions, + PaddingNegotiate, + Certs, + AuthChallenge, + Authenticate, + Authorize, +); + #[cfg(test)] mod test { // @@ begin test lint list maintained by maint/add_warning @@ diff --git a/crates/tor-cell/src/relaycell/msg.rs b/crates/tor-cell/src/relaycell/msg.rs index a33afbae4..32beaec34 100644 --- a/crates/tor-cell/src/relaycell/msg.rs +++ b/crates/tor-cell/src/relaycell/msg.rs @@ -1175,3 +1175,49 @@ empty_body! { /// Opens a new stream on a directory cache. pub struct BeginDir {} } + +/// Helper: declare a RelayMsg implementation for a message type that has a +/// fixed command. +// +// TODO: It might be better to merge Body with RelayMsg, but that is complex, +// since their needs are _slightly_ different. +macro_rules! msg_impl_relaymsg { + ($($body:ident),* $(,)?) => + {paste::paste!{ + $(impl crate::relaycell::RelayMsg for $body { + fn cmd(&self) -> crate::relaycell::RelayCmd { crate::relaycell::RelayCmd::[< $body:snake:upper >] } + fn encode_onto(self, w: &mut W) -> tor_bytes::EncodeResult<()> { + crate::relaycell::msg::Body::encode_onto(self, w) + } + fn decode_from_reader(cmd: RelayCmd, r: &mut tor_bytes::Reader<'_>) -> tor_bytes::Result { + if cmd != crate::relaycell::RelayCmd::[< $body:snake:upper >] { + return Err(tor_bytes::Error::InvalidMessage( + format!("Expected {} command; got {cmd}", stringify!([< $body:snake:upper >])).into() + )); + } + crate::relaycell::msg::Body::decode_from_reader(r) + } + })* + }} +} + +msg_impl_relaymsg!( + Begin, Data, End, Connected, Sendme, Extend, Extended, Extend2, Extended2, Truncate, Truncated, + Drop, Resolve, Resolved, BeginDir, +); + +#[cfg(feature = "experimental-udp")] +msg_impl_relaymsg!(ConnectUdp, ConnectedUdp, Datagram); + +#[cfg(feature = "onion-service")] +msg_impl_relaymsg!( + EstablishIntro, + EstablishRendezvous, + Introduce1, + Introduce2, + Rendezvous1, + Rendezvous2, + IntroEstablished, + RendezvousEstablished, + IntroduceAck, +); diff --git a/crates/tor-cell/src/restrict.rs b/crates/tor-cell/src/restrict.rs index c975080e4..f39f9b05c 100644 --- a/crates/tor-cell/src/restrict.rs +++ b/crates/tor-cell/src/restrict.rs @@ -134,24 +134,22 @@ macro_rules! restricted_msg { where W: $crate::restrict::tor_bytes::Writer + ?Sized { - use $body_type; match self { $( $( #[cfg(feature=$feat)] )? - Self::$case(m) => m.encode_onto(w), + Self::$case(m) => $body_type::encode_onto(m, w), )* $( - Self::$unrecognized(u) => u.encode_onto(w), + Self::$unrecognized(u) => $body_type::encode_onto(u, w), )? } } fn decode_from_reader(cmd: $cmd_type, r: &mut $crate::restrict::tor_bytes::Reader<'_>) -> $crate::restrict::tor_bytes::Result { - use $body_type; Ok(match cmd { $( $( #[cfg(feature=$feat)] )? - $cmd_type:: [<$case:snake:upper>] => Self::$case( $msg_mod :: $case :: decode_from_reader(r)? ), + $cmd_type:: [<$case:snake:upper>] => Self::$case( <$msg_mod :: $case as $body_type> :: decode_from_reader(r)? ), )* $( _ => Self::$unrecognized($unrec_type::decode_with_cmd(cmd, r)?), From e4bc7ef57b4fdd5298f3ff9a7c87c08852c6e2bf Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 16:44:25 -0500 Subject: [PATCH 06/17] tor-cell: Add RelayCell::into_msg. --- crates/tor-cell/src/relaycell.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/tor-cell/src/relaycell.rs b/crates/tor-cell/src/relaycell.rs index aa2fd56fd..7cb88d696 100644 --- a/crates/tor-cell/src/relaycell.rs +++ b/crates/tor-cell/src/relaycell.rs @@ -275,6 +275,10 @@ impl RelayCell { pub fn msg(&self) -> &M { &self.msg } + /// Consume this cell and return the underlying message. + pub fn into_msg(self) -> M { + self.msg + } /// Consume this relay message and encode it as a 509-byte padded cell /// body. pub fn encode(self, rng: &mut R) -> crate::Result { From 0765243f5e7f6073866e8f624757665fa6e3303b Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 14:50:26 -0500 Subject: [PATCH 07/17] tor-proto: Use UnparsedRelayCell to start deferring cell processing. In general, we want to avoid parsing these cells until we are fairly sure that they are something we would accept. --- crates/tor-proto/src/circuit/reactor.rs | 38 ++++++++++++++++++------- crates/tor-proto/src/circuit/sendme.rs | 31 +++++++++++--------- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index 1f84ed05a..181a3c3d4 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -16,7 +16,7 @@ use std::marker::PhantomData; use std::pin::Pin; use tor_cell::chancell::msg::{AnyChanMsg, Relay}; use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme}; -use tor_cell::relaycell::{AnyRelayCell, RelayCmd, RelayMsg, StreamId}; +use tor_cell::relaycell::{AnyRelayCell, RelayCmd, RelayMsg, StreamId, UnparsedRelayCell}; use futures::channel::{mpsc, oneshot}; use futures::Sink; @@ -822,7 +822,13 @@ impl Reactor { } /// Handle a RELAY cell on this circuit with stream ID 0. - fn handle_meta_cell(&mut self, hopnum: HopNum, msg: AnyRelayMsg) -> Result { + fn handle_meta_cell(&mut self, hopnum: HopNum, msg: UnparsedRelayCell) -> Result { + // XXXX: Defer this even further. + let (_, msg) = msg + .decode::() + .map_err(|e| Error::from_bytes_err(e, "relay meta cell"))? + .into_streamid_and_msg(); + // SENDME cells and TRUNCATED get handled internally by the circuit. if let AnyRelayMsg::Sendme(s) = msg { return self.handle_sendme(hopnum, s); @@ -976,7 +982,7 @@ impl Reactor { early: bool, cell: AnyRelayCell, ) -> Result<()> { - let c_t_w = sendme::cell_counts_towards_windows(&cell); + let c_t_w = sendme::cmd_counts_towards_windows(cell.cmd()); let stream_id = cell.stream_id(); // Check whether the hop send window is empty, if this cell counts towards windows. // NOTE(eta): It is imperative this happens *before* calling encrypt() below, otherwise @@ -1241,9 +1247,8 @@ impl Reactor { tag_copy.copy_from_slice(tag); tag_copy }; - // Decode the cell. - let msg = AnyRelayCell::decode(body.into()) - .map_err(|e| Error::from_bytes_err(e, "relay cell"))?; + // Put the cell into a format where we can make sense of it. + let msg = UnparsedRelayCell::from_body(body.into()); let c_t_w = sendme::cell_counts_towards_windows(&msg); @@ -1277,12 +1282,11 @@ impl Reactor { .put(); } - // Break the message apart into its streamID and message. - let (streamid, msg) = msg.into_streamid_and_msg(); - // If this cell wants/refuses to have a Stream ID, does it // have/not have one? - if !msg.cmd().accepts_streamid_val(streamid) { + let cmd = msg.cmd(); + let streamid = msg.stream_id(); + if !cmd.accepts_streamid_val(streamid) { return Err(Error::CircProto(format!( "Invalid stream ID {} for relay command {}", sv(streamid), @@ -1309,6 +1313,12 @@ impl Reactor { }) => { // The stream for this message exists, and is open. + // XXXX: Defer this decoding even further. + let (_, msg) = msg + .decode::() + .map_err(|e| Error::from_bytes_err(e, "relay stream cell"))? + .into_streamid_and_msg(); + if let AnyRelayMsg::Sendme(_) = msg { // We need to handle sendmes here, not in the stream's // recv() method, or else we'd never notice them if the @@ -1354,6 +1364,12 @@ impl Reactor { Some(StreamEnt::EndSent(halfstream)) => { // We sent an end but maybe the other side hasn't heard. + // XXXX: Defer this decoding even further. + let (_, msg) = msg + .decode::() + .map_err(|e| Error::from_bytes_err(e, "relay half-stream cell"))? + .into_streamid_and_msg(); + if matches!(msg, AnyRelayMsg::End(_)) { hop.map.end_received(streamid)?; } else { @@ -1361,7 +1377,7 @@ impl Reactor { } } _ => { - // No stream wants this message. + // No stream wants this message, or ever did. return Err(Error::CircProto( "Cell received on nonexistent stream!?".into(), )); diff --git a/crates/tor-proto/src/circuit/sendme.rs b/crates/tor-proto/src/circuit/sendme.rs index 455242879..9365e8b41 100644 --- a/crates/tor-proto/src/circuit/sendme.rs +++ b/crates/tor-proto/src/circuit/sendme.rs @@ -12,8 +12,8 @@ use std::collections::VecDeque; -use tor_cell::relaycell::msg::AnyRelayMsg; -use tor_cell::relaycell::AnyRelayCell; +use tor_cell::relaycell::{msg::AnyRelayMsg, UnparsedRelayCell}; +use tor_cell::relaycell::{RelayCmd, RelayMsg}; use tor_error::internal; use crate::{Error, Result}; @@ -267,14 +267,19 @@ impl RecvWindow

{ } } -/// Return true if this message is counted by flow-control windows. -pub(crate) fn msg_counts_towards_windows(msg: &AnyRelayMsg) -> bool { - matches!(msg, AnyRelayMsg::Data(_)) +/// Return true if this message type is counted by flow-control windows. +pub(crate) fn cmd_counts_towards_windows(cmd: RelayCmd) -> bool { + cmd == RelayCmd::DATA } /// Return true if this message is counted by flow-control windows. -pub(crate) fn cell_counts_towards_windows(cell: &AnyRelayCell) -> bool { - msg_counts_towards_windows(cell.msg()) +pub(crate) fn msg_counts_towards_windows(msg: &AnyRelayMsg) -> bool { + cmd_counts_towards_windows(msg.cmd()) +} + +/// Return true if this message is counted by flow-control windows. +pub(crate) fn cell_counts_towards_windows(cell: &UnparsedRelayCell) -> bool { + cmd_counts_towards_windows(cell.cmd()) } #[cfg(test)] @@ -290,24 +295,24 @@ mod test { #![allow(clippy::unchecked_duration_subtraction)] //! use super::*; + use tor_basic_utils::test_rng::testing_rng; use tor_cell::relaycell::{msg, AnyRelayCell}; #[test] fn what_counts() { + let mut rng = testing_rng(); let m = msg::Begin::new("www.torproject.org", 443, 0) .unwrap() .into(); assert!(!msg_counts_towards_windows(&m)); - assert!(!cell_counts_towards_windows(&AnyRelayCell::new( - 77.into(), - m + assert!(!cell_counts_towards_windows(&UnparsedRelayCell::from_body( + AnyRelayCell::new(77.into(), m).encode(&mut rng).unwrap() ))); let m = msg::Data::new(&b"Education is not a prerequisite to political control-political control is the cause of popular education."[..]).unwrap().into(); // Du Bois assert!(msg_counts_towards_windows(&m)); - assert!(cell_counts_towards_windows(&AnyRelayCell::new( - 128.into(), - m + assert!(cell_counts_towards_windows(&UnparsedRelayCell::from_body( + AnyRelayCell::new(128.into(), m).encode(&mut rng).unwrap() ))); } From 2e483124cbff320cfabe1dcf4e8ad2398804f2ac Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 16:46:56 -0500 Subject: [PATCH 08/17] tor-proto: defer meta-cell parsing to the last moment. --- crates/tor-proto/src/circuit.rs | 7 ++-- crates/tor-proto/src/circuit/reactor.rs | 56 ++++++++++--------------- 2 files changed, 27 insertions(+), 36 deletions(-) diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index 1fc756442..f051f9d14 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -1243,9 +1243,10 @@ mod test { let error = bad_extend_test_impl(&rt, 2.into(), cc).await; match error { - Error::CircProto(s) => { - assert_eq!(s, "wanted EXTENDED2; got EXTENDED"); - } + Error::BytesErr { + err: tor_bytes::Error::InvalidMessage(_), + object: "extended2 message", + } => {} _ => panic!(), } }); diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index 181a3c3d4..c74ecef04 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -16,7 +16,7 @@ use std::marker::PhantomData; use std::pin::Pin; use tor_cell::chancell::msg::{AnyChanMsg, Relay}; use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme}; -use tor_cell::relaycell::{AnyRelayCell, RelayCmd, RelayMsg, StreamId, UnparsedRelayCell}; +use tor_cell::relaycell::{AnyRelayCell, RelayCmd, StreamId, UnparsedRelayCell}; use futures::channel::{mpsc, oneshot}; use futures::Sink; @@ -260,7 +260,7 @@ pub(super) trait MetaCellHandler: Send { /// Called when the message we were waiting for arrives. /// /// Gets a copy of the `Reactor` in order to do anything it likes there. - fn finish(&mut self, msg: AnyRelayMsg, reactor: &mut Reactor) -> Result<()>; + fn finish(&mut self, msg: UnparsedRelayCell, reactor: &mut Reactor) -> Result<()>; } /// An object that can extend a circuit by one hop, using the `MetaCellHandler` trait. @@ -373,28 +373,12 @@ where fn expected_hop(&self) -> HopNum { self.expected_hop } - fn finish(&mut self, msg: AnyRelayMsg, reactor: &mut Reactor) -> Result<()> { - // Did we get the right response? - if msg.cmd() != RelayCmd::EXTENDED2 { - return Err(Error::CircProto(format!( - "wanted EXTENDED2; got {}", - msg.cmd(), - ))); - } + fn finish(&mut self, msg: UnparsedRelayCell, reactor: &mut Reactor) -> Result<()> { + let msg = msg + .decode::() + .map_err(|e| Error::from_bytes_err(e, "extended2 message"))? + .into_msg(); - // ???? Do we need to shutdown the circuit for the remaining error - // ???? cases in this function? - - let msg = match msg { - AnyRelayMsg::Extended2(e) => e, - _ => { - return Err(Error::from(internal!( - "Message body {:?} didn't match cmd {:?}", - msg, - msg.cmd() - ))) - } - }; let relay_handshake = msg.into_body(); trace!( @@ -823,18 +807,21 @@ impl Reactor { /// Handle a RELAY cell on this circuit with stream ID 0. fn handle_meta_cell(&mut self, hopnum: HopNum, msg: UnparsedRelayCell) -> Result { - // XXXX: Defer this even further. - let (_, msg) = msg - .decode::() - .map_err(|e| Error::from_bytes_err(e, "relay meta cell"))? - .into_streamid_and_msg(); - // SENDME cells and TRUNCATED get handled internally by the circuit. - if let AnyRelayMsg::Sendme(s) = msg { - return self.handle_sendme(hopnum, s); + if msg.cmd() == RelayCmd::SENDME { + let sendme = msg + .decode::() + .map_err(|e| Error::from_bytes_err(e, "sendme message"))? + .into_msg(); + + return self.handle_sendme(hopnum, sendme); } - if let AnyRelayMsg::Truncated(t) = msg { - let reason = t.reason(); + if msg.cmd() == RelayCmd::TRUNCATED { + let truncated = msg + .decode::() + .map_err(|e| Error::from_bytes_err(e, "truncated message"))? + .into_msg(); + let reason = truncated.reason(); debug!( "{}: Truncated from hop {}. Reason: {} [{}]", self.unique_id, @@ -863,6 +850,9 @@ impl Reactor { self.unique_id, ret ); + // TODO: If the meta handler rejects the cell, we should + // probably kill the circuit depending on its type. + // (See #773.) let _ = done.send(ret); // don't care if sender goes away Ok(CellStatus::Continue) } else { From bd0f6f5adf79b68621277c8fc13db1b021d493c5 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 16:48:58 -0500 Subject: [PATCH 09/17] tor-proto: stop reactor (and kill circuit) if meta handler fails If the meta handler reports an error, then the circuit has violated its protocol, and needs to be shut down. Fixes #773. --- crates/tor-proto/src/circuit/reactor.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index c74ecef04..6846215d2 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -260,6 +260,8 @@ pub(super) trait MetaCellHandler: Send { /// Called when the message we were waiting for arrives. /// /// Gets a copy of the `Reactor` in order to do anything it likes there. + /// + /// If this function returns an error, the reactor will shut down. fn finish(&mut self, msg: UnparsedRelayCell, reactor: &mut Reactor) -> Result<()>; } @@ -850,11 +852,12 @@ impl Reactor { self.unique_id, ret ); - // TODO: If the meta handler rejects the cell, we should - // probably kill the circuit depending on its type. - // (See #773.) + let status = match &ret { + Ok(()) => Ok(CellStatus::Continue), + Err(e) => Err(e.clone()), + }; let _ = done.send(ret); // don't care if sender goes away - Ok(CellStatus::Continue) + status } else { // Somebody wanted a message from a different hop! Put this // one back. From 58c3b8276c9c5dc4f2b51089ad0a45cf54d82476 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 17:19:07 -0500 Subject: [PATCH 10/17] tor-proto: Defer parsing of messages send to half-closed streams This includes a partial solution for #769, but also turned up another bug (#774) while I was working on it. I'll close them both once I have a real solution. --- crates/tor-proto/src/circuit/halfstream.rs | 118 +++++++++++++++------ crates/tor-proto/src/circuit/reactor.rs | 14 +-- crates/tor-proto/src/circuit/streammap.rs | 4 + 3 files changed, 94 insertions(+), 42 deletions(-) diff --git a/crates/tor-proto/src/circuit/halfstream.rs b/crates/tor-proto/src/circuit/halfstream.rs index b97c4746a..f6204a171 100644 --- a/crates/tor-proto/src/circuit/halfstream.rs +++ b/crates/tor-proto/src/circuit/halfstream.rs @@ -5,8 +5,8 @@ use crate::circuit::sendme::{StreamRecvWindow, StreamSendWindow}; use crate::{Error, Result}; -use tor_cell::relaycell::{msg::AnyRelayMsg, RelayMsg}; -use tor_error::internal; +use tor_cell::relaycell::UnparsedRelayCell; +use tor_cell::restricted_msg; /// Type to track state of half-closed streams. /// @@ -27,6 +27,21 @@ pub(super) struct HalfStream { connected_ok: bool, } +restricted_msg! { + enum HalfStreamMsg : RelayMsg { + Sendme, Data, Connected, End, Resolved + } +} + +/// A status value returned by [`HalfStream::handle_msg`]. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub(crate) enum HalfStreamStatus { + /// The stream has been closed successfully and can now be dropped. + Closed, + /// The stream is still half,open, and must still be tracked. + Open, +} + impl HalfStream { /// Create a new half-closed stream. pub(super) fn new( @@ -47,33 +62,36 @@ impl HalfStream { /// The caller must handle END cells; it is an internal error to pass /// END cells to this method. /// no ends here. - pub(super) fn handle_msg(&mut self, msg: &AnyRelayMsg) -> Result<()> { + pub(super) fn handle_msg(&mut self, msg: UnparsedRelayCell) -> Result { + use HalfStreamMsg::*; + use HalfStreamStatus::*; + let msg = msg + .decode::() + .map_err(|e| Error::from_bytes_err(e, "message on half-closed stream"))? + .into_msg(); match msg { - AnyRelayMsg::Sendme(_) => { + Sendme(_) => { self.sendw.put(Some(()))?; - Ok(()) + Ok(Open) } - AnyRelayMsg::Data(_) => { + Data(_) => { self.recvw.take()?; - Ok(()) + Ok(Open) } - AnyRelayMsg::Connected(_) => { + Connected(_) => { if self.connected_ok { self.connected_ok = false; - Ok(()) + Ok(Open) } else { Err(Error::CircProto( "Bad CONNECTED cell on a closed stream!".into(), )) } } - AnyRelayMsg::End(_) => Err(Error::from(internal!( - "END cell in HalfStream::handle_msg()" - ))), - _ => Err(Error::CircProto(format!( - "Bad {} cell on a closed stream!", - msg.cmd() - ))), + End(_) => Ok(Closed), + // TODO XXXX: We should only allow a Resolved() on streams where we sent + // Resolve. My intended solution for #774 will fix this too. + Resolved(_) => Ok(Closed), } } } @@ -92,20 +110,40 @@ mod test { //! use super::*; use crate::circuit::sendme::{StreamRecvWindow, StreamSendWindow}; - use tor_cell::relaycell::msg; + use rand::{CryptoRng, Rng}; + use tor_basic_utils::test_rng::testing_rng; + use tor_cell::relaycell::{ + msg::{self, AnyRelayMsg}, + AnyRelayCell, + }; + + fn to_unparsed(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayCell { + UnparsedRelayCell::from_body( + AnyRelayCell::new(77.into(), val) + .encode(rng) + .expect("encoding failed"), + ) + } #[test] fn halfstream_sendme() -> Result<()> { + let mut rng = testing_rng(); + let mut sendw = StreamSendWindow::new(101); sendw.take(&())?; // Make sure that it will accept one sendme. let mut hs = HalfStream::new(sendw, StreamRecvWindow::new(20), true); // one sendme is fine - let m = msg::Sendme::new_empty().into(); - assert!(hs.handle_msg(&m).is_ok()); + let m = msg::Sendme::new_empty(); + assert!(hs + .handle_msg(to_unparsed(&mut rng, m.clone().into())) + .is_ok()); // but no more were expected! - let e = hs.handle_msg(&m).err().unwrap(); + let e = hs + .handle_msg(to_unparsed(&mut rng, m.into())) + .err() + .unwrap(); assert_eq!( format!("{}", e), "Circuit protocol violation: Received a SENDME when none was expected" @@ -120,17 +158,21 @@ mod test { #[test] fn halfstream_data() { let mut hs = hs_new(); + let mut rng = testing_rng(); // 20 data cells are okay. - let m = msg::Data::new(&b"this offer is unrepeatable"[..]) - .unwrap() - .into(); + let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap(); for _ in 0_u8..20 { - assert!(hs.handle_msg(&m).is_ok()); + assert!(hs + .handle_msg(to_unparsed(&mut rng, m.clone().into())) + .is_ok()); } // But one more is a protocol violation. - let e = hs.handle_msg(&m).err().unwrap(); + let e = hs + .handle_msg(to_unparsed(&mut rng, m.into())) + .err() + .unwrap(); assert_eq!( format!("{}", e), "Circuit protocol violation: Received a data cell in violation of a window" @@ -140,16 +182,24 @@ mod test { #[test] fn halfstream_connected() { let mut hs = hs_new(); + let mut rng = testing_rng(); // We were told to accept a connected, so we'll accept one // and no more. - let m = msg::Connected::new_empty().into(); - assert!(hs.handle_msg(&m).is_ok()); - assert!(hs.handle_msg(&m).is_err()); + let m = msg::Connected::new_empty(); + assert!(hs + .handle_msg(to_unparsed(&mut rng, m.clone().into())) + .is_ok()); + assert!(hs + .handle_msg(to_unparsed(&mut rng, m.clone().into())) + .is_err()); // If we try that again with connected_ok == false, we won't // accept any. let mut hs = HalfStream::new(StreamSendWindow::new(20), StreamRecvWindow::new(20), false); - let e = hs.handle_msg(&m).err().unwrap(); + let e = hs + .handle_msg(to_unparsed(&mut rng, m.into())) + .err() + .unwrap(); assert_eq!( format!("{}", e), "Circuit protocol violation: Bad CONNECTED cell on a closed stream!" @@ -159,11 +209,15 @@ mod test { #[test] fn halfstream_other() { let mut hs = hs_new(); - let m = msg::Extended2::new(Vec::new()).into(); - let e = hs.handle_msg(&m).err().unwrap(); + let mut rng = testing_rng(); + let m = msg::Extended2::new(Vec::new()); + let e = hs + .handle_msg(to_unparsed(&mut rng, m.into())) + .err() + .unwrap(); assert_eq!( format!("{}", e), - "Circuit protocol violation: Bad EXTENDED2 cell on a closed stream!" + "Unable to parse message on half-closed stream" ); } } diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index 6846215d2..9918c4db8 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -1,4 +1,5 @@ //! Code to handle incoming cells on a circuit. +use super::halfstream::HalfStreamStatus; use super::streammap::{ShouldSendEnd, StreamEnt}; use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse}; use crate::circuit::unique_id::UniqId; @@ -1357,16 +1358,9 @@ impl Reactor { Some(StreamEnt::EndSent(halfstream)) => { // We sent an end but maybe the other side hasn't heard. - // XXXX: Defer this decoding even further. - let (_, msg) = msg - .decode::() - .map_err(|e| Error::from_bytes_err(e, "relay half-stream cell"))? - .into_streamid_and_msg(); - - if matches!(msg, AnyRelayMsg::End(_)) { - hop.map.end_received(streamid)?; - } else { - halfstream.handle_msg(&msg)?; + match halfstream.handle_msg(msg)? { + HalfStreamStatus::Open => {} + HalfStreamStatus::Closed => hop.map.end_received(streamid)?, } } _ => { diff --git a/crates/tor-proto/src/circuit/streammap.rs b/crates/tor-proto/src/circuit/streammap.rs index ca8f81bb8..ff9bd574c 100644 --- a/crates/tor-proto/src/circuit/streammap.rs +++ b/crates/tor-proto/src/circuit/streammap.rs @@ -114,6 +114,10 @@ impl StreamMap { rx, send_window, dropped: 0, + // TODO XXX: This is true for all streams at this point, but it is + // problematic to accept even one CONNECTED for RESOLVE/RESOLVED + // streams, and will become more so once UDP streams are + // implemented. received_connected: false, }; // This "65536" seems too aggressive, but it's what tor does. From 41b50b6c56debc1eeb644d65b2738645bf98fce7 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 17:48:33 -0500 Subject: [PATCH 11/17] tor-proto: Push stream message parsing into the stream objects. This closes #525, and ensures, at last, that we don't parse any message that we wouldn't accept. --- crates/tor-proto/src/circuit/reactor.rs | 24 +++++------ crates/tor-proto/src/circuit/sendme.rs | 8 ++-- crates/tor-proto/src/circuit/streammap.rs | 9 ++-- crates/tor-proto/src/stream/data.rs | 51 +++++++++++++++++------ crates/tor-proto/src/stream/raw.rs | 17 ++++---- crates/tor-proto/src/stream/resolve.rs | 30 ++++++++----- 6 files changed, 88 insertions(+), 51 deletions(-) diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index 9918c4db8..3575d5acd 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -119,7 +119,7 @@ pub(super) enum CtrlMsg { /// SENDME cells once we've read enough out of the other end. If it *does* block, we /// can assume someone is trying to send us more cells than they should, and abort /// the stream. - sender: mpsc::Sender, + sender: mpsc::Sender, /// A channel to receive messages to send on this stream from. rx: mpsc::Receiver, /// Oneshot channel to notify on completion, with the allocated stream ID. @@ -1159,7 +1159,7 @@ impl Reactor { cx: &mut Context<'_>, hopnum: HopNum, message: AnyRelayMsg, - sender: mpsc::Sender, + sender: mpsc::Sender, rx: mpsc::Receiver, ) -> Result { let hop = self @@ -1307,13 +1307,11 @@ impl Reactor { }) => { // The stream for this message exists, and is open. - // XXXX: Defer this decoding even further. - let (_, msg) = msg - .decode::() - .map_err(|e| Error::from_bytes_err(e, "relay stream cell"))? - .into_streamid_and_msg(); - - if let AnyRelayMsg::Sendme(_) = msg { + if msg.cmd() == RelayCmd::SENDME { + let _sendme = msg + .decode::() + .map_err(|e| Error::from_bytes_err(e, "Sendme message on stream"))? + .into_msg(); // We need to handle sendmes here, not in the stream's // recv() method, or else we'd never notice them if the // stream isn't reading. @@ -1321,19 +1319,19 @@ impl Reactor { return Ok(CellStatus::Continue); } - if matches!(msg, AnyRelayMsg::Connected(_)) { + if msg.cmd() == RelayCmd::CONNECTED { // Remember that we've received a Connected cell, and can't get another, // even if we become a HalfStream. (This rule is enforced separately at // DataStreamReader.) + + // TODO: This is problematic; see #774. *received_connected = true; } // Remember whether this was an end cell: if so we should // close the stream. - let is_end_cell = matches!(msg, AnyRelayMsg::End(_)); + let is_end_cell = msg.cmd() == RelayCmd::END; - // TODO: Add a wrapper type here to reject cells that should - // never go to a client, like BEGIN. if let Err(e) = sink.try_send(msg) { if e.is_full() { // If we get here, we either have a logic bug (!), or an attacker diff --git a/crates/tor-proto/src/circuit/sendme.rs b/crates/tor-proto/src/circuit/sendme.rs index 9365e8b41..7c6232bd9 100644 --- a/crates/tor-proto/src/circuit/sendme.rs +++ b/crates/tor-proto/src/circuit/sendme.rs @@ -12,8 +12,8 @@ use std::collections::VecDeque; -use tor_cell::relaycell::{msg::AnyRelayMsg, UnparsedRelayCell}; -use tor_cell::relaycell::{RelayCmd, RelayMsg}; +use tor_cell::relaycell::RelayCmd; +use tor_cell::relaycell::UnparsedRelayCell; use tor_error::internal; use crate::{Error, Result}; @@ -273,7 +273,9 @@ pub(crate) fn cmd_counts_towards_windows(cmd: RelayCmd) -> bool { } /// Return true if this message is counted by flow-control windows. -pub(crate) fn msg_counts_towards_windows(msg: &AnyRelayMsg) -> bool { +#[cfg(test)] +pub(crate) fn msg_counts_towards_windows(msg: &tor_cell::relaycell::msg::AnyRelayMsg) -> bool { + use tor_cell::relaycell::RelayMsg; cmd_counts_towards_windows(msg.cmd()) } diff --git a/crates/tor-proto/src/circuit/streammap.rs b/crates/tor-proto/src/circuit/streammap.rs index ff9bd574c..87245e41a 100644 --- a/crates/tor-proto/src/circuit/streammap.rs +++ b/crates/tor-proto/src/circuit/streammap.rs @@ -3,6 +3,7 @@ use crate::circuit::halfstream::HalfStream; use crate::circuit::sendme; use crate::{Error, Result}; +use tor_cell::relaycell::UnparsedRelayCell; /// Mapping from stream ID to streams. // NOTE: This is a work in progress and I bet I'll refactor it a lot; // it needs to stay opaque! @@ -24,7 +25,7 @@ pub(super) enum StreamEnt { /// An open stream. Open { /// Sink to send relay cells tagged for this stream into. - sink: mpsc::Sender, + sink: mpsc::Sender, /// Stream for cells that should be sent down this stream. rx: mpsc::Receiver, /// Send window, for congestion control purposes. @@ -105,7 +106,7 @@ impl StreamMap { /// Add an entry to this map; return the newly allocated StreamId. pub(super) fn add_ent( &mut self, - sink: mpsc::Sender, + sink: mpsc::Sender, rx: mpsc::Receiver, send_window: sendme::StreamSendWindow, ) -> Result { @@ -114,10 +115,10 @@ impl StreamMap { rx, send_window, dropped: 0, - // TODO XXX: This is true for all streams at this point, but it is + // TODO: This is true for all streams at this point, but it is // problematic to accept even one CONNECTED for RESOLVE/RESOLVED // streams, and will become more so once UDP streams are - // implemented. + // implemented. This is #774. received_connected: false, }; // This "65536" seems too aggressive, but it's what tor does. diff --git a/crates/tor-proto/src/stream/data.rs b/crates/tor-proto/src/stream/data.rs index d45f766ba..1d460e801 100644 --- a/crates/tor-proto/src/stream/data.rs +++ b/crates/tor-proto/src/stream/data.rs @@ -3,7 +3,6 @@ use crate::{Error, Result}; use tor_cell::relaycell::msg::EndReason; -use tor_cell::relaycell::RelayMsg; use futures::io::{AsyncRead, AsyncWrite}; use futures::task::{Context, Poll}; @@ -15,6 +14,7 @@ use tokio_crate::io::ReadBuf; use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite}; #[cfg(feature = "tokio")] use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; +use tor_cell::restricted_msg; use std::fmt::Debug; use std::io::Result as IoResult; @@ -25,7 +25,7 @@ use educe::Educe; use crate::circuit::StreamTarget; use crate::stream::StreamReader; use tor_basic_utils::skip_fmt; -use tor_cell::relaycell::msg::{AnyRelayMsg, Data}; +use tor_cell::relaycell::msg::Data; use tor_error::internal; /// An anonymized stream over the Tor network. @@ -150,6 +150,14 @@ pub struct DataReader { state: Option, } +restricted_msg! { + /// An allowable incoming message on a data stream. + enum DataStreamMsg:RelayMsg { + // SENDME is handled by the reactor. + Data, End, Connected, + } +} + impl DataStream { /// Wrap raw stream reader and target parts as a DataStream. /// @@ -592,26 +600,43 @@ impl DataReaderImpl { /// This function takes ownership of self so that we can avoid /// self-referential lifetimes. async fn read_cell(mut self) -> (Self, Result<()>) { - let cell = self.s.recv().await; + use DataStreamMsg::*; + let msg = match self.s.recv().await { + Ok(unparsed) => match unparsed.decode::() { + Ok(cell) => cell.into_msg(), + Err(e) => { + self.s.protocol_error(); + return ( + self, + Err(Error::from_bytes_err(e, "message on a data stream")), + ); + } + }, + Err(e) => return (self, Err(e)), + }; - let result = match cell { - Ok(AnyRelayMsg::Connected(_)) if !self.connected => { + let result = match msg { + Connected(_) if !self.connected => { self.connected = true; Ok(()) } - Ok(AnyRelayMsg::Data(d)) if self.connected => { + Connected(_) => { + self.s.protocol_error(); + Err(Error::StreamProto( + "Received a second connect cell on a data stream".to_string(), + )) + } + Data(d) if self.connected => { self.add_data(d.into()); Ok(()) } - Ok(AnyRelayMsg::End(e)) => Err(Error::EndReceived(e.reason())), - Err(e) => Err(e), - Ok(m) => { + Data(_) => { self.s.protocol_error(); - Err(Error::StreamProto(format!( - "Unexpected {} cell on stream", - m.cmd() - ))) + Err(Error::StreamProto( + "Received a data cell an unconnected stream".to_string(), + )) } + End(e) => Err(Error::EndReceived(e.reason())), }; (self, result) diff --git a/crates/tor-proto/src/stream/raw.rs b/crates/tor-proto/src/stream/raw.rs index 4609c2efa..66554f465 100644 --- a/crates/tor-proto/src/stream/raw.rs +++ b/crates/tor-proto/src/stream/raw.rs @@ -3,7 +3,7 @@ use crate::circuit::{sendme, StreamTarget}; use crate::{Error, Result}; -use tor_cell::relaycell::msg::AnyRelayMsg; +use tor_cell::relaycell::{RelayCmd, UnparsedRelayCell}; use crate::circuit::sendme::StreamRecvWindow; use futures::channel::mpsc; @@ -15,7 +15,7 @@ pub struct StreamReader { /// The underlying `StreamTarget` for this stream. pub(crate) target: StreamTarget, /// Channel to receive stream messages from the reactor. - pub(crate) receiver: mpsc::Receiver, + pub(crate) receiver: mpsc::Receiver, /// Congestion control receive window for this stream. /// /// Having this here means we're only going to update it when the end consumer of this stream @@ -29,7 +29,7 @@ pub struct StreamReader { impl StreamReader { /// Try to read the next relay message from this stream. - async fn recv_raw(&mut self) -> Result { + async fn recv_raw(&mut self) -> Result { if self.ended { // Prevent reading from streams after they've ended. return Err(Error::NotConnected); @@ -44,7 +44,7 @@ impl StreamReader { Error::StreamProto("stream channel disappeared without END cell?".into()) })?; - if sendme::msg_counts_towards_windows(&msg) && self.recv_window.take()? { + if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? { self.target.send_sendme()?; self.recv_window.put(); } @@ -54,10 +54,13 @@ impl StreamReader { /// As recv_raw, but if there is an error or an end cell, note that this /// stream has ended. - pub async fn recv(&mut self) -> Result { + pub async fn recv(&mut self) -> Result { let val = self.recv_raw().await; - match val { - Err(_) | Ok(AnyRelayMsg::End(_)) => { + match &val { + Err(_) => { + self.ended = true; + } + Ok(m) if m.cmd() == RelayCmd::END => { self.ended = true; } _ => {} diff --git a/crates/tor-proto/src/stream/resolve.rs b/crates/tor-proto/src/stream/resolve.rs index 2c2d47cba..e81823e3d 100644 --- a/crates/tor-proto/src/stream/resolve.rs +++ b/crates/tor-proto/src/stream/resolve.rs @@ -2,8 +2,8 @@ use crate::stream::StreamReader; use crate::{Error, Result}; -use tor_cell::relaycell::msg::{AnyRelayMsg, Resolved}; -use tor_cell::relaycell::RelayMsg; +use tor_cell::relaycell::msg::Resolved; +use tor_cell::restricted_msg; /// A ResolveStream represents a pending DNS request made with a RESOLVE /// cell. @@ -12,11 +12,18 @@ pub struct ResolveStream { s: StreamReader, } +restricted_msg! { + /// An allowable reply for a RESOLVE message. + enum ResolveResponseMsg : RelayMsg { + End, + Resolved, + } +} + impl ResolveStream { /// Wrap a RawCellStream into a ResolveStream. /// /// Call only after sending a RESOLVE cell. - #[allow(dead_code)] // need to implement a caller for this. pub(crate) fn new(s: StreamReader) -> Self { ResolveStream { s } } @@ -24,17 +31,18 @@ impl ResolveStream { /// Read a message from this stream telling us the answer to our /// name lookup request. pub async fn read_msg(&mut self) -> Result { + use ResolveResponseMsg::*; let cell = self.s.recv().await?; - match cell { - AnyRelayMsg::End(e) => Err(Error::EndReceived(e.reason())), - AnyRelayMsg::Resolved(r) => Ok(r), - m => { + let msg = match cell.decode::() { + Ok(cell) => cell.into_msg(), + Err(e) => { self.s.protocol_error(); - Err(Error::StreamProto(format!( - "Unexpected {} on resolve stream", - m.cmd() - ))) + return Err(Error::from_bytes_err(e, "response on a resolve stream")); } + }; + match msg { + End(e) => Err(Error::EndReceived(e.reason())), + Resolved(r) => Ok(r), } } } From 1ee6bfa59cf17adaae2f1a545241e8e73b38c166 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 13 Feb 2023 18:41:12 -0500 Subject: [PATCH 12/17] tor-proto: note implications for future HS work --- crates/tor-proto/src/circuit.rs | 10 ++++++++++ crates/tor-proto/src/stream/incoming.rs | 2 ++ 2 files changed, 12 insertions(+) diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index f051f9d14..f3dae893e 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -241,6 +241,11 @@ impl ClientCirc { // TODO hs: rename this. "control_messages" is kind of ambiguous; we use // "control" for a lot of other things. We say "meta" elsewhere in the // reactor code, but "meta messages" just sounds odd. + // + // TODO hs: possibly this should take a more encoded message type. + // + // TODO hs: it might be nice to avoid exposing tor-cell APIs in the + // tor-proto interface. #[allow(clippy::missing_panics_doc, unused_variables)] // TODO hs remove #[cfg(feature = "experimental-api")] pub async fn send_control_message(&self, msg: AnyRelayMsg) -> Result<()> { @@ -279,6 +284,11 @@ impl ClientCirc { // TODO hs: rename this. "control_messages" is kind of ambiguous; we use // "control" for a lot of other things. We say "meta" elsewhere in the // reactor code, but "meta messages" just sounds odd. + // + // TODO hs: This should return a stream of UnparsedRelayCell. + // + // TODO hs: it might be nice to avoid exposing tor-cell APIs in the + // tor-proto interface. #[cfg(feature = "experimental-api")] #[allow(clippy::missing_panics_doc, unused_variables)] // TODO hs remove pub fn receive_control_messages( diff --git a/crates/tor-proto/src/stream/incoming.rs b/crates/tor-proto/src/stream/incoming.rs index 55ca0831b..3dd732a7a 100644 --- a/crates/tor-proto/src/stream/incoming.rs +++ b/crates/tor-proto/src/stream/incoming.rs @@ -27,6 +27,8 @@ pub struct IncomingStream { } /// A message that can be sent to begin a stream. +// +// TODO hs perhaps this should be made with restricted_msg!() #[derive(Debug, Clone)] #[non_exhaustive] pub enum IncomingStreamRequest { From 19c9593acfa2eef256179f909211ee83565624ad Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 14 Feb 2023 14:18:25 -0500 Subject: [PATCH 13/17] Move slicewriter to tor-cell and make it private. Also, add some comments about how it is likely to change. --- crates/tor-bytes/semver.md | 1 - crates/tor-bytes/src/lib.rs | 2 -- crates/tor-cell/src/lib.rs | 1 + crates/tor-cell/src/relaycell.rs | 2 +- .../{tor-bytes => tor-cell}/src/slicewriter.rs | 18 ++++++++++++------ 5 files changed, 14 insertions(+), 10 deletions(-) rename crates/{tor-bytes => tor-cell}/src/slicewriter.rs (87%) diff --git a/crates/tor-bytes/semver.md b/crates/tor-bytes/semver.md index 57673c94e..2d979ebc8 100644 --- a/crates/tor-bytes/semver.md +++ b/crates/tor-bytes/semver.md @@ -1,2 +1 @@ MODIFIED: Error::BadMessage is deprecated, Error::InvalidMessage is new. -MODIFIED: New SliceWriter type. diff --git a/crates/tor-bytes/src/lib.rs b/crates/tor-bytes/src/lib.rs index a76f527a9..51d116ec3 100644 --- a/crates/tor-bytes/src/lib.rs +++ b/crates/tor-bytes/src/lib.rs @@ -42,13 +42,11 @@ mod err; mod impls; mod reader; mod secretbuf; -mod slicewriter; mod writer; pub use err::{EncodeError, Error}; pub use reader::Reader; pub use secretbuf::SecretBuf; -pub use slicewriter::{SliceWriter, SliceWriterError}; pub use writer::Writer; use arrayref::array_ref; diff --git a/crates/tor-cell/src/lib.rs b/crates/tor-cell/src/lib.rs index 3f81ec5a7..8da1a2c9a 100644 --- a/crates/tor-cell/src/lib.rs +++ b/crates/tor-cell/src/lib.rs @@ -42,6 +42,7 @@ pub mod chancell; mod err; pub mod relaycell; pub mod restrict; +mod slicewriter; pub use err::Error; diff --git a/crates/tor-cell/src/relaycell.rs b/crates/tor-cell/src/relaycell.rs index 7cb88d696..aa41d37af 100644 --- a/crates/tor-cell/src/relaycell.rs +++ b/crates/tor-cell/src/relaycell.rs @@ -318,7 +318,7 @@ impl RelayCell { let body = BodyWrapper(Box::new([0_u8; 509])); - let mut w = tor_bytes::SliceWriter::new(body); + let mut w = crate::slicewriter::SliceWriter::new(body); w.write_u8(self.msg.cmd().into()); w.write_u16(0); // "Recognized" w.write_u16(self.streamid.0); diff --git a/crates/tor-bytes/src/slicewriter.rs b/crates/tor-cell/src/slicewriter.rs similarity index 87% rename from crates/tor-bytes/src/slicewriter.rs rename to crates/tor-cell/src/slicewriter.rs index 762f32ab9..31f57e9bc 100644 --- a/crates/tor-bytes/src/slicewriter.rs +++ b/crates/tor-cell/src/slicewriter.rs @@ -1,14 +1,20 @@ //! A Writer that can put its results into an buffer of known byte size without //! changing that size. +//! +//! TODO: This API is crate-private in tor-cell because tor-cell is the only one +//! to use it -- and only uses it in one place. Its existence may be an argument +//! for refactoring the Writer API entirely. +//! +//! NOTE: This will likely change or go away in the future. use thiserror::Error; -use crate::Writer; +use tor_bytes::Writer; /// An error that occurred while trying to unwrap a SliceWriter. #[non_exhaustive] #[derive(Clone, Debug, Error)] -pub enum SliceWriterError { +pub(crate) enum SliceWriterError { /// We've #[error("Tried to write more than would fit into a fixed-size slice.")] Truncated, @@ -23,7 +29,7 @@ pub enum SliceWriterError { // // TODO: in theory we could have a version of this that used MaybeUninit, but I // don't think that would be worth it. -pub struct SliceWriter { +pub(crate) struct SliceWriter { /// The object we're writing into. Must have fewer than usize::LEN bytes. data: T, /// Our current write position within that object. @@ -53,7 +59,7 @@ impl SliceWriter { /// /// Preexisting bytes in the `data` object will be unchanged, unless you use /// the [`Writer`] API to write to them. - pub fn new(data: T) -> Self { + pub(crate) fn new(data: T) -> Self { Self { data, offset: 0 } } @@ -64,14 +70,14 @@ impl SliceWriter { /// after that position are unchanged.) /// /// On failure (if we tried to write too much), return an error. - pub fn try_unwrap(self) -> Result<(T, usize), SliceWriterError> { + pub(crate) fn try_unwrap(self) -> Result<(T, usize), SliceWriterError> { let offset = self.offset()?; Ok((self.data, offset)) } /// Return the number of bytes written into this `SliceWriter` so far, /// or an error if it has overflowed. - pub fn offset(&self) -> Result { + pub(crate) fn offset(&self) -> Result { if self.offset != usize::MAX { Ok(self.offset) } else { From 5d28e9e7d0c67eace157767b77f084022035997b Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 14 Feb 2023 14:22:34 -0500 Subject: [PATCH 14/17] slicewriter: rename a local variable. --- crates/tor-cell/src/slicewriter.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/tor-cell/src/slicewriter.rs b/crates/tor-cell/src/slicewriter.rs index 31f57e9bc..c647a86c8 100644 --- a/crates/tor-cell/src/slicewriter.rs +++ b/crates/tor-cell/src/slicewriter.rs @@ -41,11 +41,11 @@ where T: AsMut<[u8]>, { fn write_all(&mut self, b: &[u8]) { - let new_len = self.offset.saturating_add(b.len()); - if new_len <= self.data.as_mut().len() { + let new_offset = self.offset.saturating_add(b.len()); + if new_offset <= self.data.as_mut().len() { // Note that if we reach this case, the addition was not saturating. - self.data.as_mut()[self.offset..new_len].copy_from_slice(b); - self.offset = new_len; + self.data.as_mut()[self.offset..new_offset].copy_from_slice(b); + self.offset = new_offset; } else { self.offset = usize::MAX; } From fce1c83f2e73ede24d876216b8a1ea13fe296650 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 14 Feb 2023 14:27:01 -0500 Subject: [PATCH 15/17] tor-cell: Add another debug_assert to relay cell encoding --- crates/tor-cell/src/relaycell.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/tor-cell/src/relaycell.rs b/crates/tor-cell/src/relaycell.rs index aa41d37af..a432bb209 100644 --- a/crates/tor-cell/src/relaycell.rs +++ b/crates/tor-cell/src/relaycell.rs @@ -198,6 +198,8 @@ pub struct UnparsedRelayCell { // It *is* a bit ugly to have to encode so much knowledge about the format in // different functions here, but that information shouldn't leak out of this module. } +/// Position of the stream ID within the cell body. +const STREAM_ID_OFFSET: usize = 3; impl UnparsedRelayCell { /// Wrap a BoxedCellBody as an UnparsedRelayCell. @@ -212,9 +214,6 @@ impl UnparsedRelayCell { } /// Return the stream ID for the stream that this cell corresponds to. pub fn stream_id(&self) -> StreamId { - /// Position of the stream ID within the cell body. - const STREAM_ID_OFFSET: usize = 3; - u16::from_be_bytes(*arrayref::array_ref![self.body, STREAM_ID_OFFSET, 2]).into() } /// Decode this unparsed cell into a given cell type. @@ -321,6 +320,10 @@ impl RelayCell { let mut w = crate::slicewriter::SliceWriter::new(body); w.write_u8(self.msg.cmd().into()); w.write_u16(0); // "Recognized" + debug_assert_eq!( + w.offset().expect("Overflowed a cell with just the header!"), + STREAM_ID_OFFSET + ); w.write_u16(self.streamid.0); w.write_u32(0); // Digest // (It would be simpler to use NestedWriter at this point, but it uses an internal Vec that we are trying to avoid.) From 21db73f182ad822e1693eb0fca1a619066c25376 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 14 Feb 2023 14:30:26 -0500 Subject: [PATCH 16/17] tor-cell: add a TODO comment about simplifying Body away. --- crates/tor-cell/src/chancell/msg.rs | 4 ++++ crates/tor-cell/src/relaycell/msg.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/crates/tor-cell/src/chancell/msg.rs b/crates/tor-cell/src/chancell/msg.rs index 49571e99e..62acf7af3 100644 --- a/crates/tor-cell/src/chancell/msg.rs +++ b/crates/tor-cell/src/chancell/msg.rs @@ -1175,6 +1175,10 @@ msg_into_cell!(Authorize); // // TODO: It might be better to merge Body with ChanMsg, but that is complex, // since their needs are _slightly_ different. +// +// TODO: If we *do* make the change above, then perhaps we should also implement +// our restricted enums in terms of this, so that there is only one instance of +// [<$body:snake:upper>] macro_rules! msg_impl_chanmsg { ($($body:ident,)*) => {paste::paste!{ diff --git a/crates/tor-cell/src/relaycell/msg.rs b/crates/tor-cell/src/relaycell/msg.rs index 32beaec34..86c561c59 100644 --- a/crates/tor-cell/src/relaycell/msg.rs +++ b/crates/tor-cell/src/relaycell/msg.rs @@ -1181,6 +1181,10 @@ empty_body! { // // TODO: It might be better to merge Body with RelayMsg, but that is complex, // since their needs are _slightly_ different. +// +// TODO: If we *do* make the change above, then perhaps we should also implement +// our restricted enums in terms of this, so that there is only one instance of +// [<$body:snake:upper>] macro_rules! msg_impl_relaymsg { ($($body:ident),* $(,)?) => {paste::paste!{ From 19c0dd153ad957fc108c59f578facbfc3e562d2a Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Wed, 15 Feb 2023 10:51:03 -0500 Subject: [PATCH 17/17] tor-proto: Add a TODO about simplifying a common pattern. --- crates/tor-proto/src/circuit/reactor.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index 3575d5acd..6ece08ed5 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -811,6 +811,14 @@ impl Reactor { /// Handle a RELAY cell on this circuit with stream ID 0. fn handle_meta_cell(&mut self, hopnum: HopNum, msg: UnparsedRelayCell) -> Result { // SENDME cells and TRUNCATED get handled internally by the circuit. + + // TODO: This pattern (Check command, try to decode, map error) occurs + // several times, and would be good to extract simplify. Such + // simplification is obstructed by a couple of factors: First, that + // there is not currently a good way to get the RelayCmd from _type_ of + // a RelayMsg. Second, that decode() [correctly] consumes the + // UnparsedRelayMsg. I tried a macro-based approach, and didn't care + // for it. -nickm if msg.cmd() == RelayCmd::SENDME { let sendme = msg .decode::()