proto: add minimal (raw) circuit support.
This is about enough to make a circuit, send a create_fast cell, and get a created_fast cell back.
This commit is contained in:
parent
f20eb2f43f
commit
a62717690b
|
@ -14,6 +14,7 @@ mod err;
|
|||
use log::{info, LevelFilter};
|
||||
use std::path::PathBuf;
|
||||
use tor_linkspec::ChanTarget;
|
||||
use tor_proto::chancell::msg;
|
||||
use tor_proto::channel::{self, Channel};
|
||||
|
||||
//use async_std::prelude::*;
|
||||
|
@ -79,7 +80,7 @@ fn get_netdir() -> Result<tor_netdir::NetDir> {
|
|||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
simple_logging::log_to_stderr(LevelFilter::Info);
|
||||
simple_logging::log_to_stderr(LevelFilter::Debug);
|
||||
|
||||
let dir = get_netdir()?;
|
||||
// TODO CONFORMANCE: we should stop now if there are required
|
||||
|
@ -90,7 +91,14 @@ fn main() -> Result<()> {
|
|||
.ok_or(Error::Misc("no usable relays"))?;
|
||||
|
||||
async_std::task::block_on(async {
|
||||
let _chan = connect(&g).await?;
|
||||
let mut rng = thread_rng();
|
||||
let chan = connect(&g).await?;
|
||||
|
||||
let mut circ = chan.new_circ(&mut rng).await?;
|
||||
|
||||
circ.send_msg(msg::CreateFast::new(&mut rng).into()).await?;
|
||||
let msg = circ.read_msg().await;
|
||||
println!("{:?}", msg);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
|
|
|
@ -5,6 +5,8 @@ use crate::crypto::cell::{RawCellBody, CELL_BODY_LEN};
|
|||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use tor_bytes::{self, Error, Readable, Reader, Result, Writer};
|
||||
|
||||
use rand::{CryptoRng, Rng};
|
||||
|
||||
/// Trait for the 'bodies' of channel messages.
|
||||
pub trait Body: Readable {
|
||||
/// Convert this type into a ChanMsg, wrapped as appropriate.
|
||||
|
@ -258,6 +260,14 @@ fixed_len! {
|
|||
/// the case where we don't know any onion key for the first hop.
|
||||
CreateFast, CREATE_FAST, FAST_C_HANDSHAKE_LEN
|
||||
}
|
||||
impl CreateFast {
|
||||
/// Create a new random CreateFast handshake.
|
||||
pub fn new<R: Rng + CryptoRng>(r: &mut R) -> Self {
|
||||
let mut handshake = vec![0; FAST_C_HANDSHAKE_LEN];
|
||||
r.fill_bytes(&mut handshake[..]);
|
||||
CreateFast { handshake }
|
||||
}
|
||||
}
|
||||
fixed_len! {
|
||||
/// A CreatedFast cell responds to a CreateFast cell
|
||||
CreatedFast, CREATED_FAST, FAST_S_HANDSHAKE_LEN
|
||||
|
@ -782,6 +792,13 @@ impl Readable for Unrecognized {
|
|||
}
|
||||
}
|
||||
|
||||
impl<B: Body> From<B> for ChanMsg {
|
||||
fn from(body: B) -> Self {
|
||||
body.as_message()
|
||||
}
|
||||
}
|
||||
|
||||
// XXXX should do From instead.
|
||||
// Helper: declare an Into implementation for cells that don't take a circid.
|
||||
macro_rules! msg_into_cell {
|
||||
($body:ident) => {
|
||||
|
|
|
@ -11,9 +11,10 @@ mod handshake;
|
|||
mod reactor;
|
||||
|
||||
use crate::chancell::{codec, msg, ChanCell};
|
||||
use crate::circuit;
|
||||
use crate::{Error, Result};
|
||||
|
||||
use futures::channel::oneshot;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use futures::io::{AsyncRead, AsyncWrite};
|
||||
use futures::lock::Mutex;
|
||||
use futures::sink::SinkExt;
|
||||
|
@ -22,6 +23,7 @@ use futures::stream::{SplitSink, StreamExt};
|
|||
use std::sync::Arc;
|
||||
|
||||
use log::trace;
|
||||
use rand::Rng;
|
||||
|
||||
// reexport
|
||||
pub use handshake::{OutboundClientHandshake, UnverifiedChannel, VerifiedChannel};
|
||||
|
@ -31,9 +33,8 @@ type CellFrame<T> = futures_codec::Framed<T, codec::ChannelCodec>;
|
|||
/// An open client channel, ready to send and receive Tor cells.
|
||||
///
|
||||
/// A channel is a direct connection to a Tor relay, implemented using TLS.
|
||||
#[derive(Clone)]
|
||||
pub struct Channel<T: AsyncRead + AsyncWrite + Unpin> {
|
||||
inner: Arc<ChannelImpl<T>>,
|
||||
inner: Arc<Mutex<ChannelImpl<T>>>,
|
||||
}
|
||||
|
||||
/// Main implementation type for a channel.
|
||||
|
@ -42,8 +43,9 @@ struct ChannelImpl<T: AsyncRead + AsyncWrite + Unpin> {
|
|||
// This uses a separate mutex from the circmap, since we only
|
||||
// need the circmap when we're making a new circuit, but we need
|
||||
// this _all the time_.
|
||||
tls: Mutex<SplitSink<CellFrame<T>, ChanCell>>,
|
||||
// I wish I didn't need a second Arc here, but I guess I do?
|
||||
tls: SplitSink<CellFrame<T>, ChanCell>,
|
||||
// TODO: I wish I didn't need a second Arc here, but I guess I do?
|
||||
// An rwlock would be better.
|
||||
circmap: Arc<Mutex<circmap::CircMap>>,
|
||||
sendclosed: oneshot::Sender<()>,
|
||||
}
|
||||
|
@ -69,14 +71,14 @@ where
|
|||
/// Construct a channel and reactor.
|
||||
fn new(link_protocol: u16, tls: CellFrame<T>) -> (Self, reactor::Reactor<T>) {
|
||||
use circmap::{CircIDRange, CircMap};
|
||||
let circmap = Arc::new(Mutex::new(CircMap::new(CircIDRange::All)));
|
||||
let circmap = Arc::new(Mutex::new(CircMap::new(CircIDRange::High)));
|
||||
|
||||
let (sink, stream) = tls.split();
|
||||
|
||||
let (sendclosed, recvclosed) = oneshot::channel::<()>();
|
||||
|
||||
let inner = ChannelImpl {
|
||||
tls: Mutex::new(sink),
|
||||
tls: sink,
|
||||
link_protocol,
|
||||
circmap: circmap.clone(),
|
||||
sendclosed,
|
||||
|
@ -85,7 +87,7 @@ where
|
|||
let reactor = reactor::Reactor::new(circmap, recvclosed, stream);
|
||||
|
||||
let channel = Channel {
|
||||
inner: Arc::new(inner),
|
||||
inner: Arc::new(Mutex::new(inner)),
|
||||
};
|
||||
|
||||
(channel, reactor)
|
||||
|
@ -110,13 +112,44 @@ where
|
|||
|
||||
/// Transmit a single cell on a channel.
|
||||
pub async fn send_cell(&self, cell: ChanCell) -> Result<()> {
|
||||
trace!("Sending {}", cell.get_msg().get_cmd());
|
||||
trace!(
|
||||
"Sending {} on {}",
|
||||
cell.get_msg().get_cmd(),
|
||||
cell.get_circid()
|
||||
);
|
||||
self.check_cell(&cell)?;
|
||||
let mut sink = self.inner.tls.lock().await;
|
||||
let sink = &mut self.inner.lock().await.tls;
|
||||
// XXXX I don't like holding the lock here. :(
|
||||
sink.send(cell).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return a newly allocated ClientCirc object. A circuit ID is
|
||||
/// allocated, but no handshaking is done.
|
||||
pub async fn new_circ<R: Rng>(&self, rng: &mut R) -> Result<circuit::ClientCirc<T>> {
|
||||
// TODO: blocking is risky, but so is unbounded.
|
||||
let (sender, receiver) = mpsc::channel(128);
|
||||
|
||||
let id = {
|
||||
let inner = self.inner.lock().await;
|
||||
let mut cmap = inner.circmap.lock().await;
|
||||
cmap.add_ent(rng, sender)?
|
||||
};
|
||||
|
||||
Ok(circuit::ClientCirc::new(id, self.clone(), receiver))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for Channel<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Channel {
|
||||
inner: Arc::clone(&self.inner),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
use crate::chancell::msg::ChanMsg;
|
||||
use crate::chancell::CircID;
|
||||
use crate::{Error, Result};
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
|
@ -14,14 +15,11 @@ use rand::Rng;
|
|||
|
||||
/// Which group of circuit IDs are we allowed to allocate in this map?
|
||||
///
|
||||
/// If we're a client, we can allocate any nonzero circid we want. If
|
||||
/// we authenticated as a relay, we can allocate Low circuit IDs if we
|
||||
/// launched the channel, and High circuit IDs if we received the
|
||||
/// channal.
|
||||
/// If we initiated the channel, we use High circuit ids. If we're the
|
||||
/// responder, we use low circuit ids.
|
||||
pub(super) enum CircIDRange {
|
||||
Low,
|
||||
High,
|
||||
All,
|
||||
}
|
||||
|
||||
impl CircIDRange {
|
||||
|
@ -38,7 +36,6 @@ impl CircIDRange {
|
|||
match self {
|
||||
CircIDRange::Low => v & 0x7fff_ffff,
|
||||
CircIDRange::High => v | 0x8000_0000,
|
||||
CircIDRange::All => v,
|
||||
}
|
||||
.into()
|
||||
}
|
||||
|
@ -81,6 +78,17 @@ impl CircMap {
|
|||
None
|
||||
}
|
||||
|
||||
pub(super) fn add_ent<R: Rng>(
|
||||
&mut self,
|
||||
rng: &mut R,
|
||||
sink: mpsc::Sender<ChanMsg>,
|
||||
) -> Result<CircID> {
|
||||
let id = self.gen_id(rng).ok_or(Error::CircIDRangeFull)?;
|
||||
let ent = CircEnt::Open(sink);
|
||||
self.m.insert(id, ent);
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
fn get_ref(&self, id: CircID) -> Option<&CircEnt> {
|
||||
self.m.get(&id)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
//! Multi-hop paths over the Tor network.
|
||||
|
||||
use crate::chancell::{msg::ChanMsg, ChanCell, CircID};
|
||||
use crate::channel::Channel;
|
||||
use crate::{Error, Result};
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use futures::io::{AsyncRead, AsyncWrite};
|
||||
use futures::stream::StreamExt;
|
||||
|
||||
/// A Circuit that we have constructed over the Tor network.
|
||||
// TODO: I wish this weren't parameterized.
|
||||
// TODO: need to send a destroy cell on drop
|
||||
pub struct ClientCirc<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
id: CircID,
|
||||
channel: Channel<T>,
|
||||
// TODO: could use a SPSC channel here instead.
|
||||
input: mpsc::Receiver<ChanMsg>,
|
||||
}
|
||||
|
||||
impl<T> ClientCirc<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
/// Instantiate a new circuit object.
|
||||
pub(crate) fn new(id: CircID, channel: Channel<T>, input: mpsc::Receiver<ChanMsg>) -> Self {
|
||||
ClientCirc { id, channel, input }
|
||||
}
|
||||
|
||||
/// Put a cell onto this circuit.
|
||||
///
|
||||
/// This takes a raw cell; you may need to encrypt it.
|
||||
pub async fn send_msg(&mut self, msg: ChanMsg) -> Result<()> {
|
||||
let cell = ChanCell::new(self.id, msg);
|
||||
self.channel.send_cell(cell).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read a cell from this circuit.
|
||||
///
|
||||
/// This is a raw cell as sent on the channel: if it's a relay cell,
|
||||
/// it'll need to be decrypted.
|
||||
pub async fn read_msg(&mut self) -> Result<ChanMsg> {
|
||||
// XXXX handlebetter
|
||||
self.input.next().await.ok_or(Error::CircuitClosed)
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@
|
|||
|
||||
pub mod chancell;
|
||||
pub mod channel;
|
||||
pub mod circuit;
|
||||
mod crypto;
|
||||
pub mod relaycell;
|
||||
mod util;
|
||||
|
|
|
@ -38,4 +38,10 @@ pub enum Error {
|
|||
/// Protocol violation at the channel level
|
||||
#[error("channel protocol violation: {0}")]
|
||||
ChanProto(String),
|
||||
/// Circuit is closed.
|
||||
#[error("circuit closed")]
|
||||
CircuitClosed,
|
||||
/// Can't allocate any more circuit IDs on a channel.
|
||||
#[error("too many circuits on channel: can't allocate circuit ID")]
|
||||
CircIDRangeFull,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue