diff --git a/crates/tor-proto/src/circuit/reactor.rs b/crates/tor-proto/src/circuit/reactor.rs index 1e0ab8393..f631bb190 100644 --- a/crates/tor-proto/src/circuit/reactor.rs +++ b/crates/tor-proto/src/circuit/reactor.rs @@ -168,12 +168,16 @@ pub(super) struct CircHop { sendwindow: sendme::CircSendWindow, /// Buffer for messages we can't send to this hop yet due to congestion control. /// - /// Contains the tag we should give to the send window, and the cell to send. + /// Contains the cell to send, and a boolean equivalent to the `early` parameter + /// in `Reactor::send_relay_cell` (as in, whether to send the cell using `RELAY_EARLY`). /// /// This shouldn't grow unboundedly: we try and pop things off it first before /// doing things that would result in it growing (and stop before growing it /// if popping things off it can't be done). - outbound: VecDeque<([u8; 20], ChanCell)>, + /// + /// NOTE: Control messages could potentially add unboundedly to this, although that's + /// not likely to happen (and isn't triggereable from the network, either). + outbound: VecDeque<(bool, RelayCell)>, } /// Enumeration to determine whether we require circuit-level SENDME cells to be @@ -413,8 +417,12 @@ pub struct Reactor { pub(super) control: mpsc::UnboundedReceiver, /// Buffer for cells we can't send out the channel yet due to it being full. /// - /// This should be used very very rarely: see `send_msg_direct`'s comments for more - /// information. (in fact, using it will generate a warning!) + /// We try and dequeue off this first before doing anything else, ensuring that + /// it cannot grow unboundedly (and if we start having to enqueue things on here after + /// the channel shows backpressure, we stop pulling from receivers that could send here). + /// + /// NOTE: Control messages could potentially add unboundedly to this, although that's + /// not likely to happen (and isn't triggereable from the network, either). pub(super) outbound: VecDeque, /// The channel this circuit is using to send cells through. pub(super) channel: Channel, @@ -538,28 +546,29 @@ impl Reactor { } // Let's look at our hops, and streams for each hop. - for (i, hop) in self.hops.iter_mut().enumerate() { + for i in 0..self.hops.len() { let hop_num = HopNum::from(i as u8); // If we can, drain our queue of things we tried to send earlier, but // couldn't due to congestion control. - if hop.sendwindow.window() > 0 { - 'hop: while let Some((tag, msg)) = hop.outbound.pop_front() { + if self.hops[i].sendwindow.window() > 0 { + 'hop: while let Some((early, cell)) = self.hops[i].outbound.pop_front() + { trace!( "{}: sending from hop-{}-enqueued: {:?}", self.unique_id, i, - msg + cell ); - Pin::new(&mut self.channel).start_send(msg)?; - hop.sendwindow.take(&tag)?; + self.send_relay_cell(cx, hop_num, early, cell)?; if !self.channel.poll_ready(cx)? { break 'outer; } - if hop.sendwindow.window() == 0 { + if self.hops[i].sendwindow.window() == 0 { break 'hop; } } } + let hop = &mut self.hops[i]; // Look at all of the streams on this hop. for (id, stream) in hop.map.inner().iter_mut() { if let StreamEnt::Open { @@ -897,23 +906,31 @@ impl Reactor { /// that would send here while you know you're unable to forward the messages on). fn send_msg_direct(&mut self, cx: &mut Context<'_>, msg: ChanMsg) -> Result<()> { let cell = ChanCell::new(self.channel_id, msg); - if self.channel.poll_ready(cx)? { + // NOTE(eta): We need to check whether the outbound queue is empty before trying to send: + // if we just checked whether the channel was ready, it'd be possible for + // cells to be sent out of order, since it could transition from not ready to + // ready during one cycle of the reactor! + // (This manifests as a protocol violation.) + if self.outbound.is_empty() && self.channel.poll_ready(cx)? { Pin::new(&mut self.channel).start_send(cell)?; } else { - // This case shouldn't actually happen that often, if ever. We generally check whether - // the channel can be sent to before calling this function (the one exception at the - // time of writing is in circuit creation). + // This has been observed to happen in code that doesn't have bugs in it, simply due + // to the way `Channel`'s `poll_ready` implementation works (it can change due to + // the actions of another thread in between callers of this function checking it, + // and this function checking it). // - // If this is suddenly getting hit and it wasn't before, maybe you added something that - // doesn't bother to check the channel (`self.channel.poll_ready(cx)`) before calling - // this function, and that's getting used a lot? - // - // We don't want to drop cells on the floor, though, so this is good to have. - warn!( + // However, if it's happening a lot more than it used to, that probably indicates + // some caller that's not checking whether the channel is full before calling + // this function. + + debug!( "{}: having to enqueue cell due to backpressure: {:?}", self.unique_id, cell ); self.outbound.push_back(cell); + + // Ensure we absolutely get scheduled again to clear `self.outbound`. + cx.waker().wake_by_ref(); } Ok(()) } @@ -947,8 +964,30 @@ impl Reactor { ) -> Result<()> { let c_t_w = sendme::cell_counts_towards_windows(&cell); let stream_id = cell.stream_id(); + // Check whether the hop send window is empty, if this cell counts towards windows. + // NOTE(eta): It is imperative this happens *before* calling encrypt() below, otherwise + // we'll have cells rejected due to a protocol violation! (Cells have to be + // sent out in the order they were passed to encrypt().) + if c_t_w { + let hop_num = Into::::into(hop); + let hop = &mut self.hops[hop_num]; + if hop.sendwindow.window() == 0 { + // Send window is empty! Push this cell onto the hop's outbound queue, and it'll + // get sent later. + trace!( + "{}: having to use onto hop {} queue for cell: {:?}", + self.unique_id, + hop_num, + cell + ); + hop.outbound.push_back((early, cell)); + return Ok(()); + } + } let mut body: RelayCellBody = cell.encode(&mut rand::thread_rng())?.into(); let tag = self.crypto_out.encrypt(&mut body, hop)?; + // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort + // the whole circuit (e.g. by returning an error). let msg = chancell::msg::Relay::from_raw(body.into()); let msg = if early { ChanMsg::RelayEarly(msg) @@ -960,19 +999,7 @@ impl Reactor { if c_t_w { let hop_num = Into::::into(hop); let hop = &mut self.hops[hop_num]; - if hop.sendwindow.window() == 0 { - let cell = ChanCell::new(self.channel_id, msg); - // Send window is empty! Push this cell onto the hop's outbound queue, and it'll - // get sent later. - trace!( - "{}: having to use onto hop {} queue for cell: {:?}", - self.unique_id, - hop_num, - cell - ); - hop.outbound.push_back((*tag, cell)); - return Ok(()); - } + // checked by earlier conditional, so this shouldn't fail hop.sendwindow.take(tag)?; if !stream_id.is_zero() { // We need to decrement the stream-level sendme window.