tor-proto: Implement allow_stream_requests.

This commit is contained in:
Gabriela Moldovan 2023-07-28 21:11:11 +01:00
parent ffa8056437
commit 37154dca95
No known key found for this signature in database
GPG Key ID: 3946E0ADE72BAC99
2 changed files with 51 additions and 7 deletions

View File

@ -4,3 +4,5 @@ BREAKING: `IncomingStreamRequest::accept_data` is now async, takes `mut self`,
and returns a `Result`
BREAKING: `IncomingStreamRequest::reject` is now async, takes `&mut self`,
and returns a `Result`
BREAKING: `ClientCirc::allow_stream_requests` now expects `self` to be
`&Arc<ClientCirc>`

View File

@ -72,7 +72,11 @@ use tor_error::{bad_api_usage, internal, into_internal};
use tor_linkspec::{CircTarget, LinkSpecType, OwnedChanTarget, RelayIdType};
#[cfg(feature = "hs-service")]
use tor_cell::relaycell::msg as relaymsg;
use {
crate::circuit::reactor::IncomingStreamRequestContext,
crate::stream::{IncomingCmdChecker, IncomingStream},
tor_cell::relaycell::msg as relaymsg,
};
use futures::channel::{mpsc, oneshot};
@ -437,13 +441,51 @@ impl ClientCirc {
#[cfg(feature = "hs-service")]
#[allow(unused_variables)] // TODO hss remove
pub fn allow_stream_requests(
&self,
self: &Arc<ClientCirc>,
allow_commands: &[tor_cell::relaycell::RelayCmd],
) -> Result<impl futures::Stream<Item = crate::stream::IncomingStream>> {
if false {
return Ok(futures::stream::empty()); // TODO hss remove; this is just here for type inference.
}
todo!() // TODO hss implement.
) -> Result<impl futures::Stream<Item = Result<IncomingStream>>> {
use futures::stream::StreamExt;
/// The size of the channel receiving IncomingStreamRequestContexts.
// TODO HSS: decide what capacity this channel should have.
const INCOMING_BUFFER: usize = STREAM_READER_BUFFER;
let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
let (incoming_sender, incoming_receiver) = mpsc::channel(INCOMING_BUFFER);
self.control
.unbounded_send(CtrlMsg::AwaitStreamRequest {
cmd_checker,
incoming_sender,
})
.map_err(|_| Error::CircuitClosed)?;
let circ = Arc::clone(self);
Ok(incoming_receiver.map(move |req_ctx| {
let IncomingStreamRequestContext {
req,
stream_id,
hop_num,
receiver,
msg_tx,
} = req_ctx;
let target = StreamTarget {
circ: Arc::clone(&circ),
tx: msg_tx,
hop_num,
stream_id,
};
let reader = StreamReader {
target: target.clone(),
receiver,
recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
ended: false,
};
Ok(IncomingStream::new(req, target, reader))
}))
}
/// Extend the circuit via the ntor handshake to a new target last