Merge branch 'bug219'

This commit is contained in:
Nick Mathewson 2021-11-02 15:32:44 -04:00
commit b0265c490e
3 changed files with 28 additions and 63 deletions

View File

@ -6,23 +6,15 @@
use crate::pending::{GuardStatus, RequestId};
use crate::GuardMgrInner;
use futures::{
channel::{mpsc, oneshot},
stream::{self, StreamExt},
};
#[cfg(test)]
use futures::channel::oneshot;
use futures::{channel::mpsc, stream::StreamExt};
use std::sync::{Mutex, Weak};
/// A message sent by to the [`report_status_events()`] task.
#[derive(Debug)]
pub(crate) enum Msg {
/// Tells the task to add another [`oneshot::Receiver`] to the list
/// of receivers it's listening to.
///
/// This message is sent by guard manager whenever it hands out a
/// guard; the receiver will be notified when the requester's circuit
/// succeeds, fails, or is abandoned. The receiver corresponds
/// to the sender in some [`GuardMonitor`](crate::GuardMonitor).
Observe(oneshot::Receiver<Msg>),
/// A message sent by a [`GuardMonitor`](crate::GuardMonitor) to
/// report the status of an attempt to use a guard.
Status(RequestId, GuardStatus),
@ -33,9 +25,6 @@ pub(crate) enum Msg {
Ping(oneshot::Sender<()>),
}
/// Wrapper type to unify returns from mpsc and oneshots
pub(crate) type MsgResult = Result<Msg, futures::channel::oneshot::Canceled>;
/// Background task: wait for messages about guard statuses, and
/// tell a guard manager about them. Runs indefinitely.
///
@ -47,28 +36,11 @@ pub(crate) type MsgResult = Result<Msg, futures::channel::oneshot::Canceled>;
pub(crate) async fn report_status_events(
runtime: impl tor_rtcompat::SleepProvider,
inner: Weak<Mutex<GuardMgrInner>>,
ctrl: mpsc::UnboundedReceiver<MsgResult>,
mut events: mpsc::UnboundedReceiver<Msg>,
) {
// Multiplexes a bunch of one-shot receivers to tell us about guard
// status outcomes.
let notifications = stream::FuturesUnordered::new();
// If I don't put this dummy receiver into notifications, then
// notifications will be finished prematurely and not get polled any more.
// TODO: Is there a better way to do this?
let (_dummy_snd, rcv) = oneshot::channel();
notifications.push(rcv);
// Multiplexes `notifications` with events from `ctrl`.
let mut events = stream::select(notifications, ctrl);
loop {
match events.next().await {
Some(Ok(Msg::Observe(rcv))) => {
// We've been told to wait for a new event; add it to
// `notifications`.
events.get_ref().0.push(rcv);
}
Some(Ok(Msg::Status(id, status))) => {
Some(Msg::Status(id, status)) => {
// We've got a report about a guard status.
if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock().expect("Poisoned lock");
@ -78,14 +50,8 @@ pub(crate) async fn report_status_events(
return;
}
}
Some(Err(_)) => {
// TODO: Unfortunately, we don't know which future was cancelled.
// It shouldn't be possible for this to occur, though, since
// GuardMonitor always sends a message, even on drop.
tracing::warn!("bug: Somehow a guard success event was dropped.");
}
#[cfg(test)]
Some(Ok(Msg::Ping(sender))) => {
Some(Msg::Ping(sender)) => {
let _ignore = sender.send(());
}
// The streams have all closed. (I think this is impossible?)

View File

@ -209,7 +209,7 @@ struct GuardMgrInner {
/// backpressure in the event that the task running
/// [`daemon::report_status_events`] fails to read from this
/// channel.
ctrl: mpsc::UnboundedSender<daemon::MsgResult>,
ctrl: mpsc::UnboundedSender<daemon::Msg>,
/// Information about guards that we've given out, but where we have
/// not yet heard whether the guard was successful.
@ -413,7 +413,8 @@ impl<R: Runtime> GuardMgr<R> {
(u, Some(snd))
};
let request_id = pending::RequestId::next();
let (monitor, rcv) = GuardMonitor::new(request_id);
let ctrl = inner.ctrl.clone();
let monitor = GuardMonitor::new(request_id, ctrl);
// Note that the network can be down even if all the primary guards
// are not yet marked as unreachable. But according to guard-spec we
@ -434,11 +435,6 @@ impl<R: Runtime> GuardMgr<R> {
inner.active_guards.record_attempt(&guard_id, now);
inner
.ctrl
.unbounded_send(Ok(daemon::Msg::Observe(rcv)))
.expect("Guard observer task exited prematurely");
Ok((guard, monitor, usable))
}
@ -452,7 +448,7 @@ impl<R: Runtime> GuardMgr<R> {
let inner = self.inner.lock().expect("Poisoned lock");
inner
.ctrl
.unbounded_send(Ok(pingmsg))
.unbounded_send(pingmsg)
.expect("Guard observer task exited permaturely.");
}
let _ = rcv.await;

View File

@ -9,7 +9,10 @@
//! handled via [`GuardUsable`].
use crate::{daemon, GuardId};
use futures::{channel::oneshot, Future};
use futures::{
channel::{mpsc::UnboundedSender, oneshot},
Future,
};
use pin_project::pin_project;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
@ -111,22 +114,22 @@ pub struct GuardMonitor {
ignore_indeterminate: bool,
/// A sender that needs to get told when the attempt to use the guard is
/// finished or abandoned.
snd: Option<oneshot::Sender<daemon::Msg>>,
///
/// TODO: This doesn't really need to be an Option, but we use None
/// here to indicate that we've already used the sender, and it can't
/// be used again.
snd: Option<UnboundedSender<daemon::Msg>>,
}
impl GuardMonitor {
/// Create a new GuardMonitor object.
pub(crate) fn new(id: RequestId) -> (Self, oneshot::Receiver<daemon::Msg>) {
let (snd, rcv) = oneshot::channel();
(
GuardMonitor {
id,
pending_status: GuardStatus::AttemptAbandoned,
ignore_indeterminate: false,
snd: Some(snd),
},
rcv,
)
pub(crate) fn new(id: RequestId, snd: UnboundedSender<daemon::Msg>) -> Self {
GuardMonitor {
id,
pending_status: GuardStatus::AttemptAbandoned,
ignore_indeterminate: false,
snd: Some(snd),
}
}
/// Report that a circuit was successfully built in a way that
@ -196,7 +199,7 @@ impl GuardMonitor {
.snd
.take()
.expect("GuardMonitor initialized with no sender")
.send(daemon::Msg::Status(self.id, msg));
.unbounded_send(daemon::Msg::Status(self.id, msg));
}
/// Report the pending message for his guard, whatever it is.