Attempted implementation of stream sendme cells

This commit is contained in:
Nick Mathewson 2020-09-25 10:25:24 -04:00
parent 92463dc03e
commit 06ede86f7f
6 changed files with 100 additions and 26 deletions

View File

@ -116,6 +116,7 @@ pub(crate) struct StreamTarget {
// XXXX truncated and then re-extended.
hop: HopNum,
circ: ClientCirc,
window: sendme::StreamSendWindow,
stream_closed: Cell<Option<oneshot::Sender<CtrlMsg>>>,
}
@ -280,7 +281,8 @@ impl ClientCirc {
let mut c = self.c.lock().await;
let hopnum = c.hops.len() - 1;
let id = c.hops[hopnum].map.add_ent(sender)?;
let window = sendme::StreamSendWindow::new(StreamTarget::WINDOW_INIT);
let id = c.hops[hopnum].map.add_ent(sender, window.new_ref())?;
let relaycell = RelayCell::new(id, begin_msg);
let hopnum = (hopnum as u8).into();
let (send_close, recv_close) = oneshot::channel::<CtrlMsg>();
@ -294,6 +296,7 @@ impl ClientCirc {
circ: self.clone(),
stream_id: id,
hop: hopnum,
window,
stream_closed: Cell::new(Some(send_close)),
};
@ -584,12 +587,18 @@ impl CreateHandshakeWrap for Create2Wrap {
}
impl StreamTarget {
const WINDOW_INIT: u16 = 500;
const WINDOW_MAX: u16 = 500;
/// Deliver a relay message for the stream that owns this StreamTarget.
///
/// The StreamTarget will set the correct stream ID and pick the
/// right hop, but will not validate that the message is well-formed
/// or meaningful in context.
pub(crate) async fn send(&mut self, msg: RelayMsg) -> Result<()> {
if msg.counts_towards_windows() {
self.window.take(&()).await;
}
let cell = RelayCell::new(self.stream_id, msg);
let mut c = self.circ.c.lock().await;
c.send_relay_cell(self.hop, false, cell).await

View File

@ -214,9 +214,16 @@ impl ReactorCore {
return circ.handle_meta_cell(hopnum, msg);
}
if let Some(StreamEnt::Open(s)) = hop.map.get_mut(streamid) {
if let Some(StreamEnt::Open(s, w)) = hop.map.get_mut(streamid) {
// The stream for this message exists, and is open.
if let RelayMsg::Sendme(_) = msg {
// We need to handle sendmes here, not in the stream, or
// else we'd never notice them if we aren't reading.
w.put(()).await;
return Ok(());
}
// Remember if this was an end cell: if so we should close
// the stram.
let end_cell = matches!(msg, RelayMsg::End(_));

View File

@ -9,8 +9,8 @@ use std::sync::Arc;
pub type CircTag = [u8; 20];
pub type NoTag = ();
pub type CircSendWindow = SendWindow<CircTag, CircInc>;
pub type StreamSendWindow = SendWindow<NoTag, StreamInc>;
pub type CircSendWindow = SendWindow<CircInc, CircTag>;
pub type StreamSendWindow = SendWindow<StreamInc, NoTag>;
pub type CircRecvWindow = RecvWindow<CircInc>;
pub type StreamRecvWindow = RecvWindow<StreamInc>;
@ -18,7 +18,7 @@ pub type StreamRecvWindow = RecvWindow<StreamInc>;
pub struct SendWindow<I, T>
where
I: WindowInc,
T: PartialEq + Eq,
T: PartialEq + Eq + Clone,
{
// TODO could use a bilock if that becomes non-experimental.
// TODO I wish we could do this without locking; we could make a bunch
@ -29,7 +29,7 @@ where
struct SendWindowInner<T>
where
T: PartialEq + Eq,
T: PartialEq + Eq + Clone,
{
window: u16,
tags: VecDeque<T>,
@ -55,7 +55,7 @@ impl WindowInc for StreamInc {
impl<I, T> SendWindow<I, T>
where
I: WindowInc,
T: PartialEq + Eq,
T: PartialEq + Eq + Clone,
{
pub fn new(window: u16) -> SendWindow<I, T> {
let increment = I::get_val();
@ -71,12 +71,23 @@ where
}
}
pub async fn take(&mut self) -> u16 {
pub fn new_ref(&self) -> Self {
SendWindow {
w: Arc::clone(&self.w),
_dummy: std::marker::PhantomData,
}
}
pub async fn take(&mut self, tag: &T) -> u16 {
loop {
let wait_on = {
let mut w = self.w.lock().await;
if let Some(val) = w.window.checked_sub(1) {
w.window = val;
if val % I::get_val() == 0 {
// We record this tag.
w.tags.push_back(tag.clone());
}
return val;
}
@ -96,11 +107,6 @@ where
}
}
pub async fn push_tag(&mut self, tag: T) {
let mut w = self.w.lock().await;
w.tags.push_back(tag);
}
pub async fn put(&mut self, tag: T) -> Option<u16> {
let mut w = self.w.lock().await;
@ -137,12 +143,14 @@ impl<I: WindowInc> RecvWindow<I> {
}
}
pub fn take(&mut self) -> Option<u16> {
pub fn take(&mut self) -> Option<bool> {
let v = self.window.checked_sub(1);
if let Some(x) = v {
self.window = x;
Some(x % I::get_val() == 0)
} else {
None
}
v
}
pub fn put(&mut self) {

View File

@ -1,3 +1,4 @@
use crate::circuit::sendme;
/// Mapping from stream ID to streams.
// NOTE: This is a work in progress and I bet I'll refactor it a lot;
// it needs to stay opaque!
@ -14,7 +15,7 @@ use rand::Rng;
pub(super) enum StreamEnt {
/// An open stream: any relay cells tagged for this stream should get
/// sent over the mpsc::Sender.
Open(mpsc::Sender<RelayMsg>),
Open(mpsc::Sender<RelayMsg>, sendme::StreamSendWindow),
/// A stream for which we have received an END cell, but not yet
/// had the stream object get dropped.
Closing,
@ -41,8 +42,12 @@ impl StreamMap {
}
/// Add an entry to this map; return the newly allocated StreamID.
pub(super) fn add_ent(&mut self, sink: mpsc::Sender<RelayMsg>) -> Result<StreamID> {
let ent = StreamEnt::Open(sink);
pub(super) fn add_ent(
&mut self,
sink: mpsc::Sender<RelayMsg>,
window: sendme::StreamSendWindow,
) -> Result<StreamID> {
let ent = StreamEnt::Open(sink, window);
let mut iter = (&mut self.i).map(|x| x.into()).take(65536);
self.m.add_ent(&mut iter, ent)
}
@ -60,7 +65,7 @@ impl StreamMap {
match old {
None => false,
Some(StreamEnt::Closing) => false,
Some(StreamEnt::Open(_)) => true,
Some(StreamEnt::Open(_, _)) => true,
}
}
@ -71,7 +76,7 @@ impl StreamMap {
match old {
None => false,
Some(StreamEnt::Closing) => false,
Some(StreamEnt::Open(_)) => true,
Some(StreamEnt::Open(_, _)) => true,
}
}

View File

@ -235,6 +235,14 @@ impl RelayMsg {
Unrecognized(b) => b.encode_onto(w),
}
}
/// Return true if this message is counted by flow-control windows.
pub(crate) fn counts_towards_windows(&self) -> bool {
match self {
RelayMsg::Sendme(_) => false,
_ => true,
}
}
}
/// Message to create a enw stream
@ -442,7 +450,21 @@ impl Body for Connected {
pub struct Sendme {
digest: Option<Vec<u8>>,
}
impl Sendme {
/// Return a new empty sendme cell
///
/// This format is used on streams, and on circuits without sendme
/// authentication.
pub fn new_empty() -> Self {
Sendme { digest: None }
}
/// This format is used on circuits with sendme authentication.
fn new_tag(x: [u8; 20]) -> Self {
Sendme {
digest: Some(x.into()),
}
}
}
impl Body for Sendme {
fn as_message(self) -> RelayMsg {
RelayMsg::Sendme(self)

View File

@ -12,8 +12,8 @@
//!
//! XXXX TODO: There is no fariness, rate-limiting, or flow control.
use crate::circuit::StreamTarget;
use crate::relaycell::msg::{Data, RelayMsg, Resolved};
use crate::circuit::{sendme, StreamTarget};
use crate::relaycell::msg::{Data, RelayMsg, Resolved, Sendme};
use crate::{Error, Result};
use futures::channel::mpsc;
@ -26,39 +26,62 @@ pub struct TorStream {
///
/// TODO: do something similar with circuits?
target: StreamTarget,
/// Window to track incoming cells and SENDMEs.
recvwindow: sendme::StreamRecvWindow,
/// A Stream over which we receive relay messages. Only relay messages
/// that can be associated with a stream ID will be received.
receiver: mpsc::Receiver<RelayMsg>,
/// Have we been informed that this stream is closed? If so this is
/// the message or the error that told us.
received_end: Option<Result<RelayMsg>>,
}
impl TorStream {
const RECV_INIT: u16 = 500;
/// Internal: build a new TorStream.
pub(crate) fn new(target: StreamTarget, receiver: mpsc::Receiver<RelayMsg>) -> Self {
TorStream {
target,
receiver,
recvwindow: sendme::StreamRecvWindow::new(500),
received_end: None,
}
}
/// Try to read the next relay message from this stream.
pub async fn recv(&mut self) -> Result<RelayMsg> {
self.receiver
let msg = self
.receiver
.next()
.await
// This probably means that the other side closed the
// mpsc channel.
.ok_or_else(|| Error::StreamClosed("stream channel disappeared without END cell?"))
.ok_or_else(|| Error::StreamClosed("stream channel disappeared without END cell?"))?;
if msg.counts_towards_windows() {
match self.recvwindow.take() {
Some(true) => self.send_sendme().await?,
Some(false) => {}
None => return Err(Error::StreamProto("stream violated SENDME window".into())),
}
}
Ok(msg)
}
/// Send a relay message along this stream
pub async fn send(&mut self, msg: RelayMsg) -> Result<()> {
self.target.send(msg).await
}
/// Send a SENDME cell and adjust the receive window.
async fn send_sendme(&mut self) -> Result<()> {
let sendme = Sendme::new_empty();
self.target.send(sendme.into()).await?;
self.recvwindow.put();
Ok(())
}
}
/// A DataStream is a wrapper around a TorStream for byte-oriented IO.