diff --git a/Cargo.lock b/Cargo.lock index 8aed041db..a1a25d2f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -582,6 +582,12 @@ version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" +[[package]] +name = "by_address" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e245704f60eb4eb45810d65cf14eb54d2eb50a6f3715fe2d7cd01ee905c2944f" + [[package]] name = "bytemuck" version = "1.12.1" @@ -3715,6 +3721,7 @@ name = "tor-linkspec" version = "0.5.1" dependencies = [ "base64ct", + "by_address", "cfg-if", "derive_builder_fork_arti", "derive_more", diff --git a/crates/tor-chanmgr/src/builder.rs b/crates/tor-chanmgr/src/builder.rs index 499b4ffe8..8ce0dc5a2 100644 --- a/crates/tor-chanmgr/src/builder.rs +++ b/crates/tor-chanmgr/src/builder.rs @@ -9,8 +9,7 @@ use crate::{event::ChanMgrEventSender, Error}; use std::time::Duration; use tor_error::internal; -use tor_linkspec::{HasChanMethod, HasRelayIds, OwnedChanTarget}; -use tor_llcrypto::pk; +use tor_linkspec::{HasChanMethod, OwnedChanTarget}; use tor_proto::channel::params::ChannelPaddingInstructionsUpdates; use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider}; @@ -208,12 +207,6 @@ where } impl crate::mgr::AbstractChannel for tor_proto::channel::Channel { - type Ident = pk::ed25519::Ed25519Identity; - fn ident(&self) -> &Self::Ident { - self.target() - .ed_identity() - .expect("This channel had an Ed25519 identity when we created it, but now it doesn't!?") - } fn is_usable(&self) -> bool { !self.is_closing() } @@ -221,10 +214,10 @@ impl crate::mgr::AbstractChannel for tor_proto::channel::Channel { self.duration_unused() } fn reparameterize( - &mut self, + &self, updates: Arc, ) -> tor_proto::Result<()> { - self.reparameterize(updates) + tor_proto::channel::Channel::reparameterize(self, updates) } fn engage_padding_activities(&self) { self.engage_padding_activities(); @@ -239,11 +232,11 @@ mod test { mgr::{AbstractChannel, AbstractChannelFactory}, Result, }; - use pk::ed25519::Ed25519Identity; - use pk::rsa::RsaIdentity; use std::net::SocketAddr; use std::time::{Duration, SystemTime}; - use tor_linkspec::ChannelMethod; + use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType}; + use tor_llcrypto::pk::ed25519::Ed25519Identity; + use tor_llcrypto::pk::rsa::RsaIdentity; use tor_proto::channel::Channel; use tor_rtcompat::{test_with_one_runtime, TcpListener}; use tor_rtmock::{io::LocalStream, net::MockNetwork, MockSleepRuntime}; @@ -316,7 +309,7 @@ mod test { ); let chan = r1.unwrap(); - assert_eq!(chan.ident(), &ed); + assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into())); assert!(chan.is_usable()); // In theory, time could pass here, so we can't just use // "assert_eq!(dur_unused, dur_unused2)". diff --git a/crates/tor-chanmgr/src/err.rs b/crates/tor-chanmgr/src/err.rs index b7991402b..93798516c 100644 --- a/crates/tor-chanmgr/src/err.rs +++ b/crates/tor-chanmgr/src/err.rs @@ -79,15 +79,28 @@ pub enum Error { /// A relay did not have the set of identity keys that we expected. /// - /// (Currently, `tor-chanmgr` only works to manage channels with a known - /// expected Ed25519 identity.) + /// (Currently, `tor-chanmgr` only works on relays that have at least + /// one recognized identity key.) #[error("Could not identify relay by identity key")] MissingId, + /// A succesful relay channel had one of the identity keys we wanted, + /// but not the other(s). + /// + /// This means that (assuming the relay is well behaved), we will not + /// find the ID combination we want. + #[error("Relay identity keys were only a partial match for what we wanted.")] + IdentityConflict, + /// Tried to connnect via a transport that we don't support. #[error("No plugin available for the transport {0}")] NoSuchTransport(tor_linkspec::TransportId), + /// An attempt to open a channel failed because it was cancelled or + /// superseded by another request or configuration change. + #[error("Channel request cancelled or superseded")] + RequestCancelled, + /// An internal error of some kind that should never occur. #[error("Internal error")] Internal(#[from] tor_error::Bug), @@ -99,6 +112,12 @@ impl From> for Error { } } +impl From for Error { + fn from(_: tor_linkspec::ByRelayIdsError) -> Self { + Error::MissingId + } +} + impl tor_error::HasKind for Error { fn kind(&self) -> ErrorKind { use tor_proto::Error as ProtoErr; @@ -117,7 +136,9 @@ impl tor_error::HasKind for Error { E::NoSuchTransport(_) => EK::InvalidConfig, E::UnusableTarget(_) | E::Internal(_) => EK::Internal, E::MissingId => EK::BadApiUsage, - Error::ChannelBuild { .. } => EK::TorAccessFailed, + E::IdentityConflict => EK::TorAccessFailed, + E::ChannelBuild { .. } => EK::TorAccessFailed, + E::RequestCancelled => EK::TransientFailure, } } } @@ -147,10 +168,15 @@ impl tor_error::HasRetryTime for Error { // it won't have addresses in the future. E::UnusableTarget(_) => RT::Never, + // This can't succeed until the relay is reconfigured. + E::IdentityConflict => RT::Never, + // This one can't succeed until the bridge, or our set of // transports, is reconfigured. E::NoSuchTransport(_) => RT::Never, + E::RequestCancelled => RT::Never, + // These aren't recoverable at all. E::Spawn { .. } | E::MissingId | E::Internal(_) => RT::Never, } diff --git a/crates/tor-chanmgr/src/lib.rs b/crates/tor-chanmgr/src/lib.rs index d077e35a1..6aab268b6 100644 --- a/crates/tor-chanmgr/src/lib.rs +++ b/crates/tor-chanmgr/src/lib.rs @@ -214,18 +214,9 @@ impl ChanMgr { target: &T, usage: ChannelUsage, ) -> Result<(Channel, ChanProvenance)> { - // TODO(nickm): We will need to change the way that we index our map - // when we eventually support channels that are _not_ primarily - // identified by their ed25519 key. That could be in the distant future - // when we make Ed25519 keys optional: But more likely it will be when - // we implement bridges. - let ed_identity = target.ed_identity().ok_or(Error::MissingId)?; let targetinfo = OwnedChanTarget::from_chan_target(target); - let (chan, provenance) = self - .mgr - .get_or_launch(*ed_identity, targetinfo, usage) - .await?; + let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?; // Double-check the match to make sure that the RSA identity is // what we wanted too. chan.check_match(target) diff --git a/crates/tor-chanmgr/src/mgr.rs b/crates/tor-chanmgr/src/mgr.rs index 99d36d530..13c2339d5 100644 --- a/crates/tor-chanmgr/src/mgr.rs +++ b/crates/tor-chanmgr/src/mgr.rs @@ -1,17 +1,17 @@ //! Abstract implementation of a channel manager -use crate::mgr::map::OpenEntry; +use crate::mgr::map::{OpenEntry, PendingEntry}; use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result}; use async_trait::async_trait; use futures::channel::oneshot; use futures::future::{FutureExt, Shared}; use rand::Rng; -use std::hash::Hash; use std::result::Result as StdResult; use std::sync::Arc; use std::time::Duration; use tor_error::internal; +use tor_linkspec::{HasRelayIds, RelayIds}; use tor_netdir::params::NetParameters; use tor_proto::channel::params::ChannelPaddingInstructionsUpdates; @@ -20,11 +20,7 @@ mod map; /// Trait to describe as much of a /// [`Channel`](tor_proto::channel::Channel) as `AbstractChanMgr` /// needs to use. -pub(crate) trait AbstractChannel: Clone { - /// Identity type for the other side of the channel. - type Ident: Hash + Eq + Clone; - /// Return this channel's identity. - fn ident(&self) -> &Self::Ident; +pub(crate) trait AbstractChannel: Clone + HasRelayIds { /// Return true if this channel is usable. /// /// A channel might be unusable because it is closed, because it has @@ -40,7 +36,7 @@ pub(crate) trait AbstractChannel: Clone { /// The changed parameters may not be implemented "immediately", /// but this will be done "reasonably soon". fn reparameterize( - &mut self, + &self, updates: Arc, ) -> tor_proto::Result<()>; @@ -62,7 +58,7 @@ pub(crate) trait AbstractChannelFactory { /// The type of channel that this factory can build. type Channel: AbstractChannel; /// Type that explains how to build a channel. - type BuildSpec; + type BuildSpec: HasRelayIds; /// Construct a new channel to the destination described at `target`. /// @@ -91,11 +87,11 @@ pub(crate) struct AbstractChanMgr { /// Type alias for a future that we wait on to see when a pending /// channel is done or failed. -type Pending = Shared>>; +type Pending = Shared>>; -/// Type alias for the sender we notify when we complete a channel (or -/// fail to complete it). -type Sending = oneshot::Sender>; +/// Type alias for the sender we notify when we complete a channel (or fail to +/// complete it). +type Sending = oneshot::Sender>; impl AbstractChanMgr { /// Make a new empty channel manager. @@ -119,30 +115,34 @@ impl AbstractChanMgr { /// Helper: return the objects used to inform pending tasks /// about a newly open or failed channel. - fn setup_launch(&self) -> (map::ChannelState, Sending) { + fn setup_launch(&self, ids: RelayIds) -> (map::ChannelState, Sending) { let (snd, rcv) = oneshot::channel(); - let shared = rcv.shared(); - (map::ChannelState::Building(shared), snd) + let pending = rcv.shared(); + ( + map::ChannelState::Building(map::PendingEntry { ids, pending }), + snd, + ) } - /// Get a channel whose identity is `ident`. + /// Get a channel corresponding to the identities of `target`. /// /// If a usable channel exists with that identity, return it. /// /// If no such channel exists already, and none is in progress, - /// launch a new request using `target`, which must match `ident`. + /// launch a new request using `target`. /// /// If no such channel exists already, but we have one that's in /// progress, wait for it to succeed or fail. pub(crate) async fn get_or_launch( &self, - ident: <::Channel as AbstractChannel>::Ident, target: CF::BuildSpec, usage: ChannelUsage, ) -> Result<(CF::Channel, ChanProvenance)> { use ChannelUsage as CU; - let chan = self.get_or_launch_internal(ident, target).await?; + // TODO pt-client: This is not yet used. + + let chan = self.get_or_launch_internal(target).await?; match usage { CU::Dir | CU::UselessCircuit => {} @@ -155,138 +155,278 @@ impl AbstractChanMgr { /// Get a channel whose identity is `ident` - internal implementation async fn get_or_launch_internal( &self, - ident: <::Channel as AbstractChannel>::Ident, target: CF::BuildSpec, ) -> Result<(CF::Channel, ChanProvenance)> { - use map::ChannelState::*; - - /// Possible actions that we'll decide to take based on the - /// channel's initial state. - enum Action { - /// We found no channel. We're going to launch a new one, - /// then tell everybody about it. - Launch(Sending), - /// We found an in-progress attempt at making a channel. - /// We're going to wait for it to finish. - Wait(Pending), - /// We found a usable channel. We're going to return it. - Return(Result<(C, ChanProvenance)>), - } /// How many times do we try? const N_ATTEMPTS: usize = 2; + let mut attempts_so_far = 0; + let mut final_attempt = false; + let mut provenance = ChanProvenance::Preexisting; // TODO(nickm): It would be neat to use tor_retry instead. let mut last_err = None; - for _ in 0..N_ATTEMPTS { - // First, see what state we're in, and what we should do - // about it. - let action = self - .channels - .change_state(&ident, |oldstate| match oldstate { - Some(Open(ref ent)) => { - if ent.channel.is_usable() { - // Good channel. Return it. - let action = Action::Return(Ok(( - ent.channel.clone(), - ChanProvenance::Preexisting, - ))); - (oldstate, action) - } else { - // Unusable channel. Move to the Building - // state and launch a new channel. - let (newstate, send) = self.setup_launch(); - let action = Action::Launch(send); - (Some(newstate), action) - } - } - Some(Building(ref pending)) => { - let action = Action::Wait(pending.clone()); - (oldstate, action) - } - Some(Poisoned(_)) => { - // We should never be able to see this state; this - // is a bug. - ( - None, - Action::Return(Err(Error::Internal(internal!( - "Found a poisoned entry" - )))), - ) - } - None => { - // No channel. Move to the Building - // state and launch a new channel. - let (newstate, send) = self.setup_launch(); - let action = Action::Launch(send); - (Some(newstate), action) - } - })?; + while attempts_so_far < N_ATTEMPTS || final_attempt { + attempts_so_far += 1; - // Now we act based on the channel. + // For each attempt, we _first_ look at the state of the channel map + // to decide on an `Action`, and _then_ we execute that action. + + // First, see what state we're in, and what we should do about it. + let action = self.choose_action(&target, final_attempt)?; + + // We are done deciding on our Action! It's time act based on the + // Action that we chose. match action { + // If this happens, we were trying to make one final check of our state, but + // we would have had to make additional attempts. + None => { + if !final_attempt { + return Err(Error::Internal(internal!( + "No action returned while not on final attempt" + ))); + } + break; + } // Easy case: we have an error or a channel to return. - Action::Return(v) => { - return v; + Some(Action::Return(v)) => { + return v.map(|chan| (chan, provenance)); } // There's an in-progress channel. Wait for it. - Action::Wait(pend) => match pend.await { - Ok(Ok(chan)) => return Ok((chan, ChanProvenance::NewlyCreated)), - Ok(Err(e)) => { - last_err = Some(e); + Some(Action::Wait(pend)) => { + match pend.await { + Ok(Ok(())) => { + // We were waiting for a channel, and it succeeded, or it + // got cancelled. But it might have gotten more + // identities while negotiating than it had when it was + // launched, or it might have failed to get all the + // identities we want. Check for this. + final_attempt = true; + provenance = ChanProvenance::NewlyCreated; + last_err.get_or_insert(Error::RequestCancelled); + } + Ok(Err(e)) => { + last_err = Some(e); + } + Err(_) => { + last_err = + Some(Error::Internal(internal!("channel build task disappeared"))); + } } - Err(_) => { - last_err = - Some(Error::Internal(internal!("channel build task disappeared"))); - } - }, + } // We need to launch a channel. - Action::Launch(send) => match self.connector.build_channel(&target).await { - Ok(mut chan) => { - // The channel got built: remember it, tell the - // others, and return it. - self.channels - .replace_with_params(ident.clone(), |channels_params| { - // This isn't great. We context switch to the newly-created - // channel just to tell it how and whether to do padding. Ideally - // we would pass the params at some suitable point during - // building. However, that would involve the channel taking a - // copy of the params, and that must happen in the same channel - // manager lock acquisition span as the one where we insert the - // channel into the table so it will receive updates. I.e., - // here. - let update = channels_params.initial_update(); - if let Some(update) = update { - chan.reparameterize(update.into()) - .map_err(|_| internal!("failure on new channel"))?; - } - Ok(Open(OpenEntry { - channel: chan.clone(), - max_unused_duration: Duration::from_secs( - rand::thread_rng().gen_range(180..270), - ), - })) - })?; - // It's okay if all the receivers went away: - // that means that nobody was waiting for this channel. - let _ignore_err = send.send(Ok(chan.clone())); - return Ok((chan, ChanProvenance::NewlyCreated)); + Some(Action::Launch(send)) => { + let outcome = self.connector.build_channel(&target).await; + let status = self.handle_build_outcome(&target, outcome); + + // It's okay if all the receivers went away: + // that means that nobody was waiting for this channel. + let _ignore_err = send.send(status.clone().map(|_| ())); + + match status { + Ok(Some(chan)) => { + return Ok((chan, ChanProvenance::NewlyCreated)); + } + Ok(None) => { + final_attempt = true; + provenance = ChanProvenance::NewlyCreated; + last_err.get_or_insert(Error::RequestCancelled); + } + Err(e) => last_err = Some(e), } - Err(e) => { - // The channel failed. Make it non-pending, tell the - // others, and set the error. - self.channels.remove(&ident)?; - // (As above) - let _ignore_err = send.send(Err(e.clone())); - last_err = Some(e); - } - }, + } } + + // End of this attempt. We will try again... } Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?")))) } + /// Helper: based on our internal state, decide which action to take when + /// asked for a channel, and update our internal state accordingly. + /// + /// If `final_attempt` is true, then we will not pick any action that does + /// not result in an immediate result. If we would pick such an action, we + /// instead return `Ok(None)`. (We could instead have the caller detect + /// such actions, but it's less efficient to construct them, insert them, + /// and immediately revert them.) + fn choose_action( + &self, + target: &CF::BuildSpec, + final_attempt: bool, + ) -> Result>> { + use map::ChannelState::*; + self.channels.with_channels(|channel_map| { + match channel_map.by_all_ids(target) { + Some(Open(OpenEntry { channel, .. })) => { + if channel.is_usable() { + // This entry is a perfect match for the target keys: + // we'll return the open entry. + return Ok(Some(Action::Return(Ok(channel.clone())))); + } else if final_attempt { + // We don't launch an attempt in this case. + return Ok(None); + } else { + // This entry was a perfect match for the target, but it + // is no longer usable! We launch a new connection to + // this target, and wait on that. + let (new_state, send) = self.setup_launch(RelayIds::from_relay_ids(target)); + channel_map.try_insert(new_state)?; + + return Ok(Some(Action::Launch(send))); + } + } + Some(Building(PendingEntry { pending, .. })) => { + // This entry is a perfect match for the target keys: we'll + // return the pending entry. (We don't know for sure if it + // will match once it completes, since we might discover + // additional keys beyond those listed for this pending + // entry.) + if final_attempt { + // We don't launch an attempt in this case. + return Ok(None); + } + return Ok(Some(Action::Wait(pending.clone()))); + } + _ => {} + } + // Okay, we don't have an exact match. But we might have one or more _partial_ matches? + let overlapping = channel_map.all_overlapping(target); + if overlapping + .iter() + .any(|entry| matches!(entry, Open(OpenEntry{ channel, ..}) if channel.is_usable())) + { + // At least one *open, usable* channel has been negotiated that + // overlaps only partially with our target: it has proven itself + // to have _one_ of our target identities, but not all. + // + // Because this channel exists, we know that our target cannot + // succeed, since relays are not allowed to share _any_ + // identities. + return Ok(Some(Action::Return(Err(Error::IdentityConflict)))); + } else if let Some(first_building) = overlapping + .iter() + .find(|entry| matches!(entry, Building(_))) + { + // There is at least one *in-progress* channel that has at least + // one identity in common with our target, but it does not have + // _all_ the identities we want. + // + // If it succeeds, we might find that we can use it; or we might + // find out that it is not suitable. So we'll wait for it, and + // see what happens. + // + // TODO: This approach will _not_ be sufficient once we are + // implementing relays, and some of our channels are created in + // response to client requests. + match first_building { + Open(_) => unreachable!(), + Building(PendingEntry { pending, .. }) => { + if final_attempt { + // We don't wait in this case. + return Ok(None); + } + return Ok(Some(Action::Wait(pending.clone()))); + } + } + } + + if final_attempt { + // We don't launch an attempt in this case. + return Ok(None); + } + + // Great, nothing interfered at all. + let (new_state, send) = self.setup_launch(RelayIds::from_relay_ids(target)); + channel_map.try_insert(new_state)?; + Ok(Some(Action::Launch(send))) + })? + } + + /// We just tried to build a channel: Handle the outcome and decide what to + /// do. + /// + /// Return `Ok(None)` if we have a transient error that we expect will be + /// cleaned up by one final call to `choose_action`. + fn handle_build_outcome( + &self, + target: &CF::BuildSpec, + outcome: Result, + ) -> Result> { + use map::ChannelState::*; + match outcome { + Ok(chan) => { + // The channel got built: remember it, tell the + // others, and return it. + self.channels + .with_channels_and_params(|channel_map, channels_params| { + match channel_map.remove_exact(target) { + Some(Building(_)) => { + // We successfully removed our pending + // action. great! Fall through and add + // the channel we just built. + } + None => { + // Something removed our entry from the list. + // Time to retry. + return Ok(None); + } + Some(ent @ Open(_)) => { + // Oh no. Something else built an entry + // here, and replaced us. Put that + // something back, and retry. + + channel_map.insert(ent); + return Ok(None); + } + } + + // This isn't great. We context switch to the newly-created + // channel just to tell it how and whether to do padding. Ideally + // we would pass the params at some suitable point during + // building. However, that would involve the channel taking a + // copy of the params, and that must happen in the same channel + // manager lock acquisition span as the one where we insert the + // channel into the table so it will receive updates. I.e., + // here. + let update = channels_params.initial_update(); + if let Some(update) = update { + chan.reparameterize(update.into()) + .map_err(|_| internal!("failure on new channel"))?; + } + let new_entry = Open(OpenEntry { + channel: chan.clone(), + max_unused_duration: Duration::from_secs( + rand::thread_rng().gen_range(180..270), + ), + }); + channel_map.insert(new_entry); + Ok(Some(chan)) + })? + } + Err(e) => { + // The channel failed. Make it non-pending, tell the + // others, and set the error. + self.channels.with_channels(|channel_map| { + match channel_map.remove_exact(target) { + Some(Building(_)) | None => { + // We successfully removed our pending + // action, or somebody else did. + } + Some(ent @ Open(_)) => { + // Oh no. Something else built an entry + // here, and replaced us. Put that + // something back. + channel_map.insert(ent); + } + } + })?; + Err(e) + } + } + } + /// Update the netdir pub(crate) fn update_netparams( &self, @@ -330,18 +470,32 @@ impl AbstractChanMgr { /// Test only: return the current open usable channel with a given /// `ident`, if any. #[cfg(test)] - pub(crate) fn get_nowait( - &self, - ident: &<::Channel as AbstractChannel>::Ident, - ) -> Option { + pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Option + where + T: Into>, + { use map::ChannelState::*; - match self.channels.get(ident) { - Ok(Some(Open(ref ent))) if ent.channel.is_usable() => Some(ent.channel.clone()), - _ => None, - } + self.channels + .with_channels(|channel_map| match channel_map.by_id(ident) { + Some(Open(ref ent)) if ent.channel.is_usable() => Some(ent.channel.clone()), + _ => None, + }) + .expect("Poisoned lock") } } +/// Possible actions that we'll decide to take when asked for a channel. +enum Action { + /// We found no channel. We're going to launch a new one, + /// then tell everybody about it. + Launch(Sending), + /// We found an in-progress attempt at making a channel. + /// We're going to wait for it to finish. + Wait(Pending), + /// We found a usable channel. We're going to return it. + Return(Result), +} + #[cfg(test)] mod test { #![allow(clippy::unwrap_used)] @@ -353,6 +507,7 @@ mod test { use std::sync::Arc; use std::time::Duration; use tor_error::bad_api_usage; + use tor_llcrypto::pk::ed25519::Ed25519Identity; use crate::ChannelUsage as CU; use tor_rtcompat::{task::yield_now, test_with_one_runtime, Runtime}; @@ -363,11 +518,11 @@ mod test { #[derive(Clone, Debug)] struct FakeChannel { - ident: u32, + ed_ident: Ed25519Identity, mood: char, closing: Arc, detect_reuse: Arc, - last_params: Option, + // last_params: Option, } impl PartialEq for FakeChannel { @@ -377,10 +532,6 @@ mod test { } impl AbstractChannel for FakeChannel { - type Ident = u32; - fn ident(&self) -> &u32 { - &self.ident - } fn is_usable(&self) -> bool { !self.closing.load(Ordering::SeqCst) } @@ -388,15 +539,27 @@ mod test { None } fn reparameterize( - &mut self, - updates: Arc, + &self, + _updates: Arc, ) -> tor_proto::Result<()> { - self.last_params = Some((*updates).clone()); + // *self.last_params.lock().unwrap() = Some((*updates).clone()); Ok(()) } fn engage_padding_activities(&self) {} } + impl HasRelayIds for FakeChannel { + fn identity( + &self, + key_type: tor_linkspec::RelayIdType, + ) -> Option> { + match key_type { + tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()), + _ => None, + } + } + } + impl FakeChannel { fn start_closing(&self) { self.closing.store(true, Ordering::SeqCst); @@ -419,14 +582,38 @@ mod test { ) } + #[derive(Clone, Debug)] + struct FakeBuildSpec(u32, char, Ed25519Identity); + + impl HasRelayIds for FakeBuildSpec { + fn identity( + &self, + key_type: tor_linkspec::RelayIdType, + ) -> Option> { + match key_type { + tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()), + _ => None, + } + } + } + + /// Helper to make a fake Ed identity from a u32. + fn u32_to_ed(n: u32) -> Ed25519Identity { + let mut bytes = [0; 32]; + bytes[0..4].copy_from_slice(&n.to_be_bytes()); + bytes.into() + } + #[async_trait] impl AbstractChannelFactory for FakeChannelFactory { type Channel = FakeChannel; - type BuildSpec = (u32, char); + type BuildSpec = FakeBuildSpec; async fn build_channel(&self, target: &Self::BuildSpec) -> Result { yield_now().await; - let (ident, mood) = *target; + let FakeBuildSpec(ident, mood, id) = *target; + let ed_ident = u32_to_ed(ident); + assert_eq!(ed_ident, id); match mood { // "X" means never connect. '❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))), @@ -437,11 +624,11 @@ mod test { _ => {} } Ok(FakeChannel { - ident, + ed_ident, mood, closing: Arc::new(AtomicBool::new(false)), detect_reuse: Default::default(), - last_params: None, + // last_params: None, }) } } @@ -450,21 +637,17 @@ mod test { fn connect_one_ok() { test_with_one_runtime!(|runtime| async { let mgr = new_test_abstract_chanmgr(runtime); - let target = (413, '!'); + let target = FakeBuildSpec(413, '!', u32_to_ed(413)); let chan1 = mgr - .get_or_launch(413, target, CU::UserTraffic) - .await - .unwrap() - .0; - let chan2 = mgr - .get_or_launch(413, target, CU::UserTraffic) + .get_or_launch(target.clone(), CU::UserTraffic) .await .unwrap() .0; + let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0; assert_eq!(chan1, chan2); - let chan3 = mgr.get_nowait(&413).unwrap(); + let chan3 = mgr.get_nowait(&u32_to_ed(413)).unwrap(); assert_eq!(chan1, chan3); }); } @@ -475,11 +658,11 @@ mod test { let mgr = new_test_abstract_chanmgr(runtime); // This is set up to always fail. - let target = (999, '❌'); - let res1 = mgr.get_or_launch(999, target, CU::UserTraffic).await; + let target = FakeBuildSpec(999, '❌', u32_to_ed(999)); + let res1 = mgr.get_or_launch(target, CU::UserTraffic).await; assert!(matches!(res1, Err(Error::UnusableTarget(_)))); - let chan3 = mgr.get_nowait(&999); + let chan3 = mgr.get_nowait(&u32_to_ed(999)); assert!(chan3.is_none()); }); } @@ -493,12 +676,12 @@ mod test { // concurrently. Right now it seems that they don't actually // interact. let (ch3a, ch3b, ch44a, ch44b, ch86a, ch86b) = join!( - mgr.get_or_launch(3, (3, 'a'), CU::UserTraffic), - mgr.get_or_launch(3, (3, 'b'), CU::UserTraffic), - mgr.get_or_launch(44, (44, 'a'), CU::UserTraffic), - mgr.get_or_launch(44, (44, 'b'), CU::UserTraffic), - mgr.get_or_launch(86, (86, '❌'), CU::UserTraffic), - mgr.get_or_launch(86, (86, '🔥'), CU::UserTraffic), + mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic), + mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic), + mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44)), CU::UserTraffic), + mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44)), CU::UserTraffic), + mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86)), CU::UserTraffic), + mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86)), CU::UserTraffic), ); let ch3a = ch3a.unwrap(); let ch3b = ch3b.unwrap(); @@ -522,9 +705,9 @@ mod test { let mgr = new_test_abstract_chanmgr(runtime); let (ch3, ch4, ch5) = join!( - mgr.get_or_launch(3, (3, 'a'), CU::UserTraffic), - mgr.get_or_launch(4, (4, 'a'), CU::UserTraffic), - mgr.get_or_launch(5, (5, 'a'), CU::UserTraffic), + mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic), + mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4)), CU::UserTraffic), + mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5)), CU::UserTraffic), ); let ch3 = ch3.unwrap().0; @@ -535,7 +718,7 @@ mod test { ch5.start_closing(); let ch3_new = mgr - .get_or_launch(3, (3, 'b'), CU::UserTraffic) + .get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic) .await .unwrap() .0; @@ -544,9 +727,9 @@ mod test { mgr.remove_unusable_entries().unwrap(); - assert!(mgr.get_nowait(&3).is_some()); - assert!(mgr.get_nowait(&4).is_some()); - assert!(mgr.get_nowait(&5).is_none()); + assert!(mgr.get_nowait(&u32_to_ed(3)).is_some()); + assert!(mgr.get_nowait(&u32_to_ed(4)).is_some()); + assert!(mgr.get_nowait(&u32_to_ed(5)).is_none()); }); } } diff --git a/crates/tor-chanmgr/src/mgr/map.rs b/crates/tor-chanmgr/src/mgr/map.rs index fb0097cf4..ec208cf8a 100644 --- a/crates/tor-chanmgr/src/mgr/map.rs +++ b/crates/tor-chanmgr/src/mgr/map.rs @@ -3,14 +3,16 @@ use std::time::Duration; use super::{AbstractChannel, Pending}; -use crate::{ChannelConfig, Dormancy, Error, Result}; +use crate::{ChannelConfig, Dormancy, Result}; -use std::collections::{hash_map, HashMap}; use std::result::Result as StdResult; use std::sync::Arc; use tor_cell::chancell::msg::PaddingNegotiate; use tor_config::PaddingLevel; use tor_error::{internal, into_internal}; +use tor_linkspec::ByRelayIds; +use tor_linkspec::HasRelayIds; +use tor_linkspec::RelayIds; use tor_netdir::{params::NetParameters, params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND}; use tor_proto::channel::padding::Parameters as PaddingParameters; use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder; @@ -23,14 +25,6 @@ use void::{ResultVoidExt as _, Void}; #[cfg(test)] mod padding_test; -// TODO pt-client: -// -// This map code will no longer work in the bridge setting. We need to update -// our internal map of channels, so that we can look them up not only by Ed25519 -// identity, but by RSA identity too. We also need a way to be able to get a -// channel only if it matches a specific ChanTarget in its address and transport -// and keys. - /// A map from channel id to channel state, plus necessary auxiliary state /// /// We make this a separate type instead of just using @@ -48,8 +42,12 @@ struct Inner { /// /// (Danger: this uses a blocking mutex close to async code. This mutex /// must never be held while an await is happening.) - channels: HashMap>, + channels: ByRelayIds>, + // TODO: The subsequent fields really do not belong in structure called + // `ChannelMap`. These, plus the actual map, should probably be in a + // structure called "MgrState", and that structure should be the thing that + // is put behind a Mutex. /// Parameters for channels that we create, and that all existing channels are using /// /// Will be updated by a background task, which also notifies all existing @@ -70,14 +68,17 @@ struct Inner { dormancy: Dormancy, } -/// Structure that can only be constructed from within this module. -/// Used to make sure that only we can construct ChannelState::Poisoned. -pub(crate) struct Priv { - /// (This field is private) - _unused: (), -} - /// The state of a channel (or channel build attempt) within a map. +/// +/// A ChannelState can be Open (representing a fully negotiated channel) or +/// Building (representing a pending attempt to build a channel). Both states +/// have a set of RelayIds, but these RelayIds represent slightly different +/// things: +/// * On a Building channel, the set of RelayIds is all the identities that we +/// require the peer to have. (The peer may turn out to have _more_ +/// identities than this.) +/// * On an Open channel, the set of RelayIds is all the identities that +/// we were able to successfully authenticate for the peer. pub(crate) enum ChannelState { /// An open channel. /// @@ -86,12 +87,7 @@ pub(crate) enum ChannelState { /// yielding it to the user. Open(OpenEntry), /// A channel that's getting built. - Building(Pending), - /// A temporary invalid state. - /// - /// We insert this into the map temporarily as a placeholder in - /// `change_state()`. - Poisoned(Priv), + Building(PendingEntry), } /// An open channel entry. @@ -103,24 +99,42 @@ pub(crate) struct OpenEntry { pub(crate) max_unused_duration: Duration, } -impl ChannelState { - /// Create a new shallow copy of this ChannelState. - #[cfg(test)] - fn clone_ref(&self) -> Result { - use ChannelState::*; +/// An entry for a not-yet-build channel +#[derive(Clone)] +pub(crate) struct PendingEntry { + /// The keys of the relay to which we're trying to open a channel. + pub(crate) ids: RelayIds, + + /// A future we can clone and listen on to learn when this channel attempt + /// is successful or failed. + /// + /// This entry will be removed from the map (and possibly replaced with an + /// `OpenEntry`) _before_ this future becomes ready. + pub(crate) pending: Pending, +} + +impl HasRelayIds for ChannelState +where + C: HasRelayIds, +{ + fn identity( + &self, + key_type: tor_linkspec::RelayIdType, + ) -> Option> { match self { - Open(ent) => Ok(Open(ent.clone())), - Building(pending) => Ok(Building(pending.clone())), - Poisoned(_) => Err(Error::Internal(internal!("Poisoned state in channel map"))), + ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type), + ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type), } } +} +impl ChannelState { /// For testing: either give the Open channel inside this state, /// or panic if there is none. #[cfg(test)] - fn unwrap_open(&mut self) -> &mut C { + fn unwrap_open(&self) -> &C { match self { - ChannelState::Open(ent) => &mut ent.channel, + ChannelState::Open(ent) => &ent.channel, _ => panic!("Not an open channel"), } } @@ -182,24 +196,6 @@ impl NetParamsExtract { } impl ChannelState { - /// Return an error if `ident`is definitely not a matching - /// matching identity for this state. - fn check_ident(&self, ident: &C::Ident) -> Result<()> { - match self { - ChannelState::Open(ent) => { - if ent.channel.ident() == ident { - Ok(()) - } else { - Err(Error::Internal(internal!("Identity mismatch"))) - } - } - ChannelState::Poisoned(_) => { - Err(Error::Internal(internal!("Poisoned state in channel map"))) - } - ChannelState::Building(_) => Ok(()), - } - } - /// Return true if a channel is ready to expire. /// Update `expire_after` if a smaller duration than /// the given value is required to expire this channel. @@ -240,7 +236,7 @@ impl ChannelMap { ChannelMap { inner: std::sync::Mutex::new(Inner { - channels: HashMap::new(), + channels: ByRelayIds::new(), config, channels_params, dormancy, @@ -248,119 +244,52 @@ impl ChannelMap { } } - /// Return the channel state for the given identity, if any. - #[cfg(test)] - pub(crate) fn get(&self, ident: &C::Ident) -> Result>> { - let inner = self.inner.lock()?; - inner - .channels - .get(ident) - .map(ChannelState::clone_ref) - .transpose() - } - - /// Replace the channel state for `ident` with `newval`, and return the - /// previous value if any. - #[cfg(test)] - pub(crate) fn replace( - &self, - ident: C::Ident, - newval: ChannelState, - ) -> Result>> { - newval.check_ident(&ident)?; - let mut inner = self.inner.lock()?; - Ok(inner.channels.insert(ident, newval)) - } - - /// Replace the channel state for `ident` with the return value from `func`, - /// and return the previous value if any. + /// Run a function on the `ByRelayIds` that implements this map. /// - /// Passes a snapshot of the current global channels parameters to `func`. - /// If those parameters are copied by `func` into an [`AbstractChannel`] - /// `func` must ensure that that `AbstractChannel` is returned, - /// so that it will be properly registered and receive params updates. - pub(crate) fn replace_with_params( - &self, - ident: C::Ident, - func: F, - ) -> Result>> + /// This function grabs a mutex over the map: do not provide slow function. + /// + /// We provide this function rather than exposing the channels set directly, + /// to make sure that the calling code doesn't await while holding the lock. + pub(crate) fn with_channels(&self, func: F) -> Result where - F: FnOnce(&ChannelPaddingInstructions) -> Result>, + F: FnOnce(&mut ByRelayIds>) -> T, { let mut inner = self.inner.lock()?; - let newval = func(&inner.channels_params)?; - newval.check_ident(&ident)?; - Ok(inner.channels.insert(ident, newval)) + Ok(func(&mut inner.channels)) } - /// Remove and return the state for `ident`, if any. - pub(crate) fn remove(&self, ident: &C::Ident) -> Result>> { + /// Run a function on the `ByRelayIds` that implements this map. + /// + /// This function grabs a mutex over the map: do not provide slow function. + /// + /// We provide this function rather than exposing the channels set directly, + /// to make sure that the calling code doesn't await while holding the lock. + pub(crate) fn with_channels_and_params(&self, func: F) -> Result + where + F: FnOnce(&mut ByRelayIds>, &ChannelPaddingInstructions) -> T, + { let mut inner = self.inner.lock()?; - Ok(inner.channels.remove(ident)) + // We need this silly destructuring syntax so that we don't seem to be + // borrowing the structure mutably and immutably at the same time. + let Inner { + ref mut channels, + ref channels_params, + .. + } = &mut *inner; + Ok(func(channels, channels_params)) } /// Remove every unusable state from the map. #[cfg(test)] pub(crate) fn remove_unusable(&self) -> Result<()> { let mut inner = self.inner.lock()?; - inner.channels.retain(|_, state| match state { - ChannelState::Poisoned(_) => false, + inner.channels.retain(|state| match state { ChannelState::Open(ent) => ent.channel.is_usable(), ChannelState::Building(_) => true, }); Ok(()) } - /// Replace the state whose identity is `ident` with a new state. - /// - /// The provided function `func` is invoked on the old state (if - /// any), and must return a tuple containing an optional new - /// state, and an arbitrary return value for this function. - /// - /// Because `func` is run while holding the lock on this object, - /// it should be fast and nonblocking. In return, you can be sure - /// that it's running atomically with respect to other accessors - /// of this map. - /// - /// If `func` panics, or if it returns a channel with a different - /// identity, this position in the map will be become unusable and - /// future accesses to that position may fail. - pub(crate) fn change_state(&self, ident: &C::Ident, func: F) -> Result - where - F: FnOnce(Option>) -> (Option>, V), - { - use hash_map::Entry::*; - let mut inner = self.inner.lock()?; - let entry = inner.channels.entry(ident.clone()); - match entry { - Occupied(mut occupied) => { - // Temporarily replace the entry for this identity with - // a poisoned entry. - let mut oldent = ChannelState::Poisoned(Priv { _unused: () }); - std::mem::swap(occupied.get_mut(), &mut oldent); - let (newval, output) = func(Some(oldent)); - match newval { - Some(mut newent) => { - newent.check_ident(ident)?; - std::mem::swap(occupied.get_mut(), &mut newent); - } - None => { - occupied.remove(); - } - }; - Ok(output) - } - Vacant(vacant) => { - let (newval, output) = func(None); - if let Some(newent) = newval { - newent.check_ident(ident)?; - vacant.insert(newent); - } - Ok(output) - } - } - } - /// Reconfigure all channels as necessary /// /// (By reparameterising channels as needed) @@ -413,10 +342,10 @@ impl ChannelMap { }; let update = Arc::new(update); - for channel in inner.channels.values_mut() { + for channel in inner.channels.values() { let channel = match channel { CS::Open(OpenEntry { channel, .. }) => channel, - CS::Building(_) | CS::Poisoned(_) => continue, + CS::Building(_) => continue, }; // Ignore error (which simply means the channel is closed or gone) let _ = channel.reparameterize(update.clone()); @@ -434,7 +363,7 @@ impl ChannelMap { .lock() .expect("Poisoned lock") .channels - .retain(|_id, chan| !chan.ready_to_expire(&mut ret)); + .retain(|chan| !chan.ready_to_expire(&mut ret)); ret } } @@ -584,7 +513,8 @@ mod test { //! use super::*; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; + use tor_llcrypto::pk::ed25519::Ed25519Identity; use tor_proto::channel::params::ChannelPaddingInstructionsUpdates; fn new_test_channel_map() -> ChannelMap { @@ -595,18 +525,14 @@ mod test { ) } - #[derive(Eq, PartialEq, Clone, Debug)] + #[derive(Clone, Debug)] struct FakeChannel { - ident: &'static str, + ed_ident: Ed25519Identity, usable: bool, unused_duration: Option, - params_update: Option>, + params_update: Arc>>>, } impl AbstractChannel for FakeChannel { - type Ident = u8; - fn ident(&self) -> &Self::Ident { - &self.ident.as_bytes()[0] - } fn is_usable(&self) -> bool { self.usable } @@ -614,20 +540,36 @@ mod test { self.unused_duration.map(Duration::from_secs) } fn reparameterize( - &mut self, + &self, update: Arc, ) -> tor_proto::Result<()> { - self.params_update = Some(update); + *self.params_update.lock().unwrap() = Some(update); Ok(()) } fn engage_padding_activities(&self) {} } + impl tor_linkspec::HasRelayIds for FakeChannel { + fn identity( + &self, + key_type: tor_linkspec::RelayIdType, + ) -> Option> { + match key_type { + tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()), + _ => None, + } + } + } + /// Get a fake ed25519 identity from the first byte of a string. + fn str_to_ed(s: &str) -> Ed25519Identity { + let byte = s.as_bytes()[0]; + [byte; 32].into() + } fn ch(ident: &'static str) -> ChannelState { let channel = FakeChannel { - ident, + ed_ident: str_to_ed(ident), usable: true, unused_duration: None, - params_update: None, + params_update: Arc::new(Mutex::new(None)), }; ChannelState::Open(OpenEntry { channel, @@ -640,10 +582,10 @@ mod test { unused_duration: Option, ) -> ChannelState { let channel = FakeChannel { - ident, + ed_ident: str_to_ed(ident), usable: true, unused_duration, - params_update: None, + params_update: Arc::new(Mutex::new(None)), }; ChannelState::Open(OpenEntry { channel, @@ -652,10 +594,10 @@ mod test { } fn closed(ident: &'static str) -> ChannelState { let channel = FakeChannel { - ident, + ed_ident: str_to_ed(ident), usable: false, unused_duration: None, - params_update: None, + params_update: Arc::new(Mutex::new(None)), }; ChannelState::Open(OpenEntry { channel, @@ -664,106 +606,30 @@ mod test { } #[test] - fn simple_ops() { - let map = new_test_channel_map(); - use ChannelState::Open; - - assert!(map.replace(b'h', ch("hello")).unwrap().is_none()); - assert!(map.replace(b'w', ch("wello")).unwrap().is_none()); - - match map.get(&b'h') { - Ok(Some(Open(ent))) if ent.channel.ident == "hello" => {} - _ => panic!(), - } - - assert!(map.get(&b'W').unwrap().is_none()); - - match map.replace(b'h', ch("hebbo")) { - Ok(Some(Open(ent))) if ent.channel.ident == "hello" => {} - _ => panic!(), - } - - assert!(map.remove(&b'Z').unwrap().is_none()); - match map.remove(&b'h') { - Ok(Some(Open(ent))) if ent.channel.ident == "hebbo" => {} - _ => panic!(), - } - } - - #[test] - fn rmv_unusable() { + fn rmv_unusable() -> Result<()> { let map = new_test_channel_map(); - map.replace(b'm', closed("machen")).unwrap(); - map.replace(b'f', ch("feinen")).unwrap(); - map.replace(b'w', closed("wir")).unwrap(); - map.replace(b'F', ch("Fug")).unwrap(); + map.with_channels(|map| { + map.insert(closed("machen")); + map.insert(ch("feinen")); + map.insert(closed("wir")); + map.insert(ch("Fug")); + })?; map.remove_unusable().unwrap(); - assert!(map.get(&b'm').unwrap().is_none()); - assert!(map.get(&b'w').unwrap().is_none()); - assert!(map.get(&b'f').unwrap().is_some()); - assert!(map.get(&b'F').unwrap().is_some()); + map.with_channels(|map| { + assert!(map.by_id(&str_to_ed("m")).is_none()); + assert!(map.by_id(&str_to_ed("w")).is_none()); + assert!(map.by_id(&str_to_ed("f")).is_some()); + assert!(map.by_id(&str_to_ed("F")).is_some()); + })?; + + Ok(()) } #[test] - fn change() { - let map = new_test_channel_map(); - - map.replace(b'w', ch("wir")).unwrap(); - map.replace(b'm', ch("machen")).unwrap(); - map.replace(b'f', ch("feinen")).unwrap(); - map.replace(b'F', ch("Fug")).unwrap(); - - // Replace Some with Some. - let (old, v) = map - .change_state(&b'F', |state| (Some(ch("FUG")), (state, 99_u8))) - .unwrap(); - assert_eq!(old.unwrap().unwrap_open().ident, "Fug"); - assert_eq!(v, 99); - assert_eq!(map.get(&b'F').unwrap().unwrap().unwrap_open().ident, "FUG"); - - // Replace Some with None. - let (old, v) = map - .change_state(&b'f', |state| (None, (state, 123_u8))) - .unwrap(); - assert_eq!(old.unwrap().unwrap_open().ident, "feinen"); - assert_eq!(v, 123); - assert!(map.get(&b'f').unwrap().is_none()); - - // Replace None with Some. - let (old, v) = map - .change_state(&b'G', |state| (Some(ch("Geheimnisse")), (state, "Hi"))) - .unwrap(); - assert!(old.is_none()); - assert_eq!(v, "Hi"); - assert_eq!( - map.get(&b'G').unwrap().unwrap().unwrap_open().ident, - "Geheimnisse" - ); - - // Replace None with None - let (old, v) = map - .change_state(&b'Q', |state| (None, (state, "---"))) - .unwrap(); - assert!(old.is_none()); - assert_eq!(v, "---"); - assert!(map.get(&b'Q').unwrap().is_none()); - - // Try replacing None with invalid entry (with mismatched ID) - let e = map.change_state(&b'P', |state| (Some(ch("Geheimnisse")), (state, "Hi"))); - assert!(matches!(e, Err(Error::Internal(_)))); - assert!(matches!(map.get(&b'P'), Ok(None))); - - // Try replacing Some with invalid entry (mismatched ID) - let e = map.change_state(&b'G', |state| (Some(ch("Wobbledy")), (state, "Hi"))); - assert!(matches!(e, Err(Error::Internal(_)))); - assert!(matches!(map.get(&b'G'), Err(Error::Internal(_)))); - } - - #[test] - fn reparameterise_via_netdir() { + fn reparameterise_via_netdir() -> Result<()> { let map = new_test_channel_map(); // Set some non-default parameters so that we can tell when an update happens @@ -781,16 +647,19 @@ mod test { ) .finish(); - assert!(map.replace(b't', ch("track")).unwrap().is_none()); + map.with_channels(|map| { + map.insert(ch("track")); + })?; let netdir = tor_netdir::testnet::construct_netdir() .unwrap_if_sufficient() .unwrap(); let netdir = Arc::new(netdir); - let with_ch = |f: &dyn Fn(&mut FakeChannel)| { - let mut inner = map.inner.lock().unwrap(); - let ch = inner.channels.get_mut(&b't').unwrap().unwrap_open(); + let with_ch = |f: &dyn Fn(&FakeChannel)| { + let inner = map.inner.lock().unwrap(); + let ch = inner.channels.by_ed25519(&str_to_ed("t")); + let ch = ch.unwrap().unwrap_open(); f(ch); }; @@ -798,7 +667,7 @@ mod test { map.reconfigure_general(None, None, netdir.clone()).unwrap(); with_ch(&|ch| { assert_eq!( - format!("{:?}", ch.params_update.take().unwrap()), + format!("{:?}", ch.params_update.lock().unwrap().take().unwrap()), // evade field visibility by (ab)using Debug impl "ChannelPaddingInstructionsUpdates { padding_enable: None, \ padding_parameters: Some(Parameters { \ @@ -811,54 +680,65 @@ mod test { eprintln!("-- process a default netdir again, which should *not* send an update --"); map.reconfigure_general(None, None, netdir).unwrap(); - with_ch(&|ch| assert_eq!(ch.params_update, None)); + with_ch(&|ch| assert!(ch.params_update.lock().unwrap().is_none())); + + Ok(()) } #[test] - fn expire_channels() { + fn expire_channels() -> Result<()> { let map = new_test_channel_map(); // Channel that has been unused beyond max duration allowed is expired - map.replace( - b'w', - ch_with_details("wello", Duration::from_secs(180), Some(181)), - ) - .unwrap(); + map.with_channels(|map| { + map.insert(ch_with_details( + "wello", + Duration::from_secs(180), + Some(181), + )) + })?; // Minimum value of max unused duration is 180 seconds assert_eq!(180, map.expire_channels().as_secs()); - assert!(map.get(&b'w').unwrap().is_none()); + map.with_channels(|map| { + assert!(map.by_ed25519(&str_to_ed("w")).is_none()); + })?; let map = new_test_channel_map(); // Channel that has been unused for shorter than max unused duration - map.replace( - b'w', - ch_with_details("wello", Duration::from_secs(180), Some(120)), - ) - .unwrap(); + map.with_channels(|map| { + map.insert(ch_with_details( + "wello", + Duration::from_secs(180), + Some(120), + )); - map.replace( - b'y', - ch_with_details("yello", Duration::from_secs(180), Some(170)), - ) - .unwrap(); + map.insert(ch_with_details( + "yello", + Duration::from_secs(180), + Some(170), + )); - // Channel that has been unused beyond max duration allowed is expired - map.replace( - b'g', - ch_with_details("gello", Duration::from_secs(180), Some(181)), - ) - .unwrap(); + // Channel that has been unused beyond max duration allowed is expired + map.insert(ch_with_details( + "gello", + Duration::from_secs(180), + Some(181), + )); - // Closed channel should be retained - map.replace(b'h', closed("hello")).unwrap(); + // Closed channel should be retained + map.insert(closed("hello")); + })?; // Return duration until next channel expires assert_eq!(10, map.expire_channels().as_secs()); - assert!(map.get(&b'w').unwrap().is_some()); - assert!(map.get(&b'y').unwrap().is_some()); - assert!(map.get(&b'h').unwrap().is_some()); - assert!(map.get(&b'g').unwrap().is_none()); + map.with_channels(|map| { + assert!(map.by_ed25519(&str_to_ed("w")).is_some()); + assert!(map.by_ed25519(&str_to_ed("y")).is_some()); + assert!(map.by_ed25519(&str_to_ed("h")).is_some()); + assert!(map.by_ed25519(&str_to_ed("g")).is_none()); + })?; + Ok(()) } } diff --git a/crates/tor-chanmgr/src/mgr/map/padding_test.rs b/crates/tor-chanmgr/src/mgr/map/padding_test.rs index 2f5533d33..f0624341c 100644 --- a/crates/tor-chanmgr/src/mgr/map/padding_test.rs +++ b/crates/tor-chanmgr/src/mgr/map/padding_test.rs @@ -21,7 +21,7 @@ use itertools::{zip_eq, Itertools}; use tor_cell::chancell::msg::PaddingNegotiateCmd; use tor_config::PaddingLevel; -use tor_linkspec::HasRelayIds; +use tor_linkspec::{HasRelayIds, RelayIds}; use tor_netdir::NetDir; use tor_proto::channel::{Channel, CtrlMsg}; @@ -121,7 +121,7 @@ struct FakeChannelFactory { #[async_trait] impl AbstractChannelFactory for FakeChannelFactory { type Channel = Channel; - type BuildSpec = (); + type BuildSpec = tor_linkspec::RelayIds; async fn build_channel(&self, _target: &Self::BuildSpec) -> Result { Ok(self.channel.clone()) @@ -152,13 +152,17 @@ async fn case(level: PaddingLevel, dormancy: Dormancy, usage: ChannelUsage) -> C let (channel, recv) = Channel::new_fake(); let peer_id = channel.target().ed_identity().unwrap().clone(); + let relay_ids = RelayIds::builder() + .ed_identity(peer_id.clone()) + .build() + .unwrap(); let factory = FakeChannelFactory { channel }; let netparams = Arc::new(NetParameters::default()); let chanmgr = AbstractChanMgr::new(factory, &cconfig, dormancy, &netparams); - let (channel, _prov) = chanmgr.get_or_launch(peer_id, (), usage).await.unwrap(); + let (channel, _prov) = chanmgr.get_or_launch(relay_ids, usage).await.unwrap(); CaseContext { channel, diff --git a/crates/tor-linkspec/Cargo.toml b/crates/tor-linkspec/Cargo.toml index 39828bcd8..fc2653bd5 100644 --- a/crates/tor-linkspec/Cargo.toml +++ b/crates/tor-linkspec/Cargo.toml @@ -18,6 +18,7 @@ pt-client = [] [dependencies] base64ct = "1.5.1" +by_address = "1" cfg-if = "1.0.0" derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" } derive_more = "0.99" diff --git a/crates/tor-linkspec/src/ids/by_id.rs b/crates/tor-linkspec/src/ids/by_id.rs index 899e86397..e486c02d5 100644 --- a/crates/tor-linkspec/src/ids/by_id.rs +++ b/crates/tor-linkspec/src/ids/by_id.rs @@ -40,6 +40,17 @@ impl ByRelayIds { } } + /// Return the value in this set (if any) that has the key `key`. + pub fn remove_by_id<'a, T>(&mut self, key: T) -> Option + where + T: Into>, + { + match key.into() { + RelayIdRef::Ed25519(ed) => self.remove_by_ed25519(ed), + RelayIdRef::Rsa(rsa) => self.remove_by_rsa(rsa), + } + } + /// Return the value in this set (if any) that has _all_ the relay IDs /// that `key` does. /// @@ -52,9 +63,175 @@ impl ByRelayIds { self.by_id(any_id) .filter(|val| val.has_all_relay_ids_from(key)) } + + /// Remove the single value in this set (if any) that has _exactly the same_ + /// relay IDs that `key` does + pub fn remove_exact(&mut self, key: &T) -> Option + where + T: HasRelayIds, + { + let any_id = key.identities().next()?; + if self + .by_id(any_id) + .filter(|ent| ent.same_relay_ids(key)) + .is_some() + { + self.remove_by_id(any_id) + } else { + None + } + } + + /// Return a reference to every element in this set that shares _any_ ID + /// with `key`. + /// + /// No element is returned more than once. + pub fn all_overlapping(&self, key: &T) -> Vec<&H> + where + T: HasRelayIds, + { + use by_address::ByAddress; + use std::collections::HashSet; + + let mut items: HashSet> = HashSet::new(); + + for ident in key.identities() { + if let Some(found) = self.by_id(ident) { + items.insert(ByAddress(found)); + } + } + + items.into_iter().map(|by_addr| by_addr.0).collect() + } } // TODO MSRV: Remove this `allow` once we no longer get a false positive // for it on our MSRV. 1.56 is affected; 1.60 is not. #[allow(unreachable_pub)] pub use tor_basic_utils::n_key_set::Error as ByRelayIdsError; + +#[cfg(test)] +mod test { + #![allow(clippy::unwrap_used)] + + use super::*; + use crate::{RelayIds, RelayIdsBuilder}; + + #[test] + fn lookup() { + let rsa1: RsaIdentity = (*b"12345678901234567890").into(); + let rsa2: RsaIdentity = (*b"abcefghijklmnopqrstu").into(); + let rsa3: RsaIdentity = (*b"abcefghijklmnopQRSTU").into(); + let ed1: Ed25519Identity = (*b"12345678901234567890123456789012").into(); + let ed2: Ed25519Identity = (*b"abcefghijklmnopqrstuvwxyzABCDEFG").into(); + let ed3: Ed25519Identity = (*b"abcefghijklmnopqrstuvwxyz1234567").into(); + + let keys1 = RelayIdsBuilder::default() + .rsa_identity(rsa1) + .ed_identity(ed1) + .build() + .unwrap(); + + let keys2 = RelayIdsBuilder::default() + .rsa_identity(rsa2) + .ed_identity(ed2) + .build() + .unwrap(); + + let mut set = ByRelayIds::new(); + set.insert(keys1.clone()); + set.insert(keys2.clone()); + + // Try by_id + assert_eq!(set.by_id(&rsa1), Some(&keys1)); + assert_eq!(set.by_id(&ed1), Some(&keys1)); + assert_eq!(set.by_id(&rsa2), Some(&keys2)); + assert_eq!(set.by_id(&ed2), Some(&keys2)); + assert_eq!(set.by_id(&rsa3), None); + assert_eq!(set.by_id(&ed3), None); + + // Try exact lookup + assert_eq!(set.by_all_ids(&keys1), Some(&keys1)); + assert_eq!(set.by_all_ids(&keys2), Some(&keys2)); + { + let search = RelayIdsBuilder::default() + .rsa_identity(rsa1) + .build() + .unwrap(); + assert_eq!(set.by_all_ids(&search), Some(&keys1)); + } + { + let search = RelayIdsBuilder::default() + .rsa_identity(rsa1) + .ed_identity(ed2) + .build() + .unwrap(); + assert_eq!(set.by_all_ids(&search), None); + } + + // Try looking for overlap + assert_eq!(set.all_overlapping(&keys1), vec![&keys1]); + assert_eq!(set.all_overlapping(&keys2), vec![&keys2]); + { + let search = RelayIdsBuilder::default() + .rsa_identity(rsa1) + .ed_identity(ed2) + .build() + .unwrap(); + let answer = set.all_overlapping(&search); + assert_eq!(answer.len(), 2); + assert!(answer.contains(&&keys1)); + assert!(answer.contains(&&keys2)); + } + { + let search = RelayIdsBuilder::default() + .rsa_identity(rsa2) + .build() + .unwrap(); + assert_eq!(set.all_overlapping(&search), vec![&keys2]); + } + { + let search = RelayIdsBuilder::default() + .rsa_identity(rsa3) + .build() + .unwrap(); + assert_eq!(set.all_overlapping(&search), Vec::<&RelayIds>::new()); + } + } + + #[test] + fn remove_exact() { + let rsa1: RsaIdentity = (*b"12345678901234567890").into(); + let rsa2: RsaIdentity = (*b"abcefghijklmnopqrstu").into(); + let ed1: Ed25519Identity = (*b"12345678901234567890123456789012").into(); + let ed2: Ed25519Identity = (*b"abcefghijklmnopqrstuvwxyzABCDEFG").into(); + + let keys1 = RelayIdsBuilder::default() + .rsa_identity(rsa1) + .ed_identity(ed1) + .build() + .unwrap(); + + let keys2 = RelayIdsBuilder::default() + .rsa_identity(rsa2) + .ed_identity(ed2) + .build() + .unwrap(); + + let mut set = ByRelayIds::new(); + set.insert(keys1.clone()); + set.insert(keys2); + assert_eq!(set.len(), 2); + + let removed = set.remove_exact(&keys1); + assert_eq!(removed, Some(keys1)); + assert_eq!(set.len(), 1); + + { + let search = RelayIdsBuilder::default().ed_identity(ed2).build().unwrap(); + let removed = set.remove_exact(&search); + assert_eq!(removed, None); + assert_eq!(set.len(), 1); + } + } +} diff --git a/crates/tor-linkspec/src/lib.rs b/crates/tor-linkspec/src/lib.rs index 1c121733a..8ae929523 100644 --- a/crates/tor-linkspec/src/lib.rs +++ b/crates/tor-linkspec/src/lib.rs @@ -48,7 +48,10 @@ pub use ids::{ RelayId, RelayIdError, RelayIdRef, RelayIdType, RelayIdTypeIter, }; pub use ls::LinkSpec; -pub use owned::{OwnedChanTarget, OwnedChanTargetBuilder, OwnedCircTarget, RelayIds}; +pub use owned::{ + OwnedChanTarget, OwnedChanTargetBuilder, OwnedCircTarget, OwnedCircTargetBuilder, RelayIds, + RelayIdsBuilder, +}; pub use traits::{ ChanTarget, CircTarget, DirectChanMethodsHelper, HasAddrs, HasChanMethod, HasRelayIds, HasRelayIdsLegacy, diff --git a/crates/tor-proto/semver.md b/crates/tor-proto/semver.md index 9e7f859e8..4127b67ce 100644 --- a/crates/tor-proto/semver.md +++ b/crates/tor-proto/semver.md @@ -1 +1,3 @@ MODIFIED: New Channel::set_declared_method, deprecated set_declared_addr. +MODIFIED: Channel implements HasRelayIds. +MODIFIED: Channel::reparameterize now take immutable &self diff --git a/crates/tor-proto/src/channel.rs b/crates/tor-proto/src/channel.rs index 76ca4075f..c7b44cca7 100644 --- a/crates/tor-proto/src/channel.rs +++ b/crates/tor-proto/src/channel.rs @@ -476,7 +476,7 @@ impl Channel { /// Reparameterise (update parameters; reconfigure) /// /// Returns `Err` if the channel was closed earlier - pub fn reparameterize(&mut self, params: Arc) -> Result<()> { + pub fn reparameterize(&self, params: Arc) -> Result<()> { let mut mutable = self .details .mutable @@ -664,6 +664,15 @@ where Ok(()) } +impl HasRelayIds for Channel { + fn identity( + &self, + key_type: tor_linkspec::RelayIdType, + ) -> Option> { + self.details.peer_id.identity(key_type) + } +} + /// Make some fake channel details (for testing only!) #[cfg(any(test, feature = "testing"))] fn fake_channel_details() -> Arc {