tor-proto: Implement IncomingStream::{accept_data, request, reject}.

This commit is contained in:
Gabriela Moldovan 2023-07-28 11:47:06 +01:00
parent 445c052420
commit f06e0e2df0
No known key found for this signature in database
GPG Key ID: 3946E0ADE72BAC99
2 changed files with 38 additions and 6 deletions

View File

@ -0,0 +1,6 @@
BREAKING: `IncomingStream::request` returns an `&IncomingStreamRequest` instead
of `IncomingStreamRequest`
BREAKING: `IncomingStreamRequest::accept_data` is now async, takes `mut self`,
and returns a `Result`
BREAKING: `IncomingStreamRequest::reject` is now async, takes `&mut self`,
and returns a `Result`

View File

@ -7,6 +7,7 @@ use crate::circuit::StreamTarget;
use crate::{Error, Result};
use tor_cell::relaycell::{msg, RelayCmd, UnparsedRelayCell};
use tor_cell::restricted_msg;
use tor_error::internal;
/// A pending request from the other end of the circuit for us to open a new
/// stream.
@ -25,6 +26,10 @@ pub struct IncomingStream {
stream: StreamTarget,
/// The underlying `StreamReader`.
reader: StreamReader,
/// Whether we have rejected the stream using [`StreamTarget::close`].
///
/// If set to `true`, any attempts to use this `IncomingStream` will return an error.
is_rejected: bool,
}
/// A message that can be sent to begin a stream.
@ -50,23 +55,44 @@ impl IncomingStream {
request,
stream,
reader,
is_rejected: false,
}
}
/// Return the underlying message that was used to try to begin this stream.
pub fn request(&self) -> IncomingStreamRequest {
todo!()
pub fn request(&self) -> &IncomingStreamRequest {
&self.request
}
/// Whether we have rejected this `IncomingStream` using [`IncomingStream::reject`].
pub fn is_rejected(&self) -> bool {
self.is_rejected
}
/// Accept this stream as a new [`DataStream`], and send the client a
/// message letting them know the stream was accepted.
pub fn accept_data(self, message: msg::Connected) -> DataStream {
todo!()
pub async fn accept_data(mut self, message: msg::Connected) -> Result<DataStream> {
if self.is_rejected {
return Err(internal!("Cannot accept data on a closed stream").into());
}
match self.request {
IncomingStreamRequest::Begin(_) => {
self.stream.send(message.into()).await?;
Ok(DataStream::new_connected(self.reader, self.stream))
} // TODO HSS: return an error if the request was RESOLVE, or any other request that
// we cannot respond with CONNECTED to
}
}
/// Reject this request and send an error message to the client.
pub fn reject(self, message: msg::End) {
todo!() // TODO hss
pub async fn reject(&mut self, message: msg::End) -> Result<()> {
if self.is_rejected {
return Err(internal!("IncomingStream::reject() called twice").into());
}
self.is_rejected = true;
self.stream.close(message).await
}
/// Ignore this request without replying to the client.