tor-proto: Add types for sharing stream request info with the reactor.

This commit is contained in:
Gabriela Moldovan 2023-07-28 21:06:27 +01:00
parent 18b01f94cc
commit 950d0da0b5
No known key found for this signature in database
GPG Key ID: 3946E0ADE72BAC99
1 changed files with 49 additions and 0 deletions

View File

@ -35,6 +35,8 @@ use std::pin::Pin;
use tor_cell::chancell::msg::{AnyChanMsg, Relay};
use tor_cell::relaycell::msg::{AnyRelayMsg, End, Sendme};
use tor_cell::relaycell::{AnyRelayCell, RelayCmd, StreamId, UnparsedRelayCell};
#[cfg(feature = "hs-service")]
use crate::stream::IncomingStreamRequest;
use futures::channel::{mpsc, oneshot};
use futures::Stream;
@ -537,6 +539,35 @@ pub struct Reactor {
channel_id: CircId,
/// A handler for a meta cell, together with a result channel to notify on completion.
meta_handler: Option<Box<dyn MetaCellHandler>>,
/// A handler for incoming stream requests.
#[cfg(feature = "hs-service")]
incoming_stream_req_handler: Option<IncomingStreamRequestHandler>,
}
/// Information about an incoming stream request.
#[cfg(feature = "hs-service")]
#[derive(Debug)]
pub(super) struct IncomingStreamRequestContext {
/// The [`IncomingStreamRequest`].
pub(super) req: IncomingStreamRequest,
/// The ID of the stream being requested.
pub(super) stream_id: StreamId,
/// The [`HopNum`].
pub(super) hop_num: HopNum,
/// A channel for receiving messages from this stream.
pub(super) receiver: mpsc::Receiver<UnparsedRelayCell>,
/// A channel for sending messages to be sent on this stream.
pub(super) msg_tx: mpsc::Sender<AnyRelayMsg>,
}
/// Data required for handling an incoming stream request.
#[cfg(feature = "hs-service")]
#[derive(Debug)]
struct IncomingStreamRequestHandler {
/// A sender for sharing information about an incoming stream request.
incoming_sender: mpsc::Sender<IncomingStreamRequestContext>,
/// A [`AnyCmdChecker`] for validating incoming stream requests.
cmd_checker: AnyCmdChecker,
}
impl Reactor {
@ -573,6 +604,8 @@ impl Reactor {
channel_id,
crypto_out,
meta_handler: None,
#[cfg(feature = "hs-service")]
incoming_stream_req_handler: None,
mutable: mutable.clone(),
};
@ -1247,6 +1280,22 @@ impl Reactor {
}
}
/// Try to install a given cell handler on this circuit.
#[cfg(feature = "hs-service")]
fn set_incoming_stream_req_handler(
&mut self,
handler: IncomingStreamRequestHandler,
) -> Result<()> {
if self.incoming_stream_req_handler.is_none() {
self.incoming_stream_req_handler = Some(handler);
Ok(())
} else {
Err(Error::from(internal!(
"Tried to install a BEGIN cell handler before the old one was gone."
)))
}
}
/// Handle a CtrlMsg other than Create and Shutdown.
fn handle_control(&mut self, cx: &mut Context<'_>, msg: CtrlMsg) -> Result<()> {
trace!("{}: reactor received {:?}", self.unique_id, msg);