diff --git a/Cargo.lock b/Cargo.lock index 076ef98e9..e314a8354 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2823,7 +2823,6 @@ dependencies = [ "coarsetime", "crypto-mac", "digest", - "event-listener", "futures", "generic-array", "hex", diff --git a/crates/tor-circmgr/src/build.rs b/crates/tor-circmgr/src/build.rs index 8794f2f0a..5c3fafe0f 100644 --- a/crates/tor-circmgr/src/build.rs +++ b/crates/tor-circmgr/src/build.rs @@ -90,6 +90,7 @@ async fn create_common Ok(pending_circ) } +// FIXME(eta): de-Arc-ify this #[async_trait] impl Buildable for Arc { async fn create_chantarget( @@ -100,7 +101,8 @@ impl Buildable for Arc { params: &CircParameters, ) -> Result { let circ = create_common(chanmgr, rt, rng, ct).await?; - Ok(circ.create_firsthop_fast(rng, params).await?) + // FIXME(eta): don't clone the params? + Ok(Arc::new(circ.create_firsthop_fast(params.clone()).await?)) } async fn create( chanmgr: &ChanMgr, @@ -110,16 +112,19 @@ impl Buildable for Arc { params: &CircParameters, ) -> Result { let circ = create_common(chanmgr, rt, rng, ct).await?; - Ok(circ.create_firsthop_ntor(rng, ct, params).await?) + Ok(Arc::new( + circ.create_firsthop_ntor(ct, params.clone()).await?, + )) } async fn extend( &self, _rt: &RT, - rng: &mut RNG, + // FIXME(eta): get rid of this RNG parameter? + _rng: &mut RNG, ct: &OwnedCircTarget, params: &CircParameters, ) -> Result<()> { - ClientCirc::extend_ntor(self, rng, ct, params).await?; + ClientCirc::extend_ntor(self, ct, params).await?; Ok(()) } } diff --git a/crates/tor-proto/Cargo.toml b/crates/tor-proto/Cargo.toml index 4b51034d7..3797f94a0 100644 --- a/crates/tor-proto/Cargo.toml +++ b/crates/tor-proto/Cargo.toml @@ -31,7 +31,6 @@ bytes = "1.0.1" cipher = "0.3.0" crypto-mac = "0.11.0" digest = "0.9.0" -event-listener = "2.5.1" futures = "0.3.13" asynchronous-codec = "0.6.0" generic-array = "0.14.4" @@ -51,6 +50,6 @@ tokio-util = { version = "0.6", features = ["compat"], optional = true } coarsetime = { version = "0.1.20", optional = true } [dev-dependencies] -tokio-crate = { package = "tokio", version = "1.7.0", features = ["macros", "rt"] } +tokio-crate = { package = "tokio", version = "1.7.0", features = ["macros", "rt", "time"] } hex-literal = "0.3.1" hex = "0.4.3" diff --git a/crates/tor-proto/src/channel.rs b/crates/tor-proto/src/channel.rs index c444ab79c..16e9b9a57 100644 --- a/crates/tor-proto/src/channel.rs +++ b/crates/tor-proto/src/channel.rs @@ -67,6 +67,7 @@ pub use crate::channel::unique_id::UniqId; use crate::circuit; use crate::circuit::celltypes::CreateResponse; use crate::{Error, Result}; +use std::pin::Pin; use tor_cell::chancell::{msg, ChanCell, CircId}; use tor_linkspec::ChanTarget; use tor_llcrypto::pk::ed25519::Ed25519Identity; @@ -76,9 +77,10 @@ use asynchronous_codec as futures_codec; use futures::channel::{mpsc, oneshot}; use futures::io::{AsyncRead, AsyncWrite}; -use futures::SinkExt; +use futures::{Sink, SinkExt}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::task::{Context, Poll}; use tracing::trace; @@ -109,6 +111,55 @@ pub struct Channel { cell_tx: mpsc::Sender, } +impl Sink for Channel { + type Error = Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.cell_tx) + .poll_ready(cx) + .map_err(|_| Error::ChannelClosed) + } + + fn start_send(self: Pin<&mut Self>, cell: ChanCell) -> Result<()> { + let this = self.get_mut(); + if this.closed.load(Ordering::SeqCst) { + return Err(Error::ChannelClosed); + } + this.check_cell(&cell)?; + { + use msg::ChanMsg::*; + match cell.msg() { + Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log. + _ => trace!( + "{}: Sending {} for {}", + this.unique_id, + cell.msg().cmd(), + cell.circid() + ), + } + } + + Pin::new(&mut this.cell_tx) + .start_send(cell) + .map_err(|_| Error::ChannelClosed) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.cell_tx) + .poll_flush(cx) + .map_err(|_| Error::ChannelClosed) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.cell_tx) + .poll_close(cx) + .map_err(|_| Error::ChannelClosed) + } +} + /// Structure for building and launching a Tor channel. pub struct ChannelBuilder { /// If present, a description of the address we're trying to connect to, @@ -261,29 +312,18 @@ impl Channel { } } + /// Like `futures::Sink::poll_ready`. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result { + Ok(match Pin::new(&mut self.cell_tx).poll_ready(cx) { + Poll::Ready(Ok(_)) => true, + Poll::Ready(Err(_)) => return Err(Error::CircuitClosed), + Poll::Pending => false, + }) + } + /// Transmit a single cell on a channel. pub async fn send_cell(&mut self, cell: ChanCell) -> Result<()> { - if self.closed.load(Ordering::SeqCst) { - return Err(Error::ChannelClosed); - } - self.check_cell(&cell)?; - { - use msg::ChanMsg::*; - match cell.msg() { - Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log. - _ => trace!( - "{}: Sending {} for {}", - self.unique_id, - cell.msg().cmd(), - cell.circid() - ), - } - } - - self.cell_tx - .send(cell) - .await - .map_err(|_| Error::InternalError("Reactor not alive to receive cells".into()))?; + self.send(cell).await?; Ok(()) } @@ -318,13 +358,10 @@ impl Channel { trace!("{}: Allocated CircId {}", circ_unique_id, id); - let destroy_handle = CircDestroyHandle::new(id, self.control.clone()); - Ok(circuit::PendingClientCirc::new( id, self.clone(), createdreceiver, - Some(destroy_handle), receiver, circ_unique_id, )) @@ -342,27 +379,13 @@ impl Channel { pub fn terminate(&self) { let _ = self.control.unbounded_send(CtrlMsg::Shutdown); } -} -/// Helper structure: when this is dropped, the reactor is told to kill -/// the circuit. -pub(crate) struct CircDestroyHandle { - /// The circuit ID in question - id: CircId, - /// A sender to tell the reactor. - sender: mpsc::UnboundedSender, -} - -impl CircDestroyHandle { - /// Create a new CircDestroyHandle - fn new(id: CircId, sender: mpsc::UnboundedSender) -> Self { - CircDestroyHandle { id, sender } - } -} - -impl Drop for CircDestroyHandle { - fn drop(&mut self) { - let _ignore_cancel = self.sender.unbounded_send(CtrlMsg::CloseCircuit(self.id)); + /// Tell the reactor that the circuit with the given ID has gone away. + pub fn close_circuit(&self, circid: CircId) -> Result<()> { + self.control + .unbounded_send(CtrlMsg::CloseCircuit(circid)) + .map_err(|_| Error::ChannelClosed)?; + Ok(()) } } diff --git a/crates/tor-proto/src/channel/reactor.rs b/crates/tor-proto/src/channel/reactor.rs index 74a338d6b..6d26376b6 100644 --- a/crates/tor-proto/src/channel/reactor.rs +++ b/crates/tor-proto/src/channel/reactor.rs @@ -281,11 +281,11 @@ impl Reactor { match self.circs.get_mut(circid) { Some(CircEnt::Open(s)) => { // There's an open circuit; we can give it the RELAY cell. - // XXXX I think that this one actually means the other side - // is closed. If we see it IRL we should maybe ignore it. - s.send(msg.try_into()?).await.map_err(|_| { - Error::InternalError("Circuit queue rejected message. Is it closing?".into()) - }) + if s.send(msg.try_into()?).await.is_err() { + // The circuit's receiver went away, so we should destroy the circuit. + self.outbound_destroy_circ(circid).await?; + } + Ok(()) } Some(CircEnt::Opening(_, _)) => Err(Error::ChanProto( "Relay cell on pending circuit before CREATED* received".into(), @@ -397,6 +397,7 @@ pub(crate) mod test { use futures::stream::StreamExt; use tokio::test as async_test; use tokio_crate as tokio; + use tokio_crate::runtime::Handle; use crate::circuit::CircParameters; @@ -469,10 +470,11 @@ pub(crate) mod test { let (chan, mut reactor, mut output, _input) = new_reactor(); let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once()); - let (pending, _circr) = ret.unwrap(); + let (pending, circr) = ret.unwrap(); + Handle::current().spawn(circr.run()); assert!(reac.is_ok()); - let id = pending.peek_circid().await; + let id = pending.peek_circid(); let ent = reactor.circs.get_mut(id); assert!(matches!(ent, Some(CircEnt::Opening(_, _)))); @@ -492,34 +494,38 @@ pub(crate) mod test { #[async_test] async fn new_circ_create_failure() { use tor_cell::chancell::msg; - let mut rng = rand::thread_rng(); let (chan, mut reactor, mut output, mut input) = new_reactor(); let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once()); - let (pending, _circr) = ret.unwrap(); + let (pending, circr) = ret.unwrap(); + Handle::current().spawn(circr.run()); assert!(reac.is_ok()); let circparams = CircParameters::default(); - let id = pending.peek_circid().await; + let id = pending.peek_circid(); + eprintln!("abc"); let ent = reactor.circs.get_mut(id); assert!(matches!(ent, Some(CircEnt::Opening(_, _)))); // We'll get a bad handshake result from this createdfast cell. let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into()); input.send(Ok(created_cell)).await.unwrap(); + eprintln!("def"); - let (circ, reac) = futures::join!( - pending.create_firsthop_fast(&mut rng, &circparams), - reactor.run_once() - ); + let (circ, reac) = + futures::join!(pending.create_firsthop_fast(circparams), reactor.run_once()); // Make sure statuses are as expected. assert!(matches!(circ.err().unwrap(), Error::BadHandshake)); assert!(reac.is_ok()); + eprintln!("ghi"); + + reactor.run_once().await.unwrap(); // Make sure that the createfast cell got sent let cell_sent = output.next().await.unwrap(); assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_))); + eprintln!("jkkl"); // The circid now counts as open, since as far as the reactor knows, // it was accepted. (TODO: is this a bug?) diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index 6f3555d8c..83d7aff45 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -46,54 +46,50 @@ pub(crate) mod sendme; mod streammap; mod unique_id; -use crate::channel::{Channel, CircDestroyHandle}; +use crate::channel::Channel; use crate::circuit::celltypes::*; -use crate::circuit::reactor::CtrlMsg; -pub use crate::circuit::unique_id::UniqId; -use crate::crypto::cell::{ - ClientLayer, CryptInit, HopNum, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer, - RelayCellBody, +use crate::circuit::reactor::{ + CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER, }; -use crate::crypto::handshake::{ClientHandshake, KeyGenerator}; -use crate::stream::{DataStream, RawCellStream, ResolveStream, StreamParameters}; +pub use crate::circuit::unique_id::UniqId; +use crate::crypto::cell::{HopNum, InboundClientCrypt, OutboundClientCrypt}; +use crate::stream::{DataStream, ResolveStream, StreamParameters, StreamReader}; use crate::{Error, Result}; -use tor_cell::chancell::{self, msg::ChanMsg, ChanCell, CircId}; -use tor_cell::relaycell::msg::{Begin, RelayMsg, Resolve, Resolved, ResolvedVal, Sendme}; -use tor_cell::relaycell::{RelayCell, RelayCmd, StreamId}; +use tor_cell::{ + chancell::{self, msg::ChanMsg, CircId}, + relaycell::msg::{Begin, RelayMsg, Resolve, Resolved, ResolvedVal}, +}; -use tor_linkspec::{ChanTarget, CircTarget, LinkSpec}; +use tor_linkspec::{CircTarget, LinkSpec}; use futures::channel::{mpsc, oneshot}; -use futures::lock::Mutex; +use crate::circuit::sendme::StreamRecvWindow; +use futures::SinkExt; use std::net::IpAddr; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; +use tor_cell::relaycell::StreamId; // use std::time::Duration; -use rand::{thread_rng, CryptoRng, Rng}; +use crate::crypto::handshake::ntor::NtorPublicKey; -use tracing::{debug, trace}; +/// The size of the buffer for communication between `ClientCirc` and its reactor. +pub const CIRCUIT_BUFFER_SIZE: usize = 128; +#[derive(Clone, Debug)] /// A circuit that we have constructed over the Tor network. pub struct ClientCirc { - /// This circuit can't be used because it has been closed, locally - /// or remotely. - closed: AtomicBool, + /// Number of hops on this circuit. + /// + /// This value is incremented after the circuit successfully completes extending to a new hop. + hops: Arc, /// A unique identifier for this circuit. unique_id: UniqId, - - /// Reference-counted locked reference to the inner circuit object. - c: Mutex, -} - -impl std::fmt::Debug for ClientCirc { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ClientCirc") - .field("unique_id", &self.unique_id) - .field("closed", &self.closed) - .finish() - } + /// Channel to send control messages to the reactor. + control: mpsc::UnboundedSender, + /// For testing purposes: the CircId, for use in peek_circid(). + circid: CircId, } /// A ClientCirc that needs to send a create cell and receive a created* cell. @@ -105,7 +101,7 @@ pub struct PendingClientCirc { /// or a DESTROY cell. recvcreated: oneshot::Receiver, /// The ClientCirc object that we can expose on success. - circ: Arc, + circ: ClientCirc, } /// Description of the network's current rules for building circuits. @@ -163,279 +159,26 @@ impl CircParameters { } } -/// A result type used to tell a circuit about some a "meta-cell" -/// (like extended, intro_established, etc). -type MetaResult = Result; - -/// The implementation type for this circuit. -struct ClientCircImpl { - /// This circuit's ID on the upstream channel. - id: CircId, - /// The channel that this circuit uses to send its cells to the - /// next hop. - channel: Channel, - /// The cryptographic state for this circuit for outbound cells. - /// This object is divided into multiple layers, each of which is - /// shared with one hop of the circuit - crypto_out: OutboundClientCrypt, - /// When this is dropped, the channel reactor is told to send a DESTROY - /// cell. - circ_closed: Option, - /// Per-hop circuit information. - /// - /// Note that hops.len() must be the same as crypto.n_layers(). - hops: Vec, - /// Control channel to send messages to the reactor. - control: mpsc::UnboundedSender, - /// A oneshot sender that can be used by the reactor to report a - /// meta-cell to an owning task. - /// - /// This comes along with a hop number saying which hop we expect a - /// meta-cell from. Cells from other hops won't go to this sender. - /// - /// For the purposes of this implementation, a "meta" cell - /// is a RELAY cell with a stream ID value of 0. - sendmeta: Option<(HopNum, oneshot::Sender)>, - - /// An identifier for this circuit, for logging purposes. - /// TODO: Make this field go away in favor of the one in ClientCirc. - unique_id: UniqId, -} - -/// A handle to a circuit as held by a stream. Used to send cells. -/// -/// Rather than using the stream directly, the stream uses this object -/// to send its relay cells to the correct hop, using the correct stream ID. -/// -/// When this object is dropped, the reactor will be told to close the stream. -// XXXX TODO: rename this +/// A stream on a particular circuit. +#[derive(Clone, Debug)] pub(crate) struct StreamTarget { - /// The stream ID for this stream on its circuit. + /// Which hop of the circuit this stream is with. + hop_num: HopNum, + /// Reactor ID for this stream. stream_id: StreamId, - /// Which hop on this circuit is this stream built from? - // XXXX Using 'hop' by number here will cause bugs if circuits can get - // XXXX truncated and then re-extended. - hop: HopNum, + /// Channel to send cells down. + tx: mpsc::Sender, /// Reference to the circuit that this stream is on. - circ: Arc, - /// Window for sending cells on this circuit. - window: sendme::StreamSendWindow, - /// Control sender, for notifying the reactor when this stream gets dropped. - control: mpsc::UnboundedSender, - /// Window to track incoming cells and SENDMEs. - // XXXX Putting this field here in this object means that this - // object isn't really so much a "target", since a "target" - // doesn't know how to receive. Maybe we should rename it to be - // some kind of a "handle" or something? - pub(crate) recvwindow: sendme::StreamRecvWindow, -} - -/// Information about a single hop of a client circuit, from the sender-side -/// point of view. -/// -/// (see also circuit::reactor::InboundHop) -struct CircHop { - /// If true, this hop is using an older link protocol and we - /// shouldn't expect good authenticated SENDMEs from it. - auth_sendme_optional: bool, - /// Window used to say how many cells we can send. - sendwindow: sendme::CircSendWindow, -} - -impl CircHop { - /// Construct a new (sender-side) view of a circuit hop. - fn new(auth_sendme_optional: bool, initial_window: u16) -> Self { - CircHop { - auth_sendme_optional, - sendwindow: sendme::CircSendWindow::new(initial_window), - } - } + circ: ClientCirc, } impl ClientCirc { - /// Helper: return the number of hops for this circuit - #[cfg(test)] - async fn n_hops(&self) -> usize { - let c = self.c.lock().await; - c.crypto_out.n_layers() - } - - /// Helper: extend the circuit by one hop. - /// - /// The `rng` is used to generate handshake material. The - /// `handshake_id` is the numeric identifier for what kind of - /// handshake we're doing. The `key is the relay's onion key that - /// goes along with the handshake, and the `linkspecs` are the - /// link specifiers to include in the EXTEND cell to tell the - /// current last hop which relay to connect to. - async fn extend_impl( - &self, - rng: &mut R, - handshake_id: u16, - key: &H::KeyType, - linkspecs: Vec, - supports_flowctrl_1: bool, - params: &CircParameters, - ) -> Result<()> - where - R: Rng + CryptoRng, - L: CryptInit + ClientLayer, - FWD: OutboundClientLayer + 'static + Send, - REV: InboundClientLayer + 'static + Send, - H: ClientHandshake, - H::KeyGen: KeyGenerator, - { - use tor_cell::relaycell::msg::{Body, Extend2}; - // Perform the first part of the cryptographic handshake - let (state, msg) = H::client1(rng, key)?; - // Cloning linkspecs is only necessary because of the log - // below. Would be nice to fix that. - let extend_msg = Extend2::new(linkspecs.clone(), handshake_id, msg); - let cell = RelayCell::new(0.into(), extend_msg.into_message()); - - // Now send the EXTEND2 cell to the the last hop... - let (unique_id, _hop, receiver) = { - let mut c = self.c.lock().await; - let n_hops = c.crypto_out.n_layers(); - let hop = ((n_hops - 1) as u8).into(); - debug!( - "{}: Extending circuit to hop {} with {:?}", - c.unique_id, - n_hops + 1, - linkspecs - ); - - // We'll be waiting for an EXTENDED2 cell; install the handler. - let receiver = c.register_meta_handler(hop)?; - - // Send the message to the last hop... - c.send_relay_cell( - hop, true, // use a RELAY_EARLY cell - cell, - ) - .await?; - - (c.unique_id, hop, receiver) - // note that we're dropping the lock here, since we're going - // to wait for a response. - }; - - trace!("{}: waiting for EXTENDED2 cell", unique_id); - // ... and now we wait for a response. - let msg = match receiver.await { - Ok(Ok(m)) => Ok(m), - Err(_) => Err(Error::InternalError( - "Receiver cancelled while waiting for EXTENDED2".into(), - )), - Ok(Err(Error::CircuitClosed)) => Err(Error::CircDestroy( - "Circuit closed while waiting for EXTENDED2".into(), - )), - Ok(Err(e)) => Err(e), - }?; - - // XXXX If two EXTEND cells are of these are launched on the - // same circuit at once, could they collide in this part of - // the function? I don't _think_ so, but it might be a good idea - // to have an "extending" bit that keeps two tasks from entering - // extend_impl at the same time. - // - // Also we could enforce that `hop` is still what we expect it - // to be at this point. - - // Did we get the right response? - if msg.cmd() != RelayCmd::EXTENDED2 { - self.protocol_error().await; - return Err(Error::CircProto(format!( - "wanted EXTENDED2; got {}", - msg.cmd(), - ))); - } - - // ???? Do we need to shutdown the circuit for the remaining error - // ???? cases in this function? - - let msg = match msg { - RelayMsg::Extended2(e) => e, - _ => return Err(Error::InternalError("Body didn't match cmd".into())), - }; - let relay_handshake = msg.into_body(); - - trace!( - "{}: Received EXTENDED2 cell; completing handshake.", - unique_id - ); - // Now perform the second part of the handshake, and see if it - // succeeded. - let keygen = H::client2(state, relay_handshake)?; - let layer = L::construct(keygen)?; - - debug!("{}: Handshake complete; circuit extended.", unique_id); - - // If we get here, it succeeded. Add a new hop to the circuit. - let (layer_fwd, layer_back) = layer.split(); - self.add_hop( - supports_flowctrl_1, - Box::new(layer_fwd), - Box::new(layer_back), - params, - ) - .await - } - - /// Add a hop to the end of this circuit. - /// - /// This function is a bit tricky, since we need to add the - /// hop to our own structures, and tell the reactor to add it to the - /// reactor's structures as well, and wait for the reactor to tell us - /// that it did. - async fn add_hop<'a>( - &'a self, - supports_flowctrl_1: bool, - fwd: Box, - rev: Box, - params: &'a CircParameters, - ) -> Result<()> { - let inbound_hop = crate::circuit::reactor::InboundHop::new(); - let (snd, rcv) = oneshot::channel(); - { - let c = self.c.lock().await; - c.control - .unbounded_send(CtrlMsg::AddHop(inbound_hop, rev, snd)) - .map_err(|_| Error::InternalError("Can't queue AddHop request".into()))?; - } - - // I think we don't need to worry about two hops being added at - // once, because there can only be on meta-message receiver at - // a time. - - rcv.await - .map_err(|_| Error::InternalError("AddHop request cancelled".into()))?; - - { - let mut c = self.c.lock().await; - let hop = CircHop::new(supports_flowctrl_1, params.initial_send_window()); - c.hops.push(hop); - c.crypto_out.add_layer(fwd); - } - Ok(()) - } - /// Extend the circuit via the ntor handshake to a new target last /// hop. - /// - /// The same caveats apply from extend_impl. - pub async fn extend_ntor( - &self, - rng: &mut R, - target: &Tg, - params: &CircParameters, - ) -> Result<()> + pub async fn extend_ntor(&self, target: &Tg, params: &CircParameters) -> Result<()> where - R: Rng + CryptoRng, Tg: CircTarget, { - use crate::crypto::cell::Tor1RelayCrypto; - use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey}; let key = NtorPublicKey { id: *target.rsa_identity(), pk: *target.ntor_onion_key(), @@ -448,15 +191,22 @@ impl ClientCirc { let supports_flowctrl_1 = target .protovers() .supports_known_subver(tor_protover::ProtoKind::FlowCtrl, 1); - self.extend_impl::( - rng, - 0x0002, - &key, - linkspecs, - supports_flowctrl_1, - params, - ) - .await + + let (tx, rx) = oneshot::channel(); + + self.control + .unbounded_send(CtrlMsg::ExtendNtor { + public_key: key, + linkspecs, + supports_authenticated_sendme: supports_flowctrl_1, + params: params.clone(), + done: tx, + }) + .map_err(|_| Error::CircuitClosed)?; + + rx.await.map_err(|_| Error::CircuitClosed)??; + + Ok(()) } /// Helper, used to begin a stream. @@ -466,53 +216,44 @@ impl ClientCirc { /// /// The caller will typically want to see the first cell in response, /// to see whether it is e.g. an END or a CONNECTED. - async fn begin_stream_impl(self: &Arc, begin_msg: RelayMsg) -> Result { + async fn begin_stream_impl(&self, begin_msg: RelayMsg) -> Result<(StreamReader, StreamTarget)> { // TODO: Possibly this should take a hop, rather than just // assuming it's the last hop. - // XXXX Both a bound and a lack of bound are scary here :/ - let (sender, receiver) = mpsc::channel(128); + let num_hops = self.hops.load(Ordering::SeqCst); + // FIXME(eta): could panic if num_hops is zero + let hop_num: HopNum = (num_hops - 1).into(); + let (sender, receiver) = mpsc::channel(STREAM_READER_BUFFER); + let (tx, rx) = oneshot::channel(); + let (msg_tx, msg_rx) = mpsc::channel(CIRCUIT_BUFFER_SIZE); - let window = sendme::StreamSendWindow::new(StreamTarget::SEND_WINDOW_INIT); + self.control + .unbounded_send(CtrlMsg::BeginStream { + hop_num, + message: begin_msg, + sender, + rx: msg_rx, + done: tx, + }) + .map_err(|_| Error::CircuitClosed)?; - let (id_snd, id_rcv) = oneshot::channel(); - let hopnum; - { - let c = self.c.lock().await; - let h = c.hops.len() - 1; - hopnum = (h as u8).into(); - - c.control - .unbounded_send(CtrlMsg::AddStream(hopnum, sender, window.new_ref(), id_snd)) - .map_err(|_| Error::InternalError("Can't queue new-stream request.".into()))?; - } - - let id = id_rcv - .await - .map_err(|_| Error::InternalError("Didn't receive a stream ID.".into()))?; - let id = id?; - - let relaycell = RelayCell::new(id, begin_msg); - - let control_tx = { - let mut c = self.c.lock().await; - c.send_relay_cell(hopnum, false, relaycell).await?; - c.control.clone() - }; - - /// Initial value for inbound flow-control window on streams. - const STREAM_RECV_INIT: u16 = 500; + let stream_id = rx.await.map_err(|_| Error::CircuitClosed)??; let target = StreamTarget { - circ: Arc::clone(self), - stream_id: id, - hop: hopnum, - window, - recvwindow: sendme::StreamRecvWindow::new(STREAM_RECV_INIT), - control: control_tx, + circ: self.clone(), + tx: msg_tx, + hop_num, + stream_id, }; - Ok(RawCellStream::new(target, receiver)) + let reader = StreamReader { + target: target.clone(), + receiver, + recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT), + ended: false, + }; + + Ok((reader, target)) } /// Start a DataStream (anonymized connection) to the given @@ -522,8 +263,8 @@ impl ClientCirc { msg: RelayMsg, optimistic: bool, ) -> Result { - let raw_s = self.begin_stream_impl(msg).await?; - let mut stream = DataStream::new(raw_s); + let (reader, target) = self.begin_stream_impl(msg).await?; + let mut stream = DataStream::new(reader, target); if !optimistic { stream.wait_for_connection().await?; } @@ -550,6 +291,7 @@ impl ClientCirc { /// Start a new stream to the last relay in the circuit, using /// a BEGIN_DIR cell. + // FIXME(eta): get rid of Arc here!!! pub async fn begin_dir_stream(self: Arc) -> Result { self.begin_data_stream(RelayMsg::BeginDir, false).await } @@ -599,28 +341,17 @@ impl ClientCirc { .collect() } - /// Helper: Encode the relay cell `cell`, encrypt it, and send it to the - /// 'hop'th hop. - /// - /// Does not check whether the cell is well-formed or reasonable. - async fn send_relay_cell(&self, hop: HopNum, early: bool, cell: RelayCell) -> Result<()> { - if self.closed.load(Ordering::SeqCst) { - return Err(Error::CircuitClosed); - } - let mut c = self.c.lock().await; - c.send_relay_cell(hop, early, cell).await - } - /// Helper: Send the resolve message, and read resolved message from /// resolve stream. async fn try_resolve(self: &Arc, msg: Resolve) -> Result { - let rc_stream = self.begin_stream_impl(msg.into()).await?; - let mut resolve_stream = ResolveStream::new(rc_stream); + let (reader, _) = self.begin_stream_impl(msg.into()).await?; + let mut resolve_stream = ResolveStream::new(reader); resolve_stream.read_msg().await } - /// Shut down this circuit immediately, along with all streams that - /// are using it. + /// Shut down this circuit, along with all streams that are using it. + /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down + /// immediately after this function returns!). /// /// Note that other references to this circuit may exist. If they /// do, they will stop working after you call this function. @@ -628,14 +359,8 @@ impl ClientCirc { /// It's not necessary to call this method if you're just done /// with a circuit: the channel should close on its own once nothing /// is using it any more. - pub async fn terminate(&self) { - let outcome = self - .closed - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst); - if outcome == Ok(false) { - // The old value was false and the new value is true. - self.c.lock().await.shutdown_reactor(); - } + pub fn terminate(&self) { + let _ = self.control.unbounded_send(CtrlMsg::Shutdown); } /// Called when a circuit-level protocol error has occurred and the @@ -643,13 +368,15 @@ impl ClientCirc { /// /// This is a separate function because we may eventually want to have /// it do more than just shut down. - pub(crate) async fn protocol_error(&self) { - self.terminate().await; + /// + /// As with `terminate`, this function is asynchronous. + pub(crate) fn protocol_error(&self) { + self.terminate(); } /// Return true if this circuit is closed and therefore unusable. pub fn is_closing(&self) -> bool { - self.closed.load(Ordering::SeqCst) + self.control.is_closed() } /// Return a process-unique identifier for this circuit. @@ -657,190 +384,9 @@ impl ClientCirc { self.unique_id } - /// Helper: register a meta-handler for this circuit. #[cfg(test)] - async fn register_meta_handler(&self, hop: HopNum) -> Result> { - let mut c = self.c.lock().await; - c.register_meta_handler(hop) - } -} - -impl ClientCircImpl { - /// Return a mutable reference to the nth hop of this circuit, if one - /// exists. - fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> { - self.hops.get_mut(Into::::into(hopnum)) - } - - /// Helper: Register a handler that will be told about the RELAY message - /// with StreamId 0. - /// - /// This pattern is useful for parts of the protocol where the circuit - /// originator sends a single request, and waits for a single relay - /// message in response. (For example, EXTEND/EXTENDED, - /// ESTABLISH_RENDEZVOUS/RENDEZVOUS_ESTABLISHED, and so on.) - /// - /// It isn't suitable for SENDME cells, INTRODUCE2 cells, or TRUNCATED - /// cells. - /// - /// Only one handler can be registered at a time; until it fires or is - /// cancelled, you can't register another. - /// - /// Note that you should register a meta handler _before_ you send whatever - /// cell you're waiting a response to, or you might miss the response. - // TODO: It would be cool for this to take a list of allowable - // cell types to get in response, so that any other cell types are - // treated as circuit protocol violations automatically. - fn register_meta_handler(&mut self, hop: HopNum) -> Result> { - // Was there previously a handler? - if self.sendmeta.is_some() { - return Err(Error::InternalError( - "Tried to register a second meta-cell handler".into(), - )); - } - let (sender, receiver) = oneshot::channel(); - - self.sendmeta = Some((hop, sender)); - - trace!( - "{}: Registered a meta-cell handler for hop {}", - self.unique_id, - hop - ); - - Ok(receiver) - } - - /// Handle a RELAY cell on this circuit with stream ID 0. - async fn handle_meta_cell(&mut self, hopnum: HopNum, msg: RelayMsg) -> Result<()> { - // SENDME cells and TRUNCATED get handled internally by the circuit. - if let RelayMsg::Sendme(s) = msg { - return self.handle_sendme(hopnum, s).await; - } - if let RelayMsg::Truncated(_) = msg { - // XXXX need to handle Truncated cells. This isn't the right - // way, but at least it's safe. - // TODO: If we ever do handle Truncate cells more - // correctly, we will need to audit all our use of HopNum - // to identify a layer. Otherwise we could confuse a - // message from the previous hop N with a message from the - // new hop N. - return Err(Error::CircuitClosed); - } - - trace!("{}: Received meta-cell {:?}", self.unique_id, msg); - - // For all other command types, we'll only get them in response - // to another command, which should have registered a responder. - // - // TODO: that means that service-introduction circuits will need - // a different implementation, but that should be okay. We'll work - // something out. - if let Some((expected_hop, sender)) = self.sendmeta.take() { - if expected_hop == hopnum { - // Somebody was waiting for a message -- maybe this message - sender - .send(Ok(msg)) - // I think this means that the channel got closed. - .map_err(|_| Error::CircuitClosed) - } else { - // Somebody wanted a message from a different hop! Put this - // one back. - self.sendmeta = Some((expected_hop, sender)); - Err(Error::CircProto(format!( - "Unexpected {} cell from hop {} on client circuit", - msg.cmd(), - hopnum, - ))) - } - } else { - // No need to call shutdown here, since this error will - // propagate to the reactor shut it down. - Err(Error::CircProto(format!( - "Unexpected {} cell on client circuit", - msg.cmd() - ))) - } - } - - /// Handle a RELAY_SENDME cell on this circuit with stream ID 0. - async fn handle_sendme(&mut self, hopnum: HopNum, msg: Sendme) -> Result<()> { - // No need to call "shutdown" on errors in this function; - // it's called from the reactor task and errors will propagate there. - let hop = self - .hop_mut(hopnum) - .ok_or_else(|| Error::CircProto(format!("Couldn't find {} hop", hopnum)))?; - - let auth: Option<[u8; 20]> = match msg.into_tag() { - Some(v) if v.len() == 20 => { - // XXXX ugly code. - let mut tag = [0_u8; 20]; - (&mut tag).copy_from_slice(&v[..]); - Some(tag) - } - Some(_) => return Err(Error::CircProto("malformed tag on circuit sendme".into())), - None => { - if !hop.auth_sendme_optional { - return Err(Error::CircProto("missing tag on circuit sendme".into())); - } else { - None - } - } - }; - match hop.sendwindow.put(auth).await { - Some(_) => Ok(()), - None => Err(Error::CircProto("bad auth tag on circuit sendme".into())), - } - } - - /// Helper: Put a cell onto this circuit's channel. - /// - /// This takes a raw cell that has already been encrypted, puts - /// a circuit ID on it, and sends it. - /// - /// Does not check whether the cell is well-formed or reasonable. - async fn send_msg(&mut self, msg: ChanMsg) -> Result<()> { - let cell = ChanCell::new(self.id, msg); - self.channel.send_cell(cell).await?; - Ok(()) - } - - /// Helper: Encode the relay cell `cell`, encrypt it, and send it to the - /// 'hop'th hop. - /// - /// Does not check whether the cell is well-formed or reasonable. - async fn send_relay_cell(&mut self, hop: HopNum, early: bool, cell: RelayCell) -> Result<()> { - let c_t_w = sendme::cell_counts_towards_windows(&cell); - let mut body: RelayCellBody = cell.encode(&mut thread_rng())?.into(); - let tag = self.crypto_out.encrypt(&mut body, hop)?; - let msg = chancell::msg::Relay::from_raw(body.into()); - let msg = if early { - ChanMsg::RelayEarly(msg) - } else { - ChanMsg::Relay(msg) - }; - // If the cell counted towards our sendme window, decrement - // that window, and maybe remember the authentication tag. - if c_t_w { - // TODO: I'd like to use get_hops_mut here, but the borrow checker - // won't let me. - // This blocks if the send window is empty. - self.hops[Into::::into(hop)] - .sendwindow - .take(tag) - .await?; - } - self.send_msg(msg).await - } - - /// Shut down this circuit's reactor and send a DESTROY cell. - /// - /// This is idempotent and safe to call more than once. - fn shutdown_reactor(&mut self) { - let _ = self.control.unbounded_send(CtrlMsg::Shutdown); - // Drop the circuit destroy handle now so that a DESTROY cell - // gets sent. - drop(self.circ_closed.take()); + pub fn n_hops(&self) -> u8 { + self.hops.load(Ordering::SeqCst) } } @@ -854,113 +400,45 @@ impl PendingClientCirc { id: CircId, channel: Channel, createdreceiver: oneshot::Receiver, - circ_closed: Option, input: mpsc::Receiver, unique_id: UniqId, ) -> (PendingClientCirc, reactor::Reactor) { let crypto_out = OutboundClientCrypt::new(); let (control_tx, control_rx) = mpsc::unbounded(); - let hops = Vec::new(); + let num_hops = Arc::new(AtomicU8::new(0)); - let circuit_impl = ClientCircImpl { - id, + let reactor = Reactor { + control: control_rx, + outbound: Default::default(), channel, + input, + crypto_in: InboundClientCrypt::new(), + hops: vec![], + unique_id, + channel_id: id, crypto_out, - hops, - circ_closed, - control: control_tx, - sendmeta: None, - unique_id, + meta_handler: None, + num_hops: Arc::clone(&num_hops), }; + let circuit = ClientCirc { - closed: AtomicBool::new(false), - c: Mutex::new(circuit_impl), + hops: num_hops, unique_id, + control: control_tx, + circid: id, }; - let circuit = Arc::new(circuit); + let pending = PendingClientCirc { recvcreated: createdreceiver, - circ: Arc::clone(&circuit), + circ: circuit, }; - let reactor = reactor::Reactor::new(&circuit, control_rx, input, unique_id); (pending, reactor) } - /// Check whether this pending circuit matches a given channel target; - /// return an error if it doesn't. - async fn check_chan_match(&self, target: &T) -> Result<()> { - let c = self.circ.c.lock().await; - c.channel.check_match(target) - } - - /// Testing only: extract the circuit ID for this pending circuit. + /// Testing only: Extract the circuit ID for this pending circuit. #[cfg(test)] - pub(crate) async fn peek_circid(&self) -> CircId { - let c = self.circ.c.lock().await; - c.id - } - - /// Helper: create the first hop of a circuit. - /// - /// This is parameterized not just on the RNG, but a wrapper object to - /// build the right kind of create cell, a handshake object to perform - /// the cryptographic cryptographic handshake, and a layer type to - /// handle relay crypto after this hop is built. - async fn create_impl( - self, - rng: &mut R, - wrap: &W, - key: &H::KeyType, - supports_flowctrl_1: bool, - params: &CircParameters, - ) -> Result> - where - R: Rng + CryptoRng, - L: CryptInit + ClientLayer + 'static + Send, // need all this?XXXX - FWD: OutboundClientLayer + 'static + Send, - REV: InboundClientLayer + 'static + Send, - H: ClientHandshake, - W: CreateHandshakeWrap, - H::KeyGen: KeyGenerator, - { - // We don't need to shut down the circuit on failure here, since this - // function consumes the PendingClientCirc and only returns - // a ClientCirc on success. - - let PendingClientCirc { circ, recvcreated } = self; - let (state, msg) = H::client1(rng, key)?; - let create_cell = wrap.to_chanmsg(msg); - let unique_id = { - let mut c = circ.c.lock().await; - debug!( - "{}: Extending to hop 1 with {}", - c.unique_id, - create_cell.cmd() - ); - c.send_msg(create_cell).await?; - c.unique_id - }; - - let reply = recvcreated - .await - .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?; - - let relay_handshake = wrap.from_chanmsg(reply)?; - let keygen = H::client2(state, relay_handshake)?; - - let layer = L::construct(keygen)?; - - debug!("{}: Handshake complete; circuit created.", unique_id); - - let (layer_fwd, layer_back) = layer.split(); - circ.add_hop( - supports_flowctrl_1, - Box::new(layer_fwd), - Box::new(layer_back), - params, - ) - .await?; - Ok(circ) + pub(crate) fn peek_circid(&self) -> CircId { + self.circ.circid } /// Use the (questionable!) CREATE_FAST handshake to connect to the @@ -969,66 +447,60 @@ impl PendingClientCirc { /// There's no authentication in CRATE_FAST, /// so we don't need to know whom we're connecting to: we're just /// connecting to whichever relay the channel is for. - pub async fn create_firsthop_fast( - self, - rng: &mut R, - params: &CircParameters, - ) -> Result> - where - R: Rng + CryptoRng, - { - use crate::crypto::cell::Tor1RelayCrypto; - use crate::crypto::handshake::fast::CreateFastClient; - let wrap = CreateFastWrap; - self.create_impl::( - rng, - &wrap, - &(), - false, - params, - ) - .await + pub async fn create_firsthop_fast(self, params: CircParameters) -> Result { + let (tx, rx) = oneshot::channel(); + self.circ + .control + .unbounded_send(CtrlMsg::Create { + recv_created: self.recvcreated, + handshake: CircuitHandshake::CreateFast, + supports_authenticated_sendme: false, + params: params.clone(), + done: tx, + }) + .map_err(|_| Error::CircuitClosed)?; + + rx.await.map_err(|_| Error::CircuitClosed)??; + + Ok(self.circ) } /// Use the ntor handshake to connect to the first hop of this circuit. /// /// Note that the provided 'target' must match the channel's target, /// or the handshake will fail. - pub async fn create_firsthop_ntor( + pub async fn create_firsthop_ntor( self, - rng: &mut R, target: &Tg, - params: &CircParameters, - ) -> Result> + params: CircParameters, + ) -> Result where - R: Rng + CryptoRng, Tg: tor_linkspec::CircTarget, { - use crate::crypto::cell::Tor1RelayCrypto; - use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey}; - - // Exit now if we have an Ed25519 or RSA identity mismatch. - self.check_chan_match(target).await?; - - let wrap = Create2Wrap { - handshake_type: 0x0002, // ntor - }; - let key = NtorPublicKey { - id: *target.rsa_identity(), - pk: *target.ntor_onion_key(), - }; - // FlowCtrl=1 means that this hop supports authenticated SENDMEs + let (tx, rx) = oneshot::channel(); let supports_flowctrl_1 = target .protovers() .supports_known_subver(tor_protover::ProtoKind::FlowCtrl, 1); - self.create_impl::( - rng, - &wrap, - &key, - supports_flowctrl_1, - params, - ) - .await + self.circ + .control + .unbounded_send(CtrlMsg::Create { + recv_created: self.recvcreated, + handshake: CircuitHandshake::Ntor { + public_key: NtorPublicKey { + id: *target.rsa_identity(), + pk: *target.ntor_onion_key(), + }, + ed_identity: *target.ed_identity(), + }, + supports_authenticated_sendme: supports_flowctrl_1, + params: params.clone(), + done: tx, + }) + .map_err(|_| Error::CircuitClosed)?; + + rx.await.map_err(|_| Error::CircuitClosed)??; + + Ok(self.circ) } } @@ -1085,48 +557,32 @@ impl CreateHandshakeWrap for Create2Wrap { } impl StreamTarget { - /// Initial value for outbound flow-control window on streams. - const SEND_WINDOW_INIT: u16 = 500; - /// Deliver a relay message for the stream that owns this StreamTarget. /// /// The StreamTarget will set the correct stream ID and pick the /// right hop, but will not validate that the message is well-formed /// or meaningful in context. pub(crate) async fn send(&mut self, msg: RelayMsg) -> Result<()> { - if sendme::msg_counts_towards_windows(&msg) { - // Decrement the stream window (and block if it's empty) - self.window.take(&()).await?; - } - let cell = RelayCell::new(self.stream_id, msg); - self.circ.send_relay_cell(self.hop, false, cell).await + self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?; + Ok(()) } /// Called when a circuit-level protocol error has occurred and the /// circuit needs to shut down. - pub(crate) async fn protocol_error(&mut self) { - self.circ.protocol_error().await; + pub(crate) fn protocol_error(&mut self) { + self.circ.protocol_error(); } -} -impl Drop for ClientCircImpl { - fn drop(&mut self) { - self.shutdown_reactor(); - } -} - -impl Drop for StreamTarget { - fn drop(&mut self) { - // This "clone" call is a bit dangerous: it means that we might - // allow the other side to send a couple of cells that get - // decremented from self.recvwindow but don't get reflected - // in the circuit-owned view of the window. - let window = self.recvwindow.clone(); - let _ = self + /// Send a SENDME cell for this stream. + pub(crate) fn send_sendme(&mut self) -> Result<()> { + self.circ .control - .unbounded_send(CtrlMsg::CloseStream(self.hop, self.stream_id, window)); - // If there's an error, no worries: it's hard-cancel, and we - // can just ignore it. XXXX (I hope?) + .unbounded_send(CtrlMsg::SendSendme { + stream_id: self.stream_id, + hop_num: self.hop_num, + }) + .map_err(|_| Error::CircuitClosed)?; + Ok(()) } } @@ -1150,17 +606,20 @@ mod test { use super::*; use crate::channel::test::new_reactor; + use crate::crypto::cell::RelayCellBody; use chanmsg::{ChanMsg, Created2, CreatedFast}; use futures::channel::mpsc::{Receiver, Sender}; use futures::io::{AsyncReadExt, AsyncWriteExt}; use futures::sink::SinkExt; use futures::stream::StreamExt; use hex_literal::hex; + use rand::thread_rng; + use std::time::Duration; use tokio::runtime::Handle; use tokio_crate as tokio; use tokio_crate::test as async_test; - use tor_cell::chancell::msg as chanmsg; - use tor_cell::relaycell::msg as relaymsg; + use tor_cell::chancell::{msg as chanmsg, ChanCell}; + use tor_cell::relaycell::{msg as relaymsg, RelayCell, StreamId}; use tor_llcrypto::pk; fn rmsg_to_ccmsg(id: ID, msg: relaymsg::RelayMsg) -> ClientCircChanMsg @@ -1233,7 +692,6 @@ mod test { // via a crate_fast handshake. use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake}; - use futures::future::FutureExt; let (chan, mut rx, _sink) = working_fake_channel(); let circid = 128.into(); @@ -1241,14 +699,10 @@ mod test { let (_circmsg_send, circmsg_recv) = mpsc::channel(64); let unique_id = UniqId::new(23, 17); - let (pending, mut reactor) = PendingClientCirc::new( - circid, - chan, - created_recv, - None, // circ_closed. - circmsg_recv, - unique_id, - ); + let (pending, reactor) = + PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id); + + Handle::current().spawn(reactor.run()); // Future to pretend to be a relay on the other end of the circuit. let simulate_relay_fut = async move { @@ -1275,25 +729,20 @@ mod test { }; // Future to pretend to be a client. let client_fut = async move { - let mut rng = rand::thread_rng(); let target = example_target(); let params = CircParameters::default(); let ret = if fast { - trace!("doing fast create"); - pending.create_firsthop_fast(&mut rng, ¶ms).await + eprintln!("doing fast create"); + pending.create_firsthop_fast(params).await } else { - trace!("doing ntor create"); - pending - .create_firsthop_ntor(&mut rng, &target, ¶ms) - .await + eprintln!("doing ntor create"); + pending.create_firsthop_ntor(&target, params).await }; - trace!("create done: result {:?}", ret); + eprintln!("create done: result {:?}", ret); ret }; - // Future to run the reactor. - let reactor_fut = reactor.run_once().map(|_| ()); - let (circ, _, _) = futures::join!(client_fut, reactor_fut, simulate_relay_fut); + let (circ, _) = futures::join!(client_fut, simulate_relay_fut); let _circ = circ.unwrap(); @@ -1315,7 +764,7 @@ mod test { // An encryption layer that doesn't do any crypto. Can be used // as inbound or outbound, but not both at once. - struct DummyCrypto { + pub(crate) struct DummyCrypto { counter_tag: [u8; 20], counter: u32, lasthop: bool, @@ -1348,7 +797,7 @@ mod test { } } impl DummyCrypto { - fn new(lasthop: bool) -> Self { + pub(crate) fn new(lasthop: bool) -> Self { DummyCrypto { counter_tag: [0; 20], counter: 0, @@ -1362,24 +811,16 @@ mod test { async fn newcirc_ext( chan: Channel, next_msg_from: HopNum, - ) -> ( - Arc, - reactor::Reactor, - mpsc::Sender, - ) { + ) -> (ClientCirc, mpsc::Sender) { let circid = 128.into(); let (_created_send, created_recv) = oneshot::channel(); let (circmsg_send, circmsg_recv) = mpsc::channel(64); let unique_id = UniqId::new(23, 17); - let (pending, mut reactor) = PendingClientCirc::new( - circid, - chan, - created_recv, - None, // circ_closed. - circmsg_recv, - unique_id, - ); + let (pending, reactor) = + PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id); + + Handle::current().spawn(reactor.run()); let PendingClientCirc { circ, @@ -1388,31 +829,25 @@ mod test { for idx in 0_u8..3 { let params = CircParameters::default(); - let (hopf, reacf) = futures::join!( - circ.add_hop( - true, - Box::new(DummyCrypto::new(idx == 2)), - Box::new(DummyCrypto::new(idx == next_msg_from.into())), - ¶ms, - ), - reactor.run_once() - ); - assert!(hopf.is_ok()); - assert!(reacf.is_ok()); + let (tx, rx) = oneshot::channel(); + circ.control + .unbounded_send(CtrlMsg::AddFakeHop { + supports_flowctrl_1: true, + fwd_lasthop: idx == 2, + rev_lasthop: idx == next_msg_from.into(), + params, + done: tx, + }) + .unwrap(); + rx.await.unwrap().unwrap(); } - (circ, reactor, circmsg_send) + (circ, circmsg_send) } // Helper: set up a 3-hop circuit with no encryption, where the // next inbound message seems to come from hop next_msg_from - async fn newcirc( - chan: Channel, - ) -> ( - Arc, - reactor::Reactor, - mpsc::Sender, - ) { + async fn newcirc(chan: Channel) -> (ClientCirc, mpsc::Sender) { newcirc_ext(chan, 2.into()).await } @@ -1420,10 +855,14 @@ mod test { #[async_test] async fn send_simple() { let (chan, mut rx, _sink) = working_fake_channel(); - let (circ, _reactor, _send) = newcirc(chan).await; + let (circ, _send) = newcirc(chan).await; let begindir = RelayCell::new(0.into(), RelayMsg::BeginDir); - circ.send_relay_cell(2.into(), false, begindir) - .await + circ.control + .unbounded_send(CtrlMsg::SendRelayCell { + hop: 2.into(), + early: false, + cell: begindir, + }) .unwrap(); // Here's what we tried to put on the TLS channel. Note that @@ -1437,6 +876,11 @@ mod test { assert!(matches!(m.msg(), RelayMsg::BeginDir)); } + // NOTE(eta): this test is commented out because it basically tested implementation details + // of the old code which are hard to port to the reactor version, and the behaviour + // is covered by the extend tests anyway, so I don't think it's worth it. + + /* // Try getting a "meta-cell", which is what we're calling those not // for a specific circuit. #[async_test] @@ -1496,19 +940,19 @@ mod test { "circuit protocol violation: Unexpected EXTENDED2 cell from hop 1 on client circuit" ); } + */ #[async_test] async fn extend() { use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake}; let (chan, mut rx, _sink) = working_fake_channel(); - let (circ, mut reactor, mut sink) = newcirc(chan).await; + let (circ, mut sink) = newcirc(chan).await; let params = CircParameters::default(); let extend_fut = async move { let target = example_target(); - let mut rng = thread_rng(); - circ.extend_ntor(&mut rng, &target, ¶ms).await.unwrap(); + circ.extend_ntor(&target, ¶ms).await.unwrap(); circ // gotta keep the circ alive, or the reactor would exit. }; let reply_fut = async move { @@ -1531,41 +975,30 @@ mod test { sink.send(rmsg_to_ccmsg(0, extended2)).await.unwrap(); sink // gotta keep the sink alive, or the reactor will exit. }; - let reactor_fut = async move { - reactor.run_once().await.unwrap(); // to deliver the relay cell - reactor.run_once().await.unwrap(); // to handle the AddHop - }; - let (circ, _, _) = futures::join!(extend_fut, reply_fut, reactor_fut); + let (circ, _) = futures::join!(extend_fut, reply_fut); // Did we really add another hop? - assert_eq!(circ.n_hops().await, 4); + assert_eq!(circ.n_hops(), 4); } async fn bad_extend_test_impl(reply_hop: HopNum, bad_reply: ClientCircChanMsg) -> Error { let (chan, _rx, _sink) = working_fake_channel(); - let (circ, mut reactor, mut sink) = newcirc_ext(chan, reply_hop).await; + let (circ, mut sink) = newcirc_ext(chan, reply_hop).await; let params = CircParameters::default(); let extend_fut = async move { let target = example_target(); - let mut rng = thread_rng(); - let outcome = circ.extend_ntor(&mut rng, &target, ¶ms).await; + let outcome = circ.extend_ntor(&target, ¶ms).await; (outcome, circ) // keep the circ alive, or the reactor will exit. }; let bad_reply_fut = async move { sink.send(bad_reply).await.unwrap(); sink // keep the sink alive, or the reactor will exit. }; - let reactor_fut = async move { - let res = reactor.run_once().await; - if res.is_err() { - reactor.propagate_close().await; - } - }; - let ((outcome, circ), _, _) = futures::join!(extend_fut, bad_reply_fut, reactor_fut); + let ((outcome, circ), _) = futures::join!(extend_fut, bad_reply_fut); - assert_eq!(circ.n_hops().await, 3); + assert_eq!(circ.n_hops(), 3); assert!(outcome.is_err()); outcome.unwrap_err() } @@ -1581,9 +1014,7 @@ mod test { // code's meta-handler. Instead the unexpected message will cause // the circuit to get torn down. match error { - Error::CircDestroy(s) => { - assert_eq!(s, "Circuit closed while waiting for EXTENDED2"); - } + Error::CircuitClosed => {} x => panic!("got other error: {}", x), } } @@ -1607,7 +1038,7 @@ mod test { let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into())); let error = bad_extend_test_impl(2.into(), cc).await; match error { - Error::CircDestroy(s) => assert_eq!(s, "Circuit closed while waiting for EXTENDED2"), + Error::CircuitClosed => {} _ => panic!(), } } @@ -1623,12 +1054,12 @@ mod test { #[async_test] async fn begindir() { let (chan, mut rx, _sink) = working_fake_channel(); - let (circ, mut reactor, mut sink) = newcirc(chan).await; + let (circ, mut sink) = newcirc(chan).await; let begin_and_send_fut = async move { // Here we'll say we've got a circuit, and we want to // make a simple BEGINDIR request with it. - let mut stream = circ.begin_dir_stream().await.unwrap(); + let mut stream = Arc::new(circ).begin_dir_stream().await.unwrap(); stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap(); stream.flush().await.unwrap(); let mut buf = [0_u8; 1024]; @@ -1681,33 +1112,26 @@ mod test { sink // gotta keep the sink alive, or the reactor will exit. }; - let reactor_fut = async move { - reactor.run_once().await.unwrap(); // AddStream - reactor.run_once().await.unwrap(); // Connected cell - reactor.run_once().await.unwrap(); // Data cell - reactor.run_once().await.unwrap(); // End cell - reactor - }; - let (_stream, _, _) = futures::join!(begin_and_send_fut, reply_fut, reactor_fut); + let (_stream, _) = futures::join!(begin_and_send_fut, reply_fut); } // Set up a circuit and stream that expects some incoming SENDMEs. async fn setup_incoming_sendme_case( n_to_send: usize, ) -> ( - Arc, + ClientCirc, DataStream, mpsc::Sender, StreamId, - crate::circuit::reactor::Reactor, usize, + Receiver, + Sender>, ) { - let (chan, mut rx, _sink) = working_fake_channel(); - let (circ, mut reactor, mut sink) = newcirc(chan).await; - let (snd_done, mut rcv_done) = oneshot::channel::<()>(); + let (chan, mut rx, sink2) = working_fake_channel(); + let (circ, mut sink) = newcirc(chan).await; - let circ_clone = Arc::clone(&circ); + let circ_clone = Arc::new(circ.clone()); let begin_and_send_fut = async move { // Take our circuit and make a stream on it. let mut stream = circ_clone @@ -1758,40 +1182,33 @@ mod test { panic!() } } - snd_done.send(()).unwrap(); - (sink, streamid, cells_received) + (sink, streamid, cells_received, rx) }; - let reactor_fut = async move { - use futures::FutureExt; - loop { - futures::select! { - r = reactor.run_once().fuse() => r.unwrap(), - _ = rcv_done => break, - } - } - reactor - }; + let (stream, (sink, streamid, cells_received, rx)) = + futures::join!(begin_and_send_fut, receive_fut); - let (stream, (sink, streamid, cells_received), reactor) = - futures::join!(begin_and_send_fut, receive_fut, reactor_fut); - - (circ, stream, sink, streamid, reactor, cells_received) + (circ, stream, sink, streamid, cells_received, rx, sink2) } #[async_test] async fn accept_valid_sendme() { - let (circ, _stream, mut sink, streamid, mut reactor, cells_received) = + let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) = setup_incoming_sendme_case(300 * 498 + 3).await; assert_eq!(cells_received, 301); // Make sure that the circuit is indeed expecting the right sendmes { - let mut c = circ.c.lock().await; - let hop = c.hop_mut(2.into()).unwrap(); - let (window, tags) = hop.sendwindow.window_and_expected_tags().await; + let (tx, rx) = oneshot::channel(); + circ.control + .unbounded_send(CtrlMsg::QuerySendWindow { + hop: 2.into(), + done: tx, + }) + .unwrap(); + let (window, tags) = rx.await.unwrap().unwrap(); assert_eq!(window, 1000 - 301); assert_eq!(tags.len(), 3); // 100 @@ -1815,20 +1232,22 @@ mod test { sink }; - let reactor_fut = async move { - reactor.run_once().await.unwrap(); // circuit sendme - reactor.run_once().await.unwrap(); // stream sendme - reactor - }; - - let (_, _) = futures::join!(reply_with_sendme_fut, reactor_fut); + let _sink = reply_with_sendme_fut.await; + // FIXME(eta): this is a hacky way of waiting for the reactor to run before doing the below + // query; should find some way to properly synchronize to avoid flakiness + tokio::time::sleep(Duration::from_millis(100)).await; // Now make sure that the circuit is still happy, and its // window is updated. { - let mut c = circ.c.lock().await; - let hop = c.hop_mut(2.into()).unwrap(); - let (window, _tags) = hop.sendwindow.window_and_expected_tags().await; + let (tx, rx) = oneshot::channel(); + circ.control + .unbounded_send(CtrlMsg::QuerySendWindow { + hop: 2.into(), + done: tx, + }) + .unwrap(); + let (window, _tags) = rx.await.unwrap().unwrap(); assert_eq!(window, 1000 - 201); } } @@ -1838,7 +1257,7 @@ mod test { // Same setup as accept_valid_sendme() test above but try giving // a sendme with the wrong tag. - let (_circ, _stream, mut sink, _streamid, mut reactor, _cells_received) = + let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) = setup_incoming_sendme_case(300 * 498 + 3).await; let reply_with_sendme_fut = async move { @@ -1849,19 +1268,18 @@ mod test { sink }; - let reactor_fut = async move { - use crate::util::err::ReactorError; - let r = reactor.run_once().await; - match r { - Err(ReactorError::Err(Error::CircProto(m))) => { - assert_eq!(m, "bad auth tag on circuit sendme") - } - _ => panic!(), - } - reactor - }; + let _sink = reply_with_sendme_fut.await; - let (_, _) = futures::join!(reply_with_sendme_fut, reactor_fut); + let mut tries = 0; + // FIXME(eta): we aren't testing the error message like we used to; however, we can at least + // check whether the reactor dies as a result of receiving invalid data. + while !circ.control.is_closed() { + tokio::time::sleep(Duration::from_millis(100)).await; + tries += 1; + if tries > 10 { + panic!("reactor continued running after invalid sendme"); + } + } // TODO: check that the circuit is shut down too } diff --git a/crates/tor-proto/src/circuit/halfstream.rs b/crates/tor-proto/src/circuit/halfstream.rs index d1901dfcc..f175aa67a 100644 --- a/crates/tor-proto/src/circuit/halfstream.rs +++ b/crates/tor-proto/src/circuit/halfstream.rs @@ -46,10 +46,10 @@ impl HalfStream { /// The caller must handle END cells; it is an internal error to pass /// END cells to this method. /// no ends here. - pub(super) async fn handle_msg(&mut self, msg: &RelayMsg) -> Result<()> { + pub(super) fn handle_msg(&mut self, msg: &RelayMsg) -> Result<()> { match msg { RelayMsg::Sendme(_) => { - self.sendw.put(Some(())).await.ok_or_else(|| { + self.sendw.put(Some(())).ok_or_else(|| { Error::CircProto("Too many sendmes on a closed stream!".into()) })?; Ok(()) @@ -91,15 +91,15 @@ mod test { #[async_test] async fn halfstream_sendme() -> Result<()> { let mut sendw = StreamSendWindow::new(101); - sendw.take(&()).await?; // Make sure that it will accept one sendme. + sendw.take(&())?; // Make sure that it will accept one sendme. let mut hs = HalfStream::new(sendw, StreamRecvWindow::new(20), true); // one sendme is fine let m = msg::Sendme::new_empty().into(); - assert!(hs.handle_msg(&m).await.is_ok()); + assert!(hs.handle_msg(&m).is_ok()); // but no more were expected! - let e = hs.handle_msg(&m).await.err().unwrap(); + let e = hs.handle_msg(&m).err().unwrap(); assert_eq!( format!("{}", e), "circuit protocol violation: Too many sendmes on a closed stream!" @@ -120,11 +120,11 @@ mod test { .unwrap() .into(); for _ in 0_u8..20 { - assert!(hs.handle_msg(&m).await.is_ok()); + assert!(hs.handle_msg(&m).is_ok()); } // But one more is a protocol violation. - let e = hs.handle_msg(&m).await.err().unwrap(); + let e = hs.handle_msg(&m).err().unwrap(); assert_eq!( format!("{}", e), "circuit protocol violation: Received a data cell in violation of a window" @@ -137,13 +137,13 @@ mod test { // We were told to accept a connected, so we'll accept one // and no more. let m = msg::Connected::new_empty().into(); - assert!(hs.handle_msg(&m).await.is_ok()); - assert!(hs.handle_msg(&m).await.is_err()); + assert!(hs.handle_msg(&m).is_ok()); + assert!(hs.handle_msg(&m).is_err()); // If we try that again with connected_ok == false, we won't // accept any. let mut hs = HalfStream::new(StreamSendWindow::new(20), StreamRecvWindow::new(20), false); - let e = hs.handle_msg(&m).await.err().unwrap(); + let e = hs.handle_msg(&m).err().unwrap(); assert_eq!( format!("{}", e), "circuit protocol violation: Bad CONNECTED cell on a closed stream!" @@ -154,7 +154,7 @@ mod test { async fn halfstream_other() { let mut hs = hs_new(); let m = msg::Extended2::new(Vec::new()).into(); - let e = hs.handle_msg(&m).await.err().unwrap(); + let e = hs.handle_msg(&m).err().unwrap(); assert_eq!( format!("{}", e), "circuit protocol violation: Bad EXTENDED2 cell on a closed stream!" diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index ee92d6515..2a1c78ed2 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -1,69 +1,155 @@ -//! Code to handle incoming cells on a circuit -//! -//! TODO: I don't have so much confidence in the close-and-cleanup -//! behavior here, or in the error handling behavior. -//! -//! TODO: perhaps this should share code with channel::reactor; perhaps -//! it should just not exist. - +//! Code to handle incoming cells on a circuit. use super::streammap::{ShouldSendEnd, StreamEnt}; -use crate::circuit::celltypes::ClientCircChanMsg; +use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse}; use crate::circuit::unique_id::UniqId; -use crate::circuit::{sendme, streammap}; -use crate::crypto::cell::{HopNum, InboundClientCrypt, InboundClientLayer}; +use crate::circuit::{ + sendme, streammap, CircParameters, Create2Wrap, CreateFastWrap, CreateHandshakeWrap, +}; +use crate::crypto::cell::{ + ClientLayer, CryptInit, HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt, + OutboundClientLayer, RelayCellBody, Tor1RelayCrypto, +}; use crate::util::err::ReactorError; use crate::{Error, Result}; -use tor_cell::chancell::msg::Relay; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::pin::Pin; +use tor_cell::chancell::msg::{ChanMsg, Relay}; use tor_cell::relaycell::msg::{End, RelayMsg, Sendme}; -use tor_cell::relaycell::{RelayCell, StreamId}; +use tor_cell::relaycell::{RelayCell, RelayCmd, StreamId}; use futures::channel::{mpsc, oneshot}; -use futures::select_biased; -use futures::sink::SinkExt; -use futures::stream::{self, StreamExt}; +use futures::Sink; +use futures::Stream; -use std::sync::atomic::Ordering; -use std::sync::{Arc, Weak}; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; -use tracing::{debug, trace}; +use crate::channel::Channel; +#[cfg(test)] +use crate::circuit::sendme::CircTag; +use crate::circuit::sendme::StreamSendWindow; +use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey}; +use crate::crypto::handshake::{ClientHandshake, KeyGenerator}; +use tor_cell::chancell; +use tor_cell::chancell::{ChanCell, CircId}; +use tor_linkspec::LinkSpec; +use tor_llcrypto::pk; +use tracing::{debug, trace, warn}; + +/// Initial value for outbound flow-control window on streams. +pub(super) const SEND_WINDOW_INIT: u16 = 500; +/// Initial value for inbound flow-control window on streams. +pub(super) const RECV_WINDOW_INIT: u16 = 500; +/// Size of the buffer used between the reactor and a `StreamReader`. +/// +/// FIXME(eta): We pick 2Ă— the receive window, which is very conservative (we arguably shouldn't +/// get sent more than the receive window anyway!). We might do due to things that +/// don't count towards the window though. +pub(super) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize; + +/// The type of a oneshot channel used to inform reactor users of the result of an operation. +pub(super) type ReactorResultChannel = oneshot::Sender>; + +/// A handshake type, to be used when creating circuit hops. +#[derive(Clone, Debug)] +pub(super) enum CircuitHandshake { + /// Use the CREATE_FAST handshake. + CreateFast, + /// Use the ntor handshake. + Ntor { + /// The public key of the relay. + public_key: NtorPublicKey, + /// The first hop's Ed25519 identity, which is verified against + /// the identity held in the circuit's channel. + ed_identity: pk::ed25519::Ed25519Identity, + }, +} /// A message telling the reactor to do something. +#[derive(Debug)] pub(super) enum CtrlMsg { + /// Create the first hop of this circuit. + Create { + /// A oneshot channel on which we'll receive the creation response. + recv_created: oneshot::Receiver, + /// The handshake type to use for the first hop. + handshake: CircuitHandshake, + /// Whether the hop supports authenticated SENDME cells. + supports_authenticated_sendme: bool, + /// Other parameters relevant for circuit creation. + params: CircParameters, + /// Oneshot channel to notify on completion. + done: ReactorResultChannel<()>, + }, + /// Extend a circuit by one hop, using the ntor handshake. + ExtendNtor { + /// The handshake type to use for this hop. + public_key: NtorPublicKey, + /// Information about how to connect to the relay we're extending to. + linkspecs: Vec, + /// Whether the hop supports authenticated SENDME cells. + supports_authenticated_sendme: bool, + /// Other parameters relevant for circuit extension. + params: CircParameters, + /// Oneshot channel to notify on completion. + done: ReactorResultChannel<()>, + }, + /// Begin a stream with the provided hop in this circuit. + /// + /// Allocates a stream ID, and sends the provided message to that hop. + BeginStream { + /// The hop number to begin the stream with. + hop_num: HopNum, + /// The message to send. + message: RelayMsg, + /// A channel to send messages on this stream down. + /// + /// This sender shouldn't ever block, because we use congestion control and only send + /// SENDME cells once we've read enough out of the other end. If it *does* block, we + /// can assume someone is trying to send us more cells than they should, and abort + /// the stream. + sender: mpsc::Sender, + /// A channel to receive messages to send on this stream from. + rx: mpsc::Receiver, + /// Oneshot channel to notify on completion, with the allocated stream ID. + done: ReactorResultChannel, + }, + /// Send a SENDME cell (used to ask for more data to be sent) on the given stream. + SendSendme { + /// The stream ID to send a SENDME for. + stream_id: StreamId, + /// The hop number the stream is on. + hop_num: HopNum, + }, /// Shut down the reactor. Shutdown, - /// Register a new one-shot receiver that can send a CtrlMsg to the - /// reactor. - /// Tell the reactor that a given stream has gone away. - CloseStream(HopNum, StreamId, sendme::StreamRecvWindow), - /// Ask the reactor for a new stream ID, and allocate a circuit for it. - AddStream( - HopNum, - mpsc::Sender, - sendme::StreamSendWindow, - oneshot::Sender>, - ), - /// Tell the reactor to add a new hop to its view of the circuit, and - /// then tell us when it has done so. - AddHop( - InboundHop, - Box, - oneshot::Sender<()>, - ), -} - -impl std::fmt::Debug for CtrlMsg { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use CtrlMsg::*; - match self { - Shutdown => write!(f, "Shutdown"), - CloseStream(h, s, _) => write!(f, "CloseStream({:?}, {:?}, _)", h, s), - AddStream(h, _, _, _) => write!(f, "AddStream({:?}, _, _, _)", h), - AddHop(_, _, _) => write!(f, "AddHop(_, _, _)"), - } - } + /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography. + #[cfg(test)] + AddFakeHop { + supports_flowctrl_1: bool, + fwd_lasthop: bool, + rev_lasthop: bool, + params: CircParameters, + done: ReactorResultChannel<()>, + }, + /// (tests only) Get the send window and expected tags for a given hop. + #[cfg(test)] + QuerySendWindow { + hop: HopNum, + done: ReactorResultChannel<(u16, Vec)>, + }, + /// (tests only) Send a raw relay cell with send_relay_cell(). + #[cfg(test)] + SendRelayCell { + hop: HopNum, + early: bool, + cell: RelayCell, + }, } /// Represents the reactor's view of a single hop. -pub(super) struct InboundHop { +pub(super) struct CircHop { /// Map from stream IDs to streams. /// /// We store this with the reactor instead of the circuit, since the @@ -72,86 +158,256 @@ pub(super) struct InboundHop { map: streammap::StreamMap, /// Window used to say how many cells we can receive. recvwindow: sendme::CircRecvWindow, + /// If true, this hop is using an older link protocol and we + /// shouldn't expect good authenticated SENDMEs from it. + auth_sendme_optional: bool, + /// Window used to say how many cells we can send. + sendwindow: sendme::CircSendWindow, + /// Buffer for messages we can't send to this hop yet due to congestion control. + /// + /// Contains the tag we should give to the send window, and the cell to send. + /// + /// This shouldn't grow unboundedly: we try and pop things off it first before + /// doing things that would result in it growing (and stop before growing it + /// if popping things off it can't be done). + outbound: VecDeque<([u8; 20], ChanCell)>, } -impl InboundHop { +impl CircHop { /// Create a new hop. - pub(super) fn new() -> Self { - InboundHop { + pub(super) fn new(auth_sendme_optional: bool, initial_window: u16) -> Self { + CircHop { map: streammap::StreamMap::new(), recvwindow: sendme::CircRecvWindow::new(1000), + auth_sendme_optional, + sendwindow: sendme::CircSendWindow::new(initial_window), + outbound: VecDeque::new(), } } } +/// An object that's waiting for a meta cell (one not associated with a stream) in order to make +/// progress. +/// +/// # Background +/// +/// The `Reactor` can't have async functions that send and receive cells, because its job is to +/// send and receive cells: if one of its functions tried to do that, it would just hang forever. +/// +/// To get around this problem, the reactor can send some cells, and then make one of these +/// `MetaCellHandler` objects, which will be run when the reply arrives. +pub(super) trait MetaCellHandler: Send { + /// The hop we're expecting the message to come from. This is compared against the hop + /// from which we actually receive messages, and an error is thrown if the two don't match. + fn expected_hop(&self) -> HopNum; + /// Called when the message we were waiting for arrives. + /// + /// Gets a copy of the `Reactor` in order to do anything it likes there. + fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()>; +} + +/// An object that can extend a circuit by one hop, using the `MetaCellHandler` trait. +/// +/// Yes, I know having trait bounds on structs is bad, but in this case it's necessary +/// since we want to be able to use `H::KeyType`. +struct CircuitExtender +where + H: ClientHandshake, +{ + /// Handshake state. + state: Option, + /// Whether the hop supports authenticated SENDME cells. + supports_flowctrl_1: bool, + /// Parameters used for this extension. + params: CircParameters, + /// An identifier for logging about this reactor's circuit. + unique_id: UniqId, + /// The hop we're expecting the EXTENDED2 cell to come back from. + expected_hop: HopNum, + /// `PhantomData` used to make the other type parameters required for a circuit extension + /// part of the `struct`, instead of having them be provided during a function call. + /// + /// This is done this way so we can implement `MetaCellHandler` for this type, which + /// doesn't include any generic type parameters; we need them to be part of the type + /// so we know what they are for that `impl` block. + phantom: PhantomData<(L, FWD, REV)>, +} +impl CircuitExtender +where + H: ClientHandshake, + H::KeyGen: KeyGenerator, + L: CryptInit + ClientLayer, + FWD: OutboundClientLayer + 'static + Send, + REV: InboundClientLayer + 'static + Send, +{ + /// Start extending a circuit, sending the necessary EXTEND cell and returning a + /// new `CircuitExtender` to be called when the reply arrives. + /// + /// The `handshake_id` is the numeric identifier for what kind of + /// handshake we're doing. The `key` is the relay's onion key that + /// goes along with the handshake, and the `linkspecs` are the + /// link specifiers to include in the EXTEND cell to tell the + /// current last hop which relay to connect to. + fn begin( + cx: &mut Context<'_>, + handshake_id: u16, + key: &H::KeyType, + linkspecs: Vec, + supports_flowctrl_1: bool, + params: CircParameters, + reactor: &mut Reactor, + ) -> Result { + let mut rng = rand::thread_rng(); + let unique_id = reactor.unique_id; + + use tor_cell::relaycell::msg::{Body, Extend2}; + // Perform the first part of the cryptographic handshake + let (state, msg) = H::client1(&mut rng, key)?; + + let n_hops = reactor.crypto_out.n_layers(); + let hop = ((n_hops - 1) as u8).into(); + + debug!( + "{}: Extending circuit to hop {} with {:?}", + unique_id, + n_hops + 1, + linkspecs + ); + + let extend_msg = Extend2::new(linkspecs, handshake_id, msg); + let cell = RelayCell::new(0.into(), extend_msg.into_message()); + + // Send the message to the last hop... + reactor.send_relay_cell( + cx, hop, true, // use a RELAY_EARLY cell + cell, + )?; + trace!("{}: waiting for EXTENDED2 cell", unique_id); + // ... and now we wait for a response. + + Ok(Self { + state: Some(state), + supports_flowctrl_1, + params, + unique_id, + expected_hop: hop, + phantom: Default::default(), + }) + } +} + +impl MetaCellHandler for CircuitExtender +where + H: ClientHandshake, + H::StateType: Send, + H::KeyGen: KeyGenerator, + L: CryptInit + ClientLayer + Send, + FWD: OutboundClientLayer + 'static + Send, + REV: InboundClientLayer + 'static + Send, +{ + fn expected_hop(&self) -> HopNum { + self.expected_hop + } + fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()> { + // XXXX If two EXTEND cells are of these are launched on the + // same circuit at once, could they collide in this part of + // the function? I don't _think_ so, but it might be a good idea + // to have an "extending" bit that keeps two tasks from entering + // extend_impl at the same time. + // + // Also we could enforce that `hop` is still what we expect it + // to be at this point. + + // Did we get the right response? + if msg.cmd() != RelayCmd::EXTENDED2 { + return Err(Error::CircProto(format!( + "wanted EXTENDED2; got {}", + msg.cmd(), + ))); + } + + // ???? Do we need to shutdown the circuit for the remaining error + // ???? cases in this function? + + let msg = match msg { + RelayMsg::Extended2(e) => e, + _ => return Err(Error::InternalError("Body didn't match cmd".into())), + }; + let relay_handshake = msg.into_body(); + + trace!( + "{}: Received EXTENDED2 cell; completing handshake.", + self.unique_id + ); + // Now perform the second part of the handshake, and see if it + // succeeded. + let keygen = H::client2( + self.state + .take() + .expect("CircuitExtender::finish() called twice"), + relay_handshake, + )?; + let layer = L::construct(keygen)?; + + debug!("{}: Handshake complete; circuit extended.", self.unique_id); + + // If we get here, it succeeded. Add a new hop to the circuit. + let (layer_fwd, layer_back) = layer.split(); + reactor.add_hop( + self.supports_flowctrl_1, + Box::new(layer_fwd), + Box::new(layer_back), + &self.params, + ); + Ok(()) + } +} + /// Object to handle incoming cells and background tasks on a circuit /// /// This type is returned when you finish a circuit; you need to spawn a /// new task that calls `run()` on it. #[must_use = "If you don't call run() on a reactor, the circuit won't work."] pub struct Reactor { - /// A stream of oneshot receivers that tell this reactor about things it - /// needs to handle, like closed streams. - // - // The actual type here is quite ugly! Is there a better way? - // - // See documentation of CtrlMsg and CtrlResult for info about why - // we're using this ugly type. - control: mpsc::UnboundedReceiver, - - /// Input Stream, on which we receive ChanMsg objects from this circuit's + /// Receiver for control messages for this reactor, sent by `ClientCirc` objects. + pub(super) control: mpsc::UnboundedReceiver, + /// Buffer for cells we can't send out the channel yet due to it being full. + /// + /// This should be used very very rarely: see `send_msg_direct`'s comments for more + /// information. (in fact, using it will generate a warning!) + pub(super) outbound: VecDeque, + /// The channel this circuit is using to send cells through. + pub(super) channel: Channel, + /// Input stream, on which we receive ChanMsg objects from this circuit's /// channel. // TODO: could use a SPSC channel here instead. - input: stream::Fuse>, - - /// Reference to the circuit. - circuit: Weak, + pub(super) input: mpsc::Receiver, /// The cryptographic state for this circuit for inbound cells. /// This object is divided into multiple layers, each of which is /// shared with one hop of the circuit. - /// - /// We keep this separately from the state for outbound cells, since - /// it is convenient for the reactor to be able to use this without - /// locking the circuit. - crypto_in: InboundClientCrypt, + pub(super) crypto_in: InboundClientCrypt, + /// The cryptographic state for this circuit for outbound cells. + pub(super) crypto_out: OutboundClientCrypt, /// List of hops state objects used by the reactor - hops: Vec, + pub(super) hops: Vec, + /// Shared atomic for the number of hops this circuit has. + pub(super) num_hops: Arc, /// An identifier for logging about this reactor's circuit. - unique_id: UniqId, + pub(super) unique_id: UniqId, + /// This circuit's identifier on the upstream channel. + pub(super) channel_id: CircId, + /// A handler for a meta cell, together with a result channel to notify on completion. + pub(super) meta_handler: Option<(Box, ReactorResultChannel<()>)>, } impl Reactor { - /// Construct a new Reactor. - pub(super) fn new( - circuit: &Arc, - control: mpsc::UnboundedReceiver, - input: mpsc::Receiver, - unique_id: UniqId, - ) -> Self { - Reactor { - input: input.fuse(), - control, - circuit: Arc::downgrade(circuit), - crypto_in: InboundClientCrypt::new(), - hops: Vec::new(), - unique_id, - } - } - /// Launch the reactor, and run until the circuit closes or we /// encounter an error. /// /// Once this method returns, the circuit is dead and cannot be /// used again. pub async fn run(mut self) -> Result<()> { - if let Some(circ) = self.circuit.upgrade() { - if circ.is_closing() { - return Err(Error::CircuitClosed); - } - } else { - return Err(Error::CircuitClosed); - } - debug!("{}: Running circuit reactor", self.unique_id); + trace!("{}: Running circuit reactor", self.unique_id); let result: Result<()> = loop { match self.run_once().await { Ok(()) => (), @@ -160,96 +416,638 @@ impl Reactor { } }; debug!("{}: Circuit reactor stopped: {:?}", self.unique_id, result); - self.propagate_close().await; result } - /// Tell the circuit that this reactor has been closed. - pub(super) async fn propagate_close(self) { - if let Some(circ) = self.circuit.upgrade() { - // TODO: should this call terminate? - circ.closed.store(true, Ordering::SeqCst); - let mut circ = circ.c.lock().await; - if let Some((_, sender)) = circ.sendmeta.take() { - let _ignore_err = sender.send(Err(Error::CircuitClosed)); - } - } - } - /// Helper for run: doesn't mark the circuit closed on finish. Only /// processes one cell or control message. pub(super) async fn run_once(&mut self) -> std::result::Result<(), ReactorError> { - // What's next to do? - let item = select_biased! { - // Got a control message! - ctrl = self.control.next() => { - match ctrl { - Some(CtrlMsg::Shutdown) | None => return Err(ReactorError::Shutdown), - Some(msg) => self.handle_control(msg).await?, - } - return Ok(()); - } - // we got a message on our channel, or it closed. - item = self.input.next() => item, - }; - let item = match item { - // the channel closed; we're done. - None => return Err(ReactorError::Shutdown), - // we got a ChanMsg! - Some(r) => r, - }; + #[allow(clippy::cognitive_complexity)] + let fut = futures::future::poll_fn(|cx| -> Poll> { + let mut create_message = None; + let mut did_things = false; - let exit = self.handle_cell(item).await?; - if exit { - return Err(ReactorError::Shutdown); + // Check whether we've got a control message pending. + if let Poll::Ready(ret) = Pin::new(&mut self.control).poll_next(cx) { + match ret { + None => { + trace!("{}: reactor shutdown due to control drop", self.unique_id); + return Poll::Ready(Err(ReactorError::Shutdown)); + } + Some(CtrlMsg::Shutdown) => { + trace!( + "{}: reactor shutdown due to explicit request", + self.unique_id + ); + return Poll::Ready(Err(ReactorError::Shutdown)); + } + // This message requires actually blocking, so we can't handle it inside + // this nonblocking poll_fn. + Some(x @ CtrlMsg::Create { .. }) => create_message = Some(x), + Some(msg) => { + self.handle_control(cx, msg)?; + did_things = true; + } + } + } + + // Check whether we've got an input message pending. + if let Poll::Ready(ret) = Pin::new(&mut self.input).poll_next(cx) { + match ret { + None => { + trace!("{}: reactor shutdown due to input drop", self.unique_id); + return Poll::Ready(Err(ReactorError::Shutdown)); + } + Some(cell) => { + if self.handle_cell(cx, cell)? { + trace!("{}: reactor shutdown due to handled cell", self.unique_id); + return Poll::Ready(Err(ReactorError::Shutdown)); + } + did_things = true; + } + } + } + + // Now for the tricky part. We want to grab some relay cells from all of our streams + // and forward them on to the channel, but we need to pay attention to both whether + // the channel can accept cells right now, and whether congestion control allows us + // to send them. + // + // We also have to do somewhat cursed things and call start_send inside this poll_fn, + // since we need to check whether the channel can still receive cells after each one + // that we send. + + let mut streams_to_close = vec![]; + let mut stream_relaycells = vec![]; + + // Is the channel ready to receive anything at all? + if self.channel.poll_ready(cx)? { + // (using this as a named block for early returns; not actually a loop) + #[allow(clippy::never_loop)] + 'outer: loop { + // First, drain our queue of things we tried to send earlier, but couldn't. + while let Some(msg) = self.outbound.pop_front() { + trace!("{}: sending from enqueued: {:?}", self.unique_id, msg); + Pin::new(&mut self.channel).start_send(msg)?; + + // `futures::Sink::start_send` dictates we need to call `poll_ready` before + // each `start_send` call. + if !self.channel.poll_ready(cx)? { + break 'outer; + } + } + + // Let's look at our hops, and streams for each hop. + for (i, hop) in self.hops.iter_mut().enumerate() { + let hop_num = HopNum::from(i as u8); + // If we can, drain our queue of things we tried to send earlier, but + // couldn't due to congestion control. + if hop.sendwindow.window() > 0 { + 'hop: while let Some((tag, msg)) = hop.outbound.pop_front() { + trace!( + "{}: sending from hop-{}-enqueued: {:?}", + self.unique_id, + i, + msg + ); + Pin::new(&mut self.channel).start_send(msg)?; + hop.sendwindow.take(&tag)?; + if !self.channel.poll_ready(cx)? { + break 'outer; + } + if hop.sendwindow.window() == 0 { + break 'hop; + } + } + } + // Look at all of the streams on this hop. + for (id, stream) in hop.map.inner().iter_mut() { + if let StreamEnt::Open { + rx, send_window, .. + } = stream + { + // Do the stream and hop send windows allow us to obtain and + // send something? + // + // FIXME(eta): not everything counts toward congestion control! + if send_window.window() > 0 && hop.sendwindow.window() > 0 { + match Pin::new(rx).poll_next(cx) { + Poll::Ready(Some(m)) => { + stream_relaycells + .push((hop_num, RelayCell::new(*id, m))); + } + Poll::Ready(None) => { + // Stream receiver was dropped; close the stream. + // We can't close it here though due to borrowck; that + // will happen later. + streams_to_close.push((hop_num, *id)); + } + Poll::Pending => {} + } + } + } + } + } + + break; + } + } + + // Close the streams we said we'd close. + for (hopn, id) in streams_to_close { + self.close_stream(cx, hopn, id)?; + did_things = true; + } + // Send messages we said we'd send. + for (hopn, rc) in stream_relaycells { + self.send_relay_cell(cx, hopn, false, rc)?; + did_things = true; + } + + let _ = Pin::new(&mut self.channel) + .poll_flush(cx) + .map_err(|_| Error::ChannelClosed)?; + if create_message.is_some() { + Poll::Ready(Ok(create_message)) + } else if did_things { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } + }); + let create_message = fut.await?; + if let Some(CtrlMsg::Create { + recv_created, + handshake, + supports_authenticated_sendme, + params, + done, + }) = create_message + { + let ret = match handshake { + CircuitHandshake::CreateFast => { + self.create_firsthop_fast(recv_created, ¶ms).await + } + CircuitHandshake::Ntor { + public_key, + ed_identity, + } => { + self.create_firsthop_ntor( + recv_created, + ed_identity, + public_key, + supports_authenticated_sendme, + ¶ms, + ) + .await + } + }; + let _ = done.send(ret); // don't care if sender goes away + futures::future::poll_fn(|cx| -> Poll> { + let _ = Pin::new(&mut self.channel) + .poll_flush(cx) + .map_err(|_| Error::ChannelClosed)?; + Poll::Ready(Ok(())) + }) + .await?; } Ok(()) } + /// Helper: create the first hop of a circuit. + /// + /// This is parameterized not just on the RNG, but a wrapper object to + /// build the right kind of create cell, a handshake object to perform + /// the cryptographic cryptographic handshake, and a layer type to + /// handle relay crypto after this hop is built. + async fn create_impl( + &mut self, + recvcreated: oneshot::Receiver, + wrap: &W, + key: &H::KeyType, + supports_flowctrl_1: bool, + params: &CircParameters, + ) -> Result<()> + where + L: CryptInit + ClientLayer + 'static + Send, // need all this?XXXX + FWD: OutboundClientLayer + 'static + Send, + REV: InboundClientLayer + 'static + Send, + H: ClientHandshake, + W: CreateHandshakeWrap, + H::KeyGen: KeyGenerator, + { + // We don't need to shut down the circuit on failure here, since this + // function consumes the PendingClientCirc and only returns + // a ClientCirc on success. + + let (state, msg) = { + // done like this because holding the RNG across an await boundary makes the future + // non-Send + let mut rng = rand::thread_rng(); + H::client1(&mut rng, key)? + }; + let create_cell = wrap.to_chanmsg(msg); + debug!( + "{}: Extending to hop 1 with {}", + self.unique_id, + create_cell.cmd() + ); + self.send_msg(create_cell).await?; + + let reply = recvcreated + .await + .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?; + + let relay_handshake = wrap.from_chanmsg(reply)?; + let keygen = H::client2(state, relay_handshake)?; + + let layer = L::construct(keygen)?; + + debug!("{}: Handshake complete; circuit created.", self.unique_id); + + let (layer_fwd, layer_back) = layer.split(); + self.add_hop( + supports_flowctrl_1, + Box::new(layer_fwd), + Box::new(layer_back), + params, + ); + Ok(()) + } + + /// Use the (questionable!) CREATE_FAST handshake to connect to the + /// first hop of this circuit. + /// + /// There's no authentication in CREATE_FAST, + /// so we don't need to know whom we're connecting to: we're just + /// connecting to whichever relay the channel is for. + async fn create_firsthop_fast( + &mut self, + recvcreated: oneshot::Receiver, + params: &CircParameters, + ) -> Result<()> { + use crate::crypto::handshake::fast::CreateFastClient; + let wrap = CreateFastWrap; + self.create_impl::( + recvcreated, + &wrap, + &(), + false, + params, + ) + .await + } + + /// Use the ntor handshake to connect to the first hop of this circuit. + /// + /// Note that the provided 'target' must match the channel's target, + /// or the handshake will fail. + async fn create_firsthop_ntor( + &mut self, + recvcreated: oneshot::Receiver, + ed_identity: pk::ed25519::Ed25519Identity, + pubkey: NtorPublicKey, + supports_flowctrl_1: bool, + params: &CircParameters, + ) -> Result<()> { + // Exit now if we have an Ed25519 or RSA identity mismatch. + // FIXME(eta): this is copypasta from Channel::check_match! + if self.channel.peer_rsa_id() != &pubkey.id { + return Err(Error::ChanMismatch(format!( + "Identity {} does not match target {}", + self.channel.peer_rsa_id(), + pubkey.id, + ))); + } + if self.channel.peer_ed25519_id() != &ed_identity { + return Err(Error::ChanMismatch(format!( + "Identity {} does not match target {}", + self.channel.peer_ed25519_id(), + ed_identity + ))); + } + + let wrap = Create2Wrap { + handshake_type: 0x0002, // ntor + }; + self.create_impl::( + recvcreated, + &wrap, + &pubkey, + supports_flowctrl_1, + params, + ) + .await + } + + /// Add a hop to the end of this circuit. + fn add_hop( + &mut self, + supports_flowctrl_1: bool, + fwd: Box, + rev: Box, + params: &CircParameters, + ) { + let hop = crate::circuit::reactor::CircHop::new( + supports_flowctrl_1, + params.initial_send_window(), + ); + self.hops.push(hop); + self.crypto_in.add_layer(rev); + self.crypto_out.add_layer(fwd); + self.num_hops.fetch_add(1, Ordering::SeqCst); + } + + /// Handle a RELAY cell on this circuit with stream ID 0. + fn handle_meta_cell(&mut self, hopnum: HopNum, msg: RelayMsg) -> Result<()> { + // SENDME cells and TRUNCATED get handled internally by the circuit. + if let RelayMsg::Sendme(s) = msg { + return self.handle_sendme(hopnum, s); + } + if let RelayMsg::Truncated(_) = msg { + // XXXX need to handle Truncated cells. This isn't the right + // way, but at least it's safe. + // TODO: If we ever do handle Truncate cells more + // correctly, we will need to audit all our use of HopNum + // to identify a layer. Otherwise we could confuse a + // message from the previous hop N with a message from the + // new hop N. + return Err(Error::CircuitClosed); + } + + trace!("{}: Received meta-cell {:?}", self.unique_id, msg); + + // For all other command types, we'll only get them in response + // to another command, which should have registered a responder. + // + // TODO: that means that service-introduction circuits will need + // a different implementation, but that should be okay. We'll work + // something out. + if let Some((mut handler, done)) = self.meta_handler.take() { + if handler.expected_hop() == hopnum { + // Somebody was waiting for a message -- maybe this message + let ret = handler.finish(msg, self); + trace!( + "{}: meta handler completed with result: {:?}", + self.unique_id, + ret + ); + let _ = done.send(ret); // don't care if sender goes away + Ok(()) + } else { + // Somebody wanted a message from a different hop! Put this + // one back. + self.meta_handler = Some((handler, done)); + Err(Error::CircProto(format!( + "Unexpected {} cell from hop {} on client circuit", + msg.cmd(), + hopnum, + ))) + } + } else { + // No need to call shutdown here, since this error will + // propagate to the reactor shut it down. + Err(Error::CircProto(format!( + "Unexpected {} cell on client circuit", + msg.cmd() + ))) + } + } + + /// Handle a RELAY_SENDME cell on this circuit with stream ID 0. + fn handle_sendme(&mut self, hopnum: HopNum, msg: Sendme) -> Result<()> { + // No need to call "shutdown" on errors in this function; + // it's called from the reactor task and errors will propagate there. + let hop = self + .hop_mut(hopnum) + .ok_or_else(|| Error::CircProto(format!("Couldn't find {} hop", hopnum)))?; + + let auth: Option<[u8; 20]> = match msg.into_tag() { + Some(v) if v.len() == 20 => { + // XXXX ugly code. + let mut tag = [0_u8; 20]; + (&mut tag).copy_from_slice(&v[..]); + Some(tag) + } + Some(_) => return Err(Error::CircProto("malformed tag on circuit sendme".into())), + None => { + if !hop.auth_sendme_optional { + return Err(Error::CircProto("missing tag on circuit sendme".into())); + } else { + None + } + } + }; + match hop.sendwindow.put(auth) { + Some(_) => Ok(()), + None => Err(Error::CircProto("bad auth tag on circuit sendme".into())), + } + } + + /// Send a message onto the circuit's channel (to be called with a `Context`) + /// + /// If the channel is ready to accept messages, it will be sent immediately. If not, the message + /// will be enqueued for sending at a later iteration of the reactor loop. + /// + /// # Note + /// + /// Making use of the enqueuing capbilities of this function is discouraged! You should first + /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and + /// ideally use this to implement backpressure (such that you do not read from other sources + /// that would send here while you know you're unable to forward the messages on). + fn send_msg_direct(&mut self, cx: &mut Context<'_>, msg: ChanMsg) -> Result<()> { + let cell = ChanCell::new(self.channel_id, msg); + if self.channel.poll_ready(cx)? { + Pin::new(&mut self.channel).start_send(cell)?; + } else { + // This case shouldn't actually happen that often, if ever. We generally check whether + // the channel can be sent to before calling this function (the one exception at the + // time of writing is in circuit creation). + // + // If this is suddenly getting hit and it wasn't before, maybe you added something that + // doesn't bother to check the channel (`self.channel.poll_ready(cx)`) before calling + // this function, and that's getting used a lot? + // + // We don't want to drop cells on the floor, though, so this is good to have. + warn!( + "{}: having to enqueue cell due to backpressure: {:?}", + self.unique_id, cell + ); + self.outbound.push_back(cell); + } + Ok(()) + } + + /// Wrapper around `send_msg_direct` that uses `futures::future::poll_fn` to get a `Context`. + async fn send_msg(&mut self, msg: ChanMsg) -> Result<()> { + // HACK(eta): technically the closure passed to `poll_fn` is a `FnMut` closure, since it + // can be polled multiple times. + // We're going to return Ready immediately since we're only using `poll_fn` to + // get a `Context`, but the compiler doesn't know that, so use an `Option` + // which we can `take()` in order to move out of it. + // (if we do get polled again this'll panic, but that shouldn't happen!) + let mut msg = Some(msg); + futures::future::poll_fn(|cx| -> Poll> { + self.send_msg_direct(cx, msg.take().expect("poll_fn called twice?"))?; + Poll::Ready(Ok(())) + }) + .await?; + Ok(()) + } + + /// Encode the relay cell `cell`, encrypt it, and send it to the 'hop'th hop. + /// + /// Does not check whether the cell is well-formed or reasonable. + fn send_relay_cell( + &mut self, + cx: &mut Context<'_>, + hop: HopNum, + early: bool, + cell: RelayCell, + ) -> Result<()> { + let c_t_w = sendme::cell_counts_towards_windows(&cell); + let mut body: RelayCellBody = cell.encode(&mut rand::thread_rng())?.into(); + let tag = self.crypto_out.encrypt(&mut body, hop)?; + let msg = chancell::msg::Relay::from_raw(body.into()); + let msg = if early { + ChanMsg::RelayEarly(msg) + } else { + ChanMsg::Relay(msg) + }; + // If the cell counted towards our sendme window, decrement + // that window, and maybe remember the authentication tag. + if c_t_w { + let hop_num = Into::::into(hop); + let hop = &mut self.hops[hop_num]; + if hop.sendwindow.window() == 0 { + let cell = ChanCell::new(self.channel_id, msg); + // Send window is empty! Push this cell onto the hop's outbound queue, and it'll + // get sent later. + trace!( + "{}: having to use onto hop {} queue for cell: {:?}", + self.unique_id, + hop_num, + cell + ); + hop.outbound.push_back((*tag, cell)); + return Ok(()); + } + hop.sendwindow.take(tag)?; + } + self.send_msg_direct(cx, msg) + } + /// Handle a CtrlMsg other than Shutdown. - async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> { + fn handle_control(&mut self, cx: &mut Context<'_>, msg: CtrlMsg) -> Result<()> { trace!("{}: reactor received {:?}", self.unique_id, msg); match msg { - CtrlMsg::Shutdown => panic!(), // was handled in reactor loop. - CtrlMsg::CloseStream(hop, id, recvwindow) => { - self.close_stream(hop, id, recvwindow).await? + // This is handled earlier, since it requires blocking. + CtrlMsg::Create { .. } => panic!("got a CtrlMsg::Create in handle_control"), + // This is handled earlier, since it requires generating a ReactorError. + CtrlMsg::Shutdown => panic!("got a CtrlMsg::Shutdown in handle_control"), + CtrlMsg::ExtendNtor { + public_key, + linkspecs, + supports_authenticated_sendme, + params, + done, + } => { + match CircuitExtender::::begin( + cx, + 0x02, + &public_key, + linkspecs, + supports_authenticated_sendme, + params, + self, + ) { + Ok(e) => { + self.meta_handler = Some((Box::new(e), done)); + } + Err(e) => { + let _ = done.send(Err(e)); + } + }; } - CtrlMsg::AddStream(hop, sink, window, sender) => { - let hop = self.hop_mut(hop); - if let Some(hop) = hop { - let r = hop.map.add_ent(sink, window); - // XXXX not sure if this is right to ignore - let _ignore = sender.send(r); - } - // If there was no hop with this index, dropping the sender - // will cancel the attempt to add the stream. + CtrlMsg::BeginStream { + hop_num, + message, + sender, + rx, + done, + } => { + let ret = self.begin_stream(cx, hop_num, message, sender, rx); + let _ = done.send(ret); // don't care if sender goes away } - CtrlMsg::AddHop(hop, layer, sender) => { - self.hops.push(hop); - self.crypto_in.add_layer(layer); - // XXXX not sure if this is right to ignore - let _ignore = sender.send(()); + CtrlMsg::SendSendme { stream_id, hop_num } => { + let sendme = Sendme::new_empty(); + let cell = RelayCell::new(stream_id, sendme.into()); + self.send_relay_cell(cx, hop_num, false, cell)?; + } + #[cfg(test)] + CtrlMsg::AddFakeHop { + supports_flowctrl_1, + fwd_lasthop, + rev_lasthop, + params, + done, + } => { + use crate::circuit::test::DummyCrypto; + + let fwd = Box::new(DummyCrypto::new(fwd_lasthop)); + let rev = Box::new(DummyCrypto::new(rev_lasthop)); + self.add_hop(supports_flowctrl_1, fwd, rev, ¶ms); + let _ = done.send(Ok(())); + } + #[cfg(test)] + CtrlMsg::QuerySendWindow { hop, done } => { + let _ = done.send(if let Some(hop) = self.hop_mut(hop) { + Ok(hop.sendwindow.window_and_expected_tags()) + } else { + Err(Error::InternalError( + "received QuerySendWindow for unknown hop".into(), + )) + }); + } + #[cfg(test)] + CtrlMsg::SendRelayCell { hop, early, cell } => { + self.send_relay_cell(cx, hop, early, cell)?; } } Ok(()) } + /// Start a stream. Creates an entry in the stream map with the given channels, and sends the + /// `message` to the provided hop. + fn begin_stream( + &mut self, + cx: &mut Context<'_>, + hopnum: HopNum, + message: RelayMsg, + sender: mpsc::Sender, + rx: mpsc::Receiver, + ) -> Result { + let hop = self + .hop_mut(hopnum) + .ok_or_else(|| Error::InternalError(format!("No such hop {:?}", hopnum)))?; + let send_window = StreamSendWindow::new(SEND_WINDOW_INIT); + let r = hop.map.add_ent(sender, rx, send_window)?; + let cell = RelayCell::new(r, message); + self.send_relay_cell(cx, hopnum, false, cell)?; + Ok(r) + } + /// Close the stream associated with `id` because the stream was /// dropped. /// /// If we have not already received an END cell on this stream, send one. - async fn close_stream( - &mut self, - hopnum: HopNum, - id: StreamId, - window: sendme::StreamRecvWindow, - ) -> Result<()> { + fn close_stream(&mut self, cx: &mut Context<'_>, hopnum: HopNum, id: StreamId) -> Result<()> { // Mark the stream as closing. let hop = self.hop_mut(hopnum).ok_or_else(|| { Error::InternalError("Tried to close a stream on a hop that wasn't there?".into()) })?; - let should_send_end = hop.map.terminate(id, window)?; + let should_send_end = hop.map.terminate(id)?; trace!( "{}: Ending stream {}; should_send_end={:?}", self.unique_id, @@ -260,11 +1058,7 @@ impl Reactor { // we didn't already get an END cell. But I should double-check! if should_send_end == ShouldSendEnd::Send { let end_cell = RelayCell::new(id, End::new_misc().into()); - if let Some(circ) = self.circuit.upgrade() { - circ.send_relay_cell(hopnum, false, end_cell).await?; - } else { - return Err(Error::CircuitClosed); - } + self.send_relay_cell(cx, hopnum, false, end_cell)?; } Ok(()) } @@ -273,11 +1067,12 @@ impl Reactor { /// or rejected; a few get delivered to circuits. /// /// Return true if we should exit. - async fn handle_cell(&mut self, cell: ClientCircChanMsg) -> Result { + fn handle_cell(&mut self, cx: &mut Context<'_>, cell: ClientCircChanMsg) -> Result { + trace!("{}: handling cell: {:?}", self.unique_id, cell); use ClientCircChanMsg::*; match cell { Relay(r) => { - self.handle_relay_cell(r).await?; + self.handle_relay_cell(cx, r)?; Ok(false) } Destroy(_) => { @@ -288,7 +1083,7 @@ impl Reactor { } /// React to a Relay or RelayEarly cell. - async fn handle_relay_cell(&mut self, cell: Relay) -> Result<()> { + fn handle_relay_cell(&mut self, cx: &mut Context<'_>, cell: Relay) -> Result<()> { let mut body = cell.into_relay_body().into(); // Decrypt the cell. If it's recognized, then find the @@ -322,11 +1117,7 @@ impl Reactor { if send_circ_sendme { let sendme = Sendme::new_tag(tag); let cell = RelayCell::new(0.into(), sendme.into()); - if let Some(circ) = self.circuit.upgrade() { - circ.send_relay_cell(hopnum, false, cell).await?; - } else { - return Err(Error::CircuitClosed); - } + self.send_relay_cell(cx, hopnum, false, cell)?; self.hop_mut(hopnum) .ok_or_else(|| { Error::InternalError("Trying to send SENDME to nonexistent hop".to_string()) @@ -351,26 +1142,28 @@ impl Reactor { // If this has a reasonable streamID value of 0, it's a meta cell, // not meant for a particular stream. if streamid.is_zero() { - if let Some(circ) = self.circuit.upgrade() { - let mut circ = circ.c.lock().await; - return circ.handle_meta_cell(hopnum, msg).await; - } else { - return Err(Error::CircuitClosed); - } + return self.handle_meta_cell(hopnum, msg); } let hop = self .hop_mut(hopnum) .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?; match hop.map.get_mut(streamid) { - Some(StreamEnt::Open(s, w, ref mut dropped)) => { + Some(StreamEnt::Open { + sink, + send_window, + dropped, + .. + }) => { // The stream for this message exists, and is open. if let RelayMsg::Sendme(_) = msg { // We need to handle sendmes here, not in the stream's // recv() method, or else we'd never notice them if the // stream isn't reading. - w.put(Some(())).await; + // FIXME(eta): I think ignoring the must_use return value here is okay, since + // the tag is () anyway? or something??? + let _ = send_window.put(Some(())); return Ok(()); } @@ -380,35 +1173,44 @@ impl Reactor { // TODO: Add a wrapper type here to reject cells that should // never go to a client, like BEGIN. - let result = s.send(msg).await; - if result.is_err() && c_t_w { - // the other side of the stream has gone away; remember - // that we received a cell that we couldn't queue for it. - // - // Later this value will be recorded in a half-stream. - *dropped += 1; + if let Err(e) = sink.try_send(msg) { + if e.is_full() { + // If we get here, we either have a logic bug (!), or an attacker + // is sending us more cells than we asked for via congestion control. + return Err(Error::CircProto(format!( + "Stream sink would block; received too many cells on stream ID {}", + streamid, + ))); + } + if e.is_disconnected() && c_t_w { + // the other side of the stream has gone away; remember + // that we received a cell that we couldn't queue for it. + // + // Later this value will be recorded in a half-stream. + *dropped += 1; + } } if is_end_cell { hop.map.end_received(streamid)?; } - Ok(()) } Some(StreamEnt::EndSent(halfstream)) => { // We sent an end but maybe the other side hasn't heard. if matches!(msg, RelayMsg::End(_)) { - hop.map.end_received(streamid) + hop.map.end_received(streamid)?; } else { - halfstream.handle_msg(&msg).await + halfstream.handle_msg(&msg)?; } } _ => { // No stream wants this message. - Err(Error::CircProto( + return Err(Error::CircProto( "Cell received on nonexistent stream!?".into(), - )) + )); } } + Ok(()) } /// Helper: process a destroy cell. @@ -419,10 +1221,16 @@ impl Reactor { } /// Return the hop corresponding to `hopnum`, if there is one. - fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut InboundHop> { + fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> { self.hops.get_mut(Into::::into(hopnum)) } } +impl Drop for Reactor { + fn drop(&mut self) { + let _ = self.channel.close_circuit(self.channel_id); + } +} + #[cfg(test)] mod test {} diff --git a/crates/tor-proto/src/circuit/sendme.rs b/crates/tor-proto/src/circuit/sendme.rs index 1110852d8..2740bb26e 100644 --- a/crates/tor-proto/src/circuit/sendme.rs +++ b/crates/tor-proto/src/circuit/sendme.rs @@ -10,10 +10,7 @@ //! other side of the circuit really has read all of the data that it's //! acknowledging. -use futures::lock::Mutex; - use std::collections::VecDeque; -use std::sync::Arc; use tor_cell::relaycell::msg::RelayMsg; use tor_cell::relaycell::RelayCell; @@ -51,28 +48,14 @@ pub(crate) struct SendWindow where P: WindowParams, T: PartialEq + Eq + Clone, -{ - // TODO could use a bilock if that becomes non-experimental. - // TODO I wish we could do this without locking; we could make a bunch - // of these functions non-async if that happened. - /// Actual SendWindow object. - w: Arc>>, - /// Marker type to tell the compiler that the P type is used. - _dummy: std::marker::PhantomData

