tor-proto: allow_stream_requests now waits until the control message is received.

`ClientCirc::allow_stream_requests` is now `async` and waits until the
`AwaitIncomingStream` control message is processed by the reactor.

This guarantees that by the time the `allow_stream_requests` future
resolves, the reactor is ready to process BEGIN/BEGIN_DIR/RESOLVE cells.

Previously, the client tasks from allow_stream_requests tests had to
sleep before sending the BEGIN cell to give the reactor time to process
the `AwaitIncomingStream` control message (which tells the reactor to
expect incoming BEGIN/BEGIN_DIR/RESOLVE cells on the circuit).

Fixes #994
This commit is contained in:
Gabriela Moldovan 2023-08-04 18:53:14 +01:00
parent 36056906ad
commit f689e94f0f
No known key found for this signature in database
GPG Key ID: 3946E0ADE72BAC99
3 changed files with 18 additions and 13 deletions

View File

@ -11,3 +11,4 @@ ADDED: `ClientCirc::last_hop_num`
DEPRECATED: `ClientCirc::start_conversation_last_hop()`
ADDED: `ClientCirc::start_conversation()` to eventually replace
`ClientCirc::start_conversation_last_hop()`
BREAKING: `ClientCirc::allow_stream_requests` is now async

View File

@ -465,7 +465,7 @@ impl ClientCirc {
// TODO HSS: this function should return an error if allow_stream_requests()
// was already called on this circuit.
#[cfg(feature = "hs-service")]
pub fn allow_stream_requests(
pub async fn allow_stream_requests(
self: &Arc<ClientCirc>,
allow_commands: &[tor_cell::relaycell::RelayCmd],
) -> Result<impl futures::Stream<Item = Result<IncomingStream>>> {
@ -477,14 +477,19 @@ impl ClientCirc {
let cmd_checker = IncomingCmdChecker::new_any(allow_commands);
let (incoming_sender, incoming_receiver) = mpsc::channel(INCOMING_BUFFER);
let (tx, rx) = oneshot::channel();
self.control
.unbounded_send(CtrlMsg::AwaitStreamRequest {
cmd_checker,
incoming_sender,
done: tx,
})
.map_err(|_| Error::CircuitClosed)?;
// Check whether the AwaitStreamRequest was processed successfully.
rx.await.map_err(|_| Error::CircuitClosed)??;
let circ = Arc::clone(self);
Ok(incoming_receiver.map(move |req_ctx| {
let IncomingStreamRequestContext {
@ -1877,9 +1882,12 @@ mod test {
let _incoming = circ
.allow_stream_requests(&[tor_cell::relaycell::RelayCmd::BEGIN])
.await
.unwrap();
let incoming = circ.allow_stream_requests(&[tor_cell::relaycell::RelayCmd::BEGIN]);
let incoming = circ
.allow_stream_requests(&[tor_cell::relaycell::RelayCmd::BEGIN])
.await;
// There can only be one IncomingStream at a time on any given circuit.
assert!(incoming.is_err());
@ -1899,6 +1907,7 @@ mod test {
let (tx, rx) = oneshot::channel();
let mut incoming = circ
.allow_stream_requests(&[tor_cell::relaycell::RelayCmd::BEGIN])
.await
.unwrap();
let simulate_service = async move {
@ -1925,11 +1934,6 @@ mod test {
.unwrap();
let begin_msg = chanmsg::Relay::from(body);
// Ensure the reactor has had a chance to process the AwaitIncomingStream control
// message before sending the cell (otherwise it will shut down due to a CircProto
// error caused by the BEGIN unexpected cell).
// TODO HSS: replace sleep with a less flaky solution
rt.sleep(Duration::from_millis(100)).await;
// Pretend to be a client at the other end of the circuit sending a begin cell
send.send(ClientCircChanMsg::Relay(begin_msg))
.await
@ -1976,6 +1980,7 @@ mod test {
let mut incoming = circ
.allow_stream_requests(&[tor_cell::relaycell::RelayCmd::BEGIN])
.await
.unwrap();
let simulate_service = async move {
@ -2017,11 +2022,6 @@ mod test {
.unwrap();
let begin_msg = chanmsg::Relay::from(body);
// Ensure the reactor has had a chance to process the AwaitIncomingStream control
// message before sending the cell (otherwise it will shut down due to a CircProto
// error caused by the BEGIN unexpected cell).
// TODO HSS: replace sleep with a less flaky solution
rt.sleep(Duration::from_millis(200)).await;
// Pretend to be a client at the other end of the circuit sending 2 identical begin
// cells (the first one will be rejected by the test service).
for _ in 0..STREAM_COUNT {

View File

@ -185,6 +185,8 @@ pub(super) enum CtrlMsg {
incoming_sender: mpsc::Sender<IncomingStreamRequestContext>,
/// A `CmdChecker` to keep track of which message types are acceptable.
cmd_checker: AnyCmdChecker,
/// Oneshot channel to notify on completion.
done: ReactorResultChannel<()>,
// TODO HSS: add a hop_num field specifying which hop in the circuit is allowed to create
// streams, if any (if we find that a different hop in the circuit is attempting to create
// a stream we should return an error).
@ -1399,6 +1401,7 @@ impl Reactor {
CtrlMsg::AwaitStreamRequest {
cmd_checker,
incoming_sender,
done,
} => {
// TODO HSS: add a CtrlMsg for de-registering the handler.
// TODO HSS: ensure the handler is deregistered when the IncomingStream is dropped.
@ -1407,7 +1410,8 @@ impl Reactor {
cmd_checker,
};
self.set_incoming_stream_req_handler(handler)?;
let ret = self.set_incoming_stream_req_handler(handler);
let _ = done.send(ret); // don't care if sender goes away
}
CtrlMsg::SendSendme { stream_id, hop_num } => {
let sendme = Sendme::new_empty();