From a62717690b1dc7b3aef9ddca118c0fea915052ee Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Fri, 11 Sep 2020 15:06:25 -0400 Subject: [PATCH] proto: add minimal (raw) circuit support. This is about enough to make a circuit, send a create_fast cell, and get a created_fast cell back. --- client-demo/src/main.rs | 12 ++++++-- tor-proto/src/chancell/msg.rs | 17 ++++++++++ tor-proto/src/channel.rs | 53 ++++++++++++++++++++++++++------ tor-proto/src/channel/circmap.rs | 20 ++++++++---- tor-proto/src/circuit.rs | 50 ++++++++++++++++++++++++++++++ tor-proto/src/lib.rs | 1 + tor-proto/src/util/err.rs | 6 ++++ 7 files changed, 141 insertions(+), 18 deletions(-) create mode 100644 tor-proto/src/circuit.rs diff --git a/client-demo/src/main.rs b/client-demo/src/main.rs index d8c31a47b..346d68fd4 100644 --- a/client-demo/src/main.rs +++ b/client-demo/src/main.rs @@ -14,6 +14,7 @@ mod err; use log::{info, LevelFilter}; use std::path::PathBuf; use tor_linkspec::ChanTarget; +use tor_proto::chancell::msg; use tor_proto::channel::{self, Channel}; //use async_std::prelude::*; @@ -79,7 +80,7 @@ fn get_netdir() -> Result { } fn main() -> Result<()> { - simple_logging::log_to_stderr(LevelFilter::Info); + simple_logging::log_to_stderr(LevelFilter::Debug); let dir = get_netdir()?; // TODO CONFORMANCE: we should stop now if there are required @@ -90,7 +91,14 @@ fn main() -> Result<()> { .ok_or(Error::Misc("no usable relays"))?; async_std::task::block_on(async { - let _chan = connect(&g).await?; + let mut rng = thread_rng(); + let chan = connect(&g).await?; + + let mut circ = chan.new_circ(&mut rng).await?; + + circ.send_msg(msg::CreateFast::new(&mut rng).into()).await?; + let msg = circ.read_msg().await; + println!("{:?}", msg); Ok(()) }) diff --git a/tor-proto/src/chancell/msg.rs b/tor-proto/src/chancell/msg.rs index 113850a81..33f5cb6a7 100644 --- a/tor-proto/src/chancell/msg.rs +++ b/tor-proto/src/chancell/msg.rs @@ -5,6 +5,8 @@ use crate::crypto::cell::{RawCellBody, CELL_BODY_LEN}; use std::net::{IpAddr, Ipv4Addr}; use tor_bytes::{self, Error, Readable, Reader, Result, Writer}; +use rand::{CryptoRng, Rng}; + /// Trait for the 'bodies' of channel messages. pub trait Body: Readable { /// Convert this type into a ChanMsg, wrapped as appropriate. @@ -258,6 +260,14 @@ fixed_len! { /// the case where we don't know any onion key for the first hop. CreateFast, CREATE_FAST, FAST_C_HANDSHAKE_LEN } +impl CreateFast { + /// Create a new random CreateFast handshake. + pub fn new(r: &mut R) -> Self { + let mut handshake = vec![0; FAST_C_HANDSHAKE_LEN]; + r.fill_bytes(&mut handshake[..]); + CreateFast { handshake } + } +} fixed_len! { /// A CreatedFast cell responds to a CreateFast cell CreatedFast, CREATED_FAST, FAST_S_HANDSHAKE_LEN @@ -782,6 +792,13 @@ impl Readable for Unrecognized { } } +impl From for ChanMsg { + fn from(body: B) -> Self { + body.as_message() + } +} + +// XXXX should do From instead. // Helper: declare an Into implementation for cells that don't take a circid. macro_rules! msg_into_cell { ($body:ident) => { diff --git a/tor-proto/src/channel.rs b/tor-proto/src/channel.rs index 8176318e0..e1f8a0b70 100644 --- a/tor-proto/src/channel.rs +++ b/tor-proto/src/channel.rs @@ -11,9 +11,10 @@ mod handshake; mod reactor; use crate::chancell::{codec, msg, ChanCell}; +use crate::circuit; use crate::{Error, Result}; -use futures::channel::oneshot; +use futures::channel::{mpsc, oneshot}; use futures::io::{AsyncRead, AsyncWrite}; use futures::lock::Mutex; use futures::sink::SinkExt; @@ -22,6 +23,7 @@ use futures::stream::{SplitSink, StreamExt}; use std::sync::Arc; use log::trace; +use rand::Rng; // reexport pub use handshake::{OutboundClientHandshake, UnverifiedChannel, VerifiedChannel}; @@ -31,9 +33,8 @@ type CellFrame = futures_codec::Framed; /// An open client channel, ready to send and receive Tor cells. /// /// A channel is a direct connection to a Tor relay, implemented using TLS. -#[derive(Clone)] pub struct Channel { - inner: Arc>, + inner: Arc>>, } /// Main implementation type for a channel. @@ -42,8 +43,9 @@ struct ChannelImpl { // This uses a separate mutex from the circmap, since we only // need the circmap when we're making a new circuit, but we need // this _all the time_. - tls: Mutex, ChanCell>>, - // I wish I didn't need a second Arc here, but I guess I do? + tls: SplitSink, ChanCell>, + // TODO: I wish I didn't need a second Arc here, but I guess I do? + // An rwlock would be better. circmap: Arc>, sendclosed: oneshot::Sender<()>, } @@ -69,14 +71,14 @@ where /// Construct a channel and reactor. fn new(link_protocol: u16, tls: CellFrame) -> (Self, reactor::Reactor) { use circmap::{CircIDRange, CircMap}; - let circmap = Arc::new(Mutex::new(CircMap::new(CircIDRange::All))); + let circmap = Arc::new(Mutex::new(CircMap::new(CircIDRange::High))); let (sink, stream) = tls.split(); let (sendclosed, recvclosed) = oneshot::channel::<()>(); let inner = ChannelImpl { - tls: Mutex::new(sink), + tls: sink, link_protocol, circmap: circmap.clone(), sendclosed, @@ -85,7 +87,7 @@ where let reactor = reactor::Reactor::new(circmap, recvclosed, stream); let channel = Channel { - inner: Arc::new(inner), + inner: Arc::new(Mutex::new(inner)), }; (channel, reactor) @@ -110,13 +112,44 @@ where /// Transmit a single cell on a channel. pub async fn send_cell(&self, cell: ChanCell) -> Result<()> { - trace!("Sending {}", cell.get_msg().get_cmd()); + trace!( + "Sending {} on {}", + cell.get_msg().get_cmd(), + cell.get_circid() + ); self.check_cell(&cell)?; - let mut sink = self.inner.tls.lock().await; + let sink = &mut self.inner.lock().await.tls; + // XXXX I don't like holding the lock here. :( sink.send(cell).await?; Ok(()) } + + /// Return a newly allocated ClientCirc object. A circuit ID is + /// allocated, but no handshaking is done. + pub async fn new_circ(&self, rng: &mut R) -> Result> { + // TODO: blocking is risky, but so is unbounded. + let (sender, receiver) = mpsc::channel(128); + + let id = { + let inner = self.inner.lock().await; + let mut cmap = inner.circmap.lock().await; + cmap.add_ent(rng, sender)? + }; + + Ok(circuit::ClientCirc::new(id, self.clone(), receiver)) + } +} + +impl Clone for Channel +where + T: AsyncRead + AsyncWrite + Unpin, +{ + fn clone(&self) -> Self { + Channel { + inner: Arc::clone(&self.inner), + } + } } /* diff --git a/tor-proto/src/channel/circmap.rs b/tor-proto/src/channel/circmap.rs index 9465e20fe..55bc52457 100644 --- a/tor-proto/src/channel/circmap.rs +++ b/tor-proto/src/channel/circmap.rs @@ -3,6 +3,7 @@ use crate::chancell::msg::ChanMsg; use crate::chancell::CircID; +use crate::{Error, Result}; use std::collections::HashMap; @@ -14,14 +15,11 @@ use rand::Rng; /// Which group of circuit IDs are we allowed to allocate in this map? /// -/// If we're a client, we can allocate any nonzero circid we want. If -/// we authenticated as a relay, we can allocate Low circuit IDs if we -/// launched the channel, and High circuit IDs if we received the -/// channal. +/// If we initiated the channel, we use High circuit ids. If we're the +/// responder, we use low circuit ids. pub(super) enum CircIDRange { Low, High, - All, } impl CircIDRange { @@ -38,7 +36,6 @@ impl CircIDRange { match self { CircIDRange::Low => v & 0x7fff_ffff, CircIDRange::High => v | 0x8000_0000, - CircIDRange::All => v, } .into() } @@ -81,6 +78,17 @@ impl CircMap { None } + pub(super) fn add_ent( + &mut self, + rng: &mut R, + sink: mpsc::Sender, + ) -> Result { + let id = self.gen_id(rng).ok_or(Error::CircIDRangeFull)?; + let ent = CircEnt::Open(sink); + self.m.insert(id, ent); + Ok(id) + } + fn get_ref(&self, id: CircID) -> Option<&CircEnt> { self.m.get(&id) } diff --git a/tor-proto/src/circuit.rs b/tor-proto/src/circuit.rs new file mode 100644 index 000000000..b0b37aa95 --- /dev/null +++ b/tor-proto/src/circuit.rs @@ -0,0 +1,50 @@ +//! Multi-hop paths over the Tor network. + +use crate::chancell::{msg::ChanMsg, ChanCell, CircID}; +use crate::channel::Channel; +use crate::{Error, Result}; + +use futures::channel::mpsc; +use futures::io::{AsyncRead, AsyncWrite}; +use futures::stream::StreamExt; + +/// A Circuit that we have constructed over the Tor network. +// TODO: I wish this weren't parameterized. +// TODO: need to send a destroy cell on drop +pub struct ClientCirc +where + T: AsyncRead + AsyncWrite + Unpin, +{ + id: CircID, + channel: Channel, + // TODO: could use a SPSC channel here instead. + input: mpsc::Receiver, +} + +impl ClientCirc +where + T: AsyncRead + AsyncWrite + Unpin, +{ + /// Instantiate a new circuit object. + pub(crate) fn new(id: CircID, channel: Channel, input: mpsc::Receiver) -> Self { + ClientCirc { id, channel, input } + } + + /// Put a cell onto this circuit. + /// + /// This takes a raw cell; you may need to encrypt it. + pub async fn send_msg(&mut self, msg: ChanMsg) -> Result<()> { + let cell = ChanCell::new(self.id, msg); + self.channel.send_cell(cell).await?; + Ok(()) + } + + /// Read a cell from this circuit. + /// + /// This is a raw cell as sent on the channel: if it's a relay cell, + /// it'll need to be decrypted. + pub async fn read_msg(&mut self) -> Result { + // XXXX handlebetter + self.input.next().await.ok_or(Error::CircuitClosed) + } +} diff --git a/tor-proto/src/lib.rs b/tor-proto/src/lib.rs index 16d2f105d..5ca85a95d 100644 --- a/tor-proto/src/lib.rs +++ b/tor-proto/src/lib.rs @@ -25,6 +25,7 @@ pub mod chancell; pub mod channel; +pub mod circuit; mod crypto; pub mod relaycell; mod util; diff --git a/tor-proto/src/util/err.rs b/tor-proto/src/util/err.rs index bed00a49c..69ae64643 100644 --- a/tor-proto/src/util/err.rs +++ b/tor-proto/src/util/err.rs @@ -38,4 +38,10 @@ pub enum Error { /// Protocol violation at the channel level #[error("channel protocol violation: {0}")] ChanProto(String), + /// Circuit is closed. + #[error("circuit closed")] + CircuitClosed, + /// Can't allocate any more circuit IDs on a channel. + #[error("too many circuits on channel: can't allocate circuit ID")] + CircIDRangeFull, }