Merge branch 'proto-circuit-refactor' into 'main'

Completely overhaul the tor-proto circuit reactor

See merge request tpo/core/arti!126
This commit is contained in:
eta 2021-11-12 15:22:21 +00:00
commit 5bdc44d14d
16 changed files with 1556 additions and 1381 deletions

1
Cargo.lock generated
View File

@ -2823,7 +2823,6 @@ dependencies = [
"coarsetime",
"crypto-mac",
"digest",
"event-listener",
"futures",
"generic-array",
"hex",

View File

@ -90,6 +90,7 @@ async fn create_common<RNG: CryptoRng + Rng + Send, RT: Runtime, CT: ChanTarget>
Ok(pending_circ)
}
// FIXME(eta): de-Arc-ify this
#[async_trait]
impl Buildable for Arc<ClientCirc> {
async fn create_chantarget<RNG: CryptoRng + Rng + Send, RT: Runtime>(
@ -100,7 +101,8 @@ impl Buildable for Arc<ClientCirc> {
params: &CircParameters,
) -> Result<Self> {
let circ = create_common(chanmgr, rt, rng, ct).await?;
Ok(circ.create_firsthop_fast(rng, params).await?)
// FIXME(eta): don't clone the params?
Ok(Arc::new(circ.create_firsthop_fast(params.clone()).await?))
}
async fn create<RNG: CryptoRng + Rng + Send, RT: Runtime>(
chanmgr: &ChanMgr<RT>,
@ -110,16 +112,19 @@ impl Buildable for Arc<ClientCirc> {
params: &CircParameters,
) -> Result<Self> {
let circ = create_common(chanmgr, rt, rng, ct).await?;
Ok(circ.create_firsthop_ntor(rng, ct, params).await?)
Ok(Arc::new(
circ.create_firsthop_ntor(ct, params.clone()).await?,
))
}
async fn extend<RNG: CryptoRng + Rng + Send, RT: Runtime>(
&self,
_rt: &RT,
rng: &mut RNG,
// FIXME(eta): get rid of this RNG parameter?
_rng: &mut RNG,
ct: &OwnedCircTarget,
params: &CircParameters,
) -> Result<()> {
ClientCirc::extend_ntor(self, rng, ct, params).await?;
ClientCirc::extend_ntor(self, ct, params).await?;
Ok(())
}
}

View File

@ -31,7 +31,6 @@ bytes = "1.0.1"
cipher = "0.3.0"
crypto-mac = "0.11.0"
digest = "0.9.0"
event-listener = "2.5.1"
futures = "0.3.13"
asynchronous-codec = "0.6.0"
generic-array = "0.14.4"
@ -51,6 +50,6 @@ tokio-util = { version = "0.6", features = ["compat"], optional = true }
coarsetime = { version = "0.1.20", optional = true }
[dev-dependencies]
tokio-crate = { package = "tokio", version = "1.7.0", features = ["macros", "rt"] }
tokio-crate = { package = "tokio", version = "1.7.0", features = ["macros", "rt", "time"] }
hex-literal = "0.3.1"
hex = "0.4.3"

View File

@ -67,6 +67,7 @@ pub use crate::channel::unique_id::UniqId;
use crate::circuit;
use crate::circuit::celltypes::CreateResponse;
use crate::{Error, Result};
use std::pin::Pin;
use tor_cell::chancell::{msg, ChanCell, CircId};
use tor_linkspec::ChanTarget;
use tor_llcrypto::pk::ed25519::Ed25519Identity;
@ -76,9 +77,10 @@ use asynchronous_codec as futures_codec;
use futures::channel::{mpsc, oneshot};
use futures::io::{AsyncRead, AsyncWrite};
use futures::SinkExt;
use futures::{Sink, SinkExt};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tracing::trace;
@ -109,6 +111,55 @@ pub struct Channel {
cell_tx: mpsc::Sender<ChanCell>,
}
impl Sink<ChanCell> for Channel {
type Error = Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.get_mut();
Pin::new(&mut this.cell_tx)
.poll_ready(cx)
.map_err(|_| Error::ChannelClosed)
}
fn start_send(self: Pin<&mut Self>, cell: ChanCell) -> Result<()> {
let this = self.get_mut();
if this.closed.load(Ordering::SeqCst) {
return Err(Error::ChannelClosed);
}
this.check_cell(&cell)?;
{
use msg::ChanMsg::*;
match cell.msg() {
Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log.
_ => trace!(
"{}: Sending {} for {}",
this.unique_id,
cell.msg().cmd(),
cell.circid()
),
}
}
Pin::new(&mut this.cell_tx)
.start_send(cell)
.map_err(|_| Error::ChannelClosed)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.get_mut();
Pin::new(&mut this.cell_tx)
.poll_flush(cx)
.map_err(|_| Error::ChannelClosed)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.get_mut();
Pin::new(&mut this.cell_tx)
.poll_close(cx)
.map_err(|_| Error::ChannelClosed)
}
}
/// Structure for building and launching a Tor channel.
pub struct ChannelBuilder {
/// If present, a description of the address we're trying to connect to,
@ -261,29 +312,18 @@ impl Channel {
}
}
/// Like `futures::Sink::poll_ready`.
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result<bool> {
Ok(match Pin::new(&mut self.cell_tx).poll_ready(cx) {
Poll::Ready(Ok(_)) => true,
Poll::Ready(Err(_)) => return Err(Error::CircuitClosed),
Poll::Pending => false,
})
}
/// Transmit a single cell on a channel.
pub async fn send_cell(&mut self, cell: ChanCell) -> Result<()> {
if self.closed.load(Ordering::SeqCst) {
return Err(Error::ChannelClosed);
}
self.check_cell(&cell)?;
{
use msg::ChanMsg::*;
match cell.msg() {
Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log.
_ => trace!(
"{}: Sending {} for {}",
self.unique_id,
cell.msg().cmd(),
cell.circid()
),
}
}
self.cell_tx
.send(cell)
.await
.map_err(|_| Error::InternalError("Reactor not alive to receive cells".into()))?;
self.send(cell).await?;
Ok(())
}
@ -318,13 +358,10 @@ impl Channel {
trace!("{}: Allocated CircId {}", circ_unique_id, id);
let destroy_handle = CircDestroyHandle::new(id, self.control.clone());
Ok(circuit::PendingClientCirc::new(
id,
self.clone(),
createdreceiver,
Some(destroy_handle),
receiver,
circ_unique_id,
))
@ -342,27 +379,13 @@ impl Channel {
pub fn terminate(&self) {
let _ = self.control.unbounded_send(CtrlMsg::Shutdown);
}
}
/// Helper structure: when this is dropped, the reactor is told to kill
/// the circuit.
pub(crate) struct CircDestroyHandle {
/// The circuit ID in question
id: CircId,
/// A sender to tell the reactor.
sender: mpsc::UnboundedSender<CtrlMsg>,
}
impl CircDestroyHandle {
/// Create a new CircDestroyHandle
fn new(id: CircId, sender: mpsc::UnboundedSender<CtrlMsg>) -> Self {
CircDestroyHandle { id, sender }
}
}
impl Drop for CircDestroyHandle {
fn drop(&mut self) {
let _ignore_cancel = self.sender.unbounded_send(CtrlMsg::CloseCircuit(self.id));
/// Tell the reactor that the circuit with the given ID has gone away.
pub fn close_circuit(&self, circid: CircId) -> Result<()> {
self.control
.unbounded_send(CtrlMsg::CloseCircuit(circid))
.map_err(|_| Error::ChannelClosed)?;
Ok(())
}
}

View File

@ -281,11 +281,11 @@ impl Reactor {
match self.circs.get_mut(circid) {
Some(CircEnt::Open(s)) => {
// There's an open circuit; we can give it the RELAY cell.
// XXXX I think that this one actually means the other side
// is closed. If we see it IRL we should maybe ignore it.
s.send(msg.try_into()?).await.map_err(|_| {
Error::InternalError("Circuit queue rejected message. Is it closing?".into())
})
if s.send(msg.try_into()?).await.is_err() {
// The circuit's receiver went away, so we should destroy the circuit.
self.outbound_destroy_circ(circid).await?;
}
Ok(())
}
Some(CircEnt::Opening(_, _)) => Err(Error::ChanProto(
"Relay cell on pending circuit before CREATED* received".into(),
@ -397,6 +397,7 @@ pub(crate) mod test {
use futures::stream::StreamExt;
use tokio::test as async_test;
use tokio_crate as tokio;
use tokio_crate::runtime::Handle;
use crate::circuit::CircParameters;
@ -469,10 +470,11 @@ pub(crate) mod test {
let (chan, mut reactor, mut output, _input) = new_reactor();
let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
let (pending, _circr) = ret.unwrap();
let (pending, circr) = ret.unwrap();
Handle::current().spawn(circr.run());
assert!(reac.is_ok());
let id = pending.peek_circid().await;
let id = pending.peek_circid();
let ent = reactor.circs.get_mut(id);
assert!(matches!(ent, Some(CircEnt::Opening(_, _))));
@ -492,34 +494,38 @@ pub(crate) mod test {
#[async_test]
async fn new_circ_create_failure() {
use tor_cell::chancell::msg;
let mut rng = rand::thread_rng();
let (chan, mut reactor, mut output, mut input) = new_reactor();
let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
let (pending, _circr) = ret.unwrap();
let (pending, circr) = ret.unwrap();
Handle::current().spawn(circr.run());
assert!(reac.is_ok());
let circparams = CircParameters::default();
let id = pending.peek_circid().await;
let id = pending.peek_circid();
eprintln!("abc");
let ent = reactor.circs.get_mut(id);
assert!(matches!(ent, Some(CircEnt::Opening(_, _))));
// We'll get a bad handshake result from this createdfast cell.
let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into());
input.send(Ok(created_cell)).await.unwrap();
eprintln!("def");
let (circ, reac) = futures::join!(
pending.create_firsthop_fast(&mut rng, &circparams),
reactor.run_once()
);
let (circ, reac) =
futures::join!(pending.create_firsthop_fast(circparams), reactor.run_once());
// Make sure statuses are as expected.
assert!(matches!(circ.err().unwrap(), Error::BadHandshake));
assert!(reac.is_ok());
eprintln!("ghi");
reactor.run_once().await.unwrap();
// Make sure that the createfast cell got sent
let cell_sent = output.next().await.unwrap();
assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_)));
eprintln!("jkkl");
// The circid now counts as open, since as far as the reactor knows,
// it was accepted. (TODO: is this a bug?)

File diff suppressed because it is too large Load Diff

View File

@ -46,10 +46,10 @@ impl HalfStream {
/// The caller must handle END cells; it is an internal error to pass
/// END cells to this method.
/// no ends here.
pub(super) async fn handle_msg(&mut self, msg: &RelayMsg) -> Result<()> {
pub(super) fn handle_msg(&mut self, msg: &RelayMsg) -> Result<()> {
match msg {
RelayMsg::Sendme(_) => {
self.sendw.put(Some(())).await.ok_or_else(|| {
self.sendw.put(Some(())).ok_or_else(|| {
Error::CircProto("Too many sendmes on a closed stream!".into())
})?;
Ok(())
@ -91,15 +91,15 @@ mod test {
#[async_test]
async fn halfstream_sendme() -> Result<()> {
let mut sendw = StreamSendWindow::new(101);
sendw.take(&()).await?; // Make sure that it will accept one sendme.
sendw.take(&())?; // Make sure that it will accept one sendme.
let mut hs = HalfStream::new(sendw, StreamRecvWindow::new(20), true);
// one sendme is fine
let m = msg::Sendme::new_empty().into();
assert!(hs.handle_msg(&m).await.is_ok());
assert!(hs.handle_msg(&m).is_ok());
// but no more were expected!
let e = hs.handle_msg(&m).await.err().unwrap();
let e = hs.handle_msg(&m).err().unwrap();
assert_eq!(
format!("{}", e),
"circuit protocol violation: Too many sendmes on a closed stream!"
@ -120,11 +120,11 @@ mod test {
.unwrap()
.into();
for _ in 0_u8..20 {
assert!(hs.handle_msg(&m).await.is_ok());
assert!(hs.handle_msg(&m).is_ok());
}
// But one more is a protocol violation.
let e = hs.handle_msg(&m).await.err().unwrap();
let e = hs.handle_msg(&m).err().unwrap();
assert_eq!(
format!("{}", e),
"circuit protocol violation: Received a data cell in violation of a window"
@ -137,13 +137,13 @@ mod test {
// We were told to accept a connected, so we'll accept one
// and no more.
let m = msg::Connected::new_empty().into();
assert!(hs.handle_msg(&m).await.is_ok());
assert!(hs.handle_msg(&m).await.is_err());
assert!(hs.handle_msg(&m).is_ok());
assert!(hs.handle_msg(&m).is_err());
// If we try that again with connected_ok == false, we won't
// accept any.
let mut hs = HalfStream::new(StreamSendWindow::new(20), StreamRecvWindow::new(20), false);
let e = hs.handle_msg(&m).await.err().unwrap();
let e = hs.handle_msg(&m).err().unwrap();
assert_eq!(
format!("{}", e),
"circuit protocol violation: Bad CONNECTED cell on a closed stream!"
@ -154,7 +154,7 @@ mod test {
async fn halfstream_other() {
let mut hs = hs_new();
let m = msg::Extended2::new(Vec::new()).into();
let e = hs.handle_msg(&m).await.err().unwrap();
let e = hs.handle_msg(&m).err().unwrap();
assert_eq!(
format!("{}", e),
"circuit protocol violation: Bad EXTENDED2 cell on a closed stream!"

File diff suppressed because it is too large Load Diff

View File

@ -10,10 +10,7 @@
//! other side of the circuit really has read all of the data that it's
//! acknowledging.
use futures::lock::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use tor_cell::relaycell::msg::RelayMsg;
use tor_cell::relaycell::RelayCell;
@ -51,28 +48,14 @@ pub(crate) struct SendWindow<P, T>
where
P: WindowParams,
T: PartialEq + Eq + Clone,
{
// TODO could use a bilock if that becomes non-experimental.
// TODO I wish we could do this without locking; we could make a bunch
// of these functions non-async if that happened.
/// Actual SendWindow object.
w: Arc<Mutex<SendWindowInner<T>>>,
/// Marker type to tell the compiler that the P type is used.
_dummy: std::marker::PhantomData<P>,
}
/// Interior (locked) code for SendWindowInner.
struct SendWindowInner<T>
where
T: PartialEq + Eq + Clone,
{
/// Current value for this window
window: u16,
/// Tag values that incoming "SENDME" messages need to match in order
/// for us to send more data.
tags: VecDeque<T>,
/// An event to wait on if we find that we are out of cells.
unblock: event_listener::Event,
/// Marker type to tell the compiler that the P type is used.
_dummy: std::marker::PhantomData<P>,
}
/// Helper: parameterizes a window to determine its maximum and its increment.
@ -117,54 +100,36 @@ where
pub(crate) fn new(window: u16) -> SendWindow<P, T> {
let increment = P::increment();
let capacity = (window + increment - 1) / increment;
let inner = SendWindowInner {
SendWindow {
window,
tags: VecDeque::with_capacity(capacity as usize),
unblock: event_listener::Event::new(),
};
SendWindow {
w: Arc::new(Mutex::new(inner)),
_dummy: std::marker::PhantomData,
}
}
/// Add a reference-count to SendWindow and return a new handle to it.
pub(crate) fn new_ref(&self) -> Self {
SendWindow {
w: Arc::clone(&self.w),
_dummy: std::marker::PhantomData,
}
}
/// Remove one item from this window (since we've sent a cell).
/// If the window was empty, returns an error.
///
/// The provided tag is the one associated with the crypto layer that
/// originated the cell. It will get cloned and recorded if we'll
/// need to check for it later.
///
/// Return the number of cells left in the window.
pub(crate) async fn take(&mut self, tag: &T) -> Result<u16> {
loop {
let wait_on = {
let mut w = self.w.lock().await;
if let Some(val) = w.window.checked_sub(1) {
w.window = val;
if w.window % P::increment() == 0 {
// We record this tag.
// TODO: I'm not saying that this cell in particular
// matches the spec, but Tor seems to like it.
w.tags.push_back(tag.clone());
}
pub(crate) fn take(&mut self, tag: &T) -> Result<u16> {
if let Some(val) = self.window.checked_sub(1) {
self.window = val;
if self.window % P::increment() == 0 {
// We record this tag.
// TODO: I'm not saying that this cell in particular
// matches the spec, but Tor seems to like it.
self.tags.push_back(tag.clone());
}
return Ok(val);
}
// Window is zero; can't send yet.
w.unblock.listen()
};
// Wait on this event while _not_ holding the lock.
wait_on.await;
Ok(val)
} else {
Err(Error::CircProto(
"Called SendWindow::take() on empty SendWindow".into(),
))
}
}
@ -179,36 +144,32 @@ where
/// On failure, return None: the caller should close the stream
/// or circuit with a protocol error.
#[must_use = "didn't check whether SENDME tag was right."]
pub(crate) async fn put(&mut self, tag: Option<T>) -> Option<u16> {
let mut w = self.w.lock().await;
match (w.tags.front(), tag) {
pub(crate) fn put(&mut self, tag: Option<T>) -> Option<u16> {
match (self.tags.front(), tag) {
(Some(t), Some(tag)) if t == &tag => {} // this is the right tag.
(Some(_), None) => {} // didn't need a tag.
_ => {
return None;
} // Bad tag or unexpected sendme.
}
w.tags.pop_front();
self.tags.pop_front();
let was_zero = w.window == 0;
let v = w.window.checked_add(P::increment())?;
w.window = v;
if was_zero {
w.unblock.notify(usize::MAX)
}
let v = self.window.checked_add(P::increment())?;
self.window = v;
Some(v)
}
/// Return the current send window value.
pub(crate) fn window(&self) -> u16 {
self.window
}
/// For testing: get a copy of the current send window, and the
/// expected incoming tags.
#[cfg(test)]
pub(crate) async fn window_and_expected_tags(&self) -> (u16, Vec<T>) {
let inner = self.w.lock().await;
let tags = inner.tags.iter().map(Clone::clone).collect();
(inner.window, tags)
pub(crate) fn window_and_expected_tags(&self) -> (u16, Vec<T>) {
let tags = self.tags.iter().map(Clone::clone).collect();
(self.window, tags)
}
}
@ -231,7 +192,7 @@ impl<P: WindowParams> RecvWindow<P> {
}
}
/// Called when we've just sent a cell; return true if we need to send
/// Called when we've just received a cell; return true if we need to send
/// a sendme, and false otherwise.
///
/// Returns None if we should not have sent the cell, and we just
@ -286,7 +247,6 @@ pub(crate) fn cell_counts_towards_windows(cell: &RelayCell) -> bool {
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use futures::FutureExt;
use tokio::test as async_test;
use tokio_crate as tokio;
use tor_cell::relaycell::{msg, RelayCell};
@ -335,37 +295,37 @@ mod test {
async fn sendwindow_basic() -> Result<()> {
let mut w = new_sendwindow();
let n = w.take(&"Hello").await?;
let n = w.take(&"Hello")?;
assert_eq!(n, 999);
for _ in 0_usize..98 {
w.take(&"world").await?;
w.take(&"world")?;
}
assert_eq!(w.w.lock().await.window, 901);
assert_eq!(w.w.lock().await.tags.len(), 0);
assert_eq!(w.window, 901);
assert_eq!(w.tags.len(), 0);
let n = w.take(&"and").await?;
let n = w.take(&"and")?;
assert_eq!(n, 900);
assert_eq!(w.w.lock().await.tags.len(), 1);
assert_eq!(w.w.lock().await.tags[0], "and");
assert_eq!(w.tags.len(), 1);
assert_eq!(w.tags[0], "and");
let n = w.take(&"goodbye").await?;
let n = w.take(&"goodbye")?;
assert_eq!(n, 899);
assert_eq!(w.w.lock().await.tags.len(), 1);
assert_eq!(w.tags.len(), 1);
// Try putting a good tag.
let n = w.put(Some("and")).await;
let n = w.put(Some("and"));
assert_eq!(n, Some(999));
assert_eq!(w.w.lock().await.tags.len(), 0);
assert_eq!(w.tags.len(), 0);
for _ in 0_usize..300 {
w.take(&"dreamland").await?;
w.take(&"dreamland")?;
}
assert_eq!(w.w.lock().await.tags.len(), 3);
assert_eq!(w.tags.len(), 3);
// Put without a tag.
let n = w.put(None).await;
let n = w.put(None);
assert_eq!(n, Some(799));
assert_eq!(w.w.lock().await.tags.len(), 2);
assert_eq!(w.tags.len(), 2);
Ok(())
}
@ -374,44 +334,41 @@ mod test {
async fn sendwindow_bad_put() -> Result<()> {
let mut w = new_sendwindow();
for _ in 0_usize..250 {
w.take(&"correct").await?;
w.take(&"correct")?;
}
// wrong tag: won't work.
assert_eq!(w.w.lock().await.window, 750);
let n = w.put(Some("incorrect")).await;
assert_eq!(w.window, 750);
let n = w.put(Some("incorrect"));
assert!(n.is_none());
let n = w.put(Some("correct")).await;
let n = w.put(Some("correct"));
assert_eq!(n, Some(850));
let n = w.put(Some("correct")).await;
let n = w.put(Some("correct"));
assert_eq!(n, Some(950));
// no tag expected: won't work.
let n = w.put(Some("correct")).await;
let n = w.put(Some("correct"));
assert_eq!(n, None);
assert_eq!(w.w.lock().await.window, 950);
assert_eq!(w.window, 950);
let n = w.put(None).await;
let n = w.put(None);
assert_eq!(n, None);
assert_eq!(w.w.lock().await.window, 950);
assert_eq!(w.window, 950);
Ok(())
}
#[async_test]
async fn sendwindow_blocking() -> Result<()> {
async fn sendwindow_erroring() -> Result<()> {
let mut w = new_sendwindow();
for _ in 0_usize..1000 {
w.take(&"here a string").await?;
w.take(&"here a string")?;
}
assert_eq!(w.w.lock().await.window, 0);
assert_eq!(w.window, 0);
// This is going to block -- make sure it doesn't say it's ready.
let ready = w.take(&"there a string").now_or_never();
assert!(ready.is_none());
// TODO: test that this actually wakes up when somebody else says "put".
let ready = w.take(&"there a string");
assert!(ready.is_err());
Ok(())
}
}

View File

@ -14,19 +14,24 @@ use std::collections::HashMap;
use rand::Rng;
use crate::circuit::reactor::RECV_WINDOW_INIT;
use crate::circuit::sendme::StreamRecvWindow;
use tracing::info;
/// The entry for a stream.
pub(super) enum StreamEnt {
/// An open stream: any relay cells tagged for this stream should get
/// sent over the mpsc::Sender.
///
/// The StreamSendWindow is used to make sure that incoming SENDME
/// cells; the u16 is a count of cells that we have dropped due to
/// the stream disappearing before we can transform this into an
/// EndSent.
// TODO: is this the best way?
Open(mpsc::Sender<RelayMsg>, sendme::StreamSendWindow, u16),
/// An open stream.
Open {
/// Sink to send relay cells tagged for this stream into.
sink: mpsc::Sender<RelayMsg>,
/// Stream for cells that should be sent down this stream.
rx: mpsc::Receiver<RelayMsg>,
/// Send window, for congestion control purposes.
send_window: sendme::StreamSendWindow,
/// Number of cells dropped due to the stream disappearing before we can
/// transform this into an `EndSent`.
dropped: u16,
},
/// A stream for which we have received an END cell, but not yet
/// had the stream object get dropped.
EndReceived,
@ -74,13 +79,24 @@ impl StreamMap {
}
}
/// Get the `HashMap` inside this stream map.
pub(super) fn inner(&mut self) -> &mut HashMap<StreamId, StreamEnt> {
&mut self.m
}
/// Add an entry to this map; return the newly allocated StreamId.
pub(super) fn add_ent(
&mut self,
sink: mpsc::Sender<RelayMsg>,
window: sendme::StreamSendWindow,
rx: mpsc::Receiver<RelayMsg>,
send_window: sendme::StreamSendWindow,
) -> Result<StreamId> {
let stream_ent = StreamEnt::Open(sink, window, 0);
let stream_ent = StreamEnt::Open {
sink,
rx,
send_window,
dropped: 0,
};
// This "65536" seems too aggressive, but it's what tor does.
//
// Also, going around in a loop here is (sadly) needed in order
@ -133,7 +149,7 @@ impl StreamMap {
stream_entry.remove_entry();
Ok(())
}
StreamEnt::Open(_, _, _) => {
StreamEnt::Open { .. } => {
stream_entry.insert(StreamEnt::EndReceived);
Ok(())
}
@ -143,38 +159,30 @@ impl StreamMap {
/// Handle a termination of the stream with `id` from this side of
/// the circuit. Return true if the stream was open and an END
/// ought to be sent.
pub(super) fn terminate(
&mut self,
id: StreamId,
mut recvw: sendme::StreamRecvWindow,
) -> Result<ShouldSendEnd> {
use ShouldSendEnd::*;
// Check the hashmap for the right stream. Bail if not found.
// Also keep the hashmap handle so that we can do more efficient inserts/removals
let mut stream_entry = match self.m.entry(id) {
Entry::Vacant(_) => {
return Err(Error::InternalError(
"Somehow we terminated a nonexistent connection‽".into(),
))
}
Entry::Occupied(o) => o,
};
pub(super) fn terminate(&mut self, id: StreamId) -> Result<ShouldSendEnd> {
// Progress the stream's state machine accordingly
match stream_entry.get() {
StreamEnt::EndReceived => {
stream_entry.remove_entry();
Ok(DontSend)
}
StreamEnt::Open(_, sendw, n) => {
recvw.decrement_n(*n)?;
match self.m.remove(&id).ok_or_else(|| {
Error::InternalError("Somehow we terminated a nonexistent connection‽".into())
})? {
StreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend),
StreamEnt::Open {
send_window,
dropped,
// notably absent: the channels for sink and stream, which will get dropped and
// closed (meaning reads/writes from/to this stream will now fail)
..
} => {
// FIXME(eta): we don't copy the receive window, instead just creating a new one,
// so a malicious peer can send us slightly more data than they should
// be able to; see arti#230.
let mut recv_window = StreamRecvWindow::new(RECV_WINDOW_INIT);
recv_window.decrement_n(dropped)?;
// TODO: would be nice to avoid new_ref.
// XXXX: We should set connected_ok properly.
let connected_ok = true;
let halfstream = HalfStream::new(sendw.new_ref(), recvw, connected_ok);
stream_entry.insert(StreamEnt::EndSent(halfstream));
Ok(Send)
let halfstream = HalfStream::new(send_window, recv_window, connected_ok);
self.m.insert(id, StreamEnt::EndSent(halfstream));
Ok(ShouldSendEnd::Send)
}
StreamEnt::EndSent(_) => {
panic!("Hang on! We're sending an END on a stream where we already sent an END‽");
@ -190,7 +198,7 @@ impl StreamMap {
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::circuit::sendme::{StreamRecvWindow, StreamSendWindow};
use crate::circuit::sendme::StreamSendWindow;
#[test]
fn streammap_basics() -> Result<()> {
@ -200,8 +208,9 @@ mod test {
// Try add_ent
for _ in 0..128 {
let (sink, _) = mpsc::channel(2);
let id = map.add_ent(sink, StreamSendWindow::new(500))?;
let (sink, _) = mpsc::channel(128);
let (_, rx) = mpsc::channel(2);
let id = map.add_ent(sink, rx, StreamSendWindow::new(500))?;
let expect_id: StreamId = next_id.into();
assert_eq!(expect_id, id);
next_id = next_id.wrapping_add(1);
@ -213,10 +222,7 @@ mod test {
// Test get_mut.
let nonesuch_id = next_id.into();
assert!(matches!(
map.get_mut(ids[0]),
Some(StreamEnt::Open(_, _, _))
));
assert!(matches!(map.get_mut(ids[0]), Some(StreamEnt::Open { .. })));
assert!(map.get_mut(nonesuch_id).is_none());
// Test end_received
@ -226,17 +232,10 @@ mod test {
assert!(map.end_received(ids[1]).is_err());
// Test terminate
let window = StreamRecvWindow::new(25);
assert!(map.terminate(nonesuch_id, window.clone()).is_err());
assert_eq!(
map.terminate(ids[2], window.clone()).unwrap(),
ShouldSendEnd::Send
);
assert!(map.terminate(nonesuch_id).is_err());
assert_eq!(map.terminate(ids[2]).unwrap(), ShouldSendEnd::Send);
assert!(matches!(map.get_mut(ids[2]), Some(StreamEnt::EndSent(_))));
assert_eq!(
map.terminate(ids[1], window).unwrap(),
ShouldSendEnd::DontSend
);
assert_eq!(map.terminate(ids[1]).unwrap(), ShouldSendEnd::DontSend);
assert!(matches!(map.get_mut(ids[1]), None));
// Try receiving an end after a terminate.

View File

@ -50,7 +50,7 @@ impl super::ServerHandshake for NtorServer {
}
/// A set of public keys used by a client to initiate an ntor handshake.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) struct NtorPublicKey {
/// Public RSA identity fingerprint for the relay; used in authentication
/// calculation.

View File

@ -16,7 +16,7 @@ mod resolve;
pub use data::DataStream;
pub use params::StreamParameters;
pub use raw::RawCellStream;
pub use raw::StreamReader;
pub use resolve::ResolveStream;
pub use tor_cell::relaycell::msg::IpVersionPreference;

View File

@ -1,7 +1,6 @@
//! Declare DataStream, a type that wraps RawCellStream so as to be useful
//! for byte-oriented communication.
use super::RawCellStream;
use crate::{Error, Result};
use tor_cell::relaycell::msg::EndReason;
@ -18,8 +17,9 @@ use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use std::io::Result as IoResult;
use std::pin::Pin;
use std::sync::Arc;
use crate::circuit::StreamTarget;
use crate::stream::StreamReader;
use tor_cell::relaycell::msg::{Data, RelayMsg};
/// An anonymized stream over the Tor network.
@ -111,15 +111,14 @@ pub struct DataReader {
}
impl DataStream {
/// Wrap a RawCellStream as a DataStream.
/// Wrap raw stream reader and target parts as a DataStream.
///
/// For non-optimistic stream, function `wait_for_connection`
/// must be called after to make sure CONNECTED is received.
pub(crate) fn new(s: RawCellStream) -> Self {
let s = Arc::new(s);
pub(crate) fn new(reader: StreamReader, target: StreamTarget) -> Self {
let r = DataReader {
state: Some(DataReaderState::Ready(DataReaderImpl {
s: Arc::clone(&s),
s: reader,
pending: Vec::new(),
offset: 0,
connected: false,
@ -127,7 +126,7 @@ impl DataStream {
};
let w = DataWriter {
state: Some(DataWriterState::Ready(DataWriterImpl {
s,
s: target,
buf: Box::new([0; Data::MAXLEN]),
n_pending: 0,
})),
@ -234,8 +233,8 @@ enum DataWriterState {
/// Internal: the write part of a DataStream
struct DataWriterImpl {
/// The underlying RawCellStream object.
s: Arc<RawCellStream>,
/// The underlying StreamTarget object.
s: StreamTarget,
/// Buffered data to send over the connection.
// TODO: this buffer is probably smaller than we want, but it's good
@ -423,8 +422,8 @@ enum DataReaderState {
/// Wrapper for the read part of a DataStream
struct DataReaderImpl {
/// The underlying RawCellStream object.
s: Arc<RawCellStream>,
/// The underlying StreamReader object.
s: StreamReader,
/// If present, data that we received on this stream but have not
/// been able to send to the caller yet.
@ -546,7 +545,7 @@ impl DataReaderImpl {
Ok(RelayMsg::End(e)) => Err(Error::EndReceived(e.reason())),
Err(e) => Err(e),
Ok(m) => {
self.s.protocol_error().await;
self.s.protocol_error();
Err(Error::StreamProto(format!(
"Unexpected {} cell on stream",
m.cmd()

View File

@ -3,45 +3,38 @@
use crate::circuit::{sendme, StreamTarget};
use crate::{Error, Result};
use tor_cell::relaycell::msg::{RelayMsg, Sendme};
use tor_cell::relaycell::msg::RelayMsg;
use crate::circuit::sendme::StreamRecvWindow;
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::stream::StreamExt;
use std::sync::atomic::{AtomicBool, Ordering};
/// A RawCellStream is a client's cell-oriented view of a stream over the
/// Tor network.
pub struct RawCellStream {
/// Wrapped view of the circuit, hop, and streamid that we're using.
/// The read part of a stream on a particular circuit.
pub struct StreamReader {
/// The underlying `StreamTarget` for this stream.
pub(crate) target: StreamTarget,
/// Channel to receive stream messages from the reactor.
pub(crate) receiver: mpsc::Receiver<RelayMsg>,
/// Congestion control receive window for this stream.
///
/// TODO: do something similar with circuits?
target: Mutex<StreamTarget>,
/// A Stream over which we receive relay messages. Only relay messages
/// that can be associated with a stream ID will be received.
receiver: Mutex<mpsc::Receiver<RelayMsg>>,
/// Have we been informed that this stream is closed, or received a fatal
/// error?
stream_ended: AtomicBool,
/// Having this here means we're only going to update it when the end consumer of this stream
/// actually reads things, meaning we don't ask for more data until it's actually needed (as
/// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
/// with having to buffer data).
pub(crate) recv_window: StreamRecvWindow,
/// Whether or not this stream has ended.
pub(crate) ended: bool,
}
impl RawCellStream {
/// Internal: build a new RawCellStream.
pub(crate) fn new(target: StreamTarget, receiver: mpsc::Receiver<RelayMsg>) -> Self {
RawCellStream {
target: Mutex::new(target),
receiver: Mutex::new(receiver),
stream_ended: AtomicBool::new(false),
}
}
impl StreamReader {
/// Try to read the next relay message from this stream.
async fn recv_raw(&self) -> Result<RelayMsg> {
async fn recv_raw(&mut self) -> Result<RelayMsg> {
if self.ended {
// Prevent reading from streams after they've ended.
return Err(Error::StreamEnded);
}
let msg = self
.receiver
.lock()
.await
.next()
.await
// This probably means that the other side closed the
@ -50,13 +43,9 @@ impl RawCellStream {
Error::StreamProto("stream channel disappeared without END cell?".into())
})?;
// Possibly decrement the window for the cell we just received, and
// send a SENDME if doing so took us under the threshold.
if sendme::msg_counts_towards_windows(&msg) {
let mut target = self.target.lock().await;
if target.recvwindow.take()? {
self.send_sendme(&mut target).await?;
}
if sendme::msg_counts_towards_windows(&msg) && self.recv_window.take()? {
self.target.send_sendme()?;
self.recv_window.put();
}
Ok(msg)
@ -64,51 +53,19 @@ impl RawCellStream {
/// As recv_raw, but if there is an error or an end cell, note that this
/// stream has ended.
pub async fn recv(&self) -> Result<RelayMsg> {
pub async fn recv(&mut self) -> Result<RelayMsg> {
let val = self.recv_raw().await;
match val {
Err(_) | Ok(RelayMsg::End(_)) => {
self.note_ended();
self.ended = true;
}
_ => {}
}
val
}
/// Send a relay message along this stream
pub async fn send(&self, msg: RelayMsg) -> Result<()> {
self.target.lock().await.send(msg).await
}
/// Return true if this stream is marked as having ended.
pub fn has_ended(&self) -> bool {
self.stream_ended.load(Ordering::SeqCst)
}
/// Mark this stream as having ended because of an incoming cell.
fn note_ended(&self) {
self.stream_ended.store(true, Ordering::SeqCst);
}
/// Inform the circuit-side of this stream about a protocol error
pub async fn protocol_error(&self) {
// TODO: Should this call note_ended?
self.target.lock().await.protocol_error().await
}
/// Send a SENDME cell and adjust the receive window.
async fn send_sendme(&self, target: &mut StreamTarget) -> Result<()> {
let sendme = Sendme::new_empty();
target.send(sendme.into()).await?;
target.recvwindow.put();
Ok(())
}
/// Ensure that all the data in this stream has been flushed in to
/// the circuit, and close it.
pub async fn close(self) -> Result<()> {
// Not much to do here right now.
drop(self);
Ok(())
/// Shut down this stream.
pub fn protocol_error(&mut self) {
self.target.protocol_error();
}
}

View File

@ -1,6 +1,6 @@
//! Declare a type for streams that do hostname lookups
use super::RawCellStream;
use crate::stream::StreamReader;
use crate::{Error, Result};
use tor_cell::relaycell::msg::{RelayMsg, Resolved};
@ -8,7 +8,7 @@ use tor_cell::relaycell::msg::{RelayMsg, Resolved};
/// cell.
pub struct ResolveStream {
/// The underlying RawCellStream.
s: RawCellStream,
s: StreamReader,
}
impl ResolveStream {
@ -16,7 +16,7 @@ impl ResolveStream {
///
/// Call only after sending a RESOLVE cell.
#[allow(dead_code)] // need to implement a caller for this.
pub(crate) fn new(s: RawCellStream) -> Self {
pub(crate) fn new(s: StreamReader) -> Self {
ResolveStream { s }
}
@ -28,7 +28,7 @@ impl ResolveStream {
RelayMsg::End(e) => Err(Error::EndReceived(e.reason())),
RelayMsg::Resolved(r) => Ok(r),
m => {
self.s.protocol_error().await;
self.s.protocol_error();
Err(Error::StreamProto(format!(
"Unexpected {} on resolve stream",
m.cmd()

View File

@ -55,6 +55,9 @@ pub enum Error {
/// Circuit is closed.
#[error("circuit closed")]
CircuitClosed,
/// Stream has ended.
#[error("stream ended")]
StreamEnded,
/// Can't allocate any more circuit or stream IDs on a channel.
#[error("too many entries in map: can't allocate ID")]
IdRangeFull,
@ -115,7 +118,9 @@ impl From<Error> for std::io::Error {
EndReceived(end_reason) => end_reason.into(),
CircDestroy(_) | ChannelClosed | CircuitClosed => ErrorKind::ConnectionReset,
CircDestroy(_) | ChannelClosed | CircuitClosed | StreamEnded => {
ErrorKind::ConnectionReset
}
BytesErr(_) | MissingKey | BadCellAuth | BadHandshake | ChanProto(_) | CircProto(_)
| CellErr(_) | ChanMismatch(_) | StreamProto(_) => ErrorKind::InvalidData,