Merge branch 'circ_self_by_ref'

This commit is contained in:
Nick Mathewson 2022-01-07 14:48:21 -05:00
commit e9a507af67
5 changed files with 66 additions and 48 deletions

View File

@ -417,7 +417,7 @@ impl<R: Runtime> TorClient<R> {
&self,
exit_ports: &[TargetPort],
flags: &ConnectPrefs,
) -> Result<Arc<ClientCirc>> {
) -> Result<ClientCirc> {
let dir = self.dirmgr.netdir();
let isolation = {

View File

@ -72,7 +72,7 @@ impl<R: Runtime> crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilde
Ok((plan, final_spec))
}
async fn build_circuit(&self, plan: Plan) -> Result<(SupportedCircUsage, Arc<ClientCirc>)> {
async fn build_circuit(&self, plan: Plan) -> Result<(SupportedCircUsage, ClientCirc)> {
use crate::build::GuardStatusHandle;
use tor_guardmgr::GuardStatus;
let Plan {
@ -114,7 +114,7 @@ impl<R: Runtime> crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilde
return Err(Error::Internal("Guard usability status cancelled".into()))
}
}
Ok((final_spec, Arc::new(circuit)))
Ok((final_spec, circuit))
}
Err(e) => {
// The attempt failed; the builder should have set the

View File

@ -281,7 +281,7 @@ impl<R: Runtime> CircMgr<R> {
/// Return a circuit suitable for sending one-hop BEGINDIR streams,
/// launching it if necessary.
pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<Arc<ClientCirc>> {
pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientCirc> {
self.expire_circuits();
let usage = TargetCircUsage::Dir;
self.mgr.get_or_launch(&usage, netdir).await
@ -297,7 +297,7 @@ impl<R: Runtime> CircMgr<R> {
netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
ports: &[TargetPort],
isolation: StreamIsolation,
) -> Result<Arc<ClientCirc>> {
) -> Result<ClientCirc> {
self.expire_circuits();
let time = Instant::now();
{

View File

@ -119,7 +119,7 @@ pub(crate) fn abstract_spec_find_supported<'a, 'b, S: AbstractSpec, C: AbstractC
///
/// From this module's point of view, circuits are simply objects
/// with unique identities, and a possible closed-state.
pub(crate) trait AbstractCirc: Debug {
pub(crate) trait AbstractCirc: Clone + Debug {
/// Type for a unique identifier for circuits.
type Id: Clone + Debug + Hash + Eq + Send + Sync;
/// Return the unique identifier for this circuit.
@ -206,7 +206,7 @@ pub(crate) trait AbstractCircBuilder: Send + Sync {
/// that was originally passed to `plan_circuit`. It _must_ also
/// contain the spec that was originally returned by
/// `plan_circuit`.
async fn build_circuit(&self, plan: Self::Plan) -> Result<(Self::Spec, Arc<Self::Circ>)>;
async fn build_circuit(&self, plan: Self::Plan) -> Result<(Self::Spec, Self::Circ)>;
/// Return a "parallelism factor" with which circuits should be
/// constructed for a given purpose.
@ -285,7 +285,7 @@ pub(crate) struct OpenEntry<S, C> {
/// Current AbstractCircSpec for this circuit's permitted usages.
spec: S,
/// The circuit under management.
circ: Arc<C>,
circ: C,
/// When does this circuit expire?
///
/// (Note that expired circuits are removed from the manager,
@ -296,7 +296,7 @@ pub(crate) struct OpenEntry<S, C> {
impl<S: AbstractSpec, C: AbstractCirc> OpenEntry<S, C> {
/// Make a new OpenEntry for a given circuit and spec.
fn new(spec: S, circ: Arc<C>, expiration: ExpirationInfo) -> Self {
fn new(spec: S, circ: C, expiration: ExpirationInfo) -> Self {
OpenEntry {
spec,
circ,
@ -684,7 +684,7 @@ pub(crate) struct AbstractCircMgr<B: AbstractCircBuilder, R: Runtime> {
/// An action to take in order to satisfy a request for a circuit.
enum Action<B: AbstractCircBuilder> {
/// We found an open circuit: return immediately.
Open(Arc<B::Circ>),
Open(B::Circ),
/// We found one or more pending circuits: wait until one succeeds,
/// or all fail.
Wait(FuturesUnordered<Shared<oneshot::Receiver<PendResult<B>>>>),
@ -735,7 +735,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
self: &Arc<Self>,
usage: &<B::Spec as AbstractSpec>::Usage,
dir: DirInfo<'_>,
) -> Result<Arc<B::Circ>> {
) -> Result<B::Circ> {
let circuit_timing = self.circuit_timing();
let wait_for_circ = circuit_timing.request_timeout;
let timeout_at = self.runtime.now() + wait_for_circ;
@ -838,7 +838,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
// TODO: If we have fewer circuits here than our select
// parallelism, perhaps we should launch more?
return Ok(Action::Open(Arc::clone(&best.circ)));
return Ok(Action::Open(best.circ.clone()));
}
if let Some(pending) = list.find_pending_circs(usage) {
@ -878,7 +878,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
self: Arc<Self>,
act: Action<B>,
usage: &<B::Spec as AbstractSpec>::Usage,
) -> std::result::Result<Arc<B::Circ>, RetryError<Box<Error>>> {
) -> std::result::Result<B::Circ, RetryError<Box<Error>>> {
// Get or make a stream of futures to wait on.
let wait_on_stream = match act {
Action::Open(c) => {
@ -944,7 +944,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
now + self.circuit_timing().max_dirtiness,
);
}
return Ok(Arc::clone(&ent.circ));
return Ok(ent.circ.clone());
}
Err(e) => {
// TODO: as below, improve this log message.
@ -1138,7 +1138,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
/// out to any future requests.
///
/// Return None if we have no circuit with the given ID.
pub(crate) fn take_circ(&self, id: &<B::Circ as AbstractCirc>::Id) -> Option<Arc<B::Circ>> {
pub(crate) fn take_circ(&self, id: &<B::Circ as AbstractCirc>::Id) -> Option<B::Circ> {
let mut list = self.circs.lock().expect("poisoned lock");
list.take_open(id).map(|e| e.circ)
}
@ -1290,6 +1290,12 @@ mod test {
id: FakeId,
}
impl FakeCirc {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl AbstractCirc for FakeCirc {
type Id = FakeId;
fn id(&self) -> FakeId {
@ -1410,14 +1416,14 @@ mod test {
Ok((plan, spec.clone()))
}
async fn build_circuit(&self, plan: FakePlan) -> Result<(FakeSpec, Arc<FakeCirc>)> {
async fn build_circuit(&self, plan: FakePlan) -> Result<(FakeSpec, FakeCirc)> {
let op = plan.op;
let sl = self.runtime.sleep(FAKE_CIRC_DELAY);
self.runtime.allow_one_advance(FAKE_CIRC_DELAY);
sl.await;
match op {
FakeOp::Succeed => Ok((plan.spec, Arc::new(FakeCirc { id: FakeId::next() }))),
FakeOp::WrongSpec(s) => Ok((s, Arc::new(FakeCirc { id: FakeId::next() }))),
FakeOp::Succeed => Ok((plan.spec, FakeCirc { id: FakeId::next() })),
FakeOp::WrongSpec(s) => Ok((s, FakeCirc { id: FakeId::next() })),
FakeOp::Fail => Err(Error::PendingFailed),
FakeOp::Delay(d) => {
let sl = self.runtime.sleep(d);
@ -1499,7 +1505,7 @@ mod test {
let c2 = mgr.get_or_launch(&port80, di()).await;
let c2 = c2.unwrap();
assert!(Arc::ptr_eq(&c1, &c2));
assert!(FakeCirc::eq(&c1, &c2));
assert_eq!(mgr.n_circs(), 1);
// Now try launching two circuits "at once" to make sure that our
@ -1517,8 +1523,8 @@ mod test {
let c3 = c3.unwrap();
let c4 = c4.unwrap();
assert!(!Arc::ptr_eq(&c1, &c3));
assert!(Arc::ptr_eq(&c3, &c4));
assert!(!FakeCirc::eq(&c1, &c3));
assert!(FakeCirc::eq(&c3, &c4));
assert_eq!(c3.id(), c4.id());
assert_eq!(mgr.n_circs(), 2);
@ -1526,7 +1532,7 @@ mod test {
// same as c4, so removing c4 will give us None.
let c3_taken = mgr.take_circ(&c3.id()).unwrap();
let now_its_gone = mgr.take_circ(&c4.id());
assert!(Arc::ptr_eq(&c3_taken, &c3));
assert!(FakeCirc::eq(&c3_taken, &c3));
assert!(now_its_gone.is_none());
assert_eq!(mgr.n_circs(), 1);
@ -1534,8 +1540,8 @@ mod test {
// sure we get a different circuit.
let c5 = rt.wait_for(mgr.get_or_launch(&dnsport, di())).await;
let c5 = c5.unwrap();
assert!(!Arc::ptr_eq(&c3, &c5));
assert!(!Arc::ptr_eq(&c4, &c5));
assert!(!FakeCirc::eq(&c3, &c5));
assert!(!FakeCirc::eq(&c4, &c5));
assert_eq!(mgr.n_circs(), 2);
// Now try launch_by_usage.
@ -1705,7 +1711,7 @@ mod test {
let c1 = c1.unwrap();
let c2 = c2.unwrap();
assert!(Arc::ptr_eq(&c1, &c2));
assert!(FakeCirc::eq(&c1, &c2));
});
}
@ -1760,8 +1766,8 @@ mod test {
let c_iso2 = c_iso2.unwrap();
let c_none = c_none.unwrap();
assert!(!Arc::ptr_eq(&c_iso1, &c_iso2));
assert!(Arc::ptr_eq(&c_iso1, &c_none) || Arc::ptr_eq(&c_iso2, &c_none));
assert!(!FakeCirc::eq(&c_iso1, &c_iso2));
assert!(FakeCirc::eq(&c_iso1, &c_none) || FakeCirc::eq(&c_iso2, &c_none));
}
});
}
@ -1800,7 +1806,7 @@ mod test {
.await;
if let (Ok(c1), Ok(c2)) = (c1, c2) {
assert!(Arc::ptr_eq(&c1, &c2));
assert!(FakeCirc::eq(&c1, &c2));
} else {
panic!();
};
@ -1845,7 +1851,7 @@ mod test {
// If we had launched these separately, they wouldn't share
// a circuit.
assert!(Arc::ptr_eq(&c1, &c2));
assert!(FakeCirc::eq(&c1, &c2));
});
}
@ -1900,8 +1906,8 @@ mod test {
let pop2 = pop2.unwrap();
let imap2 = imap2.unwrap();
assert!(!Arc::ptr_eq(&pop2, &pop1));
assert!(Arc::ptr_eq(&imap2, &imap1));
assert!(!FakeCirc::eq(&pop2, &pop1));
assert!(FakeCirc::eq(&imap2, &imap1));
});
}
@ -1935,7 +1941,7 @@ mod test {
#[test]
fn test_find_supported() {
let (ep_none, ep_web, ep_full) = get_exit_policies();
let fake_circ = Arc::new(FakeCirc { id: FakeId::next() });
let fake_circ = FakeCirc { id: FakeId::next() };
let expiration = ExpirationInfo::Unused {
use_before: Instant::now() + Duration::from_secs(60 * 60),
};
@ -1945,7 +1951,7 @@ mod test {
policy: ep_none,
isolation: None,
},
Arc::clone(&fake_circ),
fake_circ.clone(),
expiration.clone(),
);
let mut entry_none_c = entry_none.clone();
@ -1954,7 +1960,7 @@ mod test {
policy: ep_web,
isolation: None,
},
Arc::clone(&fake_circ),
fake_circ.clone(),
expiration.clone(),
);
let mut entry_web_c = entry_web.clone();
@ -1963,7 +1969,7 @@ mod test {
policy: ep_full,
isolation: None,
},
Arc::clone(&fake_circ),
fake_circ,
expiration,
);
let mut entry_full_c = entry_full.clone();

View File

@ -79,6 +79,23 @@ pub const CIRCUIT_BUFFER_SIZE: usize = 128;
#[derive(Clone, Debug)]
/// A circuit that we have constructed over the Tor network.
///
/// This struct is the interface used by the rest of the code, It is fairly
/// cheaply cloneable. None of the public methods need mutable access, since
/// they all actually communicate with the Reactor which contains the primary
/// mutable state, and does the actual work.
//
// Effectively, this struct contains two Arcs: one for `hops` and one for
// `control` (which surely has soemthing Arc-like in it). We cannot unify
// these by putting a single Arc around the whole struct, and passing
// an Arc strong reference to the `Reactor`, because then `control` would
// not be dropped when the last user of the circuit goes away. We could
// make the reactor have a weak reference but weak references are more
// expensive to dereference.
//
// Because of the above, cloning this struct is always going to involve
// two atomic refcount changes/checks. Wrapping it in another Arc would
// be overkill.
pub struct ClientCirc {
/// Number of hops on this circuit.
///
@ -263,11 +280,7 @@ impl ClientCirc {
/// Start a DataStream (anonymized connection) to the given
/// address and port, using a BEGIN cell.
async fn begin_data_stream(
self: Arc<Self>,
msg: RelayMsg,
optimistic: bool,
) -> Result<DataStream> {
async fn begin_data_stream(&self, msg: RelayMsg, optimistic: bool) -> Result<DataStream> {
let (reader, target) = self.begin_stream_impl(msg).await?;
let mut stream = DataStream::new(reader, target);
if !optimistic {
@ -282,7 +295,7 @@ impl ClientCirc {
/// The use of a string for the address is intentional: you should let
/// the remote Tor relay do the hostname lookup for you.
pub async fn begin_stream(
self: Arc<Self>,
&self,
target: &str,
port: u16,
parameters: Option<StreamParameters>,
@ -296,8 +309,7 @@ impl ClientCirc {
/// Start a new stream to the last relay in the circuit, using
/// a BEGIN_DIR cell.
// FIXME(eta): get rid of Arc here!!!
pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
pub async fn begin_dir_stream(&self) -> Result<DataStream> {
// Note that we always open begindir connections optimistically.
// Since they are local to a relay that we've already authenticated
// with and built a circuit to, there should be no additional checks
@ -310,7 +322,7 @@ impl ClientCirc {
///
/// Note that this function does not check for timeouts; that's
/// the caller's responsibility.
pub async fn resolve(self: Arc<Self>, hostname: &str) -> Result<Vec<IpAddr>> {
pub async fn resolve(&self, hostname: &str) -> Result<Vec<IpAddr>> {
let resolve_msg = Resolve::new(hostname);
let resolved_msg = self.try_resolve(resolve_msg).await?;
@ -331,7 +343,7 @@ impl ClientCirc {
///
/// Note that this function does not check for timeouts; that's
/// the caller's responsibility.
pub async fn resolve_ptr(self: Arc<Self>, addr: IpAddr) -> Result<Vec<String>> {
pub async fn resolve_ptr(&self, addr: IpAddr) -> Result<Vec<String>> {
let resolve_ptr_msg = Resolve::new_reverse(&addr);
let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
@ -352,7 +364,7 @@ impl ClientCirc {
/// Helper: Send the resolve message, and read resolved message from
/// resolve stream.
async fn try_resolve(self: &Arc<Self>, msg: Resolve) -> Result<Resolved> {
async fn try_resolve(&self, msg: Resolve) -> Result<Resolved> {
let (reader, _) = self.begin_stream_impl(msg.into()).await?;
let mut resolve_stream = ResolveStream::new(reader);
resolve_stream.read_msg().await
@ -1107,7 +1119,7 @@ mod test {
let begin_and_send_fut = async move {
// Here we'll say we've got a circuit, and we want to
// make a simple BEGINDIR request with it.
let mut stream = Arc::new(circ).begin_dir_stream().await.unwrap();
let mut stream = circ.begin_dir_stream().await.unwrap();
stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
stream.flush().await.unwrap();
let mut buf = [0_u8; 1024];
@ -1181,7 +1193,7 @@ mod test {
let (chan, mut rx, sink2) = working_fake_channel(rt);
let (circ, mut sink) = newcirc(rt, chan).await;
let circ_clone = Arc::new(circ.clone());
let circ_clone = circ.clone();
let begin_and_send_fut = async move {
// Take our circuit and make a stream on it.
let mut stream = circ_clone