tor-proto: reject() now waits until the control message is received.

As a result, by the time the `reject` future resolves, the stream has
been removed from the reactor's stream map and the corresponding END
cell has been sent.

Fixes #998.
This commit is contained in:
Gabriela Moldovan 2023-08-04 19:57:46 +01:00
parent f689e94f0f
commit 2eaa0fa52b
No known key found for this signature in database
GPG Key ID: 3946E0ADE72BAC99
2 changed files with 11 additions and 1 deletions

View File

@ -1053,14 +1053,20 @@ impl StreamTarget {
// called before)
#[cfg(feature = "hs-service")]
pub(crate) async fn close(&self, msg: relaymsg::End) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.circ
.control
.unbounded_send(CtrlMsg::ClosePendingStream {
stream_id: self.stream_id,
hop_num: self.hop_num,
message: msg,
done: tx,
})
.map_err(|_| Error::CircuitClosed)?;
// Check whether the ClosePendingStream was processed successfully.
rx.await.map_err(|_| Error::CircuitClosed)??;
Ok(())
}

View File

@ -177,6 +177,8 @@ pub(super) enum CtrlMsg {
stream_id: StreamId,
/// The END message to send.
message: End,
/// Oneshot channel to notify on completion.
done: ReactorResultChannel<()>,
},
/// Begin accepting streams on this circuit.
#[cfg(feature = "hs-service")]
@ -1394,8 +1396,10 @@ impl Reactor {
hop_num,
stream_id,
message,
done,
} => {
self.close_stream(cx, hop_num, stream_id, Some(message))?;
let ret = self.close_stream(cx, hop_num, stream_id, Some(message));
let _ = done.send(ret); // don't care if sender goes away
}
#[cfg(feature = "hs-service")]
CtrlMsg::AwaitStreamRequest {