tor-proto: Handle RELAY_BEGIN cells if we have an incoming req handler.

This commit is contained in:
Gabriela Moldovan 2023-07-28 21:10:36 +01:00
parent f06e0e2df0
commit ffa8056437
No known key found for this signature in database
GPG Key ID: 3946E0ADE72BAC99
1 changed files with 84 additions and 0 deletions

View File

@ -1659,6 +1659,14 @@ impl Reactor {
StreamStatus::Closed => hop.map.ending_msg_received(streamid)?,
}
}
#[cfg(feature = "hs-service")]
None if matches!(
msg.cmd(),
RelayCmd::BEGIN | RelayCmd::BEGIN_DIR | RelayCmd::RESOLVE
) =>
{
self.handle_incoming_stream_request(msg, streamid, hopnum)?;
}
_ => {
// No stream wants this message, or ever did.
return Err(Error::CircProto(
@ -1669,6 +1677,82 @@ impl Reactor {
Ok(CellStatus::Continue)
}
/// A helper for handling incoming stream requests.
#[cfg(feature = "hs-service")]
fn handle_incoming_stream_request(
&mut self,
msg: UnparsedRelayCell,
stream_id: StreamId,
hop_num: HopNum,
) -> Result<()> {
let Some(handler) = self.incoming_stream_req_handler.as_mut() else {
return Err(Error::CircProto(
"Cannot handle BEGIN cells on this circuit".into()
));
};
let message_closes_stream = handler.cmd_checker.check_msg(&msg)? == StreamStatus::Closed;
// TODO HSS: we've already looked up the `hop` in handle_relay_cell, so we shouldn't
// have to look it up again! However, we can't pass the `&mut hop` reference from
// `handle_relay_cell` to this function, because that makes Rust angry (we'd be
// borrowing self as mutable more than once).
//
// TODO HSS: we _could_ use self.hops.get_mut(..) instead self.hop_mut(..) inside
// handle_relay_cell to work around the problem described above
let hop = self
.hops
.get_mut(Into::<usize>::into(hop_num))
.ok_or(Error::CircuitClosed)?;
if !message_closes_stream {
let begin = msg
.decode::<Begin>()
.map_err(|e| Error::from_bytes_err(e, "Invalid Begin message"))?
.into_msg();
let req = IncomingStreamRequest::Begin(begin);
let (sender, receiver) = mpsc::channel(STREAM_READER_BUFFER);
let (msg_tx, msg_rx) = mpsc::channel(super::CIRCUIT_BUFFER_SIZE);
let send_window = StreamSendWindow::new(SEND_WINDOW_INIT);
let cmd_checker = DataCmdChecker::new_connected();
hop.map
.add_ent_with_id(sender, msg_rx, send_window, stream_id, cmd_checker)?;
if let Err(e) = handler
.incoming_sender
.try_send(IncomingStreamRequestContext {
req,
stream_id,
hop_num,
msg_tx,
receiver,
})
{
// TODO HSS: we should not be dropping BEGIN requests. Consider using an
// unbounded channel instead.
if e.is_full() {
return Err(Error::CircProto(
concat!(
"Sending incoming stream request would block: ",
"we are receiving too many BEGIN cells on this channel"
)
.into(),
));
} else {
// TODO HSS: handle the case where the sender goes away more gracefully
return Err(Error::from(internal!(
"Incoming stream request receiver dropped"
)));
}
}
}
Ok(())
}
/// Helper: process a destroy cell.
#[allow(clippy::unnecessary_wraps)]
fn handle_destroy_cell(&mut self) -> Result<()> {