Provide maybe_send on postage::Sender, via extension trait

We need to replace the AtomicBool for dormant mode with something that
can wake up tasks.  postage::watch is the right shape.

But we want to be able to update it but suppress no-op updates.
(There is going to be a call site where no-op updates can occur.)

In the absence of a suitable upstream method as requested here
  https://github.com/austinjones/postage-rs/issues/56
we introduce this facility via an extension trait.
This commit is contained in:
Ian Jackson 2022-07-18 12:04:49 +01:00
parent 44f37b2d07
commit 7acdd21750
3 changed files with 91 additions and 0 deletions

2
Cargo.lock generated
View File

@ -3428,10 +3428,12 @@ dependencies = [
"futures-await-test",
"hex",
"pin-project",
"postage",
"rand 0.8.5",
"rand_chacha 0.3.1",
"thiserror",
"tokio",
"void",
]
[[package]]

View File

@ -16,9 +16,11 @@ repository = "https://gitlab.torproject.org/tpo/core/arti.git/"
futures = "0.3.14"
hex = "0.4"
pin-project = "1"
postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] }
rand = "0.8"
rand_chacha = "0.3"
thiserror = "1"
void = "1"
[dev-dependencies]
derive_more = "0.99"

View File

@ -9,6 +9,7 @@ use futures::future::FusedFuture;
use futures::ready;
use futures::Sink;
use pin_project::pin_project;
use void::{ResultVoidExt as _, Void};
/// Switch to the nontrivial version of this, to get debugging output on stderr
macro_rules! dprintln { { $f:literal $($a:tt)* } => { () } }
@ -412,6 +413,53 @@ where
}
}
/// Extension trait for some `postage::watch::Sender` to provide `maybe_send`
///
/// Ideally these, or something like them, would be upstream:
/// See <https://github.com/austinjones/postage-rs/issues/56>.
///
/// We provide this as an extension trait became the implementation is a bit fiddly.
/// This lets us concentrate on the actual logic, when we use it.
pub trait PostageWatchSenderExt<T> {
/// Update, by calling a fallible function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>;
/// Update, by calling a function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn maybe_send<F>(&mut self, update: F)
where
T: PartialEq,
F: FnOnce(&T) -> T,
{
self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
.void_unwrap();
}
}
impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>,
{
let lock = self.borrow();
let new = update(&*lock)?;
if new != *lock {
drop(lock);
*self.borrow_mut() = new;
}
Ok(())
}
}
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
@ -540,4 +588,43 @@ mod test {
assert_eq!(*sunk.lock().unwrap(), &[42, 43]);
}
}
#[async_test]
async fn postage_sender_ext() {
use futures::stream::StreamExt;
use futures::FutureExt;
let (mut s, mut r) = postage::watch::channel_with(20);
// Receiver of a fresh watch wakes once, but let's not rely on this
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(20)),
_ = futures::future::ready(()) => { }, // tolerate nothing
};
// Now, not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i);
// Still not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i + 1);
// Ready, with 21
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(21)),
_ = futures::future::ready(()) => panic!(),
};
let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
// Not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
}
}