diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index 29a6e7bcf..84ddcec5c 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -417,7 +417,7 @@ impl TorClient { &self, exit_ports: &[TargetPort], flags: &ConnectPrefs, - ) -> Result> { + ) -> Result { let dir = self.dirmgr.netdir(); let isolation = { diff --git a/crates/tor-circmgr/src/impls.rs b/crates/tor-circmgr/src/impls.rs index aaee57c42..0ff09dc8e 100644 --- a/crates/tor-circmgr/src/impls.rs +++ b/crates/tor-circmgr/src/impls.rs @@ -72,7 +72,7 @@ impl crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilde Ok((plan, final_spec)) } - async fn build_circuit(&self, plan: Plan) -> Result<(SupportedCircUsage, Arc)> { + async fn build_circuit(&self, plan: Plan) -> Result<(SupportedCircUsage, ClientCirc)> { use crate::build::GuardStatusHandle; use tor_guardmgr::GuardStatus; let Plan { @@ -114,7 +114,7 @@ impl crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilde return Err(Error::Internal("Guard usability status cancelled".into())) } } - Ok((final_spec, Arc::new(circuit))) + Ok((final_spec, circuit)) } Err(e) => { // The attempt failed; the builder should have set the diff --git a/crates/tor-circmgr/src/lib.rs b/crates/tor-circmgr/src/lib.rs index 167aaba21..e03691f99 100644 --- a/crates/tor-circmgr/src/lib.rs +++ b/crates/tor-circmgr/src/lib.rs @@ -281,7 +281,7 @@ impl CircMgr { /// Return a circuit suitable for sending one-hop BEGINDIR streams, /// launching it if necessary. - pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result> { + pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result { self.expire_circuits(); let usage = TargetCircUsage::Dir; self.mgr.get_or_launch(&usage, netdir).await @@ -297,7 +297,7 @@ impl CircMgr { netdir: DirInfo<'_>, // TODO: This has to be a NetDir. ports: &[TargetPort], isolation: StreamIsolation, - ) -> Result> { + ) -> Result { self.expire_circuits(); let time = Instant::now(); { diff --git a/crates/tor-circmgr/src/mgr.rs b/crates/tor-circmgr/src/mgr.rs index af3f0046b..db845f59d 100644 --- a/crates/tor-circmgr/src/mgr.rs +++ b/crates/tor-circmgr/src/mgr.rs @@ -119,7 +119,7 @@ pub(crate) fn abstract_spec_find_supported<'a, 'b, S: AbstractSpec, C: AbstractC /// /// From this module's point of view, circuits are simply objects /// with unique identities, and a possible closed-state. -pub(crate) trait AbstractCirc: Debug { +pub(crate) trait AbstractCirc: Clone + Debug { /// Type for a unique identifier for circuits. type Id: Clone + Debug + Hash + Eq + Send + Sync; /// Return the unique identifier for this circuit. @@ -206,7 +206,7 @@ pub(crate) trait AbstractCircBuilder: Send + Sync { /// that was originally passed to `plan_circuit`. It _must_ also /// contain the spec that was originally returned by /// `plan_circuit`. - async fn build_circuit(&self, plan: Self::Plan) -> Result<(Self::Spec, Arc)>; + async fn build_circuit(&self, plan: Self::Plan) -> Result<(Self::Spec, Self::Circ)>; /// Return a "parallelism factor" with which circuits should be /// constructed for a given purpose. @@ -285,7 +285,7 @@ pub(crate) struct OpenEntry { /// Current AbstractCircSpec for this circuit's permitted usages. spec: S, /// The circuit under management. - circ: Arc, + circ: C, /// When does this circuit expire? /// /// (Note that expired circuits are removed from the manager, @@ -296,7 +296,7 @@ pub(crate) struct OpenEntry { impl OpenEntry { /// Make a new OpenEntry for a given circuit and spec. - fn new(spec: S, circ: Arc, expiration: ExpirationInfo) -> Self { + fn new(spec: S, circ: C, expiration: ExpirationInfo) -> Self { OpenEntry { spec, circ, @@ -684,7 +684,7 @@ pub(crate) struct AbstractCircMgr { /// An action to take in order to satisfy a request for a circuit. enum Action { /// We found an open circuit: return immediately. - Open(Arc), + Open(B::Circ), /// We found one or more pending circuits: wait until one succeeds, /// or all fail. Wait(FuturesUnordered>>>), @@ -735,7 +735,7 @@ impl AbstractCircMgr { self: &Arc, usage: &::Usage, dir: DirInfo<'_>, - ) -> Result> { + ) -> Result { let circuit_timing = self.circuit_timing(); let wait_for_circ = circuit_timing.request_timeout; let timeout_at = self.runtime.now() + wait_for_circ; @@ -838,7 +838,7 @@ impl AbstractCircMgr { // TODO: If we have fewer circuits here than our select // parallelism, perhaps we should launch more? - return Ok(Action::Open(Arc::clone(&best.circ))); + return Ok(Action::Open(best.circ.clone())); } if let Some(pending) = list.find_pending_circs(usage) { @@ -878,7 +878,7 @@ impl AbstractCircMgr { self: Arc, act: Action, usage: &::Usage, - ) -> std::result::Result, RetryError>> { + ) -> std::result::Result>> { // Get or make a stream of futures to wait on. let wait_on_stream = match act { Action::Open(c) => { @@ -944,7 +944,7 @@ impl AbstractCircMgr { now + self.circuit_timing().max_dirtiness, ); } - return Ok(Arc::clone(&ent.circ)); + return Ok(ent.circ.clone()); } Err(e) => { // TODO: as below, improve this log message. @@ -1138,7 +1138,7 @@ impl AbstractCircMgr { /// out to any future requests. /// /// Return None if we have no circuit with the given ID. - pub(crate) fn take_circ(&self, id: &::Id) -> Option> { + pub(crate) fn take_circ(&self, id: &::Id) -> Option { let mut list = self.circs.lock().expect("poisoned lock"); list.take_open(id).map(|e| e.circ) } @@ -1290,6 +1290,12 @@ mod test { id: FakeId, } + impl FakeCirc { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } + } + impl AbstractCirc for FakeCirc { type Id = FakeId; fn id(&self) -> FakeId { @@ -1410,14 +1416,14 @@ mod test { Ok((plan, spec.clone())) } - async fn build_circuit(&self, plan: FakePlan) -> Result<(FakeSpec, Arc)> { + async fn build_circuit(&self, plan: FakePlan) -> Result<(FakeSpec, FakeCirc)> { let op = plan.op; let sl = self.runtime.sleep(FAKE_CIRC_DELAY); self.runtime.allow_one_advance(FAKE_CIRC_DELAY); sl.await; match op { - FakeOp::Succeed => Ok((plan.spec, Arc::new(FakeCirc { id: FakeId::next() }))), - FakeOp::WrongSpec(s) => Ok((s, Arc::new(FakeCirc { id: FakeId::next() }))), + FakeOp::Succeed => Ok((plan.spec, FakeCirc { id: FakeId::next() })), + FakeOp::WrongSpec(s) => Ok((s, FakeCirc { id: FakeId::next() })), FakeOp::Fail => Err(Error::PendingFailed), FakeOp::Delay(d) => { let sl = self.runtime.sleep(d); @@ -1499,7 +1505,7 @@ mod test { let c2 = mgr.get_or_launch(&port80, di()).await; let c2 = c2.unwrap(); - assert!(Arc::ptr_eq(&c1, &c2)); + assert!(FakeCirc::eq(&c1, &c2)); assert_eq!(mgr.n_circs(), 1); // Now try launching two circuits "at once" to make sure that our @@ -1517,8 +1523,8 @@ mod test { let c3 = c3.unwrap(); let c4 = c4.unwrap(); - assert!(!Arc::ptr_eq(&c1, &c3)); - assert!(Arc::ptr_eq(&c3, &c4)); + assert!(!FakeCirc::eq(&c1, &c3)); + assert!(FakeCirc::eq(&c3, &c4)); assert_eq!(c3.id(), c4.id()); assert_eq!(mgr.n_circs(), 2); @@ -1526,7 +1532,7 @@ mod test { // same as c4, so removing c4 will give us None. let c3_taken = mgr.take_circ(&c3.id()).unwrap(); let now_its_gone = mgr.take_circ(&c4.id()); - assert!(Arc::ptr_eq(&c3_taken, &c3)); + assert!(FakeCirc::eq(&c3_taken, &c3)); assert!(now_its_gone.is_none()); assert_eq!(mgr.n_circs(), 1); @@ -1534,8 +1540,8 @@ mod test { // sure we get a different circuit. let c5 = rt.wait_for(mgr.get_or_launch(&dnsport, di())).await; let c5 = c5.unwrap(); - assert!(!Arc::ptr_eq(&c3, &c5)); - assert!(!Arc::ptr_eq(&c4, &c5)); + assert!(!FakeCirc::eq(&c3, &c5)); + assert!(!FakeCirc::eq(&c4, &c5)); assert_eq!(mgr.n_circs(), 2); // Now try launch_by_usage. @@ -1705,7 +1711,7 @@ mod test { let c1 = c1.unwrap(); let c2 = c2.unwrap(); - assert!(Arc::ptr_eq(&c1, &c2)); + assert!(FakeCirc::eq(&c1, &c2)); }); } @@ -1760,8 +1766,8 @@ mod test { let c_iso2 = c_iso2.unwrap(); let c_none = c_none.unwrap(); - assert!(!Arc::ptr_eq(&c_iso1, &c_iso2)); - assert!(Arc::ptr_eq(&c_iso1, &c_none) || Arc::ptr_eq(&c_iso2, &c_none)); + assert!(!FakeCirc::eq(&c_iso1, &c_iso2)); + assert!(FakeCirc::eq(&c_iso1, &c_none) || FakeCirc::eq(&c_iso2, &c_none)); } }); } @@ -1800,7 +1806,7 @@ mod test { .await; if let (Ok(c1), Ok(c2)) = (c1, c2) { - assert!(Arc::ptr_eq(&c1, &c2)); + assert!(FakeCirc::eq(&c1, &c2)); } else { panic!(); }; @@ -1845,7 +1851,7 @@ mod test { // If we had launched these separately, they wouldn't share // a circuit. - assert!(Arc::ptr_eq(&c1, &c2)); + assert!(FakeCirc::eq(&c1, &c2)); }); } @@ -1900,8 +1906,8 @@ mod test { let pop2 = pop2.unwrap(); let imap2 = imap2.unwrap(); - assert!(!Arc::ptr_eq(&pop2, &pop1)); - assert!(Arc::ptr_eq(&imap2, &imap1)); + assert!(!FakeCirc::eq(&pop2, &pop1)); + assert!(FakeCirc::eq(&imap2, &imap1)); }); } @@ -1935,7 +1941,7 @@ mod test { #[test] fn test_find_supported() { let (ep_none, ep_web, ep_full) = get_exit_policies(); - let fake_circ = Arc::new(FakeCirc { id: FakeId::next() }); + let fake_circ = FakeCirc { id: FakeId::next() }; let expiration = ExpirationInfo::Unused { use_before: Instant::now() + Duration::from_secs(60 * 60), }; @@ -1945,7 +1951,7 @@ mod test { policy: ep_none, isolation: None, }, - Arc::clone(&fake_circ), + fake_circ.clone(), expiration.clone(), ); let mut entry_none_c = entry_none.clone(); @@ -1954,7 +1960,7 @@ mod test { policy: ep_web, isolation: None, }, - Arc::clone(&fake_circ), + fake_circ.clone(), expiration.clone(), ); let mut entry_web_c = entry_web.clone(); @@ -1963,7 +1969,7 @@ mod test { policy: ep_full, isolation: None, }, - Arc::clone(&fake_circ), + fake_circ, expiration, ); let mut entry_full_c = entry_full.clone(); diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index c9b1993d1..72d4c8bba 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -79,6 +79,23 @@ pub const CIRCUIT_BUFFER_SIZE: usize = 128; #[derive(Clone, Debug)] /// A circuit that we have constructed over the Tor network. +/// +/// This struct is the interface used by the rest of the code, It is fairly +/// cheaply cloneable. None of the public methods need mutable access, since +/// they all actually communicate with the Reactor which contains the primary +/// mutable state, and does the actual work. +// +// Effectively, this struct contains two Arcs: one for `hops` and one for +// `control` (which surely has soemthing Arc-like in it). We cannot unify +// these by putting a single Arc around the whole struct, and passing +// an Arc strong reference to the `Reactor`, because then `control` would +// not be dropped when the last user of the circuit goes away. We could +// make the reactor have a weak reference but weak references are more +// expensive to dereference. +// +// Because of the above, cloning this struct is always going to involve +// two atomic refcount changes/checks. Wrapping it in another Arc would +// be overkill. pub struct ClientCirc { /// Number of hops on this circuit. /// @@ -263,11 +280,7 @@ impl ClientCirc { /// Start a DataStream (anonymized connection) to the given /// address and port, using a BEGIN cell. - async fn begin_data_stream( - self: Arc, - msg: RelayMsg, - optimistic: bool, - ) -> Result { + async fn begin_data_stream(&self, msg: RelayMsg, optimistic: bool) -> Result { let (reader, target) = self.begin_stream_impl(msg).await?; let mut stream = DataStream::new(reader, target); if !optimistic { @@ -282,7 +295,7 @@ impl ClientCirc { /// The use of a string for the address is intentional: you should let /// the remote Tor relay do the hostname lookup for you. pub async fn begin_stream( - self: Arc, + &self, target: &str, port: u16, parameters: Option, @@ -296,8 +309,7 @@ impl ClientCirc { /// Start a new stream to the last relay in the circuit, using /// a BEGIN_DIR cell. - // FIXME(eta): get rid of Arc here!!! - pub async fn begin_dir_stream(self: Arc) -> Result { + pub async fn begin_dir_stream(&self) -> Result { // Note that we always open begindir connections optimistically. // Since they are local to a relay that we've already authenticated // with and built a circuit to, there should be no additional checks @@ -310,7 +322,7 @@ impl ClientCirc { /// /// Note that this function does not check for timeouts; that's /// the caller's responsibility. - pub async fn resolve(self: Arc, hostname: &str) -> Result> { + pub async fn resolve(&self, hostname: &str) -> Result> { let resolve_msg = Resolve::new(hostname); let resolved_msg = self.try_resolve(resolve_msg).await?; @@ -331,7 +343,7 @@ impl ClientCirc { /// /// Note that this function does not check for timeouts; that's /// the caller's responsibility. - pub async fn resolve_ptr(self: Arc, addr: IpAddr) -> Result> { + pub async fn resolve_ptr(&self, addr: IpAddr) -> Result> { let resolve_ptr_msg = Resolve::new_reverse(&addr); let resolved_msg = self.try_resolve(resolve_ptr_msg).await?; @@ -352,7 +364,7 @@ impl ClientCirc { /// Helper: Send the resolve message, and read resolved message from /// resolve stream. - async fn try_resolve(self: &Arc, msg: Resolve) -> Result { + async fn try_resolve(&self, msg: Resolve) -> Result { let (reader, _) = self.begin_stream_impl(msg.into()).await?; let mut resolve_stream = ResolveStream::new(reader); resolve_stream.read_msg().await @@ -1107,7 +1119,7 @@ mod test { let begin_and_send_fut = async move { // Here we'll say we've got a circuit, and we want to // make a simple BEGINDIR request with it. - let mut stream = Arc::new(circ).begin_dir_stream().await.unwrap(); + let mut stream = circ.begin_dir_stream().await.unwrap(); stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap(); stream.flush().await.unwrap(); let mut buf = [0_u8; 1024]; @@ -1181,7 +1193,7 @@ mod test { let (chan, mut rx, sink2) = working_fake_channel(rt); let (circ, mut sink) = newcirc(rt, chan).await; - let circ_clone = Arc::new(circ.clone()); + let circ_clone = circ.clone(); let begin_and_send_fut = async move { // Take our circuit and make a stream on it. let mut stream = circ_clone