proto::channel: add piping to move cells around.

There's a "reactor" task to move cells to the appropriate circuit,
and a "send_cell" to send a cell directly.

This is client-only for now.
This commit is contained in:
Nick Mathewson 2020-09-10 15:10:54 -04:00
parent 49bbb4baa6
commit f20eb2f43f
7 changed files with 344 additions and 23 deletions

View File

@ -55,9 +55,11 @@ async fn connect<C: ChanTarget>(target: &C) -> Result<Channel<TlsStream<net::Tcp
let chan = chan.check(target, &peer_cert)?;
info!("Certificates validated; peer authenticated.");
let chan = chan.finish(&addr.ip()).await?;
let (chan, reactor) = chan.finish(&addr.ip()).await?;
info!("Channel complete.");
async_std::task::spawn(async { reactor.run().await });
Ok(chan)
}

View File

@ -23,6 +23,7 @@ futures_codec = "0.4.1"
generic-array = "0.14.4"
hkdf = "0.9.0"
hmac = "0.9.0"
log = "0.4.11"
rand = "0.7.3"
rand_core = "0.5.1"
# This is paused for now because of aes-ctr.

View File

@ -22,7 +22,7 @@ pub const CELL_DATA_LEN: usize = 509;
///
/// A circuit ID can be 2 or 4 bytes long; on modern versions of the Tor
/// protocol, it's 4 bytes long.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub struct CircID(u32);
impl From<u32> for CircID {

View File

@ -3,32 +3,58 @@
//! Right now, we only support connecting to a Tor relay as a client.
//!
//! To do so, launch a TLS connection, then call `start_client_handshake()`
//!
//! TODO: channel padding support.
mod circmap;
mod handshake;
mod reactor;
use crate::chancell::codec;
use crate::chancell::{codec, msg, ChanCell};
use crate::{Error, Result};
use futures::channel::oneshot;
use futures::io::{AsyncRead, AsyncWrite};
use futures::lock::Mutex;
use futures::sink::SinkExt;
use futures::stream::{SplitSink, StreamExt};
use std::sync::Arc;
use log::trace;
// reexport
pub use handshake::{OutboundClientHandshake, UnverifiedChannel, VerifiedChannel};
type CellFrame<T> = futures_codec::Framed<T, codec::ChannelCodec>;
/// An open client channel, ready to send and receive tor cells.
/// An open client channel, ready to send and receive Tor cells.
///
/// A channel is a direct connection to a Tor relay, implemented using TLS.
#[derive(Clone)]
pub struct Channel<T: AsyncRead + AsyncWrite + Unpin> {
inner: Arc<Mutex<ChannelImpl<T>>>,
inner: Arc<ChannelImpl<T>>,
}
/// Main implementation type for a channel.
struct ChannelImpl<T: AsyncRead + AsyncWrite + Unpin> {
link_protocol: u16,
tls: CellFrame<T>,
// This uses a separate mutex from the circmap, since we only
// need the circmap when we're making a new circuit, but we need
// this _all the time_.
tls: Mutex<SplitSink<CellFrame<T>, ChanCell>>,
// I wish I didn't need a second Arc here, but I guess I do?
circmap: Arc<Mutex<circmap::CircMap>>,
sendclosed: oneshot::Sender<()>,
}
/// Launch a new client handshake over a TLS stream.
///
/// After calling this function, you'll need to call `connect()` on
/// the result to start the handshake. If that succeeds, you'll have
/// authentication info from the relay: call `check()` on the result
/// to check that. Finally, to finish the handshake, call `finish()`
/// on the result of _that_.
pub fn start_client_handshake<T>(tls: T) -> OutboundClientHandshake<T>
where
T: AsyncRead + AsyncWrite + Unpin,
@ -36,22 +62,75 @@ where
handshake::OutboundClientHandshake::new(tls)
}
impl<T> ChannelImpl<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fn new(link_protocol: u16, tls: CellFrame<T>) -> Self {
ChannelImpl { link_protocol, tls }
}
}
impl<T> Channel<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fn from_inner(inner: ChannelImpl<T>) -> Self {
Channel {
inner: Arc::new(Mutex::new(inner)),
/// Construct a channel and reactor.
fn new(link_protocol: u16, tls: CellFrame<T>) -> (Self, reactor::Reactor<T>) {
use circmap::{CircIDRange, CircMap};
let circmap = Arc::new(Mutex::new(CircMap::new(CircIDRange::All)));
let (sink, stream) = tls.split();
let (sendclosed, recvclosed) = oneshot::channel::<()>();
let inner = ChannelImpl {
tls: Mutex::new(sink),
link_protocol,
circmap: circmap.clone(),
sendclosed,
};
let reactor = reactor::Reactor::new(circmap, recvclosed, stream);
let channel = Channel {
inner: Arc::new(inner),
};
(channel, reactor)
}
fn check_cell(&self, cell: &ChanCell) -> Result<()> {
use msg::ChanMsg::*;
let msg = cell.get_msg();
match msg {
Created(_) | Created2(_) | CreatedFast(_) => Err(Error::ChanProto(format!(
"Can't send {} cell on client channel",
msg.get_cmd()
))),
Certs(_) | Versions(_) | Authenticate(_) | Authorize(_) | AuthChallenge(_)
| Netinfo(_) => Err(Error::ChanProto(format!(
"Can't send {} cell after handshake is done",
msg.get_cmd()
))),
_ => Ok(()),
}
}
/// Transmit a single cell on a channel.
pub async fn send_cell(&self, cell: ChanCell) -> Result<()> {
trace!("Sending {}", cell.get_msg().get_cmd());
self.check_cell(&cell)?;
let mut sink = self.inner.tls.lock().await;
sink.send(cell).await?;
Ok(())
}
}
/*
XXXX this doesn't work, since we'd have to take ownership of the
XXXX oneshot.
XXXX maybe I should wrap the oneshot in a cell or something?
impl<T> Drop for ChannelImpl<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fn drop(&mut self) {
self.sendclosed.send(());
}
}
*/

View File

@ -0,0 +1,100 @@
// NOTE: This is a work in progress and I bet I'll refactor it a lot;
// it needs to stay opaque!
use crate::chancell::msg::ChanMsg;
use crate::chancell::CircID;
use std::collections::HashMap;
//use futures::sink::Sink;
//use futures::channel::oneshot;
use futures::channel::mpsc;
use rand::Rng;
/// Which group of circuit IDs are we allowed to allocate in this map?
///
/// If we're a client, we can allocate any nonzero circid we want. If
/// we authenticated as a relay, we can allocate Low circuit IDs if we
/// launched the channel, and High circuit IDs if we received the
/// channal.
pub(super) enum CircIDRange {
Low,
High,
All,
}
impl CircIDRange {
/// Return a random circuit ID in the appropriate range.
fn new_id<R: Rng>(&self, rng: &mut R) -> CircID {
// Make sure v is nonzero.
let v = loop {
match rng.gen() {
0u32 => (),
x => break x,
}
};
// Force the high bit of v to the appropriate value.
match self {
CircIDRange::Low => v & 0x7fff_ffff,
CircIDRange::High => v | 0x8000_0000,
CircIDRange::All => v,
}
.into()
}
}
/// An entry in the circuit map. Right now, we only have "here's the
/// way to send cells to a given circuit", but that's likely to
/// change.
pub(super) enum CircEnt {
Open(mpsc::Sender<ChanMsg>),
}
/// A map from circuit IDs to circuit entries. Each channel has one.
pub(super) struct CircMap {
idrange: CircIDRange,
m: HashMap<CircID, CircEnt>,
}
impl CircMap {
/// Make a new empty CircMap
pub(super) fn new(idrange: CircIDRange) -> Self {
CircMap {
idrange,
m: HashMap::new(),
}
}
/// Construct a new CircuitID for an outbound circuit; make sure
/// it is unused. This can fail if there are too many circuits on
/// this channel.
fn gen_id<R: Rng>(&self, rng: &mut R) -> Option<CircID> {
// How many times to we try before giving up?
const MAX_ATTEMPTS: usize = 16;
for _ in 0..MAX_ATTEMPTS {
let id = self.idrange.new_id(rng);
if !self.m.contains_key(&id) {
return Some(id);
}
}
None
}
fn get_ref(&self, id: CircID) -> Option<&CircEnt> {
self.m.get(&id)
}
/// Remove the entry for `id` on this map, if any.
fn remove(&mut self, id: CircID) {
self.m.remove(&id);
}
/// Return the entry for `id` in this map, if any.
pub(super) fn get_mut(&mut self, id: CircID) -> Option<&mut CircEnt> {
self.m.get_mut(&id)
}
// TODO: Eventually if we want relay support, we'll need to support
// circuit IDs chosen by somebody else. But for now, we don't need those.
}

View File

@ -247,12 +247,18 @@ impl<T: AsyncRead + AsyncWrite + Unpin> UnverifiedChannel<T> {
impl<T: AsyncRead + AsyncWrite + Unpin> VerifiedChannel<T> {
/// Send a 'Netinfo' message to the relay to finish the handshake,
/// and create an open channel.
pub async fn finish(mut self, peer_addr: &net::IpAddr) -> Result<super::Channel<T>> {
/// and create an open channel and reactor.
///
/// The channel is used to send cells, and to create outgoing circuits.
/// The reactor is used to route incoming messages to their appropriate
/// circuit.
pub async fn finish(
mut self,
peer_addr: &net::IpAddr,
) -> Result<(super::Channel<T>, super::reactor::Reactor<T>)> {
let netinfo = msg::Netinfo::for_client(*peer_addr);
self.tls.send(netinfo.into()).await?;
let inner = super::ChannelImpl::new(self.link_protocol, self.tls);
Ok(super::Channel::from_inner(inner))
Ok(super::Channel::new(self.link_protocol, self.tls))
}
}

View File

@ -0,0 +1,133 @@
//! Code to handle incoming cells on a channel
//!
//! TODO: I have zero confidence in the close-and-cleanup behavior here,
//! or in the error handling behavior.
use super::circmap::{CircEnt, CircMap};
use super::CellFrame;
use crate::chancell::{msg::ChanMsg, ChanCell, CircID};
use crate::{Error, Result};
use futures::channel::oneshot;
use futures::future::Fuse;
use futures::io::AsyncRead;
use futures::lock::Mutex;
use futures::select_biased;
use futures::sink::SinkExt;
use futures::stream::{SplitStream, StreamExt};
use futures::FutureExt;
use std::sync::Arc;
use log::trace;
/// Object to handle incoming cells on a channel.
///
/// This type is returned when you finish a channel; you need to spawn a
/// new task that calls `run()` on it.
#[must_use = "If you don't call run() on a reactor, the channel won't work."]
pub struct Reactor<T>
where
T: AsyncRead + Unpin,
{
closeflag: Fuse<oneshot::Receiver<()>>,
input: SplitStream<CellFrame<T>>,
core: ReactorCore,
}
/// This is a separate; we use it when handling cells.
struct ReactorCore {
// TODO: This lock is used pretty asymmetrically. The reactor
// task needs to use the circmap all the time, whereas other tasks
// only need the circmap when dealing with circuit creation.
// Maybe it would be better to use some kind of channel to tell
// the reactor about new circuits?
circs: Arc<Mutex<CircMap>>,
}
impl<T> Reactor<T>
where
T: AsyncRead + Unpin,
{
/// Construct a new Reactor.
pub(super) fn new(
circmap: Arc<Mutex<CircMap>>,
closeflag: oneshot::Receiver<()>,
input: SplitStream<CellFrame<T>>,
) -> Self {
let core = ReactorCore { circs: circmap };
Reactor {
closeflag: closeflag.fuse(),
input,
core,
}
}
/// Launch the reactor, and run until the channel closes or we
/// encounter an error.
pub async fn run(mut self) -> Result<()> {
let mut close_future = self.closeflag;
loop {
let mut next_future = self.input.next().fuse();
let item = select_biased! {
_ = close_future => return Ok(()), // we were asked to close
item = next_future => item,
};
let item = match item {
None => return Ok(()), // the stream closed.
Some(r) => r?,
};
self.core.handle_cell(item).await?;
}
}
}
impl ReactorCore {
/// Helper: process a cell on a channel. Most cells get ignored
/// or rejected; a few get delivered to circuits.
async fn handle_cell(&mut self, cell: ChanCell) -> Result<()> {
let (circid, msg) = cell.into_circid_and_msg();
trace!("Received {} on {}", msg.get_cmd(), circid);
use ChanMsg::*;
match msg {
// These aren't allowed on clients.
Create(_) | CreateFast(_) | Create2(_) | RelayEarly(_) | PaddingNegotiate(_) => Err(
Error::ChanProto(format!("{} cell on client channel", msg.get_cmd())),
),
// We should never see this, since we don't use TAP.
Created(_) => Err(Error::ChanProto(format!("{} cell received", msg.get_cmd()))),
// These aren't allowed after handshaking is done.
Versions(_) | Certs(_) | Authorize(_) | Authenticate(_) | AuthChallenge(_)
| Netinfo(_) => Err(Error::ChanProto(format!(
"{} cell after handshake is done",
msg.get_cmd()
))),
// These are always ignored.
Padding(_) | VPadding(_) | Unrecognized(_) => Ok(()),
// These are allowed and need to be handled.
Relay(_) | CreatedFast(_) | Created2(_) | Destroy(_) => {
self.deliver_msg(circid, msg).await
}
}
}
/// Give `msg` to the appropriate circuid.
async fn deliver_msg(&mut self, circid: CircID, msg: ChanMsg) -> Result<()> {
let mut map = self.circs.lock().await;
if let Some(CircEnt::Open(s)) = map.get_mut(circid) {
// XXXX handle errors better.
// XXXX should we really be holding the mutex for this?
s.send(msg).await.map_err(|_| Error::ChanProto("x".into()))
} else {
// XXXX handle this case better; don't just drop the cell.
Ok(())
}
}
}