Move functionality from tor_basic_utils to tor_async_utils

This commit is mostly code movement; I'd recommend reviewing it
with git's `--color-moved` option.
This commit is contained in:
Nick Mathewson 2023-03-29 09:17:13 -04:00
parent 1ee4a98a27
commit a62affd66e
12 changed files with 237 additions and 211 deletions

12
Cargo.lock generated
View File

@ -176,6 +176,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tor-async-utils",
"tor-basic-utils",
"tor-cell",
"tor-chanmgr",
@ -3610,8 +3611,12 @@ name = "tor-async-utils"
version = "0.0.1"
dependencies = [
"futures",
"futures-await-test",
"pin-project",
"postage",
"thiserror",
"tokio",
"void",
]
[[package]]
@ -3620,20 +3625,14 @@ version = "0.5.1"
dependencies = [
"derive_more",
"educe",
"futures",
"futures-await-test",
"hex",
"libc",
"paste",
"pin-project",
"postage",
"rand 0.8.5",
"rand_chacha 0.3.1",
"serde",
"slab",
"thiserror",
"tokio",
"void",
]
[[package]]
@ -4238,6 +4237,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"tor-async-utils",
"tor-basic-utils",
"tor-bytes",
"tor-cell",

View File

@ -85,15 +85,14 @@ postage = { version = "0.5.0", default-features = false, features = ["futures-tr
safelog = { path = "../safelog", version = "0.2.1" }
serde = { version = "1.0.103", features = ["derive"] }
thiserror = "1"
tor-async-utils = { path = "../tor-async-utils", version = "0.0.1" }
tor-basic-utils = { path = "../tor-basic-utils", version = "0.5.0" }
tor-cell = { path = "../tor-cell", version = "0.9.0" }
tor-chanmgr = { path = "../tor-chanmgr", version = "0.8.2" }
tor-checkable = { path = "../tor-checkable", version = "0.4.2" }
tor-circmgr = { path = "../tor-circmgr", version = "0.7.2" }
tor-config = { path = "../tor-config", version = "0.7.2" }
tor-dirmgr = { path = "../tor-dirmgr", version = "0.9.2", default-features = false, features = [
"mmap",
] }
tor-dirmgr = { path = "../tor-dirmgr", version = "0.9.2", default-features = false, features = ["mmap"] }
tor-error = { path = "../tor-error", version = "0.4.1" }
tor-guardmgr = { path = "../tor-guardmgr", version = "0.8.2" }
tor-hsclient = { path = "../tor-hsclient", version = "0.1.1", optional = true }

View File

@ -8,7 +8,7 @@ use crate::address::{IntoTorAddr, ResolveInstructions, StreamInstructions};
use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig};
use safelog::{sensitive, Sensitive};
use tor_basic_utils::futures::{DropNotifyWatchSender, PostageWatchSenderExt};
use tor_async_utils::{DropNotifyWatchSender, PostageWatchSenderExt};
use tor_circmgr::isolation::{Isolation, StreamIsolation};
use tor_circmgr::{isolation::StreamIsolationBuilder, IsolationToken, TargetPort};
use tor_config::MutCfg;

View File

@ -15,5 +15,11 @@ repository = "https://gitlab.torproject.org/tpo/core/arti.git/"
[dependencies]
futures = "0.3.14"
pin-project = "1"
postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] }
thiserror = "1"
void = "1"
[dev-dependencies]
futures-await-test = "0.3.0"
tokio = { version = "1.7", features = ["macros", "rt", "rt-multi-thread", "time"] }

View File

@ -37,3 +37,10 @@
#![allow(clippy::significant_drop_in_scrutinee)] // arti/-/merge_requests/588/#note_2812945
#![allow(clippy::result_large_err)] // temporary workaround for arti#587
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->
mod sinkext;
mod watch;
pub use sinkext::{SinkExt, SinkPrepareSendFuture, SinkSendable};
pub use watch::{DropNotifyEofSignallable, DropNotifyWatchSender, PostageWatchSenderExt};

View File

@ -1,8 +1,7 @@
//! Futures helpers
//! Extension trait for using [`Sink`] more safely.
use std::future::Future;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
@ -10,7 +9,6 @@ use futures::future::FusedFuture;
use futures::ready;
use futures::Sink;
use pin_project::pin_project;
use void::{ResultVoidExt as _, Void};
/// Switch to the nontrivial version of this, to get debugging output on stderr
macro_rules! dprintln { { $f:literal $($a:tt)* } => { () } }
@ -25,7 +23,7 @@ where
///
/// ```
/// # use futures::channel::mpsc;
/// # use tor_basic_utils::futures::SinkExt as _;
/// # use tor_async_utils::SinkExt as _;
/// #
/// # #[tokio::main]
/// # async fn main() -> Result<(),mpsc::SendError> {
@ -134,7 +132,7 @@ where
/// # async fn main() {
/// use futures::select;
/// use futures::{SinkExt as _, StreamExt as _};
/// use tor_basic_utils::futures::SinkExt as _;
/// use tor_async_utils::SinkExt as _;
///
/// let (mut input_w, mut input_r) = futures::channel::mpsc::unbounded::<usize>();
/// let (mut output_w, mut output_r) = futures::channel::mpsc::unbounded::<String>();
@ -414,121 +412,6 @@ where
}
}
/// Extension trait for some `postage::watch::Sender` to provide `maybe_send`
///
/// Ideally these, or something like them, would be upstream:
/// See <https://github.com/austinjones/postage-rs/issues/56>.
///
/// We provide this as an extension trait became the implementation is a bit fiddly.
/// This lets us concentrate on the actual logic, when we use it.
pub trait PostageWatchSenderExt<T> {
/// Update, by calling a fallible function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>;
/// Update, by calling a function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn maybe_send<F>(&mut self, update: F)
where
T: PartialEq,
F: FnOnce(&T) -> T,
{
self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
.void_unwrap();
}
}
impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>,
{
let lock = self.borrow();
let new = update(&*lock)?;
if new != *lock {
// We must drop the lock guard, because otherwise borrow_mut will deadlock.
// There is no race, because we hold &mut self, so no-one else can get a look in.
// (postage::watch::Sender is not one of those facilities which is mereely a
// handle, and Clone.)
drop(lock);
*self.borrow_mut() = new;
}
Ok(())
}
}
#[derive(Debug)]
/// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped
///
/// Derefs to the inner `Sender`.
///
/// Ideally this would be behaviour promised by upstream, or something
/// See <https://github.com/austinjones/postage-rs/issues/57>.
pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
/// Values that can signal EOF
///
/// Implemented for `Option`, which is usually what you want to use.
pub trait DropNotifyEofSignallable {
/// Generate the EOF value
fn eof() -> Self;
/// Does this value indicate EOF
fn is_eof(&self) -> bool;
}
impl<T> DropNotifyEofSignallable for Option<T> {
fn eof() -> Self {
None
}
fn is_eof(&self) -> bool {
self.is_none()
}
}
impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
/// Arrange to send `T::Default` when `inner` is dropped
pub fn new(inner: postage::watch::Sender<T>) -> Self {
DropNotifyWatchSender(Some(inner))
}
/// Unwrap the inner sender, defusing the drop notification
pub fn into_inner(mut self) -> postage::watch::Sender<T> {
self.0.take().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
type Target = postage::watch::Sender<T>;
fn deref(&self) -> &Self::Target {
self.0.as_ref().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
fn drop(&mut self) {
if let Some(mut inner) = self.0.take() {
// None means into_inner() was called
*inner.borrow_mut() = DropNotifyEofSignallable::eof();
}
}
}
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
@ -659,72 +542,4 @@ mod test {
assert_eq!(*sunk.lock().unwrap(), &[42, 43]);
}
}
#[async_test]
async fn postage_sender_ext() {
use futures::stream::StreamExt;
use futures::FutureExt;
let (mut s, mut r) = postage::watch::channel_with(20);
// Receiver of a fresh watch wakes once, but let's not rely on this
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(20)),
_ = futures::future::ready(()) => { }, // tolerate nothing
};
// Now, not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i);
// Still not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i + 1);
// Ready, with 21
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(21)),
_ = futures::future::ready(()) => panic!(),
};
let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
// Not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
}
#[async_test]
async fn postage_drop() {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct I(i32);
impl DropNotifyEofSignallable for I {
fn eof() -> I {
I(0)
}
fn is_eof(&self) -> bool {
self.0 == 0
}
}
let (s, r) = postage::watch::channel_with(I(20));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(20));
drop(s);
assert_eq!(*r.borrow(), I(0));
let (s, r) = postage::watch::channel_with(I(44));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(44));
drop(s.into_inner());
assert_eq!(*r.borrow(), I(44));
}
}

View File

@ -0,0 +1,207 @@
//! Extension trait for more efficient use of [`postage::watch`].
use std::ops::{Deref, DerefMut};
use void::{ResultVoidExt as _, Void};
/// Extension trait for some `postage::watch::Sender` to provide `maybe_send`
///
/// Ideally these, or something like them, would be upstream:
/// See <https://github.com/austinjones/postage-rs/issues/56>.
///
/// We provide this as an extension trait became the implementation is a bit fiddly.
/// This lets us concentrate on the actual logic, when we use it.
pub trait PostageWatchSenderExt<T> {
/// Update, by calling a fallible function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>;
/// Update, by calling a function, sending only if necessary
///
/// Calls `update` on the current value in the watch, to obtain a new value.
/// If the new value doesn't compare equal, updates the watch, notifying receivers.
fn maybe_send<F>(&mut self, update: F)
where
T: PartialEq,
F: FnOnce(&T) -> T,
{
self.try_maybe_send(|t| Ok::<_, Void>(update(t)))
.void_unwrap();
}
}
impl<T> PostageWatchSenderExt<T> for postage::watch::Sender<T> {
fn try_maybe_send<F, E>(&mut self, update: F) -> Result<(), E>
where
T: PartialEq,
F: FnOnce(&T) -> Result<T, E>,
{
let lock = self.borrow();
let new = update(&*lock)?;
if new != *lock {
// We must drop the lock guard, because otherwise borrow_mut will deadlock.
// There is no race, because we hold &mut self, so no-one else can get a look in.
// (postage::watch::Sender is not one of those facilities which is mereely a
// handle, and Clone.)
drop(lock);
*self.borrow_mut() = new;
}
Ok(())
}
}
#[derive(Debug)]
/// Wrapper for `postage::watch::Sender` that sends `DropNotifyEof::eof()` when dropped
///
/// Derefs to the inner `Sender`.
///
/// Ideally this would be behaviour promised by upstream, or something
/// See <https://github.com/austinjones/postage-rs/issues/57>.
pub struct DropNotifyWatchSender<T: DropNotifyEofSignallable>(Option<postage::watch::Sender<T>>);
/// Values that can signal EOF
///
/// Implemented for `Option`, which is usually what you want to use.
pub trait DropNotifyEofSignallable {
/// Generate the EOF value
fn eof() -> Self;
/// Does this value indicate EOF
fn is_eof(&self) -> bool;
}
impl<T> DropNotifyEofSignallable for Option<T> {
fn eof() -> Self {
None
}
fn is_eof(&self) -> bool {
self.is_none()
}
}
impl<T: DropNotifyEofSignallable> DropNotifyWatchSender<T> {
/// Arrange to send `T::Default` when `inner` is dropped
pub fn new(inner: postage::watch::Sender<T>) -> Self {
DropNotifyWatchSender(Some(inner))
}
/// Unwrap the inner sender, defusing the drop notification
pub fn into_inner(mut self) -> postage::watch::Sender<T> {
self.0.take().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Deref for DropNotifyWatchSender<T> {
type Target = postage::watch::Sender<T>;
fn deref(&self) -> &Self::Target {
self.0.as_ref().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> DerefMut for DropNotifyWatchSender<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().expect("inner was None")
}
}
impl<T: DropNotifyEofSignallable> Drop for DropNotifyWatchSender<T> {
fn drop(&mut self) {
if let Some(mut inner) = self.0.take() {
// None means into_inner() was called
*inner.borrow_mut() = DropNotifyEofSignallable::eof();
}
}
}
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use futures::select_biased;
use futures_await_test::async_test;
#[derive(Debug, Eq, PartialEq)]
struct TestError(char);
#[async_test]
async fn postage_sender_ext() {
use futures::stream::StreamExt;
use futures::FutureExt;
let (mut s, mut r) = postage::watch::channel_with(20);
// Receiver of a fresh watch wakes once, but let's not rely on this
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(20)),
_ = futures::future::ready(()) => { }, // tolerate nothing
};
// Now, not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i);
// Still not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
s.maybe_send(|i| *i + 1);
// Ready, with 21
select_biased! {
i = r.next().fuse() => assert_eq!(i, Some(21)),
_ = futures::future::ready(()) => panic!(),
};
let () = s.try_maybe_send(|_i| Err(())).unwrap_err();
// Not ready
select_biased! {
_ = r.next().fuse() => panic!(),
_ = futures::future::ready(()) => { },
};
}
#[async_test]
async fn postage_drop() {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct I(i32);
impl DropNotifyEofSignallable for I {
fn eof() -> I {
I(0)
}
fn is_eof(&self) -> bool {
self.0 == 0
}
}
let (s, r) = postage::watch::channel_with(I(20));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(20));
drop(s);
assert_eq!(*r.borrow(), I(0));
let (s, r) = postage::watch::channel_with(I(44));
let s = DropNotifyWatchSender::new(s);
assert_eq!(*r.borrow(), I(44));
drop(s.into_inner());
assert_eq!(*r.borrow(), I(44));
}
}

View File

@ -13,16 +13,12 @@ categories = ["rust-patterns"]
repository = "https://gitlab.torproject.org/tpo/core/arti.git/"
[dependencies]
futures = "0.3.14"
hex = "0.4"
paste = "1"
pin-project = "1"
postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] }
rand = "0.8"
rand_chacha = "0.3"
slab = "0.4.4"
thiserror = "1"
void = "1"
[target.'cfg(unix)'.dependencies]
libc = { version = "0.2", default-features = false }
@ -30,9 +26,7 @@ libc = { version = "0.2", default-features = false }
[dev-dependencies]
derive_more = "0.99.3"
educe = "0.4.6"
futures-await-test = "0.3.0"
serde = { version = "1.0.103", features = ["derive"] }
tokio = { version = "1.7", features = ["macros", "rt", "rt-multi-thread", "time"] }
[package.metadata.docs.rs]
all-features = true

View File

@ -0,0 +1 @@
BREAKING: futures-related functionality moved to tor-async-utils.

View File

@ -42,7 +42,6 @@ use std::collections::BinaryHeap;
use std::fmt;
use std::mem;
pub mod futures;
pub mod iter;
pub mod n_key_set;
pub mod retry;

View File

@ -21,7 +21,7 @@ hs-client = ["hs-common"]
hs-service = ["hs-common"]
hs-common = []
experimental-api = ["send-control-msg"]
send-control-msg = ["visibility"] # send_control_message etc.; TODO HS should this be in hs-client?
send-control-msg = ["visibility"] # send_control_message etc.; TODO HS should this be in hs-client?
# Enable testing-only APIs. APIs under this feature are not
# covered by semver.
testing = []
@ -49,6 +49,7 @@ subtle = "2"
thiserror = "1"
tokio-crate = { package = "tokio", version = "1.7", optional = true }
tokio-util = { version = "0.7.0", features = ["compat"], optional = true }
tor-async-utils = { path = "../tor-async-utils", version = "0.0.1" }
tor-basic-utils = { path = "../tor-basic-utils", version = "0.5.0" }
tor-bytes = { path = "../tor-bytes", version = "0.6.2" }
tor-cell = { path = "../tor-cell", version = "0.9.0" }
@ -75,10 +76,7 @@ itertools = "0.10.1"
regex = { version = "1", default-features = false, features = ["std"] }
statrs = "0.16.0"
tokio-crate = { package = "tokio", version = "1.7", features = ["full"] }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.8.1", features = [
"tokio",
"native-tls",
] }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.8.1", features = ["tokio", "native-tls"] }
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

View File

@ -12,7 +12,7 @@ use crate::channel::OpenChanMsgS2C;
use crate::circuit::halfcirc::HalfCirc;
use crate::util::err::{ChannelClosed, ReactorError};
use crate::{Error, Result};
use tor_basic_utils::futures::SinkExt as _;
use tor_async_utils::SinkExt as _;
use tor_cell::chancell::msg::{Destroy, DestroyReason, PaddingNegotiate};
use tor_cell::chancell::ChanMsg;
use tor_cell::chancell::{msg::AnyChanMsg, AnyChanCell, CircId};