From a796f7bf6ae53f166dff58fa7177dbd03fba221b Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 26 Oct 2020 14:25:46 -0400 Subject: [PATCH] Use event_listener to find out about sendme replenishment. Previously we would fail if a sendme window was exhausted and two tasks were waiting for it to be replenished at the same time. --- tor-proto/Cargo.toml | 1 + tor-proto/src/circuit/sendme.rs | 39 +++++++++------------------------ 2 files changed, 11 insertions(+), 29 deletions(-) diff --git a/tor-proto/Cargo.toml b/tor-proto/Cargo.toml index e042dd7f2..0f7150090 100644 --- a/tor-proto/Cargo.toml +++ b/tor-proto/Cargo.toml @@ -19,6 +19,7 @@ arrayref = "0.3.6" bytes = "0.5.6" crypto-mac = "0.9.1" digest = "0.9.0" +event-listener = "2.5.1" futures = "0.3.6" futures_codec = "0.4.1" generic-array = "0.14.4" diff --git a/tor-proto/src/circuit/sendme.rs b/tor-proto/src/circuit/sendme.rs index fe4dd1a04..186cf3934 100644 --- a/tor-proto/src/circuit/sendme.rs +++ b/tor-proto/src/circuit/sendme.rs @@ -10,7 +10,6 @@ //! other side of the circuit really has read all of the data that it's //! acknowledging. -use futures::channel::oneshot; use futures::lock::Mutex; use std::collections::VecDeque; @@ -72,9 +71,8 @@ where /// Tag values that incoming "SENDME" messages need to match in order /// for us to send more data. tags: VecDeque, - /// If present, a oneshot that we are blocking on before we can send - /// any more data. - unblock: Option>, + /// An event to wait on if we find that we are out of cells. + unblock: event_listener::Event, } /// Helper: parameterizes a window to determine its maximum and its increment. @@ -123,7 +121,7 @@ where let inner = SendWindowInner { window, tags: VecDeque::with_capacity(capacity as usize), - unblock: None, + unblock: event_listener::Event::new(), }; SendWindow { w: Arc::new(Mutex::new(inner)), @@ -163,27 +161,11 @@ where } // Window is zero; can't send yet. - let (send, recv) = oneshot::channel::<()>(); - - let old = w.unblock.replace(send); - if old.is_some() { - w.unblock.replace(old.unwrap()); - // XXXXM3 find a better approach here; maybe this - // _can_ happen. - return Err(Error::InternalError( - "Two tasks trying to block on the same sendme window at once!".into(), - )); - } - recv + w.unblock.listen() }; - // Wait on this receiver while _not_ holding the lock. - // I believe this unwrap can't fail, the sender can't be - // cancelled as long as there's a refcount to it. But - // let's be careful. - wait_on - .await - .map_err(|_| Error::InternalError("sendme window was cancelled?".into()))?; + // Wait on this event while _not_ holding the lock. + wait_on.await; } } @@ -210,15 +192,14 @@ where } w.tags.pop_front(); + let was_zero = w.window == 0; + let v = w.window.checked_add(P::increment())?; w.window = v; - if let Some(send) = w.unblock.take() { - // if we get a failure, nothing cares about this window any more. - // XXXXM3 is that true? - let _ignore = send.send(()); + if was_zero { + w.unblock.notify(usize::MAX) } - Some(v) } }