diff --git a/crates/tor-guardmgr/src/daemon.rs b/crates/tor-guardmgr/src/daemon.rs index 2c9c82dea..3426d1b5e 100644 --- a/crates/tor-guardmgr/src/daemon.rs +++ b/crates/tor-guardmgr/src/daemon.rs @@ -6,23 +6,15 @@ use crate::pending::{GuardStatus, RequestId}; use crate::GuardMgrInner; -use futures::{ - channel::{mpsc, oneshot}, - stream::{self, StreamExt}, -}; +#[cfg(test)] +use futures::channel::oneshot; +use futures::{channel::mpsc, stream::StreamExt}; + use std::sync::{Mutex, Weak}; /// A message sent by to the [`report_status_events()`] task. #[derive(Debug)] pub(crate) enum Msg { - /// Tells the task to add another [`oneshot::Receiver`] to the list - /// of receivers it's listening to. - /// - /// This message is sent by guard manager whenever it hands out a - /// guard; the receiver will be notified when the requester's circuit - /// succeeds, fails, or is abandoned. The receiver corresponds - /// to the sender in some [`GuardMonitor`](crate::GuardMonitor). - Observe(oneshot::Receiver), /// A message sent by a [`GuardMonitor`](crate::GuardMonitor) to /// report the status of an attempt to use a guard. Status(RequestId, GuardStatus), @@ -33,9 +25,6 @@ pub(crate) enum Msg { Ping(oneshot::Sender<()>), } -/// Wrapper type to unify returns from mpsc and oneshots -pub(crate) type MsgResult = Result; - /// Background task: wait for messages about guard statuses, and /// tell a guard manager about them. Runs indefinitely. /// @@ -47,28 +36,11 @@ pub(crate) type MsgResult = Result; pub(crate) async fn report_status_events( runtime: impl tor_rtcompat::SleepProvider, inner: Weak>, - ctrl: mpsc::UnboundedReceiver, + mut events: mpsc::UnboundedReceiver, ) { - // Multiplexes a bunch of one-shot receivers to tell us about guard - // status outcomes. - let notifications = stream::FuturesUnordered::new(); - // If I don't put this dummy receiver into notifications, then - // notifications will be finished prematurely and not get polled any more. - // TODO: Is there a better way to do this? - let (_dummy_snd, rcv) = oneshot::channel(); - notifications.push(rcv); - - // Multiplexes `notifications` with events from `ctrl`. - let mut events = stream::select(notifications, ctrl); - loop { match events.next().await { - Some(Ok(Msg::Observe(rcv))) => { - // We've been told to wait for a new event; add it to - // `notifications`. - events.get_ref().0.push(rcv); - } - Some(Ok(Msg::Status(id, status))) => { + Some(Msg::Status(id, status)) => { // We've got a report about a guard status. if let Some(inner) = inner.upgrade() { let mut inner = inner.lock().expect("Poisoned lock"); @@ -78,14 +50,8 @@ pub(crate) async fn report_status_events( return; } } - Some(Err(_)) => { - // TODO: Unfortunately, we don't know which future was cancelled. - // It shouldn't be possible for this to occur, though, since - // GuardMonitor always sends a message, even on drop. - tracing::warn!("bug: Somehow a guard success event was dropped."); - } #[cfg(test)] - Some(Ok(Msg::Ping(sender))) => { + Some(Msg::Ping(sender)) => { let _ignore = sender.send(()); } // The streams have all closed. (I think this is impossible?) diff --git a/crates/tor-guardmgr/src/lib.rs b/crates/tor-guardmgr/src/lib.rs index 9fafc899f..1ac390472 100644 --- a/crates/tor-guardmgr/src/lib.rs +++ b/crates/tor-guardmgr/src/lib.rs @@ -214,7 +214,7 @@ struct GuardMgrInner { /// backpressure in the event that the task running /// [`daemon::report_status_events`] fails to read from this /// channel. - ctrl: mpsc::UnboundedSender, + ctrl: mpsc::UnboundedSender, /// Information about guards that we've given out, but where we have /// not yet heard whether the guard was successful. @@ -435,7 +435,8 @@ impl GuardMgr { (u, Some(snd)) }; let request_id = pending::RequestId::next(); - let (monitor, rcv) = GuardMonitor::new(request_id); + let ctrl = inner.ctrl.clone(); + let monitor = GuardMonitor::new(request_id, ctrl); let pending_request = pending::PendingRequest::new(guard_id.clone(), usage, usable_sender, now); @@ -443,11 +444,6 @@ impl GuardMgr { inner.active_guards.record_attempt(&guard_id, now); - inner - .ctrl - .unbounded_send(Ok(daemon::Msg::Observe(rcv))) - .expect("Guard observer task exited prematurely"); - Ok((guard, monitor, usable)) } @@ -461,7 +457,7 @@ impl GuardMgr { let inner = self.inner.lock().expect("Poisoned lock"); inner .ctrl - .unbounded_send(Ok(pingmsg)) + .unbounded_send(pingmsg) .expect("Guard observer task exited permaturely."); } let _ = rcv.await; diff --git a/crates/tor-guardmgr/src/pending.rs b/crates/tor-guardmgr/src/pending.rs index 8ea57993d..3ad63d931 100644 --- a/crates/tor-guardmgr/src/pending.rs +++ b/crates/tor-guardmgr/src/pending.rs @@ -9,7 +9,10 @@ //! handled via [`GuardUsable`]. use crate::{daemon, GuardId}; -use futures::{channel::oneshot, Future}; +use futures::{ + channel::{mpsc::UnboundedSender, oneshot}, + Future, +}; use pin_project::pin_project; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; @@ -111,22 +114,22 @@ pub struct GuardMonitor { ignore_indeterminate: bool, /// A sender that needs to get told when the attempt to use the guard is /// finished or abandoned. - snd: Option>, + /// + /// TODO: This doesn't really need to be an Option, but we use None + /// here to indicate that we've already used the sender, and it can't + /// be used again. + snd: Option>, } impl GuardMonitor { /// Create a new GuardMonitor object. - pub(crate) fn new(id: RequestId) -> (Self, oneshot::Receiver) { - let (snd, rcv) = oneshot::channel(); - ( - GuardMonitor { - id, - pending_status: GuardStatus::AttemptAbandoned, - ignore_indeterminate: false, - snd: Some(snd), - }, - rcv, - ) + pub(crate) fn new(id: RequestId, snd: UnboundedSender) -> Self { + GuardMonitor { + id, + pending_status: GuardStatus::AttemptAbandoned, + ignore_indeterminate: false, + snd: Some(snd), + } } /// Report that a circuit was successfully built in a way that @@ -189,7 +192,7 @@ impl GuardMonitor { .snd .take() .expect("GuardMonitor initialized with no sender") - .send(daemon::Msg::Status(self.id, msg)); + .unbounded_send(daemon::Msg::Status(self.id, msg)); } /// Report the pending message for his guard, whatever it is.