tor-proto: Change semantics of MetaCellHandler

Now, the MetaCellHandler is responsible for consuming the messages
it gets, and reporting status to whatever task is waiting for a
status message.

Additionally, the MetaCellHandler can decide to remain installed or
shut down the circuit after a successful message.  (Previously, it
could only uninstall itself on success and kill the circuit on
failure.)

These changes will enable MetaCellHandlers to be used as the basis
for handling more kinds of message.

(There is some moved and reformatted code here; you may want to
review it with `git {diff or show} --color-moved -b`.)
This commit is contained in:
Nick Mathewson 2023-03-03 08:49:46 -05:00
parent 7108f923e0
commit 094fdc0d8d
1 changed files with 126 additions and 72 deletions

View File

@ -281,7 +281,26 @@ pub(super) trait MetaCellHandler: Send {
/// Gets a copy of the `Reactor` in order to do anything it likes there.
///
/// If this function returns an error, the reactor will shut down.
fn finish(&mut self, msg: UnparsedRelayCell, reactor: &mut Reactor) -> Result<()>;
fn handle_msg(
&mut self,
msg: UnparsedRelayCell,
reactor: &mut Reactor,
) -> Result<MetaCellDisposition>;
}
/// A possible successful outcome of giving a message to a [`MetaCellHandler`].
#[derive(Debug, Copy, Clone)]
#[allow(dead_code)] // TODO HS remove
pub(super) enum MetaCellDisposition {
/// The message was consumed; the handler should remain installed.
Consumed,
/// The message was consumed; the handler should be uninstalled.
UninstallHandler,
/// The message was consumed; the circuit should be closed.
CloseCirc,
// TODO: Eventually we might want the ability to have multiple handlers
// installed, and to let them say "not for me, maybe for somebody else?".
// But right now we don't need that.
}
/// An object that can extend a circuit by one hop, using the `MetaCellHandler` trait.
@ -307,6 +326,8 @@ where
unique_id: UniqId,
/// The hop we're expecting the EXTENDED2 cell to come back from.
expected_hop: HopNum,
/// A oneshot channel that we should inform when we are done with this extend operation.
operation_finished: Option<oneshot::Sender<Result<()>>>,
/// `PhantomData` used to make the other type parameters required for a circuit extension
/// part of the `struct`, instead of having them be provided during a function call.
///
@ -341,60 +362,68 @@ where
require_sendme_auth: RequireSendmeAuth,
params: CircParameters,
reactor: &mut Reactor,
done: ReactorResultChannel<()>,
) -> Result<Self> {
let mut rng = rand::thread_rng();
let unique_id = reactor.unique_id;
match (|| {
let mut rng = rand::thread_rng();
let unique_id = reactor.unique_id;
use tor_cell::relaycell::msg::Extend2;
// Perform the first part of the cryptographic handshake
let (state, msg) = H::client1(&mut rng, key)?;
use tor_cell::relaycell::msg::Extend2;
// Perform the first part of the cryptographic handshake
let (state, msg) = H::client1(&mut rng, key)?;
let n_hops = reactor.crypto_out.n_layers();
let hop = ((n_hops - 1) as u8).into();
let n_hops = reactor.crypto_out.n_layers();
let hop = ((n_hops - 1) as u8).into();
debug!(
"{}: Extending circuit to hop {} with {:?}",
unique_id,
n_hops + 1,
linkspecs
);
debug!(
"{}: Extending circuit to hop {} with {:?}",
unique_id,
n_hops + 1,
linkspecs
);
let extend_msg = Extend2::new(linkspecs, handshake_id, msg);
let cell = AnyRelayCell::new(0.into(), extend_msg.into());
let extend_msg = Extend2::new(linkspecs, handshake_id, msg);
let cell = AnyRelayCell::new(0.into(), extend_msg.into());
// Send the message to the last hop...
reactor.send_relay_cell(
cx, hop, true, // use a RELAY_EARLY cell
cell,
)?;
trace!("{}: waiting for EXTENDED2 cell", unique_id);
// ... and now we wait for a response.
// Send the message to the last hop...
reactor.send_relay_cell(
cx, hop, true, // use a RELAY_EARLY cell
cell,
)?;
trace!("{}: waiting for EXTENDED2 cell", unique_id);
// ... and now we wait for a response.
Ok(Self {
peer_id,
state: Some(state),
require_sendme_auth,
params,
unique_id,
expected_hop: hop,
phantom: Default::default(),
})
Ok::<CircuitExtender<_, _, _, _>, Error>(Self {
peer_id,
state: Some(state),
require_sendme_auth,
params,
unique_id,
expected_hop: hop,
operation_finished: None,
phantom: Default::default(),
})
})() {
Ok(mut result) => {
result.operation_finished = Some(done);
Ok(result)
}
Err(e) => {
// It's okay if the receiver went away.
let _ = done.send(Err(e.clone()));
Err(e)
}
}
}
}
impl<H, L, FWD, REV> MetaCellHandler for CircuitExtender<H, L, FWD, REV>
where
H: ClientHandshake,
H::StateType: Send,
H::KeyGen: KeyGenerator,
L: CryptInit + ClientLayer<FWD, REV> + Send,
FWD: OutboundClientLayer + 'static + Send,
REV: InboundClientLayer + 'static + Send,
{
fn expected_hop(&self) -> HopNum {
self.expected_hop
}
fn finish(&mut self, msg: UnparsedRelayCell, reactor: &mut Reactor) -> Result<()> {
/// Perform the work of extending the circuit another hop.
///
/// This is a separate function to simplify the error-handling work of handle_msg().
fn extend_circuit(
&mut self,
msg: UnparsedRelayCell,
reactor: &mut Reactor,
) -> Result<MetaCellDisposition> {
let msg = msg
.decode::<tor_cell::relaycell::msg::Extended2>()
.map_err(|e| Error::from_bytes_err(e, "extended2 message"))?
@ -427,7 +456,38 @@ where
Box::new(layer_back),
&self.params,
);
Ok(())
Ok(MetaCellDisposition::UninstallHandler)
}
}
impl<H, L, FWD, REV> MetaCellHandler for CircuitExtender<H, L, FWD, REV>
where
H: ClientHandshake,
H::StateType: Send,
H::KeyGen: KeyGenerator,
L: CryptInit + ClientLayer<FWD, REV> + Send,
FWD: OutboundClientLayer + 'static + Send,
REV: InboundClientLayer + 'static + Send,
{
fn expected_hop(&self) -> HopNum {
self.expected_hop
}
fn handle_msg(
&mut self,
msg: UnparsedRelayCell,
reactor: &mut Reactor,
) -> Result<MetaCellDisposition> {
let status = self.extend_circuit(msg, reactor);
if let Some(done) = self.operation_finished.take() {
// ignore it if the receiving channel went away.
let _ = done.send(status.clone().map(|_| ()));
status
} else {
Err(Error::from(internal!(
"Passed two messages to an CircuitExtender!"
)))
}
}
}
@ -469,7 +529,7 @@ pub struct Reactor {
/// This circuit's identifier on the upstream channel.
channel_id: CircId,
/// A handler for a meta cell, together with a result channel to notify on completion.
meta_handler: Option<(Box<dyn MetaCellHandler>, ReactorResultChannel<()>)>,
meta_handler: Option<Box<dyn MetaCellHandler>>,
}
impl Reactor {
@ -904,25 +964,28 @@ impl Reactor {
// TODO: that means that service-introduction circuits will need
// a different implementation, but that should be okay. We'll work
// something out.
if let Some((mut handler, done)) = self.meta_handler.take() {
if let Some(mut handler) = self.meta_handler.take() {
if handler.expected_hop() == hopnum {
// Somebody was waiting for a message -- maybe this message
let ret = handler.finish(msg, self);
let ret = handler.handle_msg(msg, self);
trace!(
"{}: meta handler completed with result: {:?}",
self.unique_id,
ret
);
let status = match &ret {
Ok(()) => Ok(CellStatus::Continue),
Err(e) => Err(e.clone()),
};
let _ = done.send(ret); // don't care if sender goes away
status
match ret {
Ok(MetaCellDisposition::Consumed) => {
self.meta_handler = Some(handler);
Ok(CellStatus::Continue)
}
Ok(MetaCellDisposition::UninstallHandler) => Ok(CellStatus::Continue),
Ok(MetaCellDisposition::CloseCirc) => Ok(CellStatus::CleanShutdown),
Err(e) => Err(e),
}
} else {
// Somebody wanted a message from a different hop! Put this
// one back.
self.meta_handler = Some((handler, done));
self.meta_handler = Some(handler);
Err(Error::CircProto(format!(
"Unexpected {} cell from hop {} on client circuit",
msg.cmd(),
@ -1101,13 +1164,9 @@ impl Reactor {
/// Try to install a given meta-cell handler to receive any unusual cells on
/// this circuit, along with a result channel to notify on completion.
fn set_meta_handler(
&mut self,
handler: Box<dyn MetaCellHandler>,
done: ReactorResultChannel<()>,
) -> Result<()> {
fn set_meta_handler(&mut self, handler: Box<dyn MetaCellHandler>) -> Result<()> {
if self.meta_handler.is_none() {
self.meta_handler = Some((handler, done));
self.meta_handler = Some(handler);
Ok(())
} else {
Err(Error::from(internal!(
@ -1132,7 +1191,7 @@ impl Reactor {
params,
done,
} => {
match CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin(
let extender = CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin(
cx,
peer_id,
0x02,
@ -1141,14 +1200,9 @@ impl Reactor {
require_sendme_auth,
params,
self,
) {
Ok(e) => {
self.set_meta_handler(Box::new(e), done)?;
}
Err(e) => {
let _ = done.send(Err(e));
}
};
done,
)?;
self.set_meta_handler(Box::new(extender))?;
}
CtrlMsg::BeginStream {
hop_num,