Merge branch 'ticket_525_part3_take2' into 'main'

Finish #525 for relay messages: Only parse messages at the last instant.

Closes #773 and #525

See merge request tpo/core/arti!1017
This commit is contained in:
Nick Mathewson 2023-02-15 16:35:27 +00:00
commit 0f2218f4d3
21 changed files with 653 additions and 204 deletions

1
Cargo.lock generated
View File

@ -4153,6 +4153,7 @@ dependencies = [
"cipher",
"coarsetime",
"derive_builder_fork_arti",
"derive_more",
"digest 0.10.6",
"educe",
"futures",

View File

@ -8,3 +8,5 @@ BREAKING: Renamed ChanCell->AnyChanCell, ChanMsg->AnyChanMsg.
BREAKING: Renamed RelayCell->AnyRelayCell, RelayMsg->AnyRelayMsg.
BREAKING: Make ChannelCodec::decode() parameterized.
BREAKING: RelayEarly is now a real type.
BREAKING: RelayCell encoding and decoding functions now expect a Box<Body.

View File

@ -22,6 +22,11 @@ pub const CELL_DATA_LEN: usize = 509;
/// A cell body considered as a raw array of bytes
pub type RawCellBody = [u8; CELL_DATA_LEN];
/// A [`RawCellBody`] stored on the heap.
///
/// We use this often to avoid copying cell bodies around.
pub type BoxedCellBody = Box<RawCellBody>;
/// Channel-local identifier for a circuit.
///
/// A circuit ID can be 2 or 4 bytes long; since version 4 of the Tor

View File

@ -1,6 +1,6 @@
//! Different kinds of messages that can be encoded in channel cells.
use super::{ChanCmd, RawCellBody, CELL_DATA_LEN};
use super::{BoxedCellBody, ChanCmd, RawCellBody, CELL_DATA_LEN};
use std::net::{IpAddr, Ipv4Addr};
use tor_basic_utils::skip_fmt;
use tor_bytes::{self, EncodeError, EncodeResult, Error, Readable, Reader, Result, Writer};
@ -353,7 +353,7 @@ impl Readable for Created2 {
///
/// A different protocol is defined over the relay cells; it is implemented
/// in the [crate::relaycell] module.
#[derive(Clone, Educe)]
#[derive(Clone, Educe, derive_more::From)]
#[educe(Debug)]
pub struct Relay {
/// The contents of the relay cell as encoded for transfer.
@ -364,7 +364,7 @@ pub struct Relay {
/// necessary happen. We should refactor our data handling until we're mostly
/// moving around pointers rather than copying data; see ticket #7.
#[educe(Debug(method = "skip_fmt"))]
body: Box<RawCellBody>,
body: BoxedCellBody,
}
impl Relay {
/// Construct a Relay message from a slice containing its contents.
@ -385,11 +385,10 @@ impl Relay {
body: Box::new(body),
}
}
/// Consume this Relay message and return a RelayCellBody for
/// Consume this Relay message and return a BoxedCellBody for
/// encryption/decryption.
pub fn into_relay_body(self) -> RawCellBody {
*self.body
pub fn into_relay_body(self) -> BoxedCellBody {
self.body
}
/// Wrap this Relay message into a RelayMsg as a RELAY_EARLY cell.
pub fn into_early(self) -> AnyChanMsg {
@ -426,13 +425,13 @@ impl Body for RelayEarly {
}
}
impl RelayEarly {
/// Consume this RelayEarly message and return a RelayCellBody for
/// Consume this RelayEarly message and return a BoxedCellBody for
/// encryption/decryption.
//
// (Since this method takes `self` by value, we can't take advantage of
// Deref.)
pub fn into_relay_body(self) -> RawCellBody {
*self.0.body
pub fn into_relay_body(self) -> BoxedCellBody {
self.0.body
}
}
@ -1171,6 +1170,58 @@ msg_into_cell!(AuthChallenge);
msg_into_cell!(Authenticate);
msg_into_cell!(Authorize);
/// Helper: declare a ChanMsg implementation for a message type that has a
/// fixed command.
//
// TODO: It might be better to merge Body with ChanMsg, but that is complex,
// since their needs are _slightly_ different.
//
// TODO: If we *do* make the change above, then perhaps we should also implement
// our restricted enums in terms of this, so that there is only one instance of
// [<$body:snake:upper>]
macro_rules! msg_impl_chanmsg {
($($body:ident,)*) =>
{paste::paste!{
$(impl crate::chancell::ChanMsg for $body {
fn cmd(&self) -> crate::chancell::ChanCmd { crate::chancell::ChanCmd::[< $body:snake:upper >] }
fn encode_onto<W: tor_bytes::Writer + ?Sized>(self, w: &mut W) -> tor_bytes::EncodeResult<()> {
crate::chancell::msg::Body::encode_onto(self, w)
}
fn decode_from_reader(cmd: ChanCmd, r: &mut tor_bytes::Reader<'_>) -> tor_bytes::Result<Self> {
if cmd != crate::chancell::ChanCmd::[< $body:snake:upper >] {
return Err(tor_bytes::Error::InvalidMessage(
format!("Expected {} command; got {cmd}", stringify!([< $body:snake:upper >])).into()
));
}
crate::chancell::msg::Body::decode_from_reader(r)
}
})*
}}
}
// We implement ChanMsg for every body type, so that you can write code that does
// e.g. ChanCell<Relay>.
msg_impl_chanmsg!(
Padding,
Vpadding,
Create,
CreateFast,
Create2,
Created,
CreatedFast,
Created2,
Relay,
RelayEarly,
Destroy,
Netinfo,
Versions,
PaddingNegotiate,
Certs,
AuthChallenge,
Authenticate,
Authorize,
);
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@

View File

@ -42,6 +42,7 @@ pub mod chancell;
mod err;
pub mod relaycell;
pub mod restrict;
mod slicewriter;
pub use err::Error;

View File

@ -1,7 +1,7 @@
//! Implementation for parsing and encoding relay cells
use crate::chancell::{RawCellBody, CELL_DATA_LEN};
use tor_bytes::{EncodeResult, Error, Result};
use crate::chancell::{BoxedCellBody, CELL_DATA_LEN};
use tor_bytes::{EncodeError, EncodeResult, Error, Result};
use tor_bytes::{Reader, Writer};
use tor_error::internal;
@ -177,6 +177,51 @@ impl StreamId {
}
}
/// A relay cell that has not yet been fully parsed, but where we have access to
/// the command and stream ID, for dispatching purposes.
//
// TODO HS: Settle on some names here. I would prefer "UnparsedRelayMsg" here so
// it can eventually be compatible with proposal 340. But that would make our
// RelayCell and RelayMsg types below kind of illogical. Perhaps we should rename...
// this -> UnparsedRelayMsg
// RelayCell -> ParsedRelayMsg
// RelayMsg -> RelayMsgBody?
// Ideas appreciated -NM
#[derive(Clone, Debug)]
pub struct UnparsedRelayCell {
/// The body of the cell.
body: BoxedCellBody,
// NOTE: we could also have a separate command and stream ID field here, but
// we expect to be working with a TON of these, so we will be mildly
// over-optimized and just peek into the body.
//
// It *is* a bit ugly to have to encode so much knowledge about the format in
// different functions here, but that information shouldn't leak out of this module.
}
/// Position of the stream ID within the cell body.
const STREAM_ID_OFFSET: usize = 3;
impl UnparsedRelayCell {
/// Wrap a BoxedCellBody as an UnparsedRelayCell.
pub fn from_body(body: BoxedCellBody) -> Self {
Self { body }
}
/// Return the command for this cell.
pub fn cmd(&self) -> RelayCmd {
/// Position of the command within the cell body.
const CMD_OFFSET: usize = 0;
self.body[CMD_OFFSET].into()
}
/// Return the stream ID for the stream that this cell corresponds to.
pub fn stream_id(&self) -> StreamId {
u16::from_be_bytes(*arrayref::array_ref![self.body, STREAM_ID_OFFSET, 2]).into()
}
/// Decode this unparsed cell into a given cell type.
pub fn decode<M: RelayMsg>(self) -> Result<RelayCell<M>> {
RelayCell::decode(self.body)
}
}
/// A decoded and parsed relay cell of unrestricted type.
pub type AnyRelayCell = RelayCell<msg::AnyRelayMsg>;
@ -229,56 +274,86 @@ impl<M: RelayMsg> RelayCell<M> {
pub fn msg(&self) -> &M {
&self.msg
}
/// Consume this cell and return the underlying message.
pub fn into_msg(self) -> M {
self.msg
}
/// Consume this relay message and encode it as a 509-byte padded cell
/// body.
pub fn encode<R: Rng + CryptoRng>(self, rng: &mut R) -> crate::Result<RawCellBody> {
pub fn encode<R: Rng + CryptoRng>(self, rng: &mut R) -> crate::Result<BoxedCellBody> {
/// We skip this much space before adding any random padding to the
/// end of the cell
const MIN_SPACE_BEFORE_PADDING: usize = 4;
// TODO: This implementation is inefficient; it copies too much.
let encoded = self.encode_to_vec()?;
let enc_len = encoded.len();
if enc_len > CELL_DATA_LEN {
return Err(crate::Error::Internal(internal!(
"too many bytes in relay cell"
)));
}
let mut raw = [0_u8; CELL_DATA_LEN];
raw[0..enc_len].copy_from_slice(&encoded);
let (mut body, enc_len) = self.encode_to_cell()?;
debug_assert!(enc_len <= CELL_DATA_LEN);
if enc_len < CELL_DATA_LEN - MIN_SPACE_BEFORE_PADDING {
rng.fill_bytes(&mut raw[enc_len + MIN_SPACE_BEFORE_PADDING..]);
rng.fill_bytes(&mut body[enc_len + MIN_SPACE_BEFORE_PADDING..]);
}
Ok(raw)
Ok(body)
}
/// Consume a relay cell and return its contents, encoded for use
/// in a RELAY or RELAY_EARLY cell
///
/// TODO: not the best interface, as this requires copying into a cell.
fn encode_to_vec(self) -> EncodeResult<Vec<u8>> {
let mut w = Vec::new();
/// in a RELAY or RELAY_EARLY cell.
fn encode_to_cell(self) -> EncodeResult<(BoxedCellBody, usize)> {
// NOTE: This implementation is a bit optimized, since it happens to
// literally every relay cell that we produce.
// TODO -NM: Add a specialized implementation for making a DATA cell from
// a body?
/// Wrap a BoxedCellBody and implement AsMut<[u8]>
struct BodyWrapper(BoxedCellBody);
impl AsMut<[u8]> for BodyWrapper {
fn as_mut(&mut self) -> &mut [u8] {
self.0.as_mut()
}
}
/// The position of the length field within a relay cell.
const LEN_POS: usize = 9;
/// The position of the body a relay cell.
const BODY_POS: usize = 11;
let body = BodyWrapper(Box::new([0_u8; 509]));
let mut w = crate::slicewriter::SliceWriter::new(body);
w.write_u8(self.msg.cmd().into());
w.write_u16(0); // "Recognized"
debug_assert_eq!(
w.offset().expect("Overflowed a cell with just the header!"),
STREAM_ID_OFFSET
);
w.write_u16(self.streamid.0);
w.write_u32(0); // Digest
let len_pos = w.len();
// (It would be simpler to use NestedWriter at this point, but it uses an internal Vec that we are trying to avoid.)
debug_assert_eq!(
w.offset().expect("Overflowed a cell with just the header!"),
LEN_POS
);
w.write_u16(0); // Length.
let body_pos = w.len();
self.msg.encode_onto(&mut w)?;
assert!(w.len() >= body_pos); // nothing was removed
let payload_len = w.len() - body_pos;
assert!(payload_len <= std::u16::MAX as usize);
*(array_mut_ref![w, len_pos, 2]) = (payload_len as u16).to_be_bytes();
Ok(w)
debug_assert_eq!(
w.offset().expect("Overflowed a cell with just the header!"),
BODY_POS
);
self.msg.encode_onto(&mut w)?; // body
let (mut body, written) = w.try_unwrap().map_err(|_| {
EncodeError::Bug(internal!(
"Encoding of relay message was too long to fit into a cell!"
))
})?;
let payload_len = written - BODY_POS;
debug_assert!(payload_len < std::u16::MAX as usize);
*(array_mut_ref![body.0, LEN_POS, 2]) = (payload_len as u16).to_be_bytes();
Ok((body.0, written))
}
/// Parse a RELAY or RELAY_EARLY cell body into a RelayCell.
///
/// Requires that the cryptographic checks on the message have already been
/// performed
pub fn decode(body: RawCellBody) -> Result<Self> {
#[allow(clippy::needless_pass_by_value)] // TODO this will go away soon.
pub fn decode(body: BoxedCellBody) -> Result<Self> {
let mut reader = Reader::from_slice(body.as_ref());
Self::decode_from_reader(&mut reader)
}

View File

@ -268,6 +268,11 @@ impl Body for Begin {
#[derive(Debug, Clone)]
pub struct Data {
/// Contents of the cell, to be sent on a specific stream
//
// TODO: There's a good case to be made that this should be a BoxedCellBody
// instead, to avoid allocations and copies. But first probably we should
// figure out how proposal 340 will work with this. Possibly, we will wind
// up using `bytes` or something.
body: Vec<u8>,
}
impl Data {
@ -1170,3 +1175,53 @@ empty_body! {
/// Opens a new stream on a directory cache.
pub struct BeginDir {}
}
/// Helper: declare a RelayMsg implementation for a message type that has a
/// fixed command.
//
// TODO: It might be better to merge Body with RelayMsg, but that is complex,
// since their needs are _slightly_ different.
//
// TODO: If we *do* make the change above, then perhaps we should also implement
// our restricted enums in terms of this, so that there is only one instance of
// [<$body:snake:upper>]
macro_rules! msg_impl_relaymsg {
($($body:ident),* $(,)?) =>
{paste::paste!{
$(impl crate::relaycell::RelayMsg for $body {
fn cmd(&self) -> crate::relaycell::RelayCmd { crate::relaycell::RelayCmd::[< $body:snake:upper >] }
fn encode_onto<W: tor_bytes::Writer + ?Sized>(self, w: &mut W) -> tor_bytes::EncodeResult<()> {
crate::relaycell::msg::Body::encode_onto(self, w)
}
fn decode_from_reader(cmd: RelayCmd, r: &mut tor_bytes::Reader<'_>) -> tor_bytes::Result<Self> {
if cmd != crate::relaycell::RelayCmd::[< $body:snake:upper >] {
return Err(tor_bytes::Error::InvalidMessage(
format!("Expected {} command; got {cmd}", stringify!([< $body:snake:upper >])).into()
));
}
crate::relaycell::msg::Body::decode_from_reader(r)
}
})*
}}
}
msg_impl_relaymsg!(
Begin, Data, End, Connected, Sendme, Extend, Extended, Extend2, Extended2, Truncate, Truncated,
Drop, Resolve, Resolved, BeginDir,
);
#[cfg(feature = "experimental-udp")]
msg_impl_relaymsg!(ConnectUdp, ConnectedUdp, Datagram);
#[cfg(feature = "onion-service")]
msg_impl_relaymsg!(
EstablishIntro,
EstablishRendezvous,
Introduce1,
Introduce2,
Rendezvous1,
Rendezvous2,
IntroEstablished,
RendezvousEstablished,
IntroduceAck,
);

View File

@ -134,24 +134,22 @@ macro_rules! restricted_msg {
where
W: $crate::restrict::tor_bytes::Writer + ?Sized
{
use $body_type;
match self {
$(
$( #[cfg(feature=$feat)] )?
Self::$case(m) => m.encode_onto(w),
Self::$case(m) => $body_type::encode_onto(m, w),
)*
$(
Self::$unrecognized(u) => u.encode_onto(w),
Self::$unrecognized(u) => $body_type::encode_onto(u, w),
)?
}
}
fn decode_from_reader(cmd: $cmd_type, r: &mut $crate::restrict::tor_bytes::Reader<'_>) -> $crate::restrict::tor_bytes::Result<Self> {
use $body_type;
Ok(match cmd {
$(
$( #[cfg(feature=$feat)] )?
$cmd_type:: [<$case:snake:upper>] => Self::$case( $msg_mod :: $case :: decode_from_reader(r)? ),
$cmd_type:: [<$case:snake:upper>] => Self::$case( <$msg_mod :: $case as $body_type> :: decode_from_reader(r)? ),
)*
$(
_ => Self::$unrecognized($unrec_type::decode_with_cmd(cmd, r)?),

View File

@ -0,0 +1,135 @@
//! A Writer that can put its results into an buffer of known byte size without
//! changing that size.
//!
//! TODO: This API is crate-private in tor-cell because tor-cell is the only one
//! to use it -- and only uses it in one place. Its existence may be an argument
//! for refactoring the Writer API entirely.
//!
//! NOTE: This will likely change or go away in the future.
use thiserror::Error;
use tor_bytes::Writer;
/// An error that occurred while trying to unwrap a SliceWriter.
#[non_exhaustive]
#[derive(Clone, Debug, Error)]
pub(crate) enum SliceWriterError {
/// We've
#[error("Tried to write more than would fit into a fixed-size slice.")]
Truncated,
}
/// An object that supports writing into a byte-slice of fixed size.
///
/// Since the writer API does not allow all `write_*` functions to report errors,
/// this type defers any truncated-data errors until you try to retrieve the
/// inner data.
///
//
// TODO: in theory we could have a version of this that used MaybeUninit, but I
// don't think that would be worth it.
pub(crate) struct SliceWriter<T> {
/// The object we're writing into. Must have fewer than usize::LEN bytes.
data: T,
/// Our current write position within that object.
offset: usize,
}
impl<T> Writer for SliceWriter<T>
where
T: AsMut<[u8]>,
{
fn write_all(&mut self, b: &[u8]) {
let new_offset = self.offset.saturating_add(b.len());
if new_offset <= self.data.as_mut().len() {
// Note that if we reach this case, the addition was not saturating.
self.data.as_mut()[self.offset..new_offset].copy_from_slice(b);
self.offset = new_offset;
} else {
self.offset = usize::MAX;
}
}
}
impl<T> SliceWriter<T> {
/// Construct a new SliceWriter
///
/// Typically, you would want to use this on a type that implements
/// `AsMut<[u8]>`, or else it won't be very useful.
///
/// Preexisting bytes in the `data` object will be unchanged, unless you use
/// the [`Writer`] API to write to them.
pub(crate) fn new(data: T) -> Self {
Self { data, offset: 0 }
}
/// Try to extract the data from this `SliceWriter`.
///
/// On success (if we did not write "off the end" of the underlying object),
/// return the object and the number of bytes we wrote into it. (Bytes
/// after that position are unchanged.)
///
/// On failure (if we tried to write too much), return an error.
pub(crate) fn try_unwrap(self) -> Result<(T, usize), SliceWriterError> {
let offset = self.offset()?;
Ok((self.data, offset))
}
/// Return the number of bytes written into this `SliceWriter` so far,
/// or an error if it has overflowed.
pub(crate) fn offset(&self) -> Result<usize, SliceWriterError> {
if self.offset != usize::MAX {
Ok(self.offset)
} else {
Err(SliceWriterError::Truncated)
}
}
}
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
#[test]
fn basics() {
let mut w = SliceWriter::new([0_u8; 16]);
w.write_u8(b'h');
w.write_u16(0x656c);
w.write_u32(0x6c6f2077);
w.write_all(b"orld!");
let (a, len) = w.try_unwrap().unwrap();
assert_eq!(a.as_ref(), b"hello world!\0\0\0\0");
assert_eq!(len, 12);
}
#[test]
fn full_is_ok() {
let mut w = SliceWriter::new([0_u8; 4]);
w.write_u8(1);
w.write_u16(0x0203);
w.write_u8(4);
let (a, len) = w.try_unwrap().unwrap();
assert_eq!(a.as_ref(), [1, 2, 3, 4]);
assert_eq!(len, 4);
}
#[test]
fn too_full_is_not_ok() {
let mut w = SliceWriter::new([0_u8; 5]);
w.write_u32(12);
w.write_u32(12);
assert!(matches!(w.try_unwrap(), Err(SliceWriterError::Truncated)));
}
}

View File

@ -2,7 +2,9 @@
#![allow(clippy::uninlined_format_args)]
use tor_bytes::Error;
use tor_cell::relaycell::{msg, msg::AnyRelayMsg, AnyRelayCell, RelayCmd, RelayMsg, StreamId};
use tor_cell::relaycell::{
msg, msg::AnyRelayMsg, AnyRelayCell, RelayCmd, RelayMsg, StreamId, UnparsedRelayCell,
};
#[cfg(feature = "experimental-udp")]
use std::{
@ -34,7 +36,7 @@ impl rand::RngCore for BadRng {
// I won't tell if you don't.
impl rand::CryptoRng for BadRng {}
fn decode(body: &str) -> [u8; CELL_BODY_LEN] {
fn decode(body: &str) -> Box<[u8; CELL_BODY_LEN]> {
let mut body = body.to_string();
body.retain(|c| !c.is_whitespace());
let mut body = hex::decode(body).unwrap();
@ -42,7 +44,7 @@ fn decode(body: &str) -> [u8; CELL_BODY_LEN] {
let mut result = [0; CELL_BODY_LEN];
result.copy_from_slice(&body[..]);
result
Box::new(result)
}
fn cell(body: &str, id: StreamId, msg: AnyRelayMsg) {
@ -51,9 +53,19 @@ fn cell(body: &str, id: StreamId, msg: AnyRelayMsg) {
let expected = AnyRelayCell::new(id, msg);
let decoded = AnyRelayCell::decode(body).unwrap();
let decoded = AnyRelayCell::decode(body.clone()).unwrap();
let decoded_from_partial = UnparsedRelayCell::from_body(body)
.decode::<AnyRelayMsg>()
.unwrap();
assert_eq!(decoded_from_partial.stream_id(), decoded.stream_id());
assert_eq!(decoded_from_partial.cmd(), decoded.cmd());
assert_eq!(format!("{:?}", expected), format!("{:?}", decoded));
assert_eq!(
format!("{:?}", expected),
format!("{:?}", decoded_from_partial)
);
let encoded1 = decoded.encode(&mut bad_rng).unwrap();
let encoded2 = expected.encode(&mut bad_rng).unwrap();

View File

@ -33,6 +33,7 @@ bytes = "1"
cipher = { version = "0.4.1", features = ["zeroize"] }
coarsetime = "0.1.20"
derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" }
derive_more = "0.99.3"
digest = "0.10.0"
educe = "0.4.6"
futures = "0.3.14"

View File

@ -241,6 +241,11 @@ impl ClientCirc {
// TODO hs: rename this. "control_messages" is kind of ambiguous; we use
// "control" for a lot of other things. We say "meta" elsewhere in the
// reactor code, but "meta messages" just sounds odd.
//
// TODO hs: possibly this should take a more encoded message type.
//
// TODO hs: it might be nice to avoid exposing tor-cell APIs in the
// tor-proto interface.
#[allow(clippy::missing_panics_doc, unused_variables)] // TODO hs remove
#[cfg(feature = "experimental-api")]
pub async fn send_control_message(&self, msg: AnyRelayMsg) -> Result<()> {
@ -279,6 +284,11 @@ impl ClientCirc {
// TODO hs: rename this. "control_messages" is kind of ambiguous; we use
// "control" for a lot of other things. We say "meta" elsewhere in the
// reactor code, but "meta messages" just sounds odd.
//
// TODO hs: This should return a stream of UnparsedRelayCell.
//
// TODO hs: it might be nice to avoid exposing tor-cell APIs in the
// tor-proto interface.
#[cfg(feature = "experimental-api")]
#[allow(clippy::missing_panics_doc, unused_variables)] // TODO hs remove
pub fn receive_control_messages(
@ -815,7 +825,7 @@ mod test {
use hex_literal::hex;
use std::time::Duration;
use tor_basic_utils::test_rng::testing_rng;
use tor_cell::chancell::{msg as chanmsg, AnyChanCell};
use tor_cell::chancell::{msg as chanmsg, AnyChanCell, BoxedCellBody};
use tor_cell::relaycell::{msg as relaymsg, AnyRelayCell, StreamId};
use tor_linkspec::OwnedCircTarget;
use tor_rtcompat::{Runtime, SleepProvider};
@ -825,11 +835,10 @@ mod test {
where
ID: Into<StreamId>,
{
let body: RelayCellBody = AnyRelayCell::new(id.into(), msg)
let body: BoxedCellBody = AnyRelayCell::new(id.into(), msg)
.encode(&mut testing_rng())
.unwrap()
.into();
let chanmsg = chanmsg::Relay::from_raw(body.into());
.unwrap();
let chanmsg = chanmsg::Relay::from(body);
ClientCircChanMsg::Relay(chanmsg)
}
@ -1244,9 +1253,10 @@ mod test {
let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
match error {
Error::CircProto(s) => {
assert_eq!(s, "wanted EXTENDED2; got EXTENDED");
}
Error::BytesErr {
err: tor_bytes::Error::InvalidMessage(_),
object: "extended2 message",
} => {}
_ => panic!(),
}
});

View File

@ -5,8 +5,8 @@
use crate::circuit::sendme::{StreamRecvWindow, StreamSendWindow};
use crate::{Error, Result};
use tor_cell::relaycell::{msg::AnyRelayMsg, RelayMsg};
use tor_error::internal;
use tor_cell::relaycell::UnparsedRelayCell;
use tor_cell::restricted_msg;
/// Type to track state of half-closed streams.
///
@ -27,6 +27,21 @@ pub(super) struct HalfStream {
connected_ok: bool,
}
restricted_msg! {
enum HalfStreamMsg : RelayMsg {
Sendme, Data, Connected, End, Resolved
}
}
/// A status value returned by [`HalfStream::handle_msg`].
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) enum HalfStreamStatus {
/// The stream has been closed successfully and can now be dropped.
Closed,
/// The stream is still half,open, and must still be tracked.
Open,
}
impl HalfStream {
/// Create a new half-closed stream.
pub(super) fn new(
@ -47,33 +62,36 @@ impl HalfStream {
/// The caller must handle END cells; it is an internal error to pass
/// END cells to this method.
/// no ends here.
pub(super) fn handle_msg(&mut self, msg: &AnyRelayMsg) -> Result<()> {
pub(super) fn handle_msg(&mut self, msg: UnparsedRelayCell) -> Result<HalfStreamStatus> {
use HalfStreamMsg::*;
use HalfStreamStatus::*;
let msg = msg
.decode::<HalfStreamMsg>()
.map_err(|e| Error::from_bytes_err(e, "message on half-closed stream"))?
.into_msg();
match msg {
AnyRelayMsg::Sendme(_) => {
Sendme(_) => {
self.sendw.put(Some(()))?;
Ok(())
Ok(Open)
}
AnyRelayMsg::Data(_) => {
Data(_) => {
self.recvw.take()?;
Ok(())
Ok(Open)
}
AnyRelayMsg::Connected(_) => {
Connected(_) => {
if self.connected_ok {
self.connected_ok = false;
Ok(())
Ok(Open)
} else {
Err(Error::CircProto(
"Bad CONNECTED cell on a closed stream!".into(),
))
}
}
AnyRelayMsg::End(_) => Err(Error::from(internal!(
"END cell in HalfStream::handle_msg()"
))),
_ => Err(Error::CircProto(format!(
"Bad {} cell on a closed stream!",
msg.cmd()
))),
End(_) => Ok(Closed),
// TODO XXXX: We should only allow a Resolved() on streams where we sent
// Resolve. My intended solution for #774 will fix this too.
Resolved(_) => Ok(Closed),
}
}
}
@ -92,20 +110,40 @@ mod test {
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use crate::circuit::sendme::{StreamRecvWindow, StreamSendWindow};
use tor_cell::relaycell::msg;
use rand::{CryptoRng, Rng};
use tor_basic_utils::test_rng::testing_rng;
use tor_cell::relaycell::{
msg::{self, AnyRelayMsg},
AnyRelayCell,
};
fn to_unparsed<R: Rng + CryptoRng>(rng: &mut R, val: AnyRelayMsg) -> UnparsedRelayCell {
UnparsedRelayCell::from_body(
AnyRelayCell::new(77.into(), val)
.encode(rng)
.expect("encoding failed"),
)
}
#[test]
fn halfstream_sendme() -> Result<()> {
let mut rng = testing_rng();
let mut sendw = StreamSendWindow::new(101);
sendw.take(&())?; // Make sure that it will accept one sendme.
let mut hs = HalfStream::new(sendw, StreamRecvWindow::new(20), true);
// one sendme is fine
let m = msg::Sendme::new_empty().into();
assert!(hs.handle_msg(&m).is_ok());
let m = msg::Sendme::new_empty();
assert!(hs
.handle_msg(to_unparsed(&mut rng, m.clone().into()))
.is_ok());
// but no more were expected!
let e = hs.handle_msg(&m).err().unwrap();
let e = hs
.handle_msg(to_unparsed(&mut rng, m.into()))
.err()
.unwrap();
assert_eq!(
format!("{}", e),
"Circuit protocol violation: Received a SENDME when none was expected"
@ -120,17 +158,21 @@ mod test {
#[test]
fn halfstream_data() {
let mut hs = hs_new();
let mut rng = testing_rng();
// 20 data cells are okay.
let m = msg::Data::new(&b"this offer is unrepeatable"[..])
.unwrap()
.into();
let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
for _ in 0_u8..20 {
assert!(hs.handle_msg(&m).is_ok());
assert!(hs
.handle_msg(to_unparsed(&mut rng, m.clone().into()))
.is_ok());
}
// But one more is a protocol violation.
let e = hs.handle_msg(&m).err().unwrap();
let e = hs
.handle_msg(to_unparsed(&mut rng, m.into()))
.err()
.unwrap();
assert_eq!(
format!("{}", e),
"Circuit protocol violation: Received a data cell in violation of a window"
@ -140,16 +182,24 @@ mod test {
#[test]
fn halfstream_connected() {
let mut hs = hs_new();
let mut rng = testing_rng();
// We were told to accept a connected, so we'll accept one
// and no more.
let m = msg::Connected::new_empty().into();
assert!(hs.handle_msg(&m).is_ok());
assert!(hs.handle_msg(&m).is_err());
let m = msg::Connected::new_empty();
assert!(hs
.handle_msg(to_unparsed(&mut rng, m.clone().into()))
.is_ok());
assert!(hs
.handle_msg(to_unparsed(&mut rng, m.clone().into()))
.is_err());
// If we try that again with connected_ok == false, we won't
// accept any.
let mut hs = HalfStream::new(StreamSendWindow::new(20), StreamRecvWindow::new(20), false);
let e = hs.handle_msg(&m).err().unwrap();
let e = hs
.handle_msg(to_unparsed(&mut rng, m.into()))
.err()
.unwrap();
assert_eq!(
format!("{}", e),
"Circuit protocol violation: Bad CONNECTED cell on a closed stream!"
@ -159,11 +209,15 @@ mod test {
#[test]
fn halfstream_other() {
let mut hs = hs_new();
let m = msg::Extended2::new(Vec::new()).into();
let e = hs.handle_msg(&m).err().unwrap();
let mut rng = testing_rng();
let m = msg::Extended2::new(Vec::new());
let e = hs
.handle_msg(to_unparsed(&mut rng, m.into()))
.err()
.unwrap();
assert_eq!(
format!("{}", e),
"Circuit protocol violation: Bad EXTENDED2 cell on a closed stream!"
"Unable to parse message on half-closed stream"
);
}
}

View File

@ -1,4 +1,5 @@
//! Code to handle incoming cells on a circuit.
use super::halfstream::HalfStreamStatus;
use super::streammap::{ShouldSendEnd, StreamEnt};
use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
use crate::circuit::unique_id::UniqId;
@ -16,7 +17,7 @@ use std::marker::PhantomData;
use std::pin::Pin;
use tor_cell::chancell::msg::{AnyChanMsg, Relay};
use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
use tor_cell::relaycell::{AnyRelayCell, RelayCmd, RelayMsg, StreamId};
use tor_cell::relaycell::{AnyRelayCell, RelayCmd, StreamId, UnparsedRelayCell};
use futures::channel::{mpsc, oneshot};
use futures::Sink;
@ -34,7 +35,7 @@ use crate::circuit::sendme::StreamSendWindow;
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
use safelog::sensitive as sv;
use tor_cell::chancell::{self, ChanMsg};
use tor_cell::chancell::{self, BoxedCellBody, ChanMsg};
use tor_cell::chancell::{AnyChanCell, CircId};
use tor_linkspec::{LinkSpec, OwnedChanTarget, RelayIds};
use tor_llcrypto::pk;
@ -118,7 +119,7 @@ pub(super) enum CtrlMsg {
/// SENDME cells once we've read enough out of the other end. If it *does* block, we
/// can assume someone is trying to send us more cells than they should, and abort
/// the stream.
sender: mpsc::Sender<AnyRelayMsg>,
sender: mpsc::Sender<UnparsedRelayCell>,
/// A channel to receive messages to send on this stream from.
rx: mpsc::Receiver<AnyRelayMsg>,
/// Oneshot channel to notify on completion, with the allocated stream ID.
@ -260,7 +261,9 @@ pub(super) trait MetaCellHandler: Send {
/// Called when the message we were waiting for arrives.
///
/// Gets a copy of the `Reactor` in order to do anything it likes there.
fn finish(&mut self, msg: AnyRelayMsg, reactor: &mut Reactor) -> Result<()>;
///
/// If this function returns an error, the reactor will shut down.
fn finish(&mut self, msg: UnparsedRelayCell, reactor: &mut Reactor) -> Result<()>;
}
/// An object that can extend a circuit by one hop, using the `MetaCellHandler` trait.
@ -373,28 +376,12 @@ where
fn expected_hop(&self) -> HopNum {
self.expected_hop
}
fn finish(&mut self, msg: AnyRelayMsg, reactor: &mut Reactor) -> Result<()> {
// Did we get the right response?
if msg.cmd() != RelayCmd::EXTENDED2 {
return Err(Error::CircProto(format!(
"wanted EXTENDED2; got {}",
msg.cmd(),
)));
}
fn finish(&mut self, msg: UnparsedRelayCell, reactor: &mut Reactor) -> Result<()> {
let msg = msg
.decode::<tor_cell::relaycell::msg::Extended2>()
.map_err(|e| Error::from_bytes_err(e, "extended2 message"))?
.into_msg();
// ???? Do we need to shutdown the circuit for the remaining error
// ???? cases in this function?
let msg = match msg {
AnyRelayMsg::Extended2(e) => e,
_ => {
return Err(Error::from(internal!(
"Message body {:?} didn't match cmd {:?}",
msg,
msg.cmd()
)))
}
};
let relay_handshake = msg.into_body();
trace!(
@ -822,13 +809,30 @@ impl Reactor {
}
/// Handle a RELAY cell on this circuit with stream ID 0.
fn handle_meta_cell(&mut self, hopnum: HopNum, msg: AnyRelayMsg) -> Result<CellStatus> {
fn handle_meta_cell(&mut self, hopnum: HopNum, msg: UnparsedRelayCell) -> Result<CellStatus> {
// SENDME cells and TRUNCATED get handled internally by the circuit.
if let AnyRelayMsg::Sendme(s) = msg {
return self.handle_sendme(hopnum, s);
// TODO: This pattern (Check command, try to decode, map error) occurs
// several times, and would be good to extract simplify. Such
// simplification is obstructed by a couple of factors: First, that
// there is not currently a good way to get the RelayCmd from _type_ of
// a RelayMsg. Second, that decode() [correctly] consumes the
// UnparsedRelayMsg. I tried a macro-based approach, and didn't care
// for it. -nickm
if msg.cmd() == RelayCmd::SENDME {
let sendme = msg
.decode::<Sendme>()
.map_err(|e| Error::from_bytes_err(e, "sendme message"))?
.into_msg();
return self.handle_sendme(hopnum, sendme);
}
if let AnyRelayMsg::Truncated(t) = msg {
let reason = t.reason();
if msg.cmd() == RelayCmd::TRUNCATED {
let truncated = msg
.decode::<tor_cell::relaycell::msg::Truncated>()
.map_err(|e| Error::from_bytes_err(e, "truncated message"))?
.into_msg();
let reason = truncated.reason();
debug!(
"{}: Truncated from hop {}. Reason: {} [{}]",
self.unique_id,
@ -857,8 +861,12 @@ impl Reactor {
self.unique_id,
ret
);
let status = match &ret {
Ok(()) => Ok(CellStatus::Continue),
Err(e) => Err(e.clone()),
};
let _ = done.send(ret); // don't care if sender goes away
Ok(CellStatus::Continue)
status
} else {
// Somebody wanted a message from a different hop! Put this
// one back.
@ -976,7 +984,7 @@ impl Reactor {
early: bool,
cell: AnyRelayCell,
) -> Result<()> {
let c_t_w = sendme::cell_counts_towards_windows(&cell);
let c_t_w = sendme::cmd_counts_towards_windows(cell.cmd());
let stream_id = cell.stream_id();
// Check whether the hop send window is empty, if this cell counts towards windows.
// NOTE(eta): It is imperative this happens *before* calling encrypt() below, otherwise
@ -1005,7 +1013,7 @@ impl Reactor {
let tag = self.crypto_out.encrypt(&mut body, hop)?;
// NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
// the whole circuit (e.g. by returning an error).
let msg = chancell::msg::Relay::from_raw(body.into());
let msg = chancell::msg::Relay::from(BoxedCellBody::from(body));
let msg = if early {
AnyChanMsg::RelayEarly(msg.into())
} else {
@ -1159,7 +1167,7 @@ impl Reactor {
cx: &mut Context<'_>,
hopnum: HopNum,
message: AnyRelayMsg,
sender: mpsc::Sender<AnyRelayMsg>,
sender: mpsc::Sender<UnparsedRelayCell>,
rx: mpsc::Receiver<AnyRelayMsg>,
) -> Result<StreamId> {
let hop = self
@ -1241,9 +1249,8 @@ impl Reactor {
tag_copy.copy_from_slice(tag);
tag_copy
};
// Decode the cell.
let msg = AnyRelayCell::decode(body.into())
.map_err(|e| Error::from_bytes_err(e, "relay cell"))?;
// Put the cell into a format where we can make sense of it.
let msg = UnparsedRelayCell::from_body(body.into());
let c_t_w = sendme::cell_counts_towards_windows(&msg);
@ -1277,12 +1284,11 @@ impl Reactor {
.put();
}
// Break the message apart into its streamID and message.
let (streamid, msg) = msg.into_streamid_and_msg();
// If this cell wants/refuses to have a Stream ID, does it
// have/not have one?
if !msg.cmd().accepts_streamid_val(streamid) {
let cmd = msg.cmd();
let streamid = msg.stream_id();
if !cmd.accepts_streamid_val(streamid) {
return Err(Error::CircProto(format!(
"Invalid stream ID {} for relay command {}",
sv(streamid),
@ -1309,7 +1315,11 @@ impl Reactor {
}) => {
// The stream for this message exists, and is open.
if let AnyRelayMsg::Sendme(_) = msg {
if msg.cmd() == RelayCmd::SENDME {
let _sendme = msg
.decode::<Sendme>()
.map_err(|e| Error::from_bytes_err(e, "Sendme message on stream"))?
.into_msg();
// We need to handle sendmes here, not in the stream's
// recv() method, or else we'd never notice them if the
// stream isn't reading.
@ -1317,19 +1327,19 @@ impl Reactor {
return Ok(CellStatus::Continue);
}
if matches!(msg, AnyRelayMsg::Connected(_)) {
if msg.cmd() == RelayCmd::CONNECTED {
// Remember that we've received a Connected cell, and can't get another,
// even if we become a HalfStream. (This rule is enforced separately at
// DataStreamReader.)
// TODO: This is problematic; see #774.
*received_connected = true;
}
// Remember whether this was an end cell: if so we should
// close the stream.
let is_end_cell = matches!(msg, AnyRelayMsg::End(_));
let is_end_cell = msg.cmd() == RelayCmd::END;
// TODO: Add a wrapper type here to reject cells that should
// never go to a client, like BEGIN.
if let Err(e) = sink.try_send(msg) {
if e.is_full() {
// If we get here, we either have a logic bug (!), or an attacker
@ -1354,14 +1364,13 @@ impl Reactor {
Some(StreamEnt::EndSent(halfstream)) => {
// We sent an end but maybe the other side hasn't heard.
if matches!(msg, AnyRelayMsg::End(_)) {
hop.map.end_received(streamid)?;
} else {
halfstream.handle_msg(&msg)?;
match halfstream.handle_msg(msg)? {
HalfStreamStatus::Open => {}
HalfStreamStatus::Closed => hop.map.end_received(streamid)?,
}
}
_ => {
// No stream wants this message.
// No stream wants this message, or ever did.
return Err(Error::CircProto(
"Cell received on nonexistent stream!?".into(),
));

View File

@ -12,8 +12,8 @@
use std::collections::VecDeque;
use tor_cell::relaycell::msg::AnyRelayMsg;
use tor_cell::relaycell::AnyRelayCell;
use tor_cell::relaycell::RelayCmd;
use tor_cell::relaycell::UnparsedRelayCell;
use tor_error::internal;
use crate::{Error, Result};
@ -267,14 +267,21 @@ impl<P: WindowParams> RecvWindow<P> {
}
}
/// Return true if this message is counted by flow-control windows.
pub(crate) fn msg_counts_towards_windows(msg: &AnyRelayMsg) -> bool {
matches!(msg, AnyRelayMsg::Data(_))
/// Return true if this message type is counted by flow-control windows.
pub(crate) fn cmd_counts_towards_windows(cmd: RelayCmd) -> bool {
cmd == RelayCmd::DATA
}
/// Return true if this message is counted by flow-control windows.
pub(crate) fn cell_counts_towards_windows(cell: &AnyRelayCell) -> bool {
msg_counts_towards_windows(cell.msg())
#[cfg(test)]
pub(crate) fn msg_counts_towards_windows(msg: &tor_cell::relaycell::msg::AnyRelayMsg) -> bool {
use tor_cell::relaycell::RelayMsg;
cmd_counts_towards_windows(msg.cmd())
}
/// Return true if this message is counted by flow-control windows.
pub(crate) fn cell_counts_towards_windows(cell: &UnparsedRelayCell) -> bool {
cmd_counts_towards_windows(cell.cmd())
}
#[cfg(test)]
@ -290,24 +297,24 @@ mod test {
#![allow(clippy::unchecked_duration_subtraction)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use tor_basic_utils::test_rng::testing_rng;
use tor_cell::relaycell::{msg, AnyRelayCell};
#[test]
fn what_counts() {
let mut rng = testing_rng();
let m = msg::Begin::new("www.torproject.org", 443, 0)
.unwrap()
.into();
assert!(!msg_counts_towards_windows(&m));
assert!(!cell_counts_towards_windows(&AnyRelayCell::new(
77.into(),
m
assert!(!cell_counts_towards_windows(&UnparsedRelayCell::from_body(
AnyRelayCell::new(77.into(), m).encode(&mut rng).unwrap()
)));
let m = msg::Data::new(&b"Education is not a prerequisite to political control-political control is the cause of popular education."[..]).unwrap().into(); // Du Bois
assert!(msg_counts_towards_windows(&m));
assert!(cell_counts_towards_windows(&AnyRelayCell::new(
128.into(),
m
assert!(cell_counts_towards_windows(&UnparsedRelayCell::from_body(
AnyRelayCell::new(128.into(), m).encode(&mut rng).unwrap()
)));
}

View File

@ -3,6 +3,7 @@
use crate::circuit::halfstream::HalfStream;
use crate::circuit::sendme;
use crate::{Error, Result};
use tor_cell::relaycell::UnparsedRelayCell;
/// Mapping from stream ID to streams.
// NOTE: This is a work in progress and I bet I'll refactor it a lot;
// it needs to stay opaque!
@ -24,7 +25,7 @@ pub(super) enum StreamEnt {
/// An open stream.
Open {
/// Sink to send relay cells tagged for this stream into.
sink: mpsc::Sender<AnyRelayMsg>,
sink: mpsc::Sender<UnparsedRelayCell>,
/// Stream for cells that should be sent down this stream.
rx: mpsc::Receiver<AnyRelayMsg>,
/// Send window, for congestion control purposes.
@ -105,7 +106,7 @@ impl StreamMap {
/// Add an entry to this map; return the newly allocated StreamId.
pub(super) fn add_ent(
&mut self,
sink: mpsc::Sender<AnyRelayMsg>,
sink: mpsc::Sender<UnparsedRelayCell>,
rx: mpsc::Receiver<AnyRelayMsg>,
send_window: sendme::StreamSendWindow,
) -> Result<StreamId> {
@ -114,6 +115,10 @@ impl StreamMap {
rx,
send_window,
dropped: 0,
// TODO: This is true for all streams at this point, but it is
// problematic to accept even one CONNECTED for RESOLVE/RESOLVED
// streams, and will become more so once UDP streams are
// implemented. This is #774.
received_connected: false,
};
// This "65536" seems too aggressive, but it's what tor does.

View File

@ -8,25 +8,15 @@
//!
use crate::{Error, Result};
use tor_cell::chancell::RawCellBody;
use tor_cell::chancell::BoxedCellBody;
use tor_error::internal;
use generic_array::GenericArray;
/// Type for the body of a relay cell.
#[derive(Clone)]
pub(crate) struct RelayCellBody(RawCellBody);
#[derive(Clone, derive_more::From, derive_more::Into)]
pub(crate) struct RelayCellBody(BoxedCellBody);
impl From<RawCellBody> for RelayCellBody {
fn from(body: RawCellBody) -> Self {
RelayCellBody(body)
}
}
impl From<RelayCellBody> for RawCellBody {
fn from(cell: RelayCellBody) -> Self {
cell.0
}
}
impl AsRef<[u8]> for RelayCellBody {
fn as_ref(&self) -> &[u8] {
&self.0[..]
@ -447,7 +437,7 @@ mod test {
let mut rng = testing_rng();
for _ in 1..300 {
// outbound cell
let mut cell = [0_u8; 509];
let mut cell = Box::new([0_u8; 509]);
let mut cell_orig = [0_u8; 509];
rng.fill_bytes(&mut cell_orig);
cell.copy_from_slice(&cell_orig);
@ -461,7 +451,7 @@ mod test {
assert_eq!(&cell.as_ref()[9..], &cell_orig.as_ref()[9..]);
// inbound cell
let mut cell = [0_u8; 509];
let mut cell = Box::new([0_u8; 509]);
let mut cell_orig = [0_u8; 509];
rng.fill_bytes(&mut cell_orig);
cell.copy_from_slice(&cell_orig);
@ -480,14 +470,14 @@ mod test {
// Try a failure: sending a cell to a nonexistent hop.
{
let mut cell = [0_u8; 509].into();
let mut cell = Box::new([0_u8; 509]).into();
let err = cc_out.encrypt(&mut cell, 10.into());
assert!(matches!(err, Err(Error::NoSuchHop)));
}
// Try a failure: A junk cell with no correct auth from any layer.
{
let mut cell = [0_u8; 509].into();
let mut cell = Box::new([0_u8; 509]).into();
let err = cc_in.decrypt(&mut cell);
assert!(matches!(err, Err(Error::BadCellAuth)));
}
@ -527,7 +517,7 @@ mod test {
let mut j = 0;
for cellno in 0..51 {
let mut body = [0_u8; 509];
let mut body = Box::new([0_u8; 509]);
body[0] = 2; // command: data.
body[4] = 1; // streamid: 1.
body[9] = 1; // length: 498

View File

@ -3,7 +3,6 @@
use crate::{Error, Result};
use tor_cell::relaycell::msg::EndReason;
use tor_cell::relaycell::RelayMsg;
use futures::io::{AsyncRead, AsyncWrite};
use futures::task::{Context, Poll};
@ -15,6 +14,7 @@ use tokio_crate::io::ReadBuf;
use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
#[cfg(feature = "tokio")]
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use tor_cell::restricted_msg;
use std::fmt::Debug;
use std::io::Result as IoResult;
@ -25,7 +25,7 @@ use educe::Educe;
use crate::circuit::StreamTarget;
use crate::stream::StreamReader;
use tor_basic_utils::skip_fmt;
use tor_cell::relaycell::msg::{AnyRelayMsg, Data};
use tor_cell::relaycell::msg::Data;
use tor_error::internal;
/// An anonymized stream over the Tor network.
@ -150,6 +150,14 @@ pub struct DataReader {
state: Option<DataReaderState>,
}
restricted_msg! {
/// An allowable incoming message on a data stream.
enum DataStreamMsg:RelayMsg {
// SENDME is handled by the reactor.
Data, End, Connected,
}
}
impl DataStream {
/// Wrap raw stream reader and target parts as a DataStream.
///
@ -592,26 +600,43 @@ impl DataReaderImpl {
/// This function takes ownership of self so that we can avoid
/// self-referential lifetimes.
async fn read_cell(mut self) -> (Self, Result<()>) {
let cell = self.s.recv().await;
use DataStreamMsg::*;
let msg = match self.s.recv().await {
Ok(unparsed) => match unparsed.decode::<DataStreamMsg>() {
Ok(cell) => cell.into_msg(),
Err(e) => {
self.s.protocol_error();
return (
self,
Err(Error::from_bytes_err(e, "message on a data stream")),
);
}
},
Err(e) => return (self, Err(e)),
};
let result = match cell {
Ok(AnyRelayMsg::Connected(_)) if !self.connected => {
let result = match msg {
Connected(_) if !self.connected => {
self.connected = true;
Ok(())
}
Ok(AnyRelayMsg::Data(d)) if self.connected => {
Connected(_) => {
self.s.protocol_error();
Err(Error::StreamProto(
"Received a second connect cell on a data stream".to_string(),
))
}
Data(d) if self.connected => {
self.add_data(d.into());
Ok(())
}
Ok(AnyRelayMsg::End(e)) => Err(Error::EndReceived(e.reason())),
Err(e) => Err(e),
Ok(m) => {
Data(_) => {
self.s.protocol_error();
Err(Error::StreamProto(format!(
"Unexpected {} cell on stream",
m.cmd()
)))
Err(Error::StreamProto(
"Received a data cell an unconnected stream".to_string(),
))
}
End(e) => Err(Error::EndReceived(e.reason())),
};
(self, result)

View File

@ -27,6 +27,8 @@ pub struct IncomingStream {
}
/// A message that can be sent to begin a stream.
//
// TODO hs perhaps this should be made with restricted_msg!()
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum IncomingStreamRequest {

View File

@ -3,7 +3,7 @@
use crate::circuit::{sendme, StreamTarget};
use crate::{Error, Result};
use tor_cell::relaycell::msg::AnyRelayMsg;
use tor_cell::relaycell::{RelayCmd, UnparsedRelayCell};
use crate::circuit::sendme::StreamRecvWindow;
use futures::channel::mpsc;
@ -15,7 +15,7 @@ pub struct StreamReader {
/// The underlying `StreamTarget` for this stream.
pub(crate) target: StreamTarget,
/// Channel to receive stream messages from the reactor.
pub(crate) receiver: mpsc::Receiver<AnyRelayMsg>,
pub(crate) receiver: mpsc::Receiver<UnparsedRelayCell>,
/// Congestion control receive window for this stream.
///
/// Having this here means we're only going to update it when the end consumer of this stream
@ -29,7 +29,7 @@ pub struct StreamReader {
impl StreamReader {
/// Try to read the next relay message from this stream.
async fn recv_raw(&mut self) -> Result<AnyRelayMsg> {
async fn recv_raw(&mut self) -> Result<UnparsedRelayCell> {
if self.ended {
// Prevent reading from streams after they've ended.
return Err(Error::NotConnected);
@ -44,7 +44,7 @@ impl StreamReader {
Error::StreamProto("stream channel disappeared without END cell?".into())
})?;
if sendme::msg_counts_towards_windows(&msg) && self.recv_window.take()? {
if sendme::cell_counts_towards_windows(&msg) && self.recv_window.take()? {
self.target.send_sendme()?;
self.recv_window.put();
}
@ -54,10 +54,13 @@ impl StreamReader {
/// As recv_raw, but if there is an error or an end cell, note that this
/// stream has ended.
pub async fn recv(&mut self) -> Result<AnyRelayMsg> {
pub async fn recv(&mut self) -> Result<UnparsedRelayCell> {
let val = self.recv_raw().await;
match val {
Err(_) | Ok(AnyRelayMsg::End(_)) => {
match &val {
Err(_) => {
self.ended = true;
}
Ok(m) if m.cmd() == RelayCmd::END => {
self.ended = true;
}
_ => {}

View File

@ -2,8 +2,8 @@
use crate::stream::StreamReader;
use crate::{Error, Result};
use tor_cell::relaycell::msg::{AnyRelayMsg, Resolved};
use tor_cell::relaycell::RelayMsg;
use tor_cell::relaycell::msg::Resolved;
use tor_cell::restricted_msg;
/// A ResolveStream represents a pending DNS request made with a RESOLVE
/// cell.
@ -12,11 +12,18 @@ pub struct ResolveStream {
s: StreamReader,
}
restricted_msg! {
/// An allowable reply for a RESOLVE message.
enum ResolveResponseMsg : RelayMsg {
End,
Resolved,
}
}
impl ResolveStream {
/// Wrap a RawCellStream into a ResolveStream.
///
/// Call only after sending a RESOLVE cell.
#[allow(dead_code)] // need to implement a caller for this.
pub(crate) fn new(s: StreamReader) -> Self {
ResolveStream { s }
}
@ -24,17 +31,18 @@ impl ResolveStream {
/// Read a message from this stream telling us the answer to our
/// name lookup request.
pub async fn read_msg(&mut self) -> Result<Resolved> {
use ResolveResponseMsg::*;
let cell = self.s.recv().await?;
match cell {
AnyRelayMsg::End(e) => Err(Error::EndReceived(e.reason())),
AnyRelayMsg::Resolved(r) => Ok(r),
m => {
let msg = match cell.decode::<ResolveResponseMsg>() {
Ok(cell) => cell.into_msg(),
Err(e) => {
self.s.protocol_error();
Err(Error::StreamProto(format!(
"Unexpected {} on resolve stream",
m.cmd()
)))
return Err(Error::from_bytes_err(e, "response on a resolve stream"));
}
};
match msg {
End(e) => Err(Error::EndReceived(e.reason())),
Resolved(r) => Ok(r),
}
}
}