, -} - -/// Interior (locked) code for SendWindowInner. -struct SendWindowInner -where - T: PartialEq + Eq + Clone, { /// Current value for this window window: u16, /// Tag values that incoming "SENDME" messages need to match in order /// for us to send more data. tags: VecDeque, - /// An event to wait on if we find that we are out of cells. - unblock: event_listener::Event, + /// Marker type to tell the compiler that the P type is used. + _dummy: std::marker::PhantomData

, } /// Helper: parameterizes a window to determine its maximum and its increment. @@ -117,54 +100,36 @@ where pub(crate) fn new(window: u16) -> SendWindow { let increment = P::increment(); let capacity = (window + increment - 1) / increment; - let inner = SendWindowInner { + SendWindow { window, tags: VecDeque::with_capacity(capacity as usize), - unblock: event_listener::Event::new(), - }; - SendWindow { - w: Arc::new(Mutex::new(inner)), - _dummy: std::marker::PhantomData, - } - } - - /// Add a reference-count to SendWindow and return a new handle to it. - pub(crate) fn new_ref(&self) -> Self { - SendWindow { - w: Arc::clone(&self.w), _dummy: std::marker::PhantomData, } } /// Remove one item from this window (since we've sent a cell). + /// If the window was empty, returns an error. /// /// The provided tag is the one associated with the crypto layer that /// originated the cell. It will get cloned and recorded if we'll /// need to check for it later. /// /// Return the number of cells left in the window. - pub(crate) async fn take(&mut self, tag: &T) -> Result { - loop { - let wait_on = { - let mut w = self.w.lock().await; - if let Some(val) = w.window.checked_sub(1) { - w.window = val; - if w.window % P::increment() == 0 { - // We record this tag. - // TODO: I'm not saying that this cell in particular - // matches the spec, but Tor seems to like it. - w.tags.push_back(tag.clone()); - } + pub(crate) fn take(&mut self, tag: &T) -> Result { + if let Some(val) = self.window.checked_sub(1) { + self.window = val; + if self.window % P::increment() == 0 { + // We record this tag. + // TODO: I'm not saying that this cell in particular + // matches the spec, but Tor seems to like it. + self.tags.push_back(tag.clone()); + } - return Ok(val); - } - - // Window is zero; can't send yet. - w.unblock.listen() - }; - - // Wait on this event while _not_ holding the lock. - wait_on.await; + Ok(val) + } else { + Err(Error::CircProto( + "Called SendWindow::take() on empty SendWindow".into(), + )) } } @@ -179,36 +144,32 @@ where /// On failure, return None: the caller should close the stream /// or circuit with a protocol error. #[must_use = "didn't check whether SENDME tag was right."] - pub(crate) async fn put(&mut self, tag: Option) -> Option { - let mut w = self.w.lock().await; - - match (w.tags.front(), tag) { + pub(crate) fn put(&mut self, tag: Option) -> Option { + match (self.tags.front(), tag) { (Some(t), Some(tag)) if t == &tag => {} // this is the right tag. (Some(_), None) => {} // didn't need a tag. _ => { return None; } // Bad tag or unexpected sendme. } - w.tags.pop_front(); + self.tags.pop_front(); - let was_zero = w.window == 0; - - let v = w.window.checked_add(P::increment())?; - w.window = v; - - if was_zero { - w.unblock.notify(usize::MAX) - } + let v = self.window.checked_add(P::increment())?; + self.window = v; Some(v) } + /// Return the current send window value. + pub(crate) fn window(&self) -> u16 { + self.window + } + /// For testing: get a copy of the current send window, and the /// expected incoming tags. #[cfg(test)] - pub(crate) async fn window_and_expected_tags(&self) -> (u16, Vec) { - let inner = self.w.lock().await; - let tags = inner.tags.iter().map(Clone::clone).collect(); - (inner.window, tags) + pub(crate) fn window_and_expected_tags(&self) -> (u16, Vec) { + let tags = self.tags.iter().map(Clone::clone).collect(); + (self.window, tags) } } @@ -231,7 +192,7 @@ impl RecvWindow

{ } } - /// Called when we've just sent a cell; return true if we need to send + /// Called when we've just received a cell; return true if we need to send /// a sendme, and false otherwise. /// /// Returns None if we should not have sent the cell, and we just @@ -286,7 +247,6 @@ pub(crate) fn cell_counts_towards_windows(cell: &RelayCell) -> bool { mod test { #![allow(clippy::unwrap_used)] use super::*; - use futures::FutureExt; use tokio::test as async_test; use tokio_crate as tokio; use tor_cell::relaycell::{msg, RelayCell}; @@ -335,37 +295,37 @@ mod test { async fn sendwindow_basic() -> Result<()> { let mut w = new_sendwindow(); - let n = w.take(&"Hello").await?; + let n = w.take(&"Hello")?; assert_eq!(n, 999); for _ in 0_usize..98 { - w.take(&"world").await?; + w.take(&"world")?; } - assert_eq!(w.w.lock().await.window, 901); - assert_eq!(w.w.lock().await.tags.len(), 0); + assert_eq!(w.window, 901); + assert_eq!(w.tags.len(), 0); - let n = w.take(&"and").await?; + let n = w.take(&"and")?; assert_eq!(n, 900); - assert_eq!(w.w.lock().await.tags.len(), 1); - assert_eq!(w.w.lock().await.tags[0], "and"); + assert_eq!(w.tags.len(), 1); + assert_eq!(w.tags[0], "and"); - let n = w.take(&"goodbye").await?; + let n = w.take(&"goodbye")?; assert_eq!(n, 899); - assert_eq!(w.w.lock().await.tags.len(), 1); + assert_eq!(w.tags.len(), 1); // Try putting a good tag. - let n = w.put(Some("and")).await; + let n = w.put(Some("and")); assert_eq!(n, Some(999)); - assert_eq!(w.w.lock().await.tags.len(), 0); + assert_eq!(w.tags.len(), 0); for _ in 0_usize..300 { - w.take(&"dreamland").await?; + w.take(&"dreamland")?; } - assert_eq!(w.w.lock().await.tags.len(), 3); + assert_eq!(w.tags.len(), 3); // Put without a tag. - let n = w.put(None).await; + let n = w.put(None); assert_eq!(n, Some(799)); - assert_eq!(w.w.lock().await.tags.len(), 2); + assert_eq!(w.tags.len(), 2); Ok(()) } @@ -374,44 +334,41 @@ mod test { async fn sendwindow_bad_put() -> Result<()> { let mut w = new_sendwindow(); for _ in 0_usize..250 { - w.take(&"correct").await?; + w.take(&"correct")?; } // wrong tag: won't work. - assert_eq!(w.w.lock().await.window, 750); - let n = w.put(Some("incorrect")).await; + assert_eq!(w.window, 750); + let n = w.put(Some("incorrect")); assert!(n.is_none()); - let n = w.put(Some("correct")).await; + let n = w.put(Some("correct")); assert_eq!(n, Some(850)); - let n = w.put(Some("correct")).await; + let n = w.put(Some("correct")); assert_eq!(n, Some(950)); // no tag expected: won't work. - let n = w.put(Some("correct")).await; + let n = w.put(Some("correct")); assert_eq!(n, None); - assert_eq!(w.w.lock().await.window, 950); + assert_eq!(w.window, 950); - let n = w.put(None).await; + let n = w.put(None); assert_eq!(n, None); - assert_eq!(w.w.lock().await.window, 950); + assert_eq!(w.window, 950); Ok(()) } #[async_test] - async fn sendwindow_blocking() -> Result<()> { + async fn sendwindow_erroring() -> Result<()> { let mut w = new_sendwindow(); for _ in 0_usize..1000 { - w.take(&"here a string").await?; + w.take(&"here a string")?; } - assert_eq!(w.w.lock().await.window, 0); + assert_eq!(w.window, 0); - // This is going to block -- make sure it doesn't say it's ready. - let ready = w.take(&"there a string").now_or_never(); - assert!(ready.is_none()); - - // TODO: test that this actually wakes up when somebody else says "put". + let ready = w.take(&"there a string"); + assert!(ready.is_err()); Ok(()) } } diff --git a/crates/tor-proto/src/circuit/streammap.rs b/crates/tor-proto/src/circuit/streammap.rs index 8fd070747..4f918a3e7 100644 --- a/crates/tor-proto/src/circuit/streammap.rs +++ b/crates/tor-proto/src/circuit/streammap.rs @@ -14,19 +14,24 @@ use std::collections::HashMap; use rand::Rng; +use crate::circuit::reactor::RECV_WINDOW_INIT; +use crate::circuit::sendme::StreamRecvWindow; use tracing::info; /// The entry for a stream. pub(super) enum StreamEnt { - /// An open stream: any relay cells tagged for this stream should get - /// sent over the mpsc::Sender. - /// - /// The StreamSendWindow is used to make sure that incoming SENDME - /// cells; the u16 is a count of cells that we have dropped due to - /// the stream disappearing before we can transform this into an - /// EndSent. - // TODO: is this the best way? - Open(mpsc::Sender, sendme::StreamSendWindow, u16), + /// An open stream. + Open { + /// Sink to send relay cells tagged for this stream into. + sink: mpsc::Sender, + /// Stream for cells that should be sent down this stream. + rx: mpsc::Receiver, + /// Send window, for congestion control purposes. + send_window: sendme::StreamSendWindow, + /// Number of cells dropped due to the stream disappearing before we can + /// transform this into an `EndSent`. + dropped: u16, + }, /// A stream for which we have received an END cell, but not yet /// had the stream object get dropped. EndReceived, @@ -74,13 +79,24 @@ impl StreamMap { } } + /// Get the `HashMap` inside this stream map. + pub(super) fn inner(&mut self) -> &mut HashMap { + &mut self.m + } + /// Add an entry to this map; return the newly allocated StreamId. pub(super) fn add_ent( &mut self, sink: mpsc::Sender, - window: sendme::StreamSendWindow, + rx: mpsc::Receiver, + send_window: sendme::StreamSendWindow, ) -> Result { - let stream_ent = StreamEnt::Open(sink, window, 0); + let stream_ent = StreamEnt::Open { + sink, + rx, + send_window, + dropped: 0, + }; // This "65536" seems too aggressive, but it's what tor does. // // Also, going around in a loop here is (sadly) needed in order @@ -133,7 +149,7 @@ impl StreamMap { stream_entry.remove_entry(); Ok(()) } - StreamEnt::Open(_, _, _) => { + StreamEnt::Open { .. } => { stream_entry.insert(StreamEnt::EndReceived); Ok(()) } @@ -143,38 +159,30 @@ impl StreamMap { /// Handle a termination of the stream with `id` from this side of /// the circuit. Return true if the stream was open and an END /// ought to be sent. - pub(super) fn terminate( - &mut self, - id: StreamId, - mut recvw: sendme::StreamRecvWindow, - ) -> Result { - use ShouldSendEnd::*; - - // Check the hashmap for the right stream. Bail if not found. - // Also keep the hashmap handle so that we can do more efficient inserts/removals - let mut stream_entry = match self.m.entry(id) { - Entry::Vacant(_) => { - return Err(Error::InternalError( - "Somehow we terminated a nonexistent connection‽".into(), - )) - } - Entry::Occupied(o) => o, - }; - + pub(super) fn terminate(&mut self, id: StreamId) -> Result { // Progress the stream's state machine accordingly - match stream_entry.get() { - StreamEnt::EndReceived => { - stream_entry.remove_entry(); - Ok(DontSend) - } - StreamEnt::Open(_, sendw, n) => { - recvw.decrement_n(*n)?; + match self.m.remove(&id).ok_or_else(|| { + Error::InternalError("Somehow we terminated a nonexistent connection‽".into()) + })? { + StreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend), + StreamEnt::Open { + send_window, + dropped, + // notably absent: the channels for sink and stream, which will get dropped and + // closed (meaning reads/writes from/to this stream will now fail) + .. + } => { + // FIXME(eta): we don't copy the receive window, instead just creating a new one, + // so a malicious peer can send us slightly more data than they should + // be able to; see arti#230. + let mut recv_window = StreamRecvWindow::new(RECV_WINDOW_INIT); + recv_window.decrement_n(dropped)?; // TODO: would be nice to avoid new_ref. // XXXX: We should set connected_ok properly. let connected_ok = true; - let halfstream = HalfStream::new(sendw.new_ref(), recvw, connected_ok); - stream_entry.insert(StreamEnt::EndSent(halfstream)); - Ok(Send) + let halfstream = HalfStream::new(send_window, recv_window, connected_ok); + self.m.insert(id, StreamEnt::EndSent(halfstream)); + Ok(ShouldSendEnd::Send) } StreamEnt::EndSent(_) => { panic!("Hang on! We're sending an END on a stream where we already sent an END‽"); @@ -190,7 +198,7 @@ impl StreamMap { mod test { #![allow(clippy::unwrap_used)] use super::*; - use crate::circuit::sendme::{StreamRecvWindow, StreamSendWindow}; + use crate::circuit::sendme::StreamSendWindow; #[test] fn streammap_basics() -> Result<()> { @@ -200,8 +208,9 @@ mod test { // Try add_ent for _ in 0..128 { - let (sink, _) = mpsc::channel(2); - let id = map.add_ent(sink, StreamSendWindow::new(500))?; + let (sink, _) = mpsc::channel(128); + let (_, rx) = mpsc::channel(2); + let id = map.add_ent(sink, rx, StreamSendWindow::new(500))?; let expect_id: StreamId = next_id.into(); assert_eq!(expect_id, id); next_id = next_id.wrapping_add(1); @@ -213,10 +222,7 @@ mod test { // Test get_mut. let nonesuch_id = next_id.into(); - assert!(matches!( - map.get_mut(ids[0]), - Some(StreamEnt::Open(_, _, _)) - )); + assert!(matches!(map.get_mut(ids[0]), Some(StreamEnt::Open { .. }))); assert!(map.get_mut(nonesuch_id).is_none()); // Test end_received @@ -226,17 +232,10 @@ mod test { assert!(map.end_received(ids[1]).is_err()); // Test terminate - let window = StreamRecvWindow::new(25); - assert!(map.terminate(nonesuch_id, window.clone()).is_err()); - assert_eq!( - map.terminate(ids[2], window.clone()).unwrap(), - ShouldSendEnd::Send - ); + assert!(map.terminate(nonesuch_id).is_err()); + assert_eq!(map.terminate(ids[2]).unwrap(), ShouldSendEnd::Send); assert!(matches!(map.get_mut(ids[2]), Some(StreamEnt::EndSent(_)))); - assert_eq!( - map.terminate(ids[1], window).unwrap(), - ShouldSendEnd::DontSend - ); + assert_eq!(map.terminate(ids[1]).unwrap(), ShouldSendEnd::DontSend); assert!(matches!(map.get_mut(ids[1]), None)); // Try receiving an end after a terminate. diff --git a/crates/tor-proto/src/crypto/handshake/ntor.rs b/crates/tor-proto/src/crypto/handshake/ntor.rs index 08711b997..48910a797 100644 --- a/crates/tor-proto/src/crypto/handshake/ntor.rs +++ b/crates/tor-proto/src/crypto/handshake/ntor.rs @@ -50,7 +50,7 @@ impl super::ServerHandshake for NtorServer { } /// A set of public keys used by a client to initiate an ntor handshake. -#[derive(Clone)] +#[derive(Clone, Debug)] pub(crate) struct NtorPublicKey { /// Public RSA identity fingerprint for the relay; used in authentication /// calculation. diff --git a/crates/tor-proto/src/stream.rs b/crates/tor-proto/src/stream.rs index f8b91b644..1c92a60a0 100644 --- a/crates/tor-proto/src/stream.rs +++ b/crates/tor-proto/src/stream.rs @@ -16,7 +16,7 @@ mod resolve; pub use data::DataStream; pub use params::StreamParameters; -pub use raw::RawCellStream; +pub use raw::StreamReader; pub use resolve::ResolveStream; pub use tor_cell::relaycell::msg::IpVersionPreference; diff --git a/crates/tor-proto/src/stream/data.rs b/crates/tor-proto/src/stream/data.rs index 1158cb51b..3153fb339 100644 --- a/crates/tor-proto/src/stream/data.rs +++ b/crates/tor-proto/src/stream/data.rs @@ -1,7 +1,6 @@ //! Declare DataStream, a type that wraps RawCellStream so as to be useful //! for byte-oriented communication. -use super::RawCellStream; use crate::{Error, Result}; use tor_cell::relaycell::msg::EndReason; @@ -18,8 +17,9 @@ use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; use std::io::Result as IoResult; use std::pin::Pin; -use std::sync::Arc; +use crate::circuit::StreamTarget; +use crate::stream::StreamReader; use tor_cell::relaycell::msg::{Data, RelayMsg}; /// An anonymized stream over the Tor network. @@ -111,15 +111,14 @@ pub struct DataReader { } impl DataStream { - /// Wrap a RawCellStream as a DataStream. + /// Wrap raw stream reader and target parts as a DataStream. /// /// For non-optimistic stream, function `wait_for_connection` /// must be called after to make sure CONNECTED is received. - pub(crate) fn new(s: RawCellStream) -> Self { - let s = Arc::new(s); + pub(crate) fn new(reader: StreamReader, target: StreamTarget) -> Self { let r = DataReader { state: Some(DataReaderState::Ready(DataReaderImpl { - s: Arc::clone(&s), + s: reader, pending: Vec::new(), offset: 0, connected: false, @@ -127,7 +126,7 @@ impl DataStream { }; let w = DataWriter { state: Some(DataWriterState::Ready(DataWriterImpl { - s, + s: target, buf: Box::new([0; Data::MAXLEN]), n_pending: 0, })), @@ -234,8 +233,8 @@ enum DataWriterState { /// Internal: the write part of a DataStream struct DataWriterImpl { - /// The underlying RawCellStream object. - s: Arc, + /// The underlying StreamTarget object. + s: StreamTarget, /// Buffered data to send over the connection. // TODO: this buffer is probably smaller than we want, but it's good @@ -423,8 +422,8 @@ enum DataReaderState { /// Wrapper for the read part of a DataStream struct DataReaderImpl { - /// The underlying RawCellStream object. - s: Arc, + /// The underlying StreamReader object. + s: StreamReader, /// If present, data that we received on this stream but have not /// been able to send to the caller yet. @@ -546,7 +545,7 @@ impl DataReaderImpl { Ok(RelayMsg::End(e)) => Err(Error::EndReceived(e.reason())), Err(e) => Err(e), Ok(m) => { - self.s.protocol_error().await; + self.s.protocol_error(); Err(Error::StreamProto(format!( "Unexpected {} cell on stream", m.cmd() diff --git a/crates/tor-proto/src/stream/raw.rs b/crates/tor-proto/src/stream/raw.rs index 3a3758bfe..9f8f2aa02 100644 --- a/crates/tor-proto/src/stream/raw.rs +++ b/crates/tor-proto/src/stream/raw.rs @@ -3,45 +3,38 @@ use crate::circuit::{sendme, StreamTarget}; use crate::{Error, Result}; -use tor_cell::relaycell::msg::{RelayMsg, Sendme}; +use tor_cell::relaycell::msg::RelayMsg; +use crate::circuit::sendme::StreamRecvWindow; use futures::channel::mpsc; -use futures::lock::Mutex; use futures::stream::StreamExt; -use std::sync::atomic::{AtomicBool, Ordering}; - -/// A RawCellStream is a client's cell-oriented view of a stream over the -/// Tor network. -pub struct RawCellStream { - /// Wrapped view of the circuit, hop, and streamid that we're using. +/// The read part of a stream on a particular circuit. +pub struct StreamReader { + /// The underlying `StreamTarget` for this stream. + pub(crate) target: StreamTarget, + /// Channel to receive stream messages from the reactor. + pub(crate) receiver: mpsc::Receiver, + /// Congestion control receive window for this stream. /// - /// TODO: do something similar with circuits? - target: Mutex, - /// A Stream over which we receive relay messages. Only relay messages - /// that can be associated with a stream ID will be received. - receiver: Mutex>, - /// Have we been informed that this stream is closed, or received a fatal - /// error? - stream_ended: AtomicBool, + /// Having this here means we're only going to update it when the end consumer of this stream + /// actually reads things, meaning we don't ask for more data until it's actually needed (as + /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself + /// with having to buffer data). + pub(crate) recv_window: StreamRecvWindow, + /// Whether or not this stream has ended. + pub(crate) ended: bool, } -impl RawCellStream { - /// Internal: build a new RawCellStream. - pub(crate) fn new(target: StreamTarget, receiver: mpsc::Receiver) -> Self { - RawCellStream { - target: Mutex::new(target), - receiver: Mutex::new(receiver), - stream_ended: AtomicBool::new(false), - } - } - +impl StreamReader { /// Try to read the next relay message from this stream. - async fn recv_raw(&self) -> Result { + async fn recv_raw(&mut self) -> Result { + if self.ended { + // Prevent reading from streams after they've ended. + return Err(Error::StreamEnded); + } let msg = self .receiver - .lock() - .await .next() .await // This probably means that the other side closed the @@ -50,13 +43,9 @@ impl RawCellStream { Error::StreamProto("stream channel disappeared without END cell?".into()) })?; - // Possibly decrement the window for the cell we just received, and - // send a SENDME if doing so took us under the threshold. - if sendme::msg_counts_towards_windows(&msg) { - let mut target = self.target.lock().await; - if target.recvwindow.take()? { - self.send_sendme(&mut target).await?; - } + if sendme::msg_counts_towards_windows(&msg) && self.recv_window.take()? { + self.target.send_sendme()?; + self.recv_window.put(); } Ok(msg) @@ -64,51 +53,19 @@ impl RawCellStream { /// As recv_raw, but if there is an error or an end cell, note that this /// stream has ended. - pub async fn recv(&self) -> Result { + pub async fn recv(&mut self) -> Result { let val = self.recv_raw().await; match val { Err(_) | Ok(RelayMsg::End(_)) => { - self.note_ended(); + self.ended = true; } _ => {} } val } - /// Send a relay message along this stream - pub async fn send(&self, msg: RelayMsg) -> Result<()> { - self.target.lock().await.send(msg).await - } - - /// Return true if this stream is marked as having ended. - pub fn has_ended(&self) -> bool { - self.stream_ended.load(Ordering::SeqCst) - } - - /// Mark this stream as having ended because of an incoming cell. - fn note_ended(&self) { - self.stream_ended.store(true, Ordering::SeqCst); - } - - /// Inform the circuit-side of this stream about a protocol error - pub async fn protocol_error(&self) { - // TODO: Should this call note_ended? - self.target.lock().await.protocol_error().await - } - - /// Send a SENDME cell and adjust the receive window. - async fn send_sendme(&self, target: &mut StreamTarget) -> Result<()> { - let sendme = Sendme::new_empty(); - target.send(sendme.into()).await?; - target.recvwindow.put(); - Ok(()) - } - - /// Ensure that all the data in this stream has been flushed in to - /// the circuit, and close it. - pub async fn close(self) -> Result<()> { - // Not much to do here right now. - drop(self); - Ok(()) + /// Shut down this stream. + pub fn protocol_error(&mut self) { + self.target.protocol_error(); } } diff --git a/crates/tor-proto/src/stream/resolve.rs b/crates/tor-proto/src/stream/resolve.rs index ae33fe405..1f1ef2313 100644 --- a/crates/tor-proto/src/stream/resolve.rs +++ b/crates/tor-proto/src/stream/resolve.rs @@ -1,6 +1,6 @@ //! Declare a type for streams that do hostname lookups -use super::RawCellStream; +use crate::stream::StreamReader; use crate::{Error, Result}; use tor_cell::relaycell::msg::{RelayMsg, Resolved}; @@ -8,7 +8,7 @@ use tor_cell::relaycell::msg::{RelayMsg, Resolved}; /// cell. pub struct ResolveStream { /// The underlying RawCellStream. - s: RawCellStream, + s: StreamReader, } impl ResolveStream { @@ -16,7 +16,7 @@ impl ResolveStream { /// /// Call only after sending a RESOLVE cell. #[allow(dead_code)] // need to implement a caller for this. - pub(crate) fn new(s: RawCellStream) -> Self { + pub(crate) fn new(s: StreamReader) -> Self { ResolveStream { s } } @@ -28,7 +28,7 @@ impl ResolveStream { RelayMsg::End(e) => Err(Error::EndReceived(e.reason())), RelayMsg::Resolved(r) => Ok(r), m => { - self.s.protocol_error().await; + self.s.protocol_error(); Err(Error::StreamProto(format!( "Unexpected {} on resolve stream", m.cmd() diff --git a/crates/tor-proto/src/util/err.rs b/crates/tor-proto/src/util/err.rs index 0ea13ac91..cdad2a6d8 100644 --- a/crates/tor-proto/src/util/err.rs +++ b/crates/tor-proto/src/util/err.rs @@ -55,6 +55,9 @@ pub enum Error { /// Circuit is closed. #[error("circuit closed")] CircuitClosed, + /// Stream has ended. + #[error("stream ended")] + StreamEnded, /// Can't allocate any more circuit or stream IDs on a channel. #[error("too many entries in map: can't allocate ID")] IdRangeFull, @@ -115,7 +118,9 @@ impl From for std::io::Error { EndReceived(end_reason) => end_reason.into(), - CircDestroy(_) | ChannelClosed | CircuitClosed => ErrorKind::ConnectionReset, + CircDestroy(_) | ChannelClosed | CircuitClosed | StreamEnded => { + ErrorKind::ConnectionReset + } BytesErr(_) | MissingKey | BadCellAuth | BadHandshake | ChanProto(_) | CircProto(_) | CellErr(_) | ChanMismatch(_) | StreamProto(_) => ErrorKind::InvalidData,