From ee390c423e7df0b678d77a8685f45dc526581910 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 16 May 2023 09:02:09 -0400 Subject: [PATCH] Refactor ClientCirc APIs to use Arc. Now ClientCirc is no longer `Clone`, and the things that need it to be `Clone` instead return and use an Arc We're doing this so that ClientCirc can participate in the RPC system, and so that its semantics are more obvious. Closes #846. Thanks to the type system, this was a much simpler refactoring than I had feared it would be. --- crates/arti-client/src/client.rs | 2 +- crates/tor-circmgr/semver.md | 1 + crates/tor-circmgr/src/build.rs | 24 ++++++++--------- crates/tor-circmgr/src/hspool.rs | 6 ++--- crates/tor-circmgr/src/hspool/pool.rs | 12 ++++----- crates/tor-circmgr/src/impls.rs | 2 +- crates/tor-circmgr/src/lib.rs | 8 +++--- crates/tor-circmgr/src/mgr.rs | 26 +++++++++--------- crates/tor-hsclient/src/connect.rs | 16 ++++++----- crates/tor-hsclient/src/lib.rs | 2 +- crates/tor-hsclient/src/state.rs | 16 +++++------ crates/tor-proto/semver.md | 2 ++ crates/tor-proto/src/circuit.rs | 38 +++++++++++++++------------ crates/tor-proto/src/stream/data.rs | 2 +- 14 files changed, 83 insertions(+), 74 deletions(-) create mode 100644 crates/tor-proto/semver.md diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index 760d0994e..528ac29ba 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -1106,7 +1106,7 @@ impl TorClient { &self, exit_ports: &[TargetPort], prefs: &StreamPrefs, - ) -> StdResult { + ) -> StdResult, ErrorDetail> { // TODO HS probably this netdir ought to be made in connect_with_prefs // like for StreamInstructions::Hs. self.wait_for_bootstrap().await?; diff --git a/crates/tor-circmgr/semver.md b/crates/tor-circmgr/semver.md index 7c1d2c6ea..4f445eb81 100644 --- a/crates/tor-circmgr/semver.md +++ b/crates/tor-circmgr/semver.md @@ -1,2 +1,3 @@ ADDED: Broadened hspool to accept T:CircTarget in place of OwnedCircTarget. +BREAKING: APIs now return and accept Arc diff --git a/crates/tor-circmgr/src/build.rs b/crates/tor-circmgr/src/build.rs index 438b586fa..88008ae40 100644 --- a/crates/tor-circmgr/src/build.rs +++ b/crates/tor-circmgr/src/build.rs @@ -43,7 +43,7 @@ pub(crate) trait Buildable: Sized { ct: &OwnedChanTarget, params: &CircParameters, usage: ChannelUsage, - ) -> Result; + ) -> Result>; /// Launch a new circuit through a given relay, given a circuit target /// `ct` specifying that relay. @@ -54,7 +54,7 @@ pub(crate) trait Buildable: Sized { ct: &OwnedCircTarget, params: &CircParameters, usage: ChannelUsage, - ) -> Result; + ) -> Result>; /// Extend this circuit-like object by one hop, to the location described /// in `ct`. @@ -122,7 +122,7 @@ impl Buildable for ClientCirc { ct: &OwnedChanTarget, params: &CircParameters, usage: ChannelUsage, - ) -> Result { + ) -> Result> { let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?; circ.create_firsthop_fast(params) .await @@ -139,7 +139,7 @@ impl Buildable for ClientCirc { ct: &OwnedCircTarget, params: &CircParameters, usage: ChannelUsage, - ) -> Result { + ) -> Result> { let circ = create_common(chanmgr, rt, ct, guard_status, usage).await?; circ.create_firsthop_ntor(ct, params.clone()) .await @@ -214,7 +214,7 @@ impl Builder { n_hops_built: Arc, guard_status: Arc, usage: ChannelUsage, - ) -> Result { + ) -> Result> { match path { OwnedPath::ChannelOnly(target) => { // If we fail now, it's the guard's fault. @@ -276,7 +276,7 @@ impl Builder { params: &CircParameters, guard_status: Arc, usage: ChannelUsage, - ) -> Result { + ) -> Result> { let action = Action::BuildCircuit { length: path.len() }; let (timeout, abandon_timeout) = self.timeouts.timeouts(&action); let start_time = self.runtime.now(); @@ -420,7 +420,7 @@ impl CircuitBuilder { params: &CircParameters, guard_status: Arc, usage: ChannelUsage, - ) -> Result { + ) -> Result> { self.builder .build_owned(path, params, guard_status, usage) .await @@ -437,7 +437,7 @@ impl CircuitBuilder { path: &TorPath<'_>, params: &CircParameters, usage: ChannelUsage, - ) -> Result { + ) -> Result> { let owned = path.try_into()?; self.build_owned(owned, params, Arc::new(None.into()), usage) .await @@ -710,7 +710,7 @@ mod test { ct: &OwnedChanTarget, _: &CircParameters, _usage: ChannelUsage, - ) -> Result { + ) -> Result> { let (d1, d2) = timeouts_from_chantarget(ct); rt.sleep(d1).await; if !d2.is_zero() { @@ -721,7 +721,7 @@ mod test { hops: vec![RelayIds::from_relay_ids(ct)], onehop: true, }; - Ok(Mutex::new(c)) + Ok(Arc::new(Mutex::new(c))) } async fn create( _: &ChanMgr, @@ -730,7 +730,7 @@ mod test { ct: &OwnedCircTarget, _: &CircParameters, _usage: ChannelUsage, - ) -> Result { + ) -> Result> { let (d1, d2) = timeouts_from_chantarget(ct); rt.sleep(d1).await; if !d2.is_zero() { @@ -741,7 +741,7 @@ mod test { hops: vec![RelayIds::from_relay_ids(ct)], onehop: false, }; - Ok(Mutex::new(c)) + Ok(Arc::new(Mutex::new(c))) } async fn extend( &self, diff --git a/crates/tor-circmgr/src/hspool.rs b/crates/tor-circmgr/src/hspool.rs index cbe40f7ad..f09ec48ee 100644 --- a/crates/tor-circmgr/src/hspool.rs +++ b/crates/tor-circmgr/src/hspool.rs @@ -111,7 +111,7 @@ impl HsCircPool { pub async fn get_or_launch_client_rend<'a>( &self, netdir: &'a NetDir, - ) -> Result<(ClientCirc, Relay<'a>)> { + ) -> Result<(Arc, Relay<'a>)> { // For rendezvous points, clients use 3-hop circuits. let circ = self .take_or_launch_stub_circuit::(netdir, None) @@ -142,7 +142,7 @@ impl HsCircPool { netdir: &NetDir, kind: HsCircKind, target: T, - ) -> Result + ) -> Result> where T: CircTarget, { @@ -218,7 +218,7 @@ impl HsCircPool { &self, netdir: &NetDir, avoid_target: Option<&T>, - ) -> Result + ) -> Result> where T: CircTarget, { diff --git a/crates/tor-circmgr/src/hspool/pool.rs b/crates/tor-circmgr/src/hspool/pool.rs index cd9119772..1da2baa3c 100644 --- a/crates/tor-circmgr/src/hspool/pool.rs +++ b/crates/tor-circmgr/src/hspool/pool.rs @@ -1,6 +1,6 @@ //! An internal pool object that we use to implement HsCircPool. -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use rand::{seq::IteratorRandom, Rng}; use tor_proto::circuit::ClientCirc; @@ -9,7 +9,7 @@ use tor_proto::circuit::ClientCirc; #[derive(Default)] pub(super) struct Pool { /// The collection of circuits themselves, in no particular order. - circuits: Mutex>, + circuits: Mutex>>, } impl Pool { @@ -19,23 +19,23 @@ impl Pool { } /// Add `circ` to this pool - pub(super) fn insert(&self, circ: ClientCirc) { + pub(super) fn insert(&self, circ: Arc) { self.circuits.lock().expect("lock poisoned").push(circ); } /// Remove every circuit from this pool for which `f` returns false. pub(super) fn retain(&self, f: F) where - F: FnMut(&ClientCirc) -> bool, + F: FnMut(&Arc) -> bool, { self.circuits.lock().expect("lock poisoned").retain(f); } /// If there is any circuit in this pool for which `f` returns true, return one such circuit at random, and remove it from the pool. - pub(super) fn take_one_where(&self, rng: &mut R, f: F) -> Option + pub(super) fn take_one_where(&self, rng: &mut R, f: F) -> Option> where R: Rng, - F: Fn(&ClientCirc) -> bool, + F: Fn(&Arc) -> bool, { let mut circuits = self.circuits.lock().expect("lock poisoned"); // TODO HS: This ensures that we take a circuit at random, but at the diff --git a/crates/tor-circmgr/src/impls.rs b/crates/tor-circmgr/src/impls.rs index ba899d79e..d0154e1f4 100644 --- a/crates/tor-circmgr/src/impls.rs +++ b/crates/tor-circmgr/src/impls.rs @@ -78,7 +78,7 @@ impl crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilde Ok((plan, final_spec)) } - async fn build_circuit(&self, plan: Plan) -> Result<(SupportedCircUsage, ClientCirc)> { + async fn build_circuit(&self, plan: Plan) -> Result<(SupportedCircUsage, Arc)> { use crate::build::GuardStatusHandle; use tor_guardmgr::GuardStatus; let Plan { diff --git a/crates/tor-circmgr/src/lib.rs b/crates/tor-circmgr/src/lib.rs index 0fa9b483a..4a5110033 100644 --- a/crates/tor-circmgr/src/lib.rs +++ b/crates/tor-circmgr/src/lib.rs @@ -382,7 +382,7 @@ impl CircMgr { /// Return a circuit suitable for sending one-hop BEGINDIR streams, /// launching it if necessary. - pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result { + pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result> { self.expire_circuits(); let usage = TargetCircUsage::Dir; self.mgr.get_or_launch(&usage, netdir).await.map(|(c, _)| c) @@ -398,7 +398,7 @@ impl CircMgr { netdir: DirInfo<'_>, // TODO: This has to be a NetDir. ports: &[TargetPort], isolation: StreamIsolation, - ) -> Result { + ) -> Result> { self.expire_circuits(); let time = Instant::now(); { @@ -425,7 +425,7 @@ impl CircMgr { pub async fn get_or_launch_dir_specific( &self, target: T, - ) -> Result { + ) -> Result> { self.expire_circuits(); let usage = TargetCircUsage::DirSpecificTarget(target.to_owned()); self.mgr @@ -452,7 +452,7 @@ impl CircMgr { &self, planned_target: Option, dir: &NetDir, - ) -> Result + ) -> Result> where T: IntoOwnedChanTarget, { diff --git a/crates/tor-circmgr/src/mgr.rs b/crates/tor-circmgr/src/mgr.rs index 7d52fe893..81c7cfada 100644 --- a/crates/tor-circmgr/src/mgr.rs +++ b/crates/tor-circmgr/src/mgr.rs @@ -146,7 +146,7 @@ pub(crate) fn abstract_spec_find_supported<'a, 'b, S: AbstractSpec, C: AbstractC /// /// From this module's point of view, circuits are simply objects /// with unique identities, and a possible closed-state. -pub(crate) trait AbstractCirc: Clone + Debug { +pub(crate) trait AbstractCirc: Debug { /// Type for a unique identifier for circuits. type Id: Clone + Debug + Hash + Eq + Send + Sync; /// Return the unique identifier for this circuit. @@ -233,7 +233,7 @@ pub(crate) trait AbstractCircBuilder: Send + Sync { /// that was originally passed to `plan_circuit`. It _must_ also /// contain the spec that was originally returned by /// `plan_circuit`. - async fn build_circuit(&self, plan: Self::Plan) -> Result<(Self::Spec, Self::Circ)>; + async fn build_circuit(&self, plan: Self::Plan) -> Result<(Self::Spec, Arc)>; /// Return a "parallelism factor" with which circuits should be /// constructed for a given purpose. @@ -312,7 +312,7 @@ pub(crate) struct OpenEntry { /// Current AbstractCircSpec for this circuit's permitted usages. spec: S, /// The circuit under management. - circ: C, + circ: Arc, /// When does this circuit expire? /// /// (Note that expired circuits are removed from the manager, @@ -323,7 +323,7 @@ pub(crate) struct OpenEntry { impl OpenEntry { /// Make a new OpenEntry for a given circuit and spec. - fn new(spec: S, circ: C, expiration: ExpirationInfo) -> Self { + fn new(spec: S, circ: Arc, expiration: ExpirationInfo) -> Self { OpenEntry { spec, circ, @@ -744,7 +744,7 @@ pub(crate) struct AbstractCircMgr { /// An action to take in order to satisfy a request for a circuit. enum Action { /// We found an open circuit: return immediately. - Open(B::Circ), + Open(Arc), /// We found one or more pending circuits: wait until one succeeds, /// or all fail. Wait(FuturesUnordered>>>), @@ -795,7 +795,7 @@ impl AbstractCircMgr { self: &Arc, usage: &::Usage, dir: DirInfo<'_>, - ) -> Result<(B::Circ, CircProvenance)> { + ) -> Result<(Arc, CircProvenance)> { /// Return CEIL(a/b). /// /// Requires that a+b is less than usize::MAX. @@ -1024,7 +1024,7 @@ impl AbstractCircMgr { self: Arc, act: Action, usage: &::Usage, - ) -> std::result::Result<(B::Circ, CircProvenance), RetryError>> { + ) -> std::result::Result<(Arc, CircProvenance), RetryError>> { /// Store the error `err` into `retry_err`, as appropriate. fn record_error( retry_err: &mut RetryError>, @@ -1380,7 +1380,7 @@ impl AbstractCircMgr { &self, usage: &::Usage, dir: DirInfo<'_>, - ) -> Result<(::Spec, B::Circ)> { + ) -> Result<(::Spec, Arc)> { let (_, plan) = self.plan_by_usage(dir, usage)?; self.builder.build_circuit(plan.plan).await } @@ -1391,7 +1391,7 @@ impl AbstractCircMgr { /// out to any future requests. /// /// Return None if we have no circuit with the given ID. - pub(crate) fn take_circ(&self, id: &::Id) -> Option { + pub(crate) fn take_circ(&self, id: &::Id) -> Option> { let mut list = self.circs.lock().expect("poisoned lock"); list.take_open(id).map(|e| e.circ) } @@ -1703,14 +1703,14 @@ mod test { Ok((plan, spec.clone())) } - async fn build_circuit(&self, plan: FakePlan) -> Result<(FakeSpec, FakeCirc)> { + async fn build_circuit(&self, plan: FakePlan) -> Result<(FakeSpec, Arc)> { let op = plan.op; let sl = self.runtime.sleep(FAKE_CIRC_DELAY); self.runtime.allow_one_advance(FAKE_CIRC_DELAY); sl.await; match op { - FakeOp::Succeed => Ok((plan.spec, FakeCirc { id: FakeId::next() })), - FakeOp::WrongSpec(s) => Ok((s, FakeCirc { id: FakeId::next() })), + FakeOp::Succeed => Ok((plan.spec, Arc::new(FakeCirc { id: FakeId::next() }))), + FakeOp::WrongSpec(s) => Ok((s, Arc::new(FakeCirc { id: FakeId::next() }))), FakeOp::Fail => Err(Error::CircTimeout), FakeOp::Delay(d) => { let sl = self.runtime.sleep(d); @@ -2240,7 +2240,7 @@ mod test { #[test] fn test_find_supported() { let (ep_none, ep_web, ep_full) = get_exit_policies(); - let fake_circ = FakeCirc { id: FakeId::next() }; + let fake_circ = Arc::new(FakeCirc { id: FakeId::next() }); let expiration = ExpirationInfo::Unused { use_before: Instant::now() + Duration::from_secs(60 * 60), }; diff --git a/crates/tor-hsclient/src/connect.rs b/crates/tor-hsclient/src/connect.rs index f7522ac13..bbe1fa2e9 100644 --- a/crates/tor-hsclient/src/connect.rs +++ b/crates/tor-hsclient/src/connect.rs @@ -66,7 +66,7 @@ pub(crate) async fn connect( hsid: HsId, data: &mut Data, secret_keys: HsClientSecretKeys, -) -> Result { +) -> Result, ConnError> { Context::new( &connector.runtime, &*connector.circpool, @@ -151,7 +151,7 @@ impl<'c, 'd, R: Runtime, M: MocksForConnect> Context<'c, 'd, R, M> { /// /// This function handles all necessary retrying of fallible operations, /// (and, therefore, must also limit the total work done for a particular call). - async fn connect(&mut self) -> Result { + async fn connect(&mut self) -> Result, ConnError> { // This function must do the following, retrying as appropriate. // - Look up the onion descriptor in the state. // - Download the onion descriptor if one isn't there. @@ -376,7 +376,7 @@ trait MockableCircPool { netdir: &NetDir, kind: HsCircKind, target: OwnedCircTarget, - ) -> tor_circmgr::Result; + ) -> tor_circmgr::Result>; } /// Mock for `ClientCirc` #[async_trait] @@ -402,7 +402,7 @@ impl MockableCircPool for HsCircPool { netdir: &NetDir, kind: HsCircKind, target: OwnedCircTarget, - ) -> tor_circmgr::Result { + ) -> tor_circmgr::Result> { self.get_or_launch_specific(netdir, kind, target).await } } @@ -426,7 +426,7 @@ impl MockableConnectorData for Data { hsid: HsId, data: &mut Self, secret_keys: HsClientSecretKeys, - ) -> Result { + ) -> Result, ConnError> { connect(connector, netdir, hsid, data, secret_keys).await } @@ -502,10 +502,12 @@ mod test { _netdir: &NetDir, kind: HsCircKind, target: OwnedCircTarget, - ) -> tor_circmgr::Result { + ) -> tor_circmgr::Result> { assert_eq!(kind, HsCircKind::ClientHsDir); self.mglobal.lock().unwrap().hsdirs_asked.push(target); - Ok(self.clone()) + // Adding the `Arc` here is a little ugly, but that's what we get + // for using the same Mocks for everything. + Ok(Arc::new(self.clone())) } } #[async_trait] diff --git a/crates/tor-hsclient/src/lib.rs b/crates/tor-hsclient/src/lib.rs index f9fd4258d..c12a3fff3 100644 --- a/crates/tor-hsclient/src/lib.rs +++ b/crates/tor-hsclient/src/lib.rs @@ -118,7 +118,7 @@ impl HsClientConnector { hs_id: HsId, secret_keys: HsClientSecretKeys, isolation: StreamIsolation, - ) -> impl Future> + Send + Sync + 'r { + ) -> impl Future, ConnError>> + Send + Sync + 'r { // As in tor-circmgr, we take `StreamIsolation`, to ensure that callers in // arti-client pass us the final overall isolation, // including the per-TorClient isolation. diff --git a/crates/tor-hsclient/src/state.rs b/crates/tor-hsclient/src/state.rs index bf649e086..58fa6999c 100644 --- a/crates/tor-hsclient/src/state.rs +++ b/crates/tor-hsclient/src/state.rs @@ -102,7 +102,7 @@ enum ServiceState { data: D, /// The circuit #[educe(Debug(ignore))] - circuit: D::ClientCirc, + circuit: Arc, /// Last time we touched this, including reuse last_used: Instant, }, @@ -202,7 +202,7 @@ fn obtain_circuit_or_continuation_info( table_index: TableIndex, rechecks: &mut impl Iterator, mut guard: MutexGuard<'_, Services>, -) -> Result, ConnError> { +) -> Result>, ConnError> { let blank_state = || ServiceState::blank(&connector.runtime); for _recheck in rechecks { @@ -383,7 +383,7 @@ impl Services { hs_id: HsId, isolation: Box, secret_keys: HsClientSecretKeys, - ) -> Result { + ) -> Result, ConnError> { let blank_state = || ServiceState::blank(&connector.runtime); let mut rechecks = 0..MAX_RECHECKS; @@ -466,7 +466,7 @@ impl Services { #[async_trait] pub trait MockableConnectorData: Default + Debug + Send + Sync + 'static { /// Client circuit - type ClientCirc: Clone + Sync + Send + 'static; + type ClientCirc: Sync + Send + 'static; /// Mock state type MockGlobalState: Clone + Sync + Send + 'static; @@ -478,7 +478,7 @@ pub trait MockableConnectorData: Default + Debug + Send + Sync + 'static { hsid: HsId, data: &mut Self, secret_keys: HsClientSecretKeys, - ) -> Result; + ) -> Result, ConnError>; /// Is circuit OK? Ie, not `.is_closing()`. fn circuit_is_ok(circuit: &Self::ClientCirc) -> bool; @@ -550,8 +550,8 @@ pub(crate) mod test { _hsid: HsId, _data: &mut MockData, _secret_keys: HsClientSecretKeys, - ) -> Result { - let make = |()| MockCirc::new(); + ) -> Result, E> { + let make = |()| Arc::new(MockCirc::new()); let mut give = connector.mock_for_state.give.clone(); if let Ready(ret) = &*give.borrow() { return ret.clone().map(make); @@ -630,7 +630,7 @@ pub(crate) mod test { id: u8, secret_keys: &HsClientSecretKeys, isolation: Option, - ) -> Result { + ) -> Result, ConnError> { let netdir = tor_netdir::testnet::construct_netdir() .unwrap_if_sufficient() .unwrap(); diff --git a/crates/tor-proto/semver.md b/crates/tor-proto/semver.md new file mode 100644 index 000000000..b2801097f --- /dev/null +++ b/crates/tor-proto/semver.md @@ -0,0 +1,2 @@ +BREAKING: APIs now return and accept Arc + diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index ec4234f1b..f54e36c65 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -91,7 +91,7 @@ pub const CIRCUIT_BUFFER_SIZE: usize = 128; #[cfg_attr(docsrs, doc(cfg(feature = "send-control-msg")))] pub use {msghandler::MsgHandler, reactor::MetaCellDisposition}; -#[derive(Clone, Debug)] +#[derive(Debug)] /// A circuit that we have constructed over the Tor network. /// /// This struct is the interface used by the rest of the code, It is fairly @@ -140,7 +140,7 @@ pub struct PendingClientCirc { /// or a DESTROY cell. recvcreated: oneshot::Receiver, /// The ClientCirc object that we can expose on success. - circ: ClientCirc, + circ: Arc, } /// Description of the network's current rules for building circuits. @@ -208,7 +208,7 @@ pub(crate) struct StreamTarget { /// Channel to send cells down. tx: mpsc::Sender, /// Reference to the circuit that this stream is on. - circ: ClientCirc, + circ: Arc, } impl ClientCirc { @@ -421,7 +421,7 @@ impl ClientCirc { /// The caller will typically want to see the first cell in response, /// to see whether it is e.g. an END or a CONNECTED. async fn begin_stream_impl( - &self, + self: &Arc, begin_msg: AnyRelayMsg, cmd_checker: AnyCmdChecker, ) -> Result<(StreamReader, StreamTarget)> { @@ -469,7 +469,11 @@ impl ClientCirc { /// Start a DataStream (anonymized connection) to the given /// address and port, using a BEGIN cell. - async fn begin_data_stream(&self, msg: AnyRelayMsg, optimistic: bool) -> Result { + async fn begin_data_stream( + self: &Arc, + msg: AnyRelayMsg, + optimistic: bool, + ) -> Result { let (reader, target) = self .begin_stream_impl(msg, DataCmdChecker::new_any()) .await?; @@ -486,7 +490,7 @@ impl ClientCirc { /// The use of a string for the address is intentional: you should let /// the remote Tor relay do the hostname lookup for you. pub async fn begin_stream( - &self, + self: &Arc, target: &str, port: u16, parameters: Option, @@ -501,7 +505,7 @@ impl ClientCirc { /// Start a new stream to the last relay in the circuit, using /// a BEGIN_DIR cell. - pub async fn begin_dir_stream(&self) -> Result { + pub async fn begin_dir_stream(self: Arc) -> Result { // Note that we always open begindir connections optimistically. // Since they are local to a relay that we've already authenticated // with and built a circuit to, there should be no additional checks @@ -515,7 +519,7 @@ impl ClientCirc { /// /// Note that this function does not check for timeouts; that's /// the caller's responsibility. - pub async fn resolve(&self, hostname: &str) -> Result> { + pub async fn resolve(self: &Arc, hostname: &str) -> Result> { let resolve_msg = Resolve::new(hostname); let resolved_msg = self.try_resolve(resolve_msg).await?; @@ -536,7 +540,7 @@ impl ClientCirc { /// /// Note that this function does not check for timeouts; that's /// the caller's responsibility. - pub async fn resolve_ptr(&self, addr: IpAddr) -> Result> { + pub async fn resolve_ptr(self: &Arc, addr: IpAddr) -> Result> { let resolve_ptr_msg = Resolve::new_reverse(&addr); let resolved_msg = self.try_resolve(resolve_ptr_msg).await?; @@ -557,7 +561,7 @@ impl ClientCirc { /// Helper: Send the resolve message, and read resolved message from /// resolve stream. - async fn try_resolve(&self, msg: Resolve) -> Result { + async fn try_resolve(self: &Arc, msg: Resolve) -> Result { let (reader, _) = self .begin_stream_impl(msg.into(), ResolveCmdChecker::new_any()) .await?; @@ -637,7 +641,7 @@ impl PendingClientCirc { let pending = PendingClientCirc { recvcreated: createdreceiver, - circ: circuit, + circ: Arc::new(circuit), }; (pending, reactor) } @@ -654,7 +658,7 @@ impl PendingClientCirc { /// There's no authentication in CRATE_FAST, /// so we don't need to know whom we're connecting to: we're just /// connecting to whichever relay the channel is for. - pub async fn create_firsthop_fast(self, params: &CircParameters) -> Result { + pub async fn create_firsthop_fast(self, params: &CircParameters) -> Result> { let (tx, rx) = oneshot::channel(); self.circ .control @@ -680,7 +684,7 @@ impl PendingClientCirc { self, target: &Tg, params: CircParameters, - ) -> Result + ) -> Result> where Tg: tor_linkspec::CircTarget, { @@ -799,7 +803,7 @@ impl StreamTarget { /// Return a reference to the circuit that this `StreamTarget` is using. #[cfg(feature = "experimental-api")] - pub(crate) fn circuit(&self) -> &ClientCirc { + pub(crate) fn circuit(&self) -> &Arc { &self.circ } } @@ -1028,7 +1032,7 @@ mod test { rt: &R, chan: Channel, next_msg_from: HopNum, - ) -> (ClientCirc, mpsc::Sender) { + ) -> (Arc, mpsc::Sender) { let circid = 128.into(); let (_created_send, created_recv) = oneshot::channel(); let (circmsg_send, circmsg_recv) = mpsc::channel(64); @@ -1070,7 +1074,7 @@ mod test { async fn newcirc( rt: &R, chan: Channel, - ) -> (ClientCirc, mpsc::Sender) { + ) -> (Arc, mpsc::Sender) { newcirc_ext(rt, chan, 2.into()).await } @@ -1372,7 +1376,7 @@ mod test { rt: &R, n_to_send: usize, ) -> ( - ClientCirc, + Arc, DataStream, mpsc::Sender, StreamId, diff --git a/crates/tor-proto/src/stream/data.rs b/crates/tor-proto/src/stream/data.rs index f2e105a02..4b9196651 100644 --- a/crates/tor-proto/src/stream/data.rs +++ b/crates/tor-proto/src/stream/data.rs @@ -109,7 +109,7 @@ pub struct DataStream { /// DataWriterState, but for now we can't actually access that state all the time, /// since it might be inside a boxed future. #[cfg(feature = "experimental-api")] - circuit: ClientCirc, + circuit: std::sync::Arc, } /// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].