diff --git a/Cargo.lock b/Cargo.lock index 254e80409..cfc444332 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -176,6 +176,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", + "tor-async-utils", "tor-basic-utils", "tor-cell", "tor-chanmgr", @@ -3610,8 +3611,12 @@ name = "tor-async-utils" version = "0.0.1" dependencies = [ "futures", + "futures-await-test", "pin-project", + "postage", "thiserror", + "tokio", + "void", ] [[package]] @@ -3620,20 +3625,14 @@ version = "0.5.1" dependencies = [ "derive_more", "educe", - "futures", - "futures-await-test", "hex", "libc", "paste", - "pin-project", - "postage", "rand 0.8.5", "rand_chacha 0.3.1", "serde", "slab", "thiserror", - "tokio", - "void", ] [[package]] @@ -4238,6 +4237,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", + "tor-async-utils", "tor-basic-utils", "tor-bytes", "tor-cell", diff --git a/crates/arti-client/Cargo.toml b/crates/arti-client/Cargo.toml index 16b1eb99a..6cb6d4a71 100644 --- a/crates/arti-client/Cargo.toml +++ b/crates/arti-client/Cargo.toml @@ -85,15 +85,14 @@ postage = { version = "0.5.0", default-features = false, features = ["futures-tr safelog = { path = "../safelog", version = "0.2.1" } serde = { version = "1.0.103", features = ["derive"] } thiserror = "1" +tor-async-utils = { path = "../tor-async-utils", version = "0.0.1" } tor-basic-utils = { path = "../tor-basic-utils", version = "0.5.0" } tor-cell = { path = "../tor-cell", version = "0.9.0" } tor-chanmgr = { path = "../tor-chanmgr", version = "0.8.2" } tor-checkable = { path = "../tor-checkable", version = "0.4.2" } tor-circmgr = { path = "../tor-circmgr", version = "0.7.2" } tor-config = { path = "../tor-config", version = "0.7.2" } -tor-dirmgr = { path = "../tor-dirmgr", version = "0.9.2", default-features = false, features = [ - "mmap", -] } +tor-dirmgr = { path = "../tor-dirmgr", version = "0.9.2", default-features = false, features = ["mmap"] } tor-error = { path = "../tor-error", version = "0.4.1" } tor-guardmgr = { path = "../tor-guardmgr", version = "0.8.2" } tor-hsclient = { path = "../tor-hsclient", version = "0.1.1", optional = true } diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index e90f82675..e915d94f8 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -8,7 +8,7 @@ use crate::address::{IntoTorAddr, ResolveInstructions, StreamInstructions}; use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig}; use safelog::{sensitive, Sensitive}; -use tor_basic_utils::futures::{DropNotifyWatchSender, PostageWatchSenderExt}; +use tor_async_utils::{DropNotifyWatchSender, PostageWatchSenderExt}; use tor_circmgr::isolation::{Isolation, StreamIsolation}; use tor_circmgr::{isolation::StreamIsolationBuilder, IsolationToken, TargetPort}; use tor_config::MutCfg; diff --git a/crates/tor-async-utils/Cargo.toml b/crates/tor-async-utils/Cargo.toml index f8105c266..e4d9d4e1b 100644 --- a/crates/tor-async-utils/Cargo.toml +++ b/crates/tor-async-utils/Cargo.toml @@ -15,5 +15,11 @@ repository = "https://gitlab.torproject.org/tpo/core/arti.git/" [dependencies] futures = "0.3.14" pin-project = "1" +postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] } thiserror = "1" +void = "1" + +[dev-dependencies] +futures-await-test = "0.3.0" +tokio = { version = "1.7", features = ["macros", "rt", "rt-multi-thread", "time"] } diff --git a/crates/tor-async-utils/src/lib.rs b/crates/tor-async-utils/src/lib.rs index 8a502ac0b..125d1e5ef 100644 --- a/crates/tor-async-utils/src/lib.rs +++ b/crates/tor-async-utils/src/lib.rs @@ -37,3 +37,10 @@ #![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945 #![allow(clippy::result_large_err)] // temporary workaround for arti#587 //! + +mod sinkext; +mod watch; + +pub use sinkext::{SinkExt, SinkPrepareSendFuture, SinkSendable}; + +pub use watch::{DropNotifyEofSignallable, DropNotifyWatchSender, PostageWatchSenderExt}; diff --git a/crates/tor-basic-utils/src/futures.rs b/crates/tor-async-utils/src/sinkext.rs similarity index 78% rename from crates/tor-basic-utils/src/futures.rs rename to crates/tor-async-utils/src/sinkext.rs index ce1baaecd..1d6fa38f2 100644 --- a/crates/tor-basic-utils/src/futures.rs +++ b/crates/tor-async-utils/src/sinkext.rs @@ -1,8 +1,7 @@ -//! Futures helpers +//! Extension trait for using [`Sink`] more safely. use std::future::Future; use std::marker::PhantomData; -use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -10,7 +9,6 @@ use futures::future::FusedFuture; use futures::ready; use futures::Sink; use pin_project::pin_project; -use void::{ResultVoidExt as _, Void}; /// Switch to the nontrivial version of this, to get debugging output on stderr macro_rules! dprintln { { $f:literal $($a:tt)* } => { () } } @@ -25,7 +23,7 @@ where /// /// ``` /// # use futures::channel::mpsc; - /// # use tor_basic_utils::futures::SinkExt as _; + /// # use tor_async_utils::SinkExt as _; /// # /// # #[tokio::main] /// # async fn main() -> Result<(),mpsc::SendError> { @@ -134,7 +132,7 @@ where /// # async fn main() { /// use futures::select; /// use futures::{SinkExt as _, StreamExt as _}; - /// use tor_basic_utils::futures::SinkExt as _; + /// use tor_async_utils::SinkExt as _; /// /// let (mut input_w, mut input_r) = futures::channel::mpsc::unbounded::(); /// let (mut output_w, mut output_r) = futures::channel::mpsc::unbounded::(); @@ -414,121 +412,6 @@ where } } -/// Extension trait for some `postage::watch::Sender` to provide `maybe_send` -/// -/// Ideally these, or something like them, would be upstream: -/// See . -/// -/// We provide this as an extension trait became the implementation is a bit fiddly. -/// This lets us concentrate on the actual logic, when we use it. -pub trait PostageWatchSenderExt { - /// Update, by calling a fallible function, sending only if necessary - /// - /// Calls `update` on the current value in the watch, to obtain a new value. - /// If the new value doesn't compare equal, updates the watch, notifying receivers. - fn try_maybe_send(&mut self, update: F) -> Result<(), E> - where - T: PartialEq, - F: FnOnce(&T) -> Result; - - /// Update, by calling a function, sending only if necessary - /// - /// Calls `update` on the current value in the watch, to obtain a new value. - /// If the new value doesn't compare equal, updates the watch, notifying receivers. - fn maybe_send(&mut self, update: F) - where - T: PartialEq, - F: FnOnce(&T) -> T, - { - self.try_maybe_send(|t| Ok::<_, Void>(update(t))) - .void_unwrap(); - } -} - -impl PostageWatchSenderExt for postage::watch::Sender { - fn try_maybe_send(&mut self, update: F) -> Result<(), E> - where - T: PartialEq, - F: FnOnce(&T) -> Result, - { - let lock = self.borrow(); - let new = update(&*lock)?; - if new != *lock { - // We must drop the lock guard, because otherwise borrow_mut will deadlock. - // There is no race, because we hold &mut self, so no-one else can get a look in. - // (postage::watch::Sender is not one of those facilities which is mereely a - // handle, and Clone.) - drop(lock); - *self.borrow_mut() = new; - } - Ok(()) - } -} - -#[derive(Debug)] -/// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped -/// -/// Derefs to the inner `Sender`. -/// -/// Ideally this would be behaviour promised by upstream, or something -/// See . -pub struct DropNotifyWatchSender(Option>); - -/// Values that can signal EOF -/// -/// Implemented for `Option`, which is usually what you want to use. -pub trait DropNotifyEofSignallable { - /// Generate the EOF value - fn eof() -> Self; - - /// Does this value indicate EOF - fn is_eof(&self) -> bool; -} - -impl DropNotifyEofSignallable for Option { - fn eof() -> Self { - None - } - - fn is_eof(&self) -> bool { - self.is_none() - } -} - -impl DropNotifyWatchSender { - /// Arrange to send `T::Default` when `inner` is dropped - pub fn new(inner: postage::watch::Sender) -> Self { - DropNotifyWatchSender(Some(inner)) - } - - /// Unwrap the inner sender, defusing the drop notification - pub fn into_inner(mut self) -> postage::watch::Sender { - self.0.take().expect("inner was None") - } -} - -impl Deref for DropNotifyWatchSender { - type Target = postage::watch::Sender; - fn deref(&self) -> &Self::Target { - self.0.as_ref().expect("inner was None") - } -} - -impl DerefMut for DropNotifyWatchSender { - fn deref_mut(&mut self) -> &mut Self::Target { - self.0.as_mut().expect("inner was None") - } -} - -impl Drop for DropNotifyWatchSender { - fn drop(&mut self) { - if let Some(mut inner) = self.0.take() { - // None means into_inner() was called - *inner.borrow_mut() = DropNotifyEofSignallable::eof(); - } - } -} - #[cfg(test)] mod test { // @@ begin test lint list maintained by maint/add_warning @@ @@ -659,72 +542,4 @@ mod test { assert_eq!(*sunk.lock().unwrap(), &[42, 43]); } } - - #[async_test] - async fn postage_sender_ext() { - use futures::stream::StreamExt; - use futures::FutureExt; - - let (mut s, mut r) = postage::watch::channel_with(20); - // Receiver of a fresh watch wakes once, but let's not rely on this - select_biased! { - i = r.next().fuse() => assert_eq!(i, Some(20)), - _ = futures::future::ready(()) => { }, // tolerate nothing - }; - // Now, not ready - select_biased! { - _ = r.next().fuse() => panic!(), - _ = futures::future::ready(()) => { }, - }; - - s.maybe_send(|i| *i); - // Still not ready - select_biased! { - _ = r.next().fuse() => panic!(), - _ = futures::future::ready(()) => { }, - }; - - s.maybe_send(|i| *i + 1); - // Ready, with 21 - select_biased! { - i = r.next().fuse() => assert_eq!(i, Some(21)), - _ = futures::future::ready(()) => panic!(), - }; - - let () = s.try_maybe_send(|_i| Err(())).unwrap_err(); - // Not ready - select_biased! { - _ = r.next().fuse() => panic!(), - _ = futures::future::ready(()) => { }, - }; - } - - #[async_test] - async fn postage_drop() { - #[derive(Clone, Copy, Debug, Eq, PartialEq)] - struct I(i32); - - impl DropNotifyEofSignallable for I { - fn eof() -> I { - I(0) - } - fn is_eof(&self) -> bool { - self.0 == 0 - } - } - - let (s, r) = postage::watch::channel_with(I(20)); - let s = DropNotifyWatchSender::new(s); - - assert_eq!(*r.borrow(), I(20)); - drop(s); - assert_eq!(*r.borrow(), I(0)); - - let (s, r) = postage::watch::channel_with(I(44)); - let s = DropNotifyWatchSender::new(s); - - assert_eq!(*r.borrow(), I(44)); - drop(s.into_inner()); - assert_eq!(*r.borrow(), I(44)); - } } diff --git a/crates/tor-async-utils/src/watch.rs b/crates/tor-async-utils/src/watch.rs new file mode 100644 index 000000000..57b2b3e4f --- /dev/null +++ b/crates/tor-async-utils/src/watch.rs @@ -0,0 +1,207 @@ +//! Extension trait for more efficient use of [`postage::watch`]. +use std::ops::{Deref, DerefMut}; +use void::{ResultVoidExt as _, Void}; + +/// Extension trait for some `postage::watch::Sender` to provide `maybe_send` +/// +/// Ideally these, or something like them, would be upstream: +/// See . +/// +/// We provide this as an extension trait became the implementation is a bit fiddly. +/// This lets us concentrate on the actual logic, when we use it. +pub trait PostageWatchSenderExt { + /// Update, by calling a fallible function, sending only if necessary + /// + /// Calls `update` on the current value in the watch, to obtain a new value. + /// If the new value doesn't compare equal, updates the watch, notifying receivers. + fn try_maybe_send(&mut self, update: F) -> Result<(), E> + where + T: PartialEq, + F: FnOnce(&T) -> Result; + + /// Update, by calling a function, sending only if necessary + /// + /// Calls `update` on the current value in the watch, to obtain a new value. + /// If the new value doesn't compare equal, updates the watch, notifying receivers. + fn maybe_send(&mut self, update: F) + where + T: PartialEq, + F: FnOnce(&T) -> T, + { + self.try_maybe_send(|t| Ok::<_, Void>(update(t))) + .void_unwrap(); + } +} + +impl PostageWatchSenderExt for postage::watch::Sender { + fn try_maybe_send(&mut self, update: F) -> Result<(), E> + where + T: PartialEq, + F: FnOnce(&T) -> Result, + { + let lock = self.borrow(); + let new = update(&*lock)?; + if new != *lock { + // We must drop the lock guard, because otherwise borrow_mut will deadlock. + // There is no race, because we hold &mut self, so no-one else can get a look in. + // (postage::watch::Sender is not one of those facilities which is mereely a + // handle, and Clone.) + drop(lock); + *self.borrow_mut() = new; + } + Ok(()) + } +} + +#[derive(Debug)] +/// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped +/// +/// Derefs to the inner `Sender`. +/// +/// Ideally this would be behaviour promised by upstream, or something +/// See . +pub struct DropNotifyWatchSender(Option>); + +/// Values that can signal EOF +/// +/// Implemented for `Option`, which is usually what you want to use. +pub trait DropNotifyEofSignallable { + /// Generate the EOF value + fn eof() -> Self; + + /// Does this value indicate EOF + fn is_eof(&self) -> bool; +} + +impl DropNotifyEofSignallable for Option { + fn eof() -> Self { + None + } + + fn is_eof(&self) -> bool { + self.is_none() + } +} + +impl DropNotifyWatchSender { + /// Arrange to send `T::Default` when `inner` is dropped + pub fn new(inner: postage::watch::Sender) -> Self { + DropNotifyWatchSender(Some(inner)) + } + + /// Unwrap the inner sender, defusing the drop notification + pub fn into_inner(mut self) -> postage::watch::Sender { + self.0.take().expect("inner was None") + } +} + +impl Deref for DropNotifyWatchSender { + type Target = postage::watch::Sender; + fn deref(&self) -> &Self::Target { + self.0.as_ref().expect("inner was None") + } +} + +impl DerefMut for DropNotifyWatchSender { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.as_mut().expect("inner was None") + } +} + +impl Drop for DropNotifyWatchSender { + fn drop(&mut self) { + if let Some(mut inner) = self.0.take() { + // None means into_inner() was called + *inner.borrow_mut() = DropNotifyEofSignallable::eof(); + } + } +} + +#[cfg(test)] +mod test { + // @@ begin test lint list maintained by maint/add_warning @@ + #![allow(clippy::bool_assert_comparison)] + #![allow(clippy::clone_on_copy)] + #![allow(clippy::dbg_macro)] + #![allow(clippy::print_stderr)] + #![allow(clippy::print_stdout)] + #![allow(clippy::single_char_pattern)] + #![allow(clippy::unwrap_used)] + #![allow(clippy::unchecked_duration_subtraction)] + //! + + use super::*; + use futures::select_biased; + use futures_await_test::async_test; + + #[derive(Debug, Eq, PartialEq)] + struct TestError(char); + + #[async_test] + async fn postage_sender_ext() { + use futures::stream::StreamExt; + use futures::FutureExt; + + let (mut s, mut r) = postage::watch::channel_with(20); + // Receiver of a fresh watch wakes once, but let's not rely on this + select_biased! { + i = r.next().fuse() => assert_eq!(i, Some(20)), + _ = futures::future::ready(()) => { }, // tolerate nothing + }; + // Now, not ready + select_biased! { + _ = r.next().fuse() => panic!(), + _ = futures::future::ready(()) => { }, + }; + + s.maybe_send(|i| *i); + // Still not ready + select_biased! { + _ = r.next().fuse() => panic!(), + _ = futures::future::ready(()) => { }, + }; + + s.maybe_send(|i| *i + 1); + // Ready, with 21 + select_biased! { + i = r.next().fuse() => assert_eq!(i, Some(21)), + _ = futures::future::ready(()) => panic!(), + }; + + let () = s.try_maybe_send(|_i| Err(())).unwrap_err(); + // Not ready + select_biased! { + _ = r.next().fuse() => panic!(), + _ = futures::future::ready(()) => { }, + }; + } + + #[async_test] + async fn postage_drop() { + #[derive(Clone, Copy, Debug, Eq, PartialEq)] + struct I(i32); + + impl DropNotifyEofSignallable for I { + fn eof() -> I { + I(0) + } + fn is_eof(&self) -> bool { + self.0 == 0 + } + } + + let (s, r) = postage::watch::channel_with(I(20)); + let s = DropNotifyWatchSender::new(s); + + assert_eq!(*r.borrow(), I(20)); + drop(s); + assert_eq!(*r.borrow(), I(0)); + + let (s, r) = postage::watch::channel_with(I(44)); + let s = DropNotifyWatchSender::new(s); + + assert_eq!(*r.borrow(), I(44)); + drop(s.into_inner()); + assert_eq!(*r.borrow(), I(44)); + } +} diff --git a/crates/tor-basic-utils/Cargo.toml b/crates/tor-basic-utils/Cargo.toml index d0375f6da..17d8bc158 100644 --- a/crates/tor-basic-utils/Cargo.toml +++ b/crates/tor-basic-utils/Cargo.toml @@ -13,16 +13,12 @@ categories = ["rust-patterns"] repository = "https://gitlab.torproject.org/tpo/core/arti.git/" [dependencies] -futures = "0.3.14" hex = "0.4" paste = "1" -pin-project = "1" -postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] } rand = "0.8" rand_chacha = "0.3" slab = "0.4.4" thiserror = "1" -void = "1" [target.'cfg(unix)'.dependencies] libc = { version = "0.2", default-features = false } @@ -30,9 +26,7 @@ libc = { version = "0.2", default-features = false } [dev-dependencies] derive_more = "0.99.3" educe = "0.4.6" -futures-await-test = "0.3.0" serde = { version = "1.0.103", features = ["derive"] } -tokio = { version = "1.7", features = ["macros", "rt", "rt-multi-thread", "time"] } [package.metadata.docs.rs] all-features = true diff --git a/crates/tor-basic-utils/semver.md b/crates/tor-basic-utils/semver.md new file mode 100644 index 000000000..c6135d677 --- /dev/null +++ b/crates/tor-basic-utils/semver.md @@ -0,0 +1 @@ +BREAKING: futures-related functionality moved to tor-async-utils. diff --git a/crates/tor-basic-utils/src/lib.rs b/crates/tor-basic-utils/src/lib.rs index 0e457d037..92ced86c2 100644 --- a/crates/tor-basic-utils/src/lib.rs +++ b/crates/tor-basic-utils/src/lib.rs @@ -42,7 +42,6 @@ use std::collections::BinaryHeap; use std::fmt; use std::mem; -pub mod futures; pub mod iter; pub mod n_key_set; pub mod retry; diff --git a/crates/tor-proto/Cargo.toml b/crates/tor-proto/Cargo.toml index a215b7bd5..67ff5d9a5 100644 --- a/crates/tor-proto/Cargo.toml +++ b/crates/tor-proto/Cargo.toml @@ -21,7 +21,7 @@ hs-client = ["hs-common"] hs-service = ["hs-common"] hs-common = [] experimental-api = ["send-control-msg"] -send-control-msg = ["visibility"] # send_control_message etc.; TODO HS should this be in hs-client? +send-control-msg = ["visibility"] # send_control_message etc.; TODO HS should this be in hs-client? # Enable testing-only APIs. APIs under this feature are not # covered by semver. testing = [] @@ -49,6 +49,7 @@ subtle = "2" thiserror = "1" tokio-crate = { package = "tokio", version = "1.7", optional = true } tokio-util = { version = "0.7.0", features = ["compat"], optional = true } +tor-async-utils = { path = "../tor-async-utils", version = "0.0.1" } tor-basic-utils = { path = "../tor-basic-utils", version = "0.5.0" } tor-bytes = { path = "../tor-bytes", version = "0.6.2" } tor-cell = { path = "../tor-cell", version = "0.9.0" } @@ -75,10 +76,7 @@ itertools = "0.10.1" regex = { version = "1", default-features = false, features = ["std"] } statrs = "0.16.0" tokio-crate = { package = "tokio", version = "1.7", features = ["full"] } -tor-rtcompat = { path = "../tor-rtcompat", version = "0.8.1", features = [ - "tokio", - "native-tls", -] } +tor-rtcompat = { path = "../tor-rtcompat", version = "0.8.1", features = ["tokio", "native-tls"] } [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/crates/tor-proto/src/channel/reactor.rs b/crates/tor-proto/src/channel/reactor.rs index ec6875a7e..9c7e1b517 100644 --- a/crates/tor-proto/src/channel/reactor.rs +++ b/crates/tor-proto/src/channel/reactor.rs @@ -12,7 +12,7 @@ use crate::channel::OpenChanMsgS2C; use crate::circuit::halfcirc::HalfCirc; use crate::util::err::{ChannelClosed, ReactorError}; use crate::{Error, Result}; -use tor_basic_utils::futures::SinkExt as _; +use tor_async_utils::SinkExt as _; use tor_cell::chancell::msg::{Destroy, DestroyReason, PaddingNegotiate}; use tor_cell::chancell::ChanMsg; use tor_cell::chancell::{msg::AnyChanMsg, AnyChanCell, CircId};