diff --git a/tor-proto/src/circuit.rs b/tor-proto/src/circuit.rs index b00f4ebbb..cb2110b21 100644 --- a/tor-proto/src/circuit.rs +++ b/tor-proto/src/circuit.rs @@ -116,6 +116,7 @@ pub(crate) struct StreamTarget { // XXXX truncated and then re-extended. hop: HopNum, circ: ClientCirc, + window: sendme::StreamSendWindow, stream_closed: Cell>>, } @@ -280,7 +281,8 @@ impl ClientCirc { let mut c = self.c.lock().await; let hopnum = c.hops.len() - 1; - let id = c.hops[hopnum].map.add_ent(sender)?; + let window = sendme::StreamSendWindow::new(StreamTarget::WINDOW_INIT); + let id = c.hops[hopnum].map.add_ent(sender, window.new_ref())?; let relaycell = RelayCell::new(id, begin_msg); let hopnum = (hopnum as u8).into(); let (send_close, recv_close) = oneshot::channel::(); @@ -294,6 +296,7 @@ impl ClientCirc { circ: self.clone(), stream_id: id, hop: hopnum, + window, stream_closed: Cell::new(Some(send_close)), }; @@ -584,12 +587,18 @@ impl CreateHandshakeWrap for Create2Wrap { } impl StreamTarget { + const WINDOW_INIT: u16 = 500; + const WINDOW_MAX: u16 = 500; + /// Deliver a relay message for the stream that owns this StreamTarget. /// /// The StreamTarget will set the correct stream ID and pick the /// right hop, but will not validate that the message is well-formed /// or meaningful in context. pub(crate) async fn send(&mut self, msg: RelayMsg) -> Result<()> { + if msg.counts_towards_windows() { + self.window.take(&()).await; + } let cell = RelayCell::new(self.stream_id, msg); let mut c = self.circ.c.lock().await; c.send_relay_cell(self.hop, false, cell).await diff --git a/tor-proto/src/circuit/reactor.rs b/tor-proto/src/circuit/reactor.rs index a03652e31..7839e5556 100644 --- a/tor-proto/src/circuit/reactor.rs +++ b/tor-proto/src/circuit/reactor.rs @@ -214,9 +214,16 @@ impl ReactorCore { return circ.handle_meta_cell(hopnum, msg); } - if let Some(StreamEnt::Open(s)) = hop.map.get_mut(streamid) { + if let Some(StreamEnt::Open(s, w)) = hop.map.get_mut(streamid) { // The stream for this message exists, and is open. + if let RelayMsg::Sendme(_) = msg { + // We need to handle sendmes here, not in the stream, or + // else we'd never notice them if we aren't reading. + w.put(()).await; + return Ok(()); + } + // Remember if this was an end cell: if so we should close // the stram. let end_cell = matches!(msg, RelayMsg::End(_)); diff --git a/tor-proto/src/circuit/sendme.rs b/tor-proto/src/circuit/sendme.rs index d02eb1567..8c22c57fa 100644 --- a/tor-proto/src/circuit/sendme.rs +++ b/tor-proto/src/circuit/sendme.rs @@ -9,8 +9,8 @@ use std::sync::Arc; pub type CircTag = [u8; 20]; pub type NoTag = (); -pub type CircSendWindow = SendWindow; -pub type StreamSendWindow = SendWindow; +pub type CircSendWindow = SendWindow; +pub type StreamSendWindow = SendWindow; pub type CircRecvWindow = RecvWindow; pub type StreamRecvWindow = RecvWindow; @@ -18,7 +18,7 @@ pub type StreamRecvWindow = RecvWindow; pub struct SendWindow where I: WindowInc, - T: PartialEq + Eq, + T: PartialEq + Eq + Clone, { // TODO could use a bilock if that becomes non-experimental. // TODO I wish we could do this without locking; we could make a bunch @@ -29,7 +29,7 @@ where struct SendWindowInner where - T: PartialEq + Eq, + T: PartialEq + Eq + Clone, { window: u16, tags: VecDeque, @@ -55,7 +55,7 @@ impl WindowInc for StreamInc { impl SendWindow where I: WindowInc, - T: PartialEq + Eq, + T: PartialEq + Eq + Clone, { pub fn new(window: u16) -> SendWindow { let increment = I::get_val(); @@ -71,12 +71,23 @@ where } } - pub async fn take(&mut self) -> u16 { + pub fn new_ref(&self) -> Self { + SendWindow { + w: Arc::clone(&self.w), + _dummy: std::marker::PhantomData, + } + } + + pub async fn take(&mut self, tag: &T) -> u16 { loop { let wait_on = { let mut w = self.w.lock().await; if let Some(val) = w.window.checked_sub(1) { w.window = val; + if val % I::get_val() == 0 { + // We record this tag. + w.tags.push_back(tag.clone()); + } return val; } @@ -96,11 +107,6 @@ where } } - pub async fn push_tag(&mut self, tag: T) { - let mut w = self.w.lock().await; - w.tags.push_back(tag); - } - pub async fn put(&mut self, tag: T) -> Option { let mut w = self.w.lock().await; @@ -137,12 +143,14 @@ impl RecvWindow { } } - pub fn take(&mut self) -> Option { + pub fn take(&mut self) -> Option { let v = self.window.checked_sub(1); if let Some(x) = v { self.window = x; + Some(x % I::get_val() == 0) + } else { + None } - v } pub fn put(&mut self) { diff --git a/tor-proto/src/circuit/streammap.rs b/tor-proto/src/circuit/streammap.rs index bced2066b..899685f52 100644 --- a/tor-proto/src/circuit/streammap.rs +++ b/tor-proto/src/circuit/streammap.rs @@ -1,3 +1,4 @@ +use crate::circuit::sendme; /// Mapping from stream ID to streams. // NOTE: This is a work in progress and I bet I'll refactor it a lot; // it needs to stay opaque! @@ -14,7 +15,7 @@ use rand::Rng; pub(super) enum StreamEnt { /// An open stream: any relay cells tagged for this stream should get /// sent over the mpsc::Sender. - Open(mpsc::Sender), + Open(mpsc::Sender, sendme::StreamSendWindow), /// A stream for which we have received an END cell, but not yet /// had the stream object get dropped. Closing, @@ -41,8 +42,12 @@ impl StreamMap { } /// Add an entry to this map; return the newly allocated StreamID. - pub(super) fn add_ent(&mut self, sink: mpsc::Sender) -> Result { - let ent = StreamEnt::Open(sink); + pub(super) fn add_ent( + &mut self, + sink: mpsc::Sender, + window: sendme::StreamSendWindow, + ) -> Result { + let ent = StreamEnt::Open(sink, window); let mut iter = (&mut self.i).map(|x| x.into()).take(65536); self.m.add_ent(&mut iter, ent) } @@ -60,7 +65,7 @@ impl StreamMap { match old { None => false, Some(StreamEnt::Closing) => false, - Some(StreamEnt::Open(_)) => true, + Some(StreamEnt::Open(_, _)) => true, } } @@ -71,7 +76,7 @@ impl StreamMap { match old { None => false, Some(StreamEnt::Closing) => false, - Some(StreamEnt::Open(_)) => true, + Some(StreamEnt::Open(_, _)) => true, } } diff --git a/tor-proto/src/relaycell/msg.rs b/tor-proto/src/relaycell/msg.rs index 62f191f1d..ddcab0c3f 100644 --- a/tor-proto/src/relaycell/msg.rs +++ b/tor-proto/src/relaycell/msg.rs @@ -235,6 +235,14 @@ impl RelayMsg { Unrecognized(b) => b.encode_onto(w), } } + + /// Return true if this message is counted by flow-control windows. + pub(crate) fn counts_towards_windows(&self) -> bool { + match self { + RelayMsg::Sendme(_) => false, + _ => true, + } + } } /// Message to create a enw stream @@ -442,7 +450,21 @@ impl Body for Connected { pub struct Sendme { digest: Option>, } - +impl Sendme { + /// Return a new empty sendme cell + /// + /// This format is used on streams, and on circuits without sendme + /// authentication. + pub fn new_empty() -> Self { + Sendme { digest: None } + } + /// This format is used on circuits with sendme authentication. + fn new_tag(x: [u8; 20]) -> Self { + Sendme { + digest: Some(x.into()), + } + } +} impl Body for Sendme { fn as_message(self) -> RelayMsg { RelayMsg::Sendme(self) diff --git a/tor-proto/src/stream.rs b/tor-proto/src/stream.rs index 27d51ec82..ee0648a69 100644 --- a/tor-proto/src/stream.rs +++ b/tor-proto/src/stream.rs @@ -12,8 +12,8 @@ //! //! XXXX TODO: There is no fariness, rate-limiting, or flow control. -use crate::circuit::StreamTarget; -use crate::relaycell::msg::{Data, RelayMsg, Resolved}; +use crate::circuit::{sendme, StreamTarget}; +use crate::relaycell::msg::{Data, RelayMsg, Resolved, Sendme}; use crate::{Error, Result}; use futures::channel::mpsc; @@ -26,39 +26,62 @@ pub struct TorStream { /// /// TODO: do something similar with circuits? target: StreamTarget, + /// Window to track incoming cells and SENDMEs. + recvwindow: sendme::StreamRecvWindow, /// A Stream over which we receive relay messages. Only relay messages /// that can be associated with a stream ID will be received. receiver: mpsc::Receiver, - /// Have we been informed that this stream is closed? If so this is /// the message or the error that told us. received_end: Option>, } impl TorStream { + const RECV_INIT: u16 = 500; + /// Internal: build a new TorStream. pub(crate) fn new(target: StreamTarget, receiver: mpsc::Receiver) -> Self { TorStream { target, receiver, + recvwindow: sendme::StreamRecvWindow::new(500), received_end: None, } } /// Try to read the next relay message from this stream. pub async fn recv(&mut self) -> Result { - self.receiver + let msg = self + .receiver .next() .await // This probably means that the other side closed the // mpsc channel. - .ok_or_else(|| Error::StreamClosed("stream channel disappeared without END cell?")) + .ok_or_else(|| Error::StreamClosed("stream channel disappeared without END cell?"))?; + + if msg.counts_towards_windows() { + match self.recvwindow.take() { + Some(true) => self.send_sendme().await?, + Some(false) => {} + None => return Err(Error::StreamProto("stream violated SENDME window".into())), + } + } + + Ok(msg) } /// Send a relay message along this stream pub async fn send(&mut self, msg: RelayMsg) -> Result<()> { self.target.send(msg).await } + + /// Send a SENDME cell and adjust the receive window. + async fn send_sendme(&mut self) -> Result<()> { + let sendme = Sendme::new_empty(); + self.target.send(sendme.into()).await?; + self.recvwindow.put(); + Ok(()) + } } /// A DataStream is a wrapper around a TorStream for byte-oriented IO.