Use event_listener to find out about sendme replenishment.

Previously we would fail if a sendme window was exhausted and two
tasks were waiting for it to be replenished at the same time.
This commit is contained in:
Nick Mathewson 2020-10-26 14:25:46 -04:00
parent 4348faff3c
commit a796f7bf6a
2 changed files with 11 additions and 29 deletions

View File

@ -19,6 +19,7 @@ arrayref = "0.3.6"
bytes = "0.5.6"
crypto-mac = "0.9.1"
digest = "0.9.0"
event-listener = "2.5.1"
futures = "0.3.6"
futures_codec = "0.4.1"
generic-array = "0.14.4"

View File

@ -10,7 +10,6 @@
//! other side of the circuit really has read all of the data that it's
//! acknowledging.
use futures::channel::oneshot;
use futures::lock::Mutex;
use std::collections::VecDeque;
@ -72,9 +71,8 @@ where
/// Tag values that incoming "SENDME" messages need to match in order
/// for us to send more data.
tags: VecDeque<T>,
/// If present, a oneshot that we are blocking on before we can send
/// any more data.
unblock: Option<oneshot::Sender<()>>,
/// An event to wait on if we find that we are out of cells.
unblock: event_listener::Event,
}
/// Helper: parameterizes a window to determine its maximum and its increment.
@ -123,7 +121,7 @@ where
let inner = SendWindowInner {
window,
tags: VecDeque::with_capacity(capacity as usize),
unblock: None,
unblock: event_listener::Event::new(),
};
SendWindow {
w: Arc::new(Mutex::new(inner)),
@ -163,27 +161,11 @@ where
}
// Window is zero; can't send yet.
let (send, recv) = oneshot::channel::<()>();
let old = w.unblock.replace(send);
if old.is_some() {
w.unblock.replace(old.unwrap());
// XXXXM3 find a better approach here; maybe this
// _can_ happen.
return Err(Error::InternalError(
"Two tasks trying to block on the same sendme window at once!".into(),
));
}
recv
w.unblock.listen()
};
// Wait on this receiver while _not_ holding the lock.
// I believe this unwrap can't fail, the sender can't be
// cancelled as long as there's a refcount to it. But
// let's be careful.
wait_on
.await
.map_err(|_| Error::InternalError("sendme window was cancelled?".into()))?;
// Wait on this event while _not_ holding the lock.
wait_on.await;
}
}
@ -210,15 +192,14 @@ where
}
w.tags.pop_front();
let was_zero = w.window == 0;
let v = w.window.checked_add(P::increment())?;
w.window = v;
if let Some(send) = w.unblock.take() {
// if we get a failure, nothing cares about this window any more.
// XXXXM3 is that true?
let _ignore = send.send(());
if was_zero {
w.unblock.notify(usize::MAX)
}
Some(v)
}
}