Merge branch 'eta/reactor-2.5' into 'main'

Fix severe reactor ordering problems

See merge request tpo/core/arti!282
This commit is contained in:
eta 2022-02-03 16:27:57 +00:00
commit e9087e1fed
1 changed files with 61 additions and 34 deletions

View File

@ -168,12 +168,16 @@ pub(super) struct CircHop {
sendwindow: sendme::CircSendWindow,
/// Buffer for messages we can't send to this hop yet due to congestion control.
///
/// Contains the tag we should give to the send window, and the cell to send.
/// Contains the cell to send, and a boolean equivalent to the `early` parameter
/// in `Reactor::send_relay_cell` (as in, whether to send the cell using `RELAY_EARLY`).
///
/// This shouldn't grow unboundedly: we try and pop things off it first before
/// doing things that would result in it growing (and stop before growing it
/// if popping things off it can't be done).
outbound: VecDeque<([u8; 20], ChanCell)>,
///
/// NOTE: Control messages could potentially add unboundedly to this, although that's
/// not likely to happen (and isn't triggereable from the network, either).
outbound: VecDeque<(bool, RelayCell)>,
}
/// Enumeration to determine whether we require circuit-level SENDME cells to be
@ -413,8 +417,12 @@ pub struct Reactor {
pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
/// Buffer for cells we can't send out the channel yet due to it being full.
///
/// This should be used very very rarely: see `send_msg_direct`'s comments for more
/// information. (in fact, using it will generate a warning!)
/// We try and dequeue off this first before doing anything else, ensuring that
/// it cannot grow unboundedly (and if we start having to enqueue things on here after
/// the channel shows backpressure, we stop pulling from receivers that could send here).
///
/// NOTE: Control messages could potentially add unboundedly to this, although that's
/// not likely to happen (and isn't triggereable from the network, either).
pub(super) outbound: VecDeque<ChanCell>,
/// The channel this circuit is using to send cells through.
pub(super) channel: Channel,
@ -538,28 +546,29 @@ impl Reactor {
}
// Let's look at our hops, and streams for each hop.
for (i, hop) in self.hops.iter_mut().enumerate() {
for i in 0..self.hops.len() {
let hop_num = HopNum::from(i as u8);
// If we can, drain our queue of things we tried to send earlier, but
// couldn't due to congestion control.
if hop.sendwindow.window() > 0 {
'hop: while let Some((tag, msg)) = hop.outbound.pop_front() {
if self.hops[i].sendwindow.window() > 0 {
'hop: while let Some((early, cell)) = self.hops[i].outbound.pop_front()
{
trace!(
"{}: sending from hop-{}-enqueued: {:?}",
self.unique_id,
i,
msg
cell
);
Pin::new(&mut self.channel).start_send(msg)?;
hop.sendwindow.take(&tag)?;
self.send_relay_cell(cx, hop_num, early, cell)?;
if !self.channel.poll_ready(cx)? {
break 'outer;
}
if hop.sendwindow.window() == 0 {
if self.hops[i].sendwindow.window() == 0 {
break 'hop;
}
}
}
let hop = &mut self.hops[i];
// Look at all of the streams on this hop.
for (id, stream) in hop.map.inner().iter_mut() {
if let StreamEnt::Open {
@ -897,23 +906,31 @@ impl Reactor {
/// that would send here while you know you're unable to forward the messages on).
fn send_msg_direct(&mut self, cx: &mut Context<'_>, msg: ChanMsg) -> Result<()> {
let cell = ChanCell::new(self.channel_id, msg);
if self.channel.poll_ready(cx)? {
// NOTE(eta): We need to check whether the outbound queue is empty before trying to send:
// if we just checked whether the channel was ready, it'd be possible for
// cells to be sent out of order, since it could transition from not ready to
// ready during one cycle of the reactor!
// (This manifests as a protocol violation.)
if self.outbound.is_empty() && self.channel.poll_ready(cx)? {
Pin::new(&mut self.channel).start_send(cell)?;
} else {
// This case shouldn't actually happen that often, if ever. We generally check whether
// the channel can be sent to before calling this function (the one exception at the
// time of writing is in circuit creation).
// This has been observed to happen in code that doesn't have bugs in it, simply due
// to the way `Channel`'s `poll_ready` implementation works (it can change due to
// the actions of another thread in between callers of this function checking it,
// and this function checking it).
//
// If this is suddenly getting hit and it wasn't before, maybe you added something that
// doesn't bother to check the channel (`self.channel.poll_ready(cx)`) before calling
// this function, and that's getting used a lot?
//
// We don't want to drop cells on the floor, though, so this is good to have.
warn!(
// However, if it's happening a lot more than it used to, that probably indicates
// some caller that's not checking whether the channel is full before calling
// this function.
debug!(
"{}: having to enqueue cell due to backpressure: {:?}",
self.unique_id, cell
);
self.outbound.push_back(cell);
// Ensure we absolutely get scheduled again to clear `self.outbound`.
cx.waker().wake_by_ref();
}
Ok(())
}
@ -947,8 +964,30 @@ impl Reactor {
) -> Result<()> {
let c_t_w = sendme::cell_counts_towards_windows(&cell);
let stream_id = cell.stream_id();
// Check whether the hop send window is empty, if this cell counts towards windows.
// NOTE(eta): It is imperative this happens *before* calling encrypt() below, otherwise
// we'll have cells rejected due to a protocol violation! (Cells have to be
// sent out in the order they were passed to encrypt().)
if c_t_w {
let hop_num = Into::<usize>::into(hop);
let hop = &mut self.hops[hop_num];
if hop.sendwindow.window() == 0 {
// Send window is empty! Push this cell onto the hop's outbound queue, and it'll
// get sent later.
trace!(
"{}: having to use onto hop {} queue for cell: {:?}",
self.unique_id,
hop_num,
cell
);
hop.outbound.push_back((early, cell));
return Ok(());
}
}
let mut body: RelayCellBody = cell.encode(&mut rand::thread_rng())?.into();
let tag = self.crypto_out.encrypt(&mut body, hop)?;
// NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
// the whole circuit (e.g. by returning an error).
let msg = chancell::msg::Relay::from_raw(body.into());
let msg = if early {
ChanMsg::RelayEarly(msg)
@ -960,19 +999,7 @@ impl Reactor {
if c_t_w {
let hop_num = Into::<usize>::into(hop);
let hop = &mut self.hops[hop_num];
if hop.sendwindow.window() == 0 {
let cell = ChanCell::new(self.channel_id, msg);
// Send window is empty! Push this cell onto the hop's outbound queue, and it'll
// get sent later.
trace!(
"{}: having to use onto hop {} queue for cell: {:?}",
self.unique_id,
hop_num,
cell
);
hop.outbound.push_back((*tag, cell));
return Ok(());
}
// checked by earlier conditional, so this shouldn't fail
hop.sendwindow.take(tag)?;
if !stream_id.is_zero() {
// We need to decrement the stream-level sendme window.