replace Arc with Box and use dyn-clone

this also removes JoinResult
This commit is contained in:
trinity-1686a 2022-03-12 18:33:25 +01:00
parent 43abd119cf
commit cb00ac677b
4 changed files with 69 additions and 95 deletions

7
Cargo.lock generated
View File

@ -986,6 +986,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650"
[[package]]
name = "dyn-clone"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee2626afccd7561a06cf1367e2950c4718ea04565e20fb5029b6c7d8ad09abcf"
[[package]]
name = "easy-parallel"
version = "3.2.0"
@ -3231,6 +3237,7 @@ dependencies = [
"bounded-vec-deque",
"derive_builder",
"downcast-rs",
"dyn-clone",
"educe",
"futures",
"futures-await-test",

View File

@ -7,6 +7,7 @@
use crate::address::IntoTorAddr;
use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig};
use tor_circmgr::isolation::Isolation;
use tor_circmgr::{DirInfo, IsolationToken, StreamIsolationBuilder, TargetPort};
use tor_config::MutCfg;
use tor_dirmgr::DirEvent;
@ -16,7 +17,6 @@ use tor_proto::stream::{DataStream, IpVersionPreference, StreamParameters};
use tor_rtcompat::{PreferredRuntime, Runtime, SleepProviderExt};
use educe::Educe;
use futures::future::Either;
use futures::lock::Mutex as AsyncMutex;
use futures::stream::StreamExt;
use futures::task::SpawnExt;
@ -104,41 +104,31 @@ pub enum BootstrapBehavior {
}
/// Preferences for how to route a stream over the Tor network.
#[derive(Debug, Clone)]
pub struct StreamPrefs<T = IsolationToken> {
#[derive(Debug, Default, Clone)]
pub struct StreamPrefs {
/// What kind of IPv6/IPv4 we'd prefer, and how strongly.
ip_ver_pref: IpVersionPreference,
/// How should we isolate connection(s) ?
isolation: StreamIsolationPreference<T>,
isolation: StreamIsolationPreference,
/// Whether to return the stream optimistically.
optimistic_stream: bool,
}
impl<T> Default for StreamPrefs<T> {
fn default() -> Self {
StreamPrefs {
ip_ver_pref: Default::default(),
isolation: Default::default(),
optimistic_stream: Default::default(),
}
}
}
/// Record of how we are isolating connections
#[derive(Debug, Clone, Educe)]
#[educe(Default)]
enum StreamIsolationPreference<T> {
enum StreamIsolationPreference {
/// No additional isolation
#[educe(Default)]
None,
/// Id of the isolation group the connection should be part of
/// TODO
Explicit(Arc<T>),
Explicit(Box<dyn Isolation>),
/// Isolate every connection!
EveryStream,
}
impl<T> StreamPrefs<T> {
impl StreamPrefs {
/// Construct a new StreamPrefs.
pub fn new() -> Self {
Self::default()
@ -218,6 +208,20 @@ impl<T> StreamPrefs<T> {
params
}
/// Indicate that connections with these preferences should have their own isolation group
///
/// This is a convenience method which creates a fresh [`IsolationToken`]
/// and sets it for these preferences.
///
/// This connection preference is orthogonal to isolation established by
/// [`TorClient::isolated_client`]. Connections made with an `isolated_client` (and its
/// clones) will not share circuits with the original client, even if the same
/// `isolation_group` is specified via the `ConnectionPrefs` in force.
pub fn new_isolation_group(&mut self) -> &mut Self {
self.isolation = StreamIsolationPreference::Explicit(Box::new(IsolationToken::new()));
self
}
/// Indicate which other connections might use the same circuit
/// as this one.
///
@ -228,8 +232,8 @@ impl<T> StreamPrefs<T> {
/// [`TorClient::isolated_client`]. Connections made with an `isolated_client` (and its
/// clones) will not share circuits with the original client, even if the same
/// `isolation_group` is specified via the `ConnectionPrefs` in force.
pub fn set_isolation_group(&mut self, isolation_group: T) -> &mut Self {
self.isolation = StreamIsolationPreference::Explicit(Arc::new(isolation_group));
pub fn set_isolation_group<T: Isolation>(&mut self, isolation_group: T) -> &mut Self {
self.isolation = StreamIsolationPreference::Explicit(Box::new(isolation_group));
self
}
@ -253,34 +257,18 @@ impl<T> StreamPrefs<T> {
/// Return a token to describe which connections might use
/// the same circuit as this one.
fn isolation_group(&self) -> Option<Either<Arc<T>, IsolationToken>> {
fn isolation_group(&self) -> Option<Box<dyn Isolation>> {
use StreamIsolationPreference as SIP;
match self.isolation {
SIP::None => None,
SIP::Explicit(ref ig) => Some(Either::Left(ig.clone())),
SIP::EveryStream => Some(Either::Right(IsolationToken::new())),
SIP::Explicit(ref ig) => Some(ig.clone()),
SIP::EveryStream => Some(Box::new(IsolationToken::new())),
}
}
// TODO: Add some way to be IPFlexible, and require exit to support both.
}
impl StreamPrefs<IsolationToken> {
/// Indicate that connections with these preferences should have their own isolation group
///
/// This is a convenience method which creates a fresh [`IsolationToken`]
/// and sets it for these preferences.
///
/// This connection preference is orthogonal to isolation established by
/// [`TorClient::isolated_client`]. Connections made with an `isolated_client` (and its
/// clones) will not share circuits with the original client, even if the same
/// `isolation_group` is specified via the `ConnectionPrefs` in force.
pub fn new_isolation_group(&mut self) -> &mut Self {
self.isolation = StreamIsolationPreference::Explicit(Arc::new(IsolationToken::new()));
self
}
}
#[cfg(feature = "tokio")]
impl TorClient<PreferredRuntime> {
/// Bootstrap a connection to the Tor network, using the provided `config`.
@ -674,10 +662,10 @@ impl<R: Runtime> TorClient<R> {
/// Note that because Tor prefers to do DNS resolution on the remote
/// side of the network, this function takes its address as a string.
/// (See [`TorClient::connect()`] for more information.)
pub async fn connect_with_prefs<A: IntoTorAddr, T: tor_circmgr::isolation::Isolation>(
pub async fn connect_with_prefs<A: IntoTorAddr>(
&self,
target: A,
prefs: &StreamPrefs<T>,
prefs: &StreamPrefs,
) -> crate::Result<DataStream> {
let addr = target.into_tor_addr().map_err(wrap_err)?;
addr.enforce_config(&self.addrcfg.get())?;
@ -713,7 +701,7 @@ impl<R: Runtime> TorClient<R> {
//
// This function is private just because we're not sure we want to provide this API.
// https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/250#note_2771238
fn set_stream_prefs(&mut self, connect_prefs: StreamPrefs<IsolationToken>) {
fn set_stream_prefs(&mut self, connect_prefs: StreamPrefs) {
self.connect_prefs = connect_prefs;
}
@ -722,7 +710,7 @@ impl<R: Runtime> TorClient<R> {
/// Connections made with e.g. [`connect`](TorClient::connect) on the returned handle will use
/// `connect_prefs`. This is a convenience wrapper for `clone` and `set_connect_prefs`.
#[must_use]
pub fn clone_with_prefs(&self, connect_prefs: StreamPrefs<IsolationToken>) -> Self {
pub fn clone_with_prefs(&self, connect_prefs: StreamPrefs) -> Self {
let mut result = self.clone();
result.set_stream_prefs(connect_prefs);
result
@ -734,10 +722,10 @@ impl<R: Runtime> TorClient<R> {
}
/// On success, return a list of IP addresses, but use prefs.
pub async fn resolve_with_prefs<T: tor_circmgr::isolation::Isolation>(
pub async fn resolve_with_prefs(
&self,
hostname: &str,
prefs: &StreamPrefs<T>,
prefs: &StreamPrefs,
) -> crate::Result<Vec<IpAddr>> {
let addr = (hostname, 1).into_tor_addr().map_err(wrap_err)?;
addr.enforce_config(&self.addrcfg.get()).map_err(wrap_err)?;
@ -765,10 +753,10 @@ impl<R: Runtime> TorClient<R> {
/// Perform a remote DNS reverse lookup with the provided IP address.
///
/// On success, return a list of hostnames.
pub async fn resolve_ptr_with_prefs<T: tor_circmgr::isolation::Isolation>(
pub async fn resolve_ptr_with_prefs(
&self,
addr: IpAddr,
prefs: &StreamPrefs<T>,
prefs: &StreamPrefs,
) -> crate::Result<Vec<String>> {
let circ = self.get_or_launch_exit_circ(&[], prefs).await?;
@ -817,10 +805,10 @@ impl<R: Runtime> TorClient<R> {
/// Get or launch an exit-suitable circuit with a given set of
/// exit ports.
async fn get_or_launch_exit_circ<T: tor_circmgr::isolation::Isolation>(
async fn get_or_launch_exit_circ(
&self,
exit_ports: &[TargetPort],
prefs: &StreamPrefs<T>,
prefs: &StreamPrefs,
) -> StdResult<ClientCirc, ErrorDetail> {
self.wait_for_bootstrap().await?;
let dir = self
@ -836,10 +824,7 @@ impl<R: Runtime> TorClient<R> {
b.owner_token(self.client_isolation);
// Consider stream isolation too, if it's set.
if let Some(tok) = prefs.isolation_group() {
match tok {
Either::Left(tok) => b.stream_token(tok),
Either::Right(tok) => b.stream_token(Arc::new(tok)),
};
b.stream_token(tok);
}
// Failure should be impossible with this builder.
b.build().expect("Failed to construct StreamIsolation")

View File

@ -35,6 +35,7 @@ async-trait = "0.1.2"
bounded-vec-deque = "0.1"
derive_builder = "0.11"
downcast-rs = "1.2.0"
dyn-clone = "1.0.4"
educe = "0.4.6"
futures = "0.3.14"
humantime-serde = "1.1.1"

View File

@ -1,6 +1,7 @@
//! Code related to tracking what activities a circuit can be used for.
use downcast_rs::{impl_downcast, Downcast};
use dyn_clone::{clone_trait_object, DynClone};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display};
@ -106,15 +107,16 @@ impl Display for TargetPorts {
/// This trait is intended to be used in dyn context. To implement it for your own types,
/// implement [`IsolationHelper`] instead.
// TODO this trait should probably be sealed so the same-type requirement can't be bypassed
pub trait Isolation: Downcast + std::fmt::Debug + Send + Sync + 'static {
pub trait Isolation: Downcast + DynClone + std::fmt::Debug + Send + Sync + 'static {
/// Returns if two [`Isolation`] are compatible.
fn compatible(&self, other: &dyn Isolation) -> bool;
/// Join two [`Isolation`] into the intersection of what each allows.
fn join(&self, other: &dyn Isolation) -> JoinResult;
fn join(&self, other: &dyn Isolation) -> Option<Box<dyn Isolation>>;
}
impl_downcast!(Isolation);
clone_trait_object!(Isolation);
impl<T: IsolationHelper + std::fmt::Debug + Send + Sync + 'static> Isolation for T {
impl<T: IsolationHelper + Clone + std::fmt::Debug + Send + Sync + 'static> Isolation for T {
fn compatible(&self, other: &dyn Isolation) -> bool {
if let Some(other) = other.as_any().downcast_ref() {
self.compatible_same_type(other)
@ -122,39 +124,26 @@ impl<T: IsolationHelper + std::fmt::Debug + Send + Sync + 'static> Isolation for
false
}
}
fn join(&self, other: &dyn Isolation) -> JoinResult {
fn join(&self, other: &dyn Isolation) -> Option<Box<dyn Isolation>> {
if let Some(other) = other.as_any().downcast_ref() {
self.join_same_type(other)
.map(|res| Box::new(res) as Box<dyn Isolation>)
} else {
JoinResult::NoJoin
None
}
}
}
/// Result of an [`Isolation::join`] operation
// rational behind this type: it's not possible to take a `self: &Arc<Self>`, so if the merge would
// result in something identical to `self`, we would need to allocate a new Arc instead of clonning
// the old one.
pub enum JoinResult {
/// The intersection is a new object
New(Arc<dyn Isolation>),
/// The intersection is equals to the `self` param
UseLeft,
/// The intersection is equals to the `other` param
UseRight,
/// No intersection is empty
NoJoin,
}
/// Trait to help implement [`Isolation`]
///
/// This trait is essentially the same as [`Isolation`] with static types. You should
/// implement this trait for types that can represent isolation between streams.
pub trait IsolationHelper {
pub trait IsolationHelper: Sized {
/// Returns whether self and other are compatible.
fn compatible_same_type(&self, other: &Self) -> bool;
/// Join self and other into the intersection of what they allows.
fn join_same_type(&self, other: &Self) -> JoinResult;
fn join_same_type(&self, other: &Self) -> Option<Self>;
}
/// A token used to isolate unrelated streams on different circuits.
@ -242,15 +231,11 @@ impl IsolationHelper for IsolationToken {
fn compatible_same_type(&self, other: &Self) -> bool {
self == other
}
fn join_same_type(&self, other: &Self) -> JoinResult {
fn join_same_type(&self, other: &Self) -> Option<Self> {
if self.compatible_same_type(other) {
// for IsolationToken, any of the three would be correct, but the last one is probably
// slower.
JoinResult::UseLeft
// JoinResult::UseRight
// JoinResult::New(Arc::new(*self))
Some(*self)
} else {
JoinResult::NoJoin
None
}
}
}
@ -262,8 +247,8 @@ impl IsolationHelper for IsolationToken {
#[derive(Clone, Debug, derive_builder::Builder)]
pub struct StreamIsolation {
/// Any isolation token set on the stream.
#[builder(default = "Arc::new(IsolationToken::no_isolation())")]
stream_token: Arc<dyn Isolation>,
#[builder(default = "Box::new(IsolationToken::no_isolation())")]
stream_token: Box<dyn Isolation>,
/// Any additional isolation token set on an object that "owns" this
/// stream. This is typically owned by a `TorClient`.
#[builder(default = "IsolationToken::no_isolation()")]
@ -297,19 +282,15 @@ impl StreamIsolation {
if self.owner_token != other.owner_token {
return None;
}
match self.stream_token.join(other.stream_token.as_ref()) {
JoinResult::New(isolation) => Some(StreamIsolation {
stream_token: isolation,
self.stream_token
.join(other.stream_token.as_ref())
.map(|stream_token| StreamIsolation {
stream_token,
owner_token: self.owner_token,
}),
JoinResult::UseLeft => Some(self.clone()),
JoinResult::UseRight => Some(other.clone()),
JoinResult::NoJoin => None,
}
})
}
}
impl Eq for StreamIsolation {}
impl PartialEq for StreamIsolation {
fn eq(&self, other: &Self) -> bool {
self.may_share_circuit(other)
@ -348,7 +329,7 @@ impl ExitPolicy {
///
/// This type should stay internal to the circmgr crate for now: we'll probably
/// want to refactor it a lot.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq)]
pub(crate) enum TargetCircUsage {
/// Use for BEGINDIR-based non-anonymous directory connections
Dir,
@ -922,7 +903,7 @@ mod test {
let no_isolation = StreamIsolation::no_isolation();
let no_isolation2 = StreamIsolation::builder()
.owner_token(IsolationToken::no_isolation())
.stream_token(Arc::new(IsolationToken::no_isolation()))
.stream_token(Box::new(IsolationToken::no_isolation()))
.build()
.unwrap();
assert_eq!(no_isolation.owner_token, no_isolation2.owner_token);
@ -943,7 +924,7 @@ mod test {
let tok = IsolationToken::new();
let some_isolation = StreamIsolation::builder().owner_token(tok).build().unwrap();
let some_isolation2 = StreamIsolation::builder()
.stream_token(Arc::new(tok))
.stream_token(Box::new(tok))
.build()
.unwrap();
assert!(!no_isolation.may_share_circuit(&some_isolation));