Merge branch 'channel_map' into 'main'

ChanMgr: Revise code to tolerate multiple identities

See merge request tpo/core/arti!773
This commit is contained in:
Nick Mathewson 2022-10-18 16:21:25 +00:00
commit 2e08775395
12 changed files with 776 additions and 500 deletions

7
Cargo.lock generated
View File

@ -582,6 +582,12 @@ version = "3.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d"
[[package]]
name = "by_address"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e245704f60eb4eb45810d65cf14eb54d2eb50a6f3715fe2d7cd01ee905c2944f"
[[package]]
name = "bytemuck"
version = "1.12.1"
@ -3715,6 +3721,7 @@ name = "tor-linkspec"
version = "0.5.1"
dependencies = [
"base64ct",
"by_address",
"cfg-if",
"derive_builder_fork_arti",
"derive_more",

View File

@ -9,8 +9,7 @@ use crate::{event::ChanMgrEventSender, Error};
use std::time::Duration;
use tor_error::internal;
use tor_linkspec::{HasChanMethod, HasRelayIds, OwnedChanTarget};
use tor_llcrypto::pk;
use tor_linkspec::{HasChanMethod, OwnedChanTarget};
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider};
@ -208,12 +207,6 @@ where
}
impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
type Ident = pk::ed25519::Ed25519Identity;
fn ident(&self) -> &Self::Ident {
self.target()
.ed_identity()
.expect("This channel had an Ed25519 identity when we created it, but now it doesn't!?")
}
fn is_usable(&self) -> bool {
!self.is_closing()
}
@ -221,10 +214,10 @@ impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
self.duration_unused()
}
fn reparameterize(
&mut self,
&self,
updates: Arc<ChannelPaddingInstructionsUpdates>,
) -> tor_proto::Result<()> {
self.reparameterize(updates)
tor_proto::channel::Channel::reparameterize(self, updates)
}
fn engage_padding_activities(&self) {
self.engage_padding_activities();
@ -239,11 +232,11 @@ mod test {
mgr::{AbstractChannel, AbstractChannelFactory},
Result,
};
use pk::ed25519::Ed25519Identity;
use pk::rsa::RsaIdentity;
use std::net::SocketAddr;
use std::time::{Duration, SystemTime};
use tor_linkspec::ChannelMethod;
use tor_linkspec::{ChannelMethod, HasRelayIds, RelayIdType};
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_llcrypto::pk::rsa::RsaIdentity;
use tor_proto::channel::Channel;
use tor_rtcompat::{test_with_one_runtime, TcpListener};
use tor_rtmock::{io::LocalStream, net::MockNetwork, MockSleepRuntime};
@ -316,7 +309,7 @@ mod test {
);
let chan = r1.unwrap();
assert_eq!(chan.ident(), &ed);
assert_eq!(chan.identity(RelayIdType::Ed25519), Some((&ed).into()));
assert!(chan.is_usable());
// In theory, time could pass here, so we can't just use
// "assert_eq!(dur_unused, dur_unused2)".

View File

@ -79,15 +79,28 @@ pub enum Error {
/// A relay did not have the set of identity keys that we expected.
///
/// (Currently, `tor-chanmgr` only works to manage channels with a known
/// expected Ed25519 identity.)
/// (Currently, `tor-chanmgr` only works on relays that have at least
/// one recognized identity key.)
#[error("Could not identify relay by identity key")]
MissingId,
/// A succesful relay channel had one of the identity keys we wanted,
/// but not the other(s).
///
/// This means that (assuming the relay is well behaved), we will not
/// find the ID combination we want.
#[error("Relay identity keys were only a partial match for what we wanted.")]
IdentityConflict,
/// Tried to connnect via a transport that we don't support.
#[error("No plugin available for the transport {0}")]
NoSuchTransport(tor_linkspec::TransportId),
/// An attempt to open a channel failed because it was cancelled or
/// superseded by another request or configuration change.
#[error("Channel request cancelled or superseded")]
RequestCancelled,
/// An internal error of some kind that should never occur.
#[error("Internal error")]
Internal(#[from] tor_error::Bug),
@ -99,6 +112,12 @@ impl<T> From<std::sync::PoisonError<T>> for Error {
}
}
impl From<tor_linkspec::ByRelayIdsError> for Error {
fn from(_: tor_linkspec::ByRelayIdsError) -> Self {
Error::MissingId
}
}
impl tor_error::HasKind for Error {
fn kind(&self) -> ErrorKind {
use tor_proto::Error as ProtoErr;
@ -117,7 +136,9 @@ impl tor_error::HasKind for Error {
E::NoSuchTransport(_) => EK::InvalidConfig,
E::UnusableTarget(_) | E::Internal(_) => EK::Internal,
E::MissingId => EK::BadApiUsage,
Error::ChannelBuild { .. } => EK::TorAccessFailed,
E::IdentityConflict => EK::TorAccessFailed,
E::ChannelBuild { .. } => EK::TorAccessFailed,
E::RequestCancelled => EK::TransientFailure,
}
}
}
@ -147,10 +168,15 @@ impl tor_error::HasRetryTime for Error {
// it won't have addresses in the future.
E::UnusableTarget(_) => RT::Never,
// This can't succeed until the relay is reconfigured.
E::IdentityConflict => RT::Never,
// This one can't succeed until the bridge, or our set of
// transports, is reconfigured.
E::NoSuchTransport(_) => RT::Never,
E::RequestCancelled => RT::Never,
// These aren't recoverable at all.
E::Spawn { .. } | E::MissingId | E::Internal(_) => RT::Never,
}

View File

@ -214,18 +214,9 @@ impl<R: Runtime> ChanMgr<R> {
target: &T,
usage: ChannelUsage,
) -> Result<(Channel, ChanProvenance)> {
// TODO(nickm): We will need to change the way that we index our map
// when we eventually support channels that are _not_ primarily
// identified by their ed25519 key. That could be in the distant future
// when we make Ed25519 keys optional: But more likely it will be when
// we implement bridges.
let ed_identity = target.ed_identity().ok_or(Error::MissingId)?;
let targetinfo = OwnedChanTarget::from_chan_target(target);
let (chan, provenance) = self
.mgr
.get_or_launch(*ed_identity, targetinfo, usage)
.await?;
let (chan, provenance) = self.mgr.get_or_launch(targetinfo, usage).await?;
// Double-check the match to make sure that the RSA identity is
// what we wanted too.
chan.check_match(target)

View File

@ -1,17 +1,17 @@
//! Abstract implementation of a channel manager
use crate::mgr::map::OpenEntry;
use crate::mgr::map::{OpenEntry, PendingEntry};
use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::future::{FutureExt, Shared};
use rand::Rng;
use std::hash::Hash;
use std::result::Result as StdResult;
use std::sync::Arc;
use std::time::Duration;
use tor_error::internal;
use tor_linkspec::{HasRelayIds, RelayIds};
use tor_netdir::params::NetParameters;
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
@ -20,11 +20,7 @@ mod map;
/// Trait to describe as much of a
/// [`Channel`](tor_proto::channel::Channel) as `AbstractChanMgr`
/// needs to use.
pub(crate) trait AbstractChannel: Clone {
/// Identity type for the other side of the channel.
type Ident: Hash + Eq + Clone;
/// Return this channel's identity.
fn ident(&self) -> &Self::Ident;
pub(crate) trait AbstractChannel: Clone + HasRelayIds {
/// Return true if this channel is usable.
///
/// A channel might be unusable because it is closed, because it has
@ -40,7 +36,7 @@ pub(crate) trait AbstractChannel: Clone {
/// The changed parameters may not be implemented "immediately",
/// but this will be done "reasonably soon".
fn reparameterize(
&mut self,
&self,
updates: Arc<ChannelPaddingInstructionsUpdates>,
) -> tor_proto::Result<()>;
@ -62,7 +58,7 @@ pub(crate) trait AbstractChannelFactory {
/// The type of channel that this factory can build.
type Channel: AbstractChannel;
/// Type that explains how to build a channel.
type BuildSpec;
type BuildSpec: HasRelayIds;
/// Construct a new channel to the destination described at `target`.
///
@ -91,11 +87,11 @@ pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
/// Type alias for a future that we wait on to see when a pending
/// channel is done or failed.
type Pending<C> = Shared<oneshot::Receiver<Result<C>>>;
type Pending = Shared<oneshot::Receiver<Result<()>>>;
/// Type alias for the sender we notify when we complete a channel (or
/// fail to complete it).
type Sending<C> = oneshot::Sender<Result<C>>;
/// Type alias for the sender we notify when we complete a channel (or fail to
/// complete it).
type Sending = oneshot::Sender<Result<()>>;
impl<CF: AbstractChannelFactory> AbstractChanMgr<CF> {
/// Make a new empty channel manager.
@ -119,30 +115,34 @@ impl<CF: AbstractChannelFactory> AbstractChanMgr<CF> {
/// Helper: return the objects used to inform pending tasks
/// about a newly open or failed channel.
fn setup_launch<C: Clone>(&self) -> (map::ChannelState<C>, Sending<C>) {
fn setup_launch<C: Clone>(&self, ids: RelayIds) -> (map::ChannelState<C>, Sending) {
let (snd, rcv) = oneshot::channel();
let shared = rcv.shared();
(map::ChannelState::Building(shared), snd)
let pending = rcv.shared();
(
map::ChannelState::Building(map::PendingEntry { ids, pending }),
snd,
)
}
/// Get a channel whose identity is `ident`.
/// Get a channel corresponding to the identities of `target`.
///
/// If a usable channel exists with that identity, return it.
///
/// If no such channel exists already, and none is in progress,
/// launch a new request using `target`, which must match `ident`.
/// launch a new request using `target`.
///
/// If no such channel exists already, but we have one that's in
/// progress, wait for it to succeed or fail.
pub(crate) async fn get_or_launch(
&self,
ident: <<CF as AbstractChannelFactory>::Channel as AbstractChannel>::Ident,
target: CF::BuildSpec,
usage: ChannelUsage,
) -> Result<(CF::Channel, ChanProvenance)> {
use ChannelUsage as CU;
let chan = self.get_or_launch_internal(ident, target).await?;
// TODO pt-client: This is not yet used.
let chan = self.get_or_launch_internal(target).await?;
match usage {
CU::Dir | CU::UselessCircuit => {}
@ -155,138 +155,278 @@ impl<CF: AbstractChannelFactory> AbstractChanMgr<CF> {
/// Get a channel whose identity is `ident` - internal implementation
async fn get_or_launch_internal(
&self,
ident: <<CF as AbstractChannelFactory>::Channel as AbstractChannel>::Ident,
target: CF::BuildSpec,
) -> Result<(CF::Channel, ChanProvenance)> {
use map::ChannelState::*;
/// Possible actions that we'll decide to take based on the
/// channel's initial state.
enum Action<C> {
/// We found no channel. We're going to launch a new one,
/// then tell everybody about it.
Launch(Sending<C>),
/// We found an in-progress attempt at making a channel.
/// We're going to wait for it to finish.
Wait(Pending<C>),
/// We found a usable channel. We're going to return it.
Return(Result<(C, ChanProvenance)>),
}
/// How many times do we try?
const N_ATTEMPTS: usize = 2;
let mut attempts_so_far = 0;
let mut final_attempt = false;
let mut provenance = ChanProvenance::Preexisting;
// TODO(nickm): It would be neat to use tor_retry instead.
let mut last_err = None;
for _ in 0..N_ATTEMPTS {
// First, see what state we're in, and what we should do
// about it.
let action = self
.channels
.change_state(&ident, |oldstate| match oldstate {
Some(Open(ref ent)) => {
if ent.channel.is_usable() {
// Good channel. Return it.
let action = Action::Return(Ok((
ent.channel.clone(),
ChanProvenance::Preexisting,
)));
(oldstate, action)
} else {
// Unusable channel. Move to the Building
// state and launch a new channel.
let (newstate, send) = self.setup_launch();
let action = Action::Launch(send);
(Some(newstate), action)
}
}
Some(Building(ref pending)) => {
let action = Action::Wait(pending.clone());
(oldstate, action)
}
Some(Poisoned(_)) => {
// We should never be able to see this state; this
// is a bug.
(
None,
Action::Return(Err(Error::Internal(internal!(
"Found a poisoned entry"
)))),
)
}
None => {
// No channel. Move to the Building
// state and launch a new channel.
let (newstate, send) = self.setup_launch();
let action = Action::Launch(send);
(Some(newstate), action)
}
})?;
while attempts_so_far < N_ATTEMPTS || final_attempt {
attempts_so_far += 1;
// Now we act based on the channel.
// For each attempt, we _first_ look at the state of the channel map
// to decide on an `Action`, and _then_ we execute that action.
// First, see what state we're in, and what we should do about it.
let action = self.choose_action(&target, final_attempt)?;
// We are done deciding on our Action! It's time act based on the
// Action that we chose.
match action {
// If this happens, we were trying to make one final check of our state, but
// we would have had to make additional attempts.
None => {
if !final_attempt {
return Err(Error::Internal(internal!(
"No action returned while not on final attempt"
)));
}
break;
}
// Easy case: we have an error or a channel to return.
Action::Return(v) => {
return v;
Some(Action::Return(v)) => {
return v.map(|chan| (chan, provenance));
}
// There's an in-progress channel. Wait for it.
Action::Wait(pend) => match pend.await {
Ok(Ok(chan)) => return Ok((chan, ChanProvenance::NewlyCreated)),
Ok(Err(e)) => {
last_err = Some(e);
Some(Action::Wait(pend)) => {
match pend.await {
Ok(Ok(())) => {
// We were waiting for a channel, and it succeeded, or it
// got cancelled. But it might have gotten more
// identities while negotiating than it had when it was
// launched, or it might have failed to get all the
// identities we want. Check for this.
final_attempt = true;
provenance = ChanProvenance::NewlyCreated;
last_err.get_or_insert(Error::RequestCancelled);
}
Ok(Err(e)) => {
last_err = Some(e);
}
Err(_) => {
last_err =
Some(Error::Internal(internal!("channel build task disappeared")));
}
}
Err(_) => {
last_err =
Some(Error::Internal(internal!("channel build task disappeared")));
}
},
}
// We need to launch a channel.
Action::Launch(send) => match self.connector.build_channel(&target).await {
Ok(mut chan) => {
// The channel got built: remember it, tell the
// others, and return it.
self.channels
.replace_with_params(ident.clone(), |channels_params| {
// This isn't great. We context switch to the newly-created
// channel just to tell it how and whether to do padding. Ideally
// we would pass the params at some suitable point during
// building. However, that would involve the channel taking a
// copy of the params, and that must happen in the same channel
// manager lock acquisition span as the one where we insert the
// channel into the table so it will receive updates. I.e.,
// here.
let update = channels_params.initial_update();
if let Some(update) = update {
chan.reparameterize(update.into())
.map_err(|_| internal!("failure on new channel"))?;
}
Ok(Open(OpenEntry {
channel: chan.clone(),
max_unused_duration: Duration::from_secs(
rand::thread_rng().gen_range(180..270),
),
}))
})?;
// It's okay if all the receivers went away:
// that means that nobody was waiting for this channel.
let _ignore_err = send.send(Ok(chan.clone()));
return Ok((chan, ChanProvenance::NewlyCreated));
Some(Action::Launch(send)) => {
let outcome = self.connector.build_channel(&target).await;
let status = self.handle_build_outcome(&target, outcome);
// It's okay if all the receivers went away:
// that means that nobody was waiting for this channel.
let _ignore_err = send.send(status.clone().map(|_| ()));
match status {
Ok(Some(chan)) => {
return Ok((chan, ChanProvenance::NewlyCreated));
}
Ok(None) => {
final_attempt = true;
provenance = ChanProvenance::NewlyCreated;
last_err.get_or_insert(Error::RequestCancelled);
}
Err(e) => last_err = Some(e),
}
Err(e) => {
// The channel failed. Make it non-pending, tell the
// others, and set the error.
self.channels.remove(&ident)?;
// (As above)
let _ignore_err = send.send(Err(e.clone()));
last_err = Some(e);
}
},
}
}
// End of this attempt. We will try again...
}
Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?"))))
}
/// Helper: based on our internal state, decide which action to take when
/// asked for a channel, and update our internal state accordingly.
///
/// If `final_attempt` is true, then we will not pick any action that does
/// not result in an immediate result. If we would pick such an action, we
/// instead return `Ok(None)`. (We could instead have the caller detect
/// such actions, but it's less efficient to construct them, insert them,
/// and immediately revert them.)
fn choose_action(
&self,
target: &CF::BuildSpec,
final_attempt: bool,
) -> Result<Option<Action<CF::Channel>>> {
use map::ChannelState::*;
self.channels.with_channels(|channel_map| {
match channel_map.by_all_ids(target) {
Some(Open(OpenEntry { channel, .. })) => {
if channel.is_usable() {
// This entry is a perfect match for the target keys:
// we'll return the open entry.
return Ok(Some(Action::Return(Ok(channel.clone()))));
} else if final_attempt {
// We don't launch an attempt in this case.
return Ok(None);
} else {
// This entry was a perfect match for the target, but it
// is no longer usable! We launch a new connection to
// this target, and wait on that.
let (new_state, send) = self.setup_launch(RelayIds::from_relay_ids(target));
channel_map.try_insert(new_state)?;
return Ok(Some(Action::Launch(send)));
}
}
Some(Building(PendingEntry { pending, .. })) => {
// This entry is a perfect match for the target keys: we'll
// return the pending entry. (We don't know for sure if it
// will match once it completes, since we might discover
// additional keys beyond those listed for this pending
// entry.)
if final_attempt {
// We don't launch an attempt in this case.
return Ok(None);
}
return Ok(Some(Action::Wait(pending.clone())));
}
_ => {}
}
// Okay, we don't have an exact match. But we might have one or more _partial_ matches?
let overlapping = channel_map.all_overlapping(target);
if overlapping
.iter()
.any(|entry| matches!(entry, Open(OpenEntry{ channel, ..}) if channel.is_usable()))
{
// At least one *open, usable* channel has been negotiated that
// overlaps only partially with our target: it has proven itself
// to have _one_ of our target identities, but not all.
//
// Because this channel exists, we know that our target cannot
// succeed, since relays are not allowed to share _any_
// identities.
return Ok(Some(Action::Return(Err(Error::IdentityConflict))));
} else if let Some(first_building) = overlapping
.iter()
.find(|entry| matches!(entry, Building(_)))
{
// There is at least one *in-progress* channel that has at least
// one identity in common with our target, but it does not have
// _all_ the identities we want.
//
// If it succeeds, we might find that we can use it; or we might
// find out that it is not suitable. So we'll wait for it, and
// see what happens.
//
// TODO: This approach will _not_ be sufficient once we are
// implementing relays, and some of our channels are created in
// response to client requests.
match first_building {
Open(_) => unreachable!(),
Building(PendingEntry { pending, .. }) => {
if final_attempt {
// We don't wait in this case.
return Ok(None);
}
return Ok(Some(Action::Wait(pending.clone())));
}
}
}
if final_attempt {
// We don't launch an attempt in this case.
return Ok(None);
}
// Great, nothing interfered at all.
let (new_state, send) = self.setup_launch(RelayIds::from_relay_ids(target));
channel_map.try_insert(new_state)?;
Ok(Some(Action::Launch(send)))
})?
}
/// We just tried to build a channel: Handle the outcome and decide what to
/// do.
///
/// Return `Ok(None)` if we have a transient error that we expect will be
/// cleaned up by one final call to `choose_action`.
fn handle_build_outcome(
&self,
target: &CF::BuildSpec,
outcome: Result<CF::Channel>,
) -> Result<Option<CF::Channel>> {
use map::ChannelState::*;
match outcome {
Ok(chan) => {
// The channel got built: remember it, tell the
// others, and return it.
self.channels
.with_channels_and_params(|channel_map, channels_params| {
match channel_map.remove_exact(target) {
Some(Building(_)) => {
// We successfully removed our pending
// action. great! Fall through and add
// the channel we just built.
}
None => {
// Something removed our entry from the list.
// Time to retry.
return Ok(None);
}
Some(ent @ Open(_)) => {
// Oh no. Something else built an entry
// here, and replaced us. Put that
// something back, and retry.
channel_map.insert(ent);
return Ok(None);
}
}
// This isn't great. We context switch to the newly-created
// channel just to tell it how and whether to do padding. Ideally
// we would pass the params at some suitable point during
// building. However, that would involve the channel taking a
// copy of the params, and that must happen in the same channel
// manager lock acquisition span as the one where we insert the
// channel into the table so it will receive updates. I.e.,
// here.
let update = channels_params.initial_update();
if let Some(update) = update {
chan.reparameterize(update.into())
.map_err(|_| internal!("failure on new channel"))?;
}
let new_entry = Open(OpenEntry {
channel: chan.clone(),
max_unused_duration: Duration::from_secs(
rand::thread_rng().gen_range(180..270),
),
});
channel_map.insert(new_entry);
Ok(Some(chan))
})?
}
Err(e) => {
// The channel failed. Make it non-pending, tell the
// others, and set the error.
self.channels.with_channels(|channel_map| {
match channel_map.remove_exact(target) {
Some(Building(_)) | None => {
// We successfully removed our pending
// action, or somebody else did.
}
Some(ent @ Open(_)) => {
// Oh no. Something else built an entry
// here, and replaced us. Put that
// something back.
channel_map.insert(ent);
}
}
})?;
Err(e)
}
}
}
/// Update the netdir
pub(crate) fn update_netparams(
&self,
@ -330,18 +470,32 @@ impl<CF: AbstractChannelFactory> AbstractChanMgr<CF> {
/// Test only: return the current open usable channel with a given
/// `ident`, if any.
#[cfg(test)]
pub(crate) fn get_nowait(
&self,
ident: &<<CF as AbstractChannelFactory>::Channel as AbstractChannel>::Ident,
) -> Option<CF::Channel> {
pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Option<CF::Channel>
where
T: Into<tor_linkspec::RelayIdRef<'a>>,
{
use map::ChannelState::*;
match self.channels.get(ident) {
Ok(Some(Open(ref ent))) if ent.channel.is_usable() => Some(ent.channel.clone()),
_ => None,
}
self.channels
.with_channels(|channel_map| match channel_map.by_id(ident) {
Some(Open(ref ent)) if ent.channel.is_usable() => Some(ent.channel.clone()),
_ => None,
})
.expect("Poisoned lock")
}
}
/// Possible actions that we'll decide to take when asked for a channel.
enum Action<C> {
/// We found no channel. We're going to launch a new one,
/// then tell everybody about it.
Launch(Sending),
/// We found an in-progress attempt at making a channel.
/// We're going to wait for it to finish.
Wait(Pending),
/// We found a usable channel. We're going to return it.
Return(Result<C>),
}
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
@ -353,6 +507,7 @@ mod test {
use std::sync::Arc;
use std::time::Duration;
use tor_error::bad_api_usage;
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use crate::ChannelUsage as CU;
use tor_rtcompat::{task::yield_now, test_with_one_runtime, Runtime};
@ -363,11 +518,11 @@ mod test {
#[derive(Clone, Debug)]
struct FakeChannel {
ident: u32,
ed_ident: Ed25519Identity,
mood: char,
closing: Arc<AtomicBool>,
detect_reuse: Arc<char>,
last_params: Option<ChannelPaddingInstructionsUpdates>,
// last_params: Option<ChannelPaddingInstructionsUpdates>,
}
impl PartialEq for FakeChannel {
@ -377,10 +532,6 @@ mod test {
}
impl AbstractChannel for FakeChannel {
type Ident = u32;
fn ident(&self) -> &u32 {
&self.ident
}
fn is_usable(&self) -> bool {
!self.closing.load(Ordering::SeqCst)
}
@ -388,15 +539,27 @@ mod test {
None
}
fn reparameterize(
&mut self,
updates: Arc<ChannelPaddingInstructionsUpdates>,
&self,
_updates: Arc<ChannelPaddingInstructionsUpdates>,
) -> tor_proto::Result<()> {
self.last_params = Some((*updates).clone());
// *self.last_params.lock().unwrap() = Some((*updates).clone());
Ok(())
}
fn engage_padding_activities(&self) {}
}
impl HasRelayIds for FakeChannel {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match key_type {
tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
_ => None,
}
}
}
impl FakeChannel {
fn start_closing(&self) {
self.closing.store(true, Ordering::SeqCst);
@ -419,14 +582,38 @@ mod test {
)
}
#[derive(Clone, Debug)]
struct FakeBuildSpec(u32, char, Ed25519Identity);
impl HasRelayIds for FakeBuildSpec {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match key_type {
tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()),
_ => None,
}
}
}
/// Helper to make a fake Ed identity from a u32.
fn u32_to_ed(n: u32) -> Ed25519Identity {
let mut bytes = [0; 32];
bytes[0..4].copy_from_slice(&n.to_be_bytes());
bytes.into()
}
#[async_trait]
impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
type Channel = FakeChannel;
type BuildSpec = (u32, char);
type BuildSpec = FakeBuildSpec;
async fn build_channel(&self, target: &Self::BuildSpec) -> Result<FakeChannel> {
yield_now().await;
let (ident, mood) = *target;
let FakeBuildSpec(ident, mood, id) = *target;
let ed_ident = u32_to_ed(ident);
assert_eq!(ed_ident, id);
match mood {
// "X" means never connect.
'❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))),
@ -437,11 +624,11 @@ mod test {
_ => {}
}
Ok(FakeChannel {
ident,
ed_ident,
mood,
closing: Arc::new(AtomicBool::new(false)),
detect_reuse: Default::default(),
last_params: None,
// last_params: None,
})
}
}
@ -450,21 +637,17 @@ mod test {
fn connect_one_ok() {
test_with_one_runtime!(|runtime| async {
let mgr = new_test_abstract_chanmgr(runtime);
let target = (413, '!');
let target = FakeBuildSpec(413, '!', u32_to_ed(413));
let chan1 = mgr
.get_or_launch(413, target, CU::UserTraffic)
.await
.unwrap()
.0;
let chan2 = mgr
.get_or_launch(413, target, CU::UserTraffic)
.get_or_launch(target.clone(), CU::UserTraffic)
.await
.unwrap()
.0;
let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0;
assert_eq!(chan1, chan2);
let chan3 = mgr.get_nowait(&413).unwrap();
let chan3 = mgr.get_nowait(&u32_to_ed(413)).unwrap();
assert_eq!(chan1, chan3);
});
}
@ -475,11 +658,11 @@ mod test {
let mgr = new_test_abstract_chanmgr(runtime);
// This is set up to always fail.
let target = (999, '❌');
let res1 = mgr.get_or_launch(999, target, CU::UserTraffic).await;
let target = FakeBuildSpec(999, '❌', u32_to_ed(999));
let res1 = mgr.get_or_launch(target, CU::UserTraffic).await;
assert!(matches!(res1, Err(Error::UnusableTarget(_))));
let chan3 = mgr.get_nowait(&999);
let chan3 = mgr.get_nowait(&u32_to_ed(999));
assert!(chan3.is_none());
});
}
@ -493,12 +676,12 @@ mod test {
// concurrently. Right now it seems that they don't actually
// interact.
let (ch3a, ch3b, ch44a, ch44b, ch86a, ch86b) = join!(
mgr.get_or_launch(3, (3, 'a'), CU::UserTraffic),
mgr.get_or_launch(3, (3, 'b'), CU::UserTraffic),
mgr.get_or_launch(44, (44, 'a'), CU::UserTraffic),
mgr.get_or_launch(44, (44, 'b'), CU::UserTraffic),
mgr.get_or_launch(86, (86, '❌'), CU::UserTraffic),
mgr.get_or_launch(86, (86, '🔥'), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86)), CU::UserTraffic),
);
let ch3a = ch3a.unwrap();
let ch3b = ch3b.unwrap();
@ -522,9 +705,9 @@ mod test {
let mgr = new_test_abstract_chanmgr(runtime);
let (ch3, ch4, ch5) = join!(
mgr.get_or_launch(3, (3, 'a'), CU::UserTraffic),
mgr.get_or_launch(4, (4, 'a'), CU::UserTraffic),
mgr.get_or_launch(5, (5, 'a'), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4)), CU::UserTraffic),
mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5)), CU::UserTraffic),
);
let ch3 = ch3.unwrap().0;
@ -535,7 +718,7 @@ mod test {
ch5.start_closing();
let ch3_new = mgr
.get_or_launch(3, (3, 'b'), CU::UserTraffic)
.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3)), CU::UserTraffic)
.await
.unwrap()
.0;
@ -544,9 +727,9 @@ mod test {
mgr.remove_unusable_entries().unwrap();
assert!(mgr.get_nowait(&3).is_some());
assert!(mgr.get_nowait(&4).is_some());
assert!(mgr.get_nowait(&5).is_none());
assert!(mgr.get_nowait(&u32_to_ed(3)).is_some());
assert!(mgr.get_nowait(&u32_to_ed(4)).is_some());
assert!(mgr.get_nowait(&u32_to_ed(5)).is_none());
});
}
}

View File

@ -3,14 +3,16 @@
use std::time::Duration;
use super::{AbstractChannel, Pending};
use crate::{ChannelConfig, Dormancy, Error, Result};
use crate::{ChannelConfig, Dormancy, Result};
use std::collections::{hash_map, HashMap};
use std::result::Result as StdResult;
use std::sync::Arc;
use tor_cell::chancell::msg::PaddingNegotiate;
use tor_config::PaddingLevel;
use tor_error::{internal, into_internal};
use tor_linkspec::ByRelayIds;
use tor_linkspec::HasRelayIds;
use tor_linkspec::RelayIds;
use tor_netdir::{params::NetParameters, params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND};
use tor_proto::channel::padding::Parameters as PaddingParameters;
use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
@ -23,14 +25,6 @@ use void::{ResultVoidExt as _, Void};
#[cfg(test)]
mod padding_test;
// TODO pt-client:
//
// This map code will no longer work in the bridge setting. We need to update
// our internal map of channels, so that we can look them up not only by Ed25519
// identity, but by RSA identity too. We also need a way to be able to get a
// channel only if it matches a specific ChanTarget in its address and transport
// and keys.
/// A map from channel id to channel state, plus necessary auxiliary state
///
/// We make this a separate type instead of just using
@ -48,8 +42,12 @@ struct Inner<C: AbstractChannel> {
///
/// (Danger: this uses a blocking mutex close to async code. This mutex
/// must never be held while an await is happening.)
channels: HashMap<C::Ident, ChannelState<C>>,
channels: ByRelayIds<ChannelState<C>>,
// TODO: The subsequent fields really do not belong in structure called
// `ChannelMap`. These, plus the actual map, should probably be in a
// structure called "MgrState", and that structure should be the thing that
// is put behind a Mutex.
/// Parameters for channels that we create, and that all existing channels are using
///
/// Will be updated by a background task, which also notifies all existing
@ -70,14 +68,17 @@ struct Inner<C: AbstractChannel> {
dormancy: Dormancy,
}
/// Structure that can only be constructed from within this module.
/// Used to make sure that only we can construct ChannelState::Poisoned.
pub(crate) struct Priv {
/// (This field is private)
_unused: (),
}
/// The state of a channel (or channel build attempt) within a map.
///
/// A ChannelState can be Open (representing a fully negotiated channel) or
/// Building (representing a pending attempt to build a channel). Both states
/// have a set of RelayIds, but these RelayIds represent slightly different
/// things:
/// * On a Building channel, the set of RelayIds is all the identities that we
/// require the peer to have. (The peer may turn out to have _more_
/// identities than this.)
/// * On an Open channel, the set of RelayIds is all the identities that
/// we were able to successfully authenticate for the peer.
pub(crate) enum ChannelState<C> {
/// An open channel.
///
@ -86,12 +87,7 @@ pub(crate) enum ChannelState<C> {
/// yielding it to the user.
Open(OpenEntry<C>),
/// A channel that's getting built.
Building(Pending<C>),
/// A temporary invalid state.
///
/// We insert this into the map temporarily as a placeholder in
/// `change_state()`.
Poisoned(Priv),
Building(PendingEntry),
}
/// An open channel entry.
@ -103,24 +99,42 @@ pub(crate) struct OpenEntry<C> {
pub(crate) max_unused_duration: Duration,
}
impl<C: Clone> ChannelState<C> {
/// Create a new shallow copy of this ChannelState.
#[cfg(test)]
fn clone_ref(&self) -> Result<Self> {
use ChannelState::*;
/// An entry for a not-yet-build channel
#[derive(Clone)]
pub(crate) struct PendingEntry {
/// The keys of the relay to which we're trying to open a channel.
pub(crate) ids: RelayIds,
/// A future we can clone and listen on to learn when this channel attempt
/// is successful or failed.
///
/// This entry will be removed from the map (and possibly replaced with an
/// `OpenEntry`) _before_ this future becomes ready.
pub(crate) pending: Pending,
}
impl<C> HasRelayIds for ChannelState<C>
where
C: HasRelayIds,
{
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match self {
Open(ent) => Ok(Open(ent.clone())),
Building(pending) => Ok(Building(pending.clone())),
Poisoned(_) => Err(Error::Internal(internal!("Poisoned state in channel map"))),
ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type),
ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type),
}
}
}
impl<C: Clone> ChannelState<C> {
/// For testing: either give the Open channel inside this state,
/// or panic if there is none.
#[cfg(test)]
fn unwrap_open(&mut self) -> &mut C {
fn unwrap_open(&self) -> &C {
match self {
ChannelState::Open(ent) => &mut ent.channel,
ChannelState::Open(ent) => &ent.channel,
_ => panic!("Not an open channel"),
}
}
@ -182,24 +196,6 @@ impl NetParamsExtract {
}
impl<C: AbstractChannel> ChannelState<C> {
/// Return an error if `ident`is definitely not a matching
/// matching identity for this state.
fn check_ident(&self, ident: &C::Ident) -> Result<()> {
match self {
ChannelState::Open(ent) => {
if ent.channel.ident() == ident {
Ok(())
} else {
Err(Error::Internal(internal!("Identity mismatch")))
}
}
ChannelState::Poisoned(_) => {
Err(Error::Internal(internal!("Poisoned state in channel map")))
}
ChannelState::Building(_) => Ok(()),
}
}
/// Return true if a channel is ready to expire.
/// Update `expire_after` if a smaller duration than
/// the given value is required to expire this channel.
@ -240,7 +236,7 @@ impl<C: AbstractChannel> ChannelMap<C> {
ChannelMap {
inner: std::sync::Mutex::new(Inner {
channels: HashMap::new(),
channels: ByRelayIds::new(),
config,
channels_params,
dormancy,
@ -248,119 +244,52 @@ impl<C: AbstractChannel> ChannelMap<C> {
}
}
/// Return the channel state for the given identity, if any.
#[cfg(test)]
pub(crate) fn get(&self, ident: &C::Ident) -> Result<Option<ChannelState<C>>> {
let inner = self.inner.lock()?;
inner
.channels
.get(ident)
.map(ChannelState::clone_ref)
.transpose()
}
/// Replace the channel state for `ident` with `newval`, and return the
/// previous value if any.
#[cfg(test)]
pub(crate) fn replace(
&self,
ident: C::Ident,
newval: ChannelState<C>,
) -> Result<Option<ChannelState<C>>> {
newval.check_ident(&ident)?;
let mut inner = self.inner.lock()?;
Ok(inner.channels.insert(ident, newval))
}
/// Replace the channel state for `ident` with the return value from `func`,
/// and return the previous value if any.
/// Run a function on the `ByRelayIds` that implements this map.
///
/// Passes a snapshot of the current global channels parameters to `func`.
/// If those parameters are copied by `func` into an [`AbstractChannel`]
/// `func` must ensure that that `AbstractChannel` is returned,
/// so that it will be properly registered and receive params updates.
pub(crate) fn replace_with_params<F>(
&self,
ident: C::Ident,
func: F,
) -> Result<Option<ChannelState<C>>>
/// This function grabs a mutex over the map: do not provide slow function.
///
/// We provide this function rather than exposing the channels set directly,
/// to make sure that the calling code doesn't await while holding the lock.
pub(crate) fn with_channels<F, T>(&self, func: F) -> Result<T>
where
F: FnOnce(&ChannelPaddingInstructions) -> Result<ChannelState<C>>,
F: FnOnce(&mut ByRelayIds<ChannelState<C>>) -> T,
{
let mut inner = self.inner.lock()?;
let newval = func(&inner.channels_params)?;
newval.check_ident(&ident)?;
Ok(inner.channels.insert(ident, newval))
Ok(func(&mut inner.channels))
}
/// Remove and return the state for `ident`, if any.
pub(crate) fn remove(&self, ident: &C::Ident) -> Result<Option<ChannelState<C>>> {
/// Run a function on the `ByRelayIds` that implements this map.
///
/// This function grabs a mutex over the map: do not provide slow function.
///
/// We provide this function rather than exposing the channels set directly,
/// to make sure that the calling code doesn't await while holding the lock.
pub(crate) fn with_channels_and_params<F, T>(&self, func: F) -> Result<T>
where
F: FnOnce(&mut ByRelayIds<ChannelState<C>>, &ChannelPaddingInstructions) -> T,
{
let mut inner = self.inner.lock()?;
Ok(inner.channels.remove(ident))
// We need this silly destructuring syntax so that we don't seem to be
// borrowing the structure mutably and immutably at the same time.
let Inner {
ref mut channels,
ref channels_params,
..
} = &mut *inner;
Ok(func(channels, channels_params))
}
/// Remove every unusable state from the map.
#[cfg(test)]
pub(crate) fn remove_unusable(&self) -> Result<()> {
let mut inner = self.inner.lock()?;
inner.channels.retain(|_, state| match state {
ChannelState::Poisoned(_) => false,
inner.channels.retain(|state| match state {
ChannelState::Open(ent) => ent.channel.is_usable(),
ChannelState::Building(_) => true,
});
Ok(())
}
/// Replace the state whose identity is `ident` with a new state.
///
/// The provided function `func` is invoked on the old state (if
/// any), and must return a tuple containing an optional new
/// state, and an arbitrary return value for this function.
///
/// Because `func` is run while holding the lock on this object,
/// it should be fast and nonblocking. In return, you can be sure
/// that it's running atomically with respect to other accessors
/// of this map.
///
/// If `func` panics, or if it returns a channel with a different
/// identity, this position in the map will be become unusable and
/// future accesses to that position may fail.
pub(crate) fn change_state<F, V>(&self, ident: &C::Ident, func: F) -> Result<V>
where
F: FnOnce(Option<ChannelState<C>>) -> (Option<ChannelState<C>>, V),
{
use hash_map::Entry::*;
let mut inner = self.inner.lock()?;
let entry = inner.channels.entry(ident.clone());
match entry {
Occupied(mut occupied) => {
// Temporarily replace the entry for this identity with
// a poisoned entry.
let mut oldent = ChannelState::Poisoned(Priv { _unused: () });
std::mem::swap(occupied.get_mut(), &mut oldent);
let (newval, output) = func(Some(oldent));
match newval {
Some(mut newent) => {
newent.check_ident(ident)?;
std::mem::swap(occupied.get_mut(), &mut newent);
}
None => {
occupied.remove();
}
};
Ok(output)
}
Vacant(vacant) => {
let (newval, output) = func(None);
if let Some(newent) = newval {
newent.check_ident(ident)?;
vacant.insert(newent);
}
Ok(output)
}
}
}
/// Reconfigure all channels as necessary
///
/// (By reparameterising channels as needed)
@ -413,10 +342,10 @@ impl<C: AbstractChannel> ChannelMap<C> {
};
let update = Arc::new(update);
for channel in inner.channels.values_mut() {
for channel in inner.channels.values() {
let channel = match channel {
CS::Open(OpenEntry { channel, .. }) => channel,
CS::Building(_) | CS::Poisoned(_) => continue,
CS::Building(_) => continue,
};
// Ignore error (which simply means the channel is closed or gone)
let _ = channel.reparameterize(update.clone());
@ -434,7 +363,7 @@ impl<C: AbstractChannel> ChannelMap<C> {
.lock()
.expect("Poisoned lock")
.channels
.retain(|_id, chan| !chan.ready_to_expire(&mut ret));
.retain(|chan| !chan.ready_to_expire(&mut ret));
ret
}
}
@ -584,7 +513,8 @@ mod test {
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tor_llcrypto::pk::ed25519::Ed25519Identity;
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
fn new_test_channel_map<C: AbstractChannel>() -> ChannelMap<C> {
@ -595,18 +525,14 @@ mod test {
)
}
#[derive(Eq, PartialEq, Clone, Debug)]
#[derive(Clone, Debug)]
struct FakeChannel {
ident: &'static str,
ed_ident: Ed25519Identity,
usable: bool,
unused_duration: Option<u64>,
params_update: Option<Arc<ChannelPaddingInstructionsUpdates>>,
params_update: Arc<Mutex<Option<Arc<ChannelPaddingInstructionsUpdates>>>>,
}
impl AbstractChannel for FakeChannel {
type Ident = u8;
fn ident(&self) -> &Self::Ident {
&self.ident.as_bytes()[0]
}
fn is_usable(&self) -> bool {
self.usable
}
@ -614,20 +540,36 @@ mod test {
self.unused_duration.map(Duration::from_secs)
}
fn reparameterize(
&mut self,
&self,
update: Arc<ChannelPaddingInstructionsUpdates>,
) -> tor_proto::Result<()> {
self.params_update = Some(update);
*self.params_update.lock().unwrap() = Some(update);
Ok(())
}
fn engage_padding_activities(&self) {}
}
impl tor_linkspec::HasRelayIds for FakeChannel {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match key_type {
tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
_ => None,
}
}
}
/// Get a fake ed25519 identity from the first byte of a string.
fn str_to_ed(s: &str) -> Ed25519Identity {
let byte = s.as_bytes()[0];
[byte; 32].into()
}
fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ident,
ed_ident: str_to_ed(ident),
usable: true,
unused_duration: None,
params_update: None,
params_update: Arc::new(Mutex::new(None)),
};
ChannelState::Open(OpenEntry {
channel,
@ -640,10 +582,10 @@ mod test {
unused_duration: Option<u64>,
) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ident,
ed_ident: str_to_ed(ident),
usable: true,
unused_duration,
params_update: None,
params_update: Arc::new(Mutex::new(None)),
};
ChannelState::Open(OpenEntry {
channel,
@ -652,10 +594,10 @@ mod test {
}
fn closed(ident: &'static str) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ident,
ed_ident: str_to_ed(ident),
usable: false,
unused_duration: None,
params_update: None,
params_update: Arc::new(Mutex::new(None)),
};
ChannelState::Open(OpenEntry {
channel,
@ -664,106 +606,30 @@ mod test {
}
#[test]
fn simple_ops() {
let map = new_test_channel_map();
use ChannelState::Open;
assert!(map.replace(b'h', ch("hello")).unwrap().is_none());
assert!(map.replace(b'w', ch("wello")).unwrap().is_none());
match map.get(&b'h') {
Ok(Some(Open(ent))) if ent.channel.ident == "hello" => {}
_ => panic!(),
}
assert!(map.get(&b'W').unwrap().is_none());
match map.replace(b'h', ch("hebbo")) {
Ok(Some(Open(ent))) if ent.channel.ident == "hello" => {}
_ => panic!(),
}
assert!(map.remove(&b'Z').unwrap().is_none());
match map.remove(&b'h') {
Ok(Some(Open(ent))) if ent.channel.ident == "hebbo" => {}
_ => panic!(),
}
}
#[test]
fn rmv_unusable() {
fn rmv_unusable() -> Result<()> {
let map = new_test_channel_map();
map.replace(b'm', closed("machen")).unwrap();
map.replace(b'f', ch("feinen")).unwrap();
map.replace(b'w', closed("wir")).unwrap();
map.replace(b'F', ch("Fug")).unwrap();
map.with_channels(|map| {
map.insert(closed("machen"));
map.insert(ch("feinen"));
map.insert(closed("wir"));
map.insert(ch("Fug"));
})?;
map.remove_unusable().unwrap();
assert!(map.get(&b'm').unwrap().is_none());
assert!(map.get(&b'w').unwrap().is_none());
assert!(map.get(&b'f').unwrap().is_some());
assert!(map.get(&b'F').unwrap().is_some());
map.with_channels(|map| {
assert!(map.by_id(&str_to_ed("m")).is_none());
assert!(map.by_id(&str_to_ed("w")).is_none());
assert!(map.by_id(&str_to_ed("f")).is_some());
assert!(map.by_id(&str_to_ed("F")).is_some());
})?;
Ok(())
}
#[test]
fn change() {
let map = new_test_channel_map();
map.replace(b'w', ch("wir")).unwrap();
map.replace(b'm', ch("machen")).unwrap();
map.replace(b'f', ch("feinen")).unwrap();
map.replace(b'F', ch("Fug")).unwrap();
// Replace Some with Some.
let (old, v) = map
.change_state(&b'F', |state| (Some(ch("FUG")), (state, 99_u8)))
.unwrap();
assert_eq!(old.unwrap().unwrap_open().ident, "Fug");
assert_eq!(v, 99);
assert_eq!(map.get(&b'F').unwrap().unwrap().unwrap_open().ident, "FUG");
// Replace Some with None.
let (old, v) = map
.change_state(&b'f', |state| (None, (state, 123_u8)))
.unwrap();
assert_eq!(old.unwrap().unwrap_open().ident, "feinen");
assert_eq!(v, 123);
assert!(map.get(&b'f').unwrap().is_none());
// Replace None with Some.
let (old, v) = map
.change_state(&b'G', |state| (Some(ch("Geheimnisse")), (state, "Hi")))
.unwrap();
assert!(old.is_none());
assert_eq!(v, "Hi");
assert_eq!(
map.get(&b'G').unwrap().unwrap().unwrap_open().ident,
"Geheimnisse"
);
// Replace None with None
let (old, v) = map
.change_state(&b'Q', |state| (None, (state, "---")))
.unwrap();
assert!(old.is_none());
assert_eq!(v, "---");
assert!(map.get(&b'Q').unwrap().is_none());
// Try replacing None with invalid entry (with mismatched ID)
let e = map.change_state(&b'P', |state| (Some(ch("Geheimnisse")), (state, "Hi")));
assert!(matches!(e, Err(Error::Internal(_))));
assert!(matches!(map.get(&b'P'), Ok(None)));
// Try replacing Some with invalid entry (mismatched ID)
let e = map.change_state(&b'G', |state| (Some(ch("Wobbledy")), (state, "Hi")));
assert!(matches!(e, Err(Error::Internal(_))));
assert!(matches!(map.get(&b'G'), Err(Error::Internal(_))));
}
#[test]
fn reparameterise_via_netdir() {
fn reparameterise_via_netdir() -> Result<()> {
let map = new_test_channel_map();
// Set some non-default parameters so that we can tell when an update happens
@ -781,16 +647,19 @@ mod test {
)
.finish();
assert!(map.replace(b't', ch("track")).unwrap().is_none());
map.with_channels(|map| {
map.insert(ch("track"));
})?;
let netdir = tor_netdir::testnet::construct_netdir()
.unwrap_if_sufficient()
.unwrap();
let netdir = Arc::new(netdir);
let with_ch = |f: &dyn Fn(&mut FakeChannel)| {
let mut inner = map.inner.lock().unwrap();
let ch = inner.channels.get_mut(&b't').unwrap().unwrap_open();
let with_ch = |f: &dyn Fn(&FakeChannel)| {
let inner = map.inner.lock().unwrap();
let ch = inner.channels.by_ed25519(&str_to_ed("t"));
let ch = ch.unwrap().unwrap_open();
f(ch);
};
@ -798,7 +667,7 @@ mod test {
map.reconfigure_general(None, None, netdir.clone()).unwrap();
with_ch(&|ch| {
assert_eq!(
format!("{:?}", ch.params_update.take().unwrap()),
format!("{:?}", ch.params_update.lock().unwrap().take().unwrap()),
// evade field visibility by (ab)using Debug impl
"ChannelPaddingInstructionsUpdates { padding_enable: None, \
padding_parameters: Some(Parameters { \
@ -811,54 +680,65 @@ mod test {
eprintln!("-- process a default netdir again, which should *not* send an update --");
map.reconfigure_general(None, None, netdir).unwrap();
with_ch(&|ch| assert_eq!(ch.params_update, None));
with_ch(&|ch| assert!(ch.params_update.lock().unwrap().is_none()));
Ok(())
}
#[test]
fn expire_channels() {
fn expire_channels() -> Result<()> {
let map = new_test_channel_map();
// Channel that has been unused beyond max duration allowed is expired
map.replace(
b'w',
ch_with_details("wello", Duration::from_secs(180), Some(181)),
)
.unwrap();
map.with_channels(|map| {
map.insert(ch_with_details(
"wello",
Duration::from_secs(180),
Some(181),
))
})?;
// Minimum value of max unused duration is 180 seconds
assert_eq!(180, map.expire_channels().as_secs());
assert!(map.get(&b'w').unwrap().is_none());
map.with_channels(|map| {
assert!(map.by_ed25519(&str_to_ed("w")).is_none());
})?;
let map = new_test_channel_map();
// Channel that has been unused for shorter than max unused duration
map.replace(
b'w',
ch_with_details("wello", Duration::from_secs(180), Some(120)),
)
.unwrap();
map.with_channels(|map| {
map.insert(ch_with_details(
"wello",
Duration::from_secs(180),
Some(120),
));
map.replace(
b'y',
ch_with_details("yello", Duration::from_secs(180), Some(170)),
)
.unwrap();
map.insert(ch_with_details(
"yello",
Duration::from_secs(180),
Some(170),
));
// Channel that has been unused beyond max duration allowed is expired
map.replace(
b'g',
ch_with_details("gello", Duration::from_secs(180), Some(181)),
)
.unwrap();
// Channel that has been unused beyond max duration allowed is expired
map.insert(ch_with_details(
"gello",
Duration::from_secs(180),
Some(181),
));
// Closed channel should be retained
map.replace(b'h', closed("hello")).unwrap();
// Closed channel should be retained
map.insert(closed("hello"));
})?;
// Return duration until next channel expires
assert_eq!(10, map.expire_channels().as_secs());
assert!(map.get(&b'w').unwrap().is_some());
assert!(map.get(&b'y').unwrap().is_some());
assert!(map.get(&b'h').unwrap().is_some());
assert!(map.get(&b'g').unwrap().is_none());
map.with_channels(|map| {
assert!(map.by_ed25519(&str_to_ed("w")).is_some());
assert!(map.by_ed25519(&str_to_ed("y")).is_some());
assert!(map.by_ed25519(&str_to_ed("h")).is_some());
assert!(map.by_ed25519(&str_to_ed("g")).is_none());
})?;
Ok(())
}
}

View File

@ -21,7 +21,7 @@ use itertools::{zip_eq, Itertools};
use tor_cell::chancell::msg::PaddingNegotiateCmd;
use tor_config::PaddingLevel;
use tor_linkspec::HasRelayIds;
use tor_linkspec::{HasRelayIds, RelayIds};
use tor_netdir::NetDir;
use tor_proto::channel::{Channel, CtrlMsg};
@ -121,7 +121,7 @@ struct FakeChannelFactory {
#[async_trait]
impl AbstractChannelFactory for FakeChannelFactory {
type Channel = Channel;
type BuildSpec = ();
type BuildSpec = tor_linkspec::RelayIds;
async fn build_channel(&self, _target: &Self::BuildSpec) -> Result<Self::Channel> {
Ok(self.channel.clone())
@ -152,13 +152,17 @@ async fn case(level: PaddingLevel, dormancy: Dormancy, usage: ChannelUsage) -> C
let (channel, recv) = Channel::new_fake();
let peer_id = channel.target().ed_identity().unwrap().clone();
let relay_ids = RelayIds::builder()
.ed_identity(peer_id.clone())
.build()
.unwrap();
let factory = FakeChannelFactory { channel };
let netparams = Arc::new(NetParameters::default());
let chanmgr = AbstractChanMgr::new(factory, &cconfig, dormancy, &netparams);
let (channel, _prov) = chanmgr.get_or_launch(peer_id, (), usage).await.unwrap();
let (channel, _prov) = chanmgr.get_or_launch(relay_ids, usage).await.unwrap();
CaseContext {
channel,

View File

@ -18,6 +18,7 @@ pt-client = []
[dependencies]
base64ct = "1.5.1"
by_address = "1"
cfg-if = "1.0.0"
derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" }
derive_more = "0.99"

View File

@ -40,6 +40,17 @@ impl<H: HasRelayIds> ByRelayIds<H> {
}
}
/// Return the value in this set (if any) that has the key `key`.
pub fn remove_by_id<'a, T>(&mut self, key: T) -> Option<H>
where
T: Into<RelayIdRef<'a>>,
{
match key.into() {
RelayIdRef::Ed25519(ed) => self.remove_by_ed25519(ed),
RelayIdRef::Rsa(rsa) => self.remove_by_rsa(rsa),
}
}
/// Return the value in this set (if any) that has _all_ the relay IDs
/// that `key` does.
///
@ -52,9 +63,175 @@ impl<H: HasRelayIds> ByRelayIds<H> {
self.by_id(any_id)
.filter(|val| val.has_all_relay_ids_from(key))
}
/// Remove the single value in this set (if any) that has _exactly the same_
/// relay IDs that `key` does
pub fn remove_exact<T>(&mut self, key: &T) -> Option<H>
where
T: HasRelayIds,
{
let any_id = key.identities().next()?;
if self
.by_id(any_id)
.filter(|ent| ent.same_relay_ids(key))
.is_some()
{
self.remove_by_id(any_id)
} else {
None
}
}
/// Return a reference to every element in this set that shares _any_ ID
/// with `key`.
///
/// No element is returned more than once.
pub fn all_overlapping<T>(&self, key: &T) -> Vec<&H>
where
T: HasRelayIds,
{
use by_address::ByAddress;
use std::collections::HashSet;
let mut items: HashSet<ByAddress<&H>> = HashSet::new();
for ident in key.identities() {
if let Some(found) = self.by_id(ident) {
items.insert(ByAddress(found));
}
}
items.into_iter().map(|by_addr| by_addr.0).collect()
}
}
// TODO MSRV: Remove this `allow` once we no longer get a false positive
// for it on our MSRV. 1.56 is affected; 1.60 is not.
#[allow(unreachable_pub)]
pub use tor_basic_utils::n_key_set::Error as ByRelayIdsError;
#[cfg(test)]
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::{RelayIds, RelayIdsBuilder};
#[test]
fn lookup() {
let rsa1: RsaIdentity = (*b"12345678901234567890").into();
let rsa2: RsaIdentity = (*b"abcefghijklmnopqrstu").into();
let rsa3: RsaIdentity = (*b"abcefghijklmnopQRSTU").into();
let ed1: Ed25519Identity = (*b"12345678901234567890123456789012").into();
let ed2: Ed25519Identity = (*b"abcefghijklmnopqrstuvwxyzABCDEFG").into();
let ed3: Ed25519Identity = (*b"abcefghijklmnopqrstuvwxyz1234567").into();
let keys1 = RelayIdsBuilder::default()
.rsa_identity(rsa1)
.ed_identity(ed1)
.build()
.unwrap();
let keys2 = RelayIdsBuilder::default()
.rsa_identity(rsa2)
.ed_identity(ed2)
.build()
.unwrap();
let mut set = ByRelayIds::new();
set.insert(keys1.clone());
set.insert(keys2.clone());
// Try by_id
assert_eq!(set.by_id(&rsa1), Some(&keys1));
assert_eq!(set.by_id(&ed1), Some(&keys1));
assert_eq!(set.by_id(&rsa2), Some(&keys2));
assert_eq!(set.by_id(&ed2), Some(&keys2));
assert_eq!(set.by_id(&rsa3), None);
assert_eq!(set.by_id(&ed3), None);
// Try exact lookup
assert_eq!(set.by_all_ids(&keys1), Some(&keys1));
assert_eq!(set.by_all_ids(&keys2), Some(&keys2));
{
let search = RelayIdsBuilder::default()
.rsa_identity(rsa1)
.build()
.unwrap();
assert_eq!(set.by_all_ids(&search), Some(&keys1));
}
{
let search = RelayIdsBuilder::default()
.rsa_identity(rsa1)
.ed_identity(ed2)
.build()
.unwrap();
assert_eq!(set.by_all_ids(&search), None);
}
// Try looking for overlap
assert_eq!(set.all_overlapping(&keys1), vec![&keys1]);
assert_eq!(set.all_overlapping(&keys2), vec![&keys2]);
{
let search = RelayIdsBuilder::default()
.rsa_identity(rsa1)
.ed_identity(ed2)
.build()
.unwrap();
let answer = set.all_overlapping(&search);
assert_eq!(answer.len(), 2);
assert!(answer.contains(&&keys1));
assert!(answer.contains(&&keys2));
}
{
let search = RelayIdsBuilder::default()
.rsa_identity(rsa2)
.build()
.unwrap();
assert_eq!(set.all_overlapping(&search), vec![&keys2]);
}
{
let search = RelayIdsBuilder::default()
.rsa_identity(rsa3)
.build()
.unwrap();
assert_eq!(set.all_overlapping(&search), Vec::<&RelayIds>::new());
}
}
#[test]
fn remove_exact() {
let rsa1: RsaIdentity = (*b"12345678901234567890").into();
let rsa2: RsaIdentity = (*b"abcefghijklmnopqrstu").into();
let ed1: Ed25519Identity = (*b"12345678901234567890123456789012").into();
let ed2: Ed25519Identity = (*b"abcefghijklmnopqrstuvwxyzABCDEFG").into();
let keys1 = RelayIdsBuilder::default()
.rsa_identity(rsa1)
.ed_identity(ed1)
.build()
.unwrap();
let keys2 = RelayIdsBuilder::default()
.rsa_identity(rsa2)
.ed_identity(ed2)
.build()
.unwrap();
let mut set = ByRelayIds::new();
set.insert(keys1.clone());
set.insert(keys2);
assert_eq!(set.len(), 2);
let removed = set.remove_exact(&keys1);
assert_eq!(removed, Some(keys1));
assert_eq!(set.len(), 1);
{
let search = RelayIdsBuilder::default().ed_identity(ed2).build().unwrap();
let removed = set.remove_exact(&search);
assert_eq!(removed, None);
assert_eq!(set.len(), 1);
}
}
}

View File

@ -48,7 +48,10 @@ pub use ids::{
RelayId, RelayIdError, RelayIdRef, RelayIdType, RelayIdTypeIter,
};
pub use ls::LinkSpec;
pub use owned::{OwnedChanTarget, OwnedChanTargetBuilder, OwnedCircTarget, RelayIds};
pub use owned::{
OwnedChanTarget, OwnedChanTargetBuilder, OwnedCircTarget, OwnedCircTargetBuilder, RelayIds,
RelayIdsBuilder,
};
pub use traits::{
ChanTarget, CircTarget, DirectChanMethodsHelper, HasAddrs, HasChanMethod, HasRelayIds,
HasRelayIdsLegacy,

View File

@ -1 +1,3 @@
MODIFIED: New Channel::set_declared_method, deprecated set_declared_addr.
MODIFIED: Channel implements HasRelayIds.
MODIFIED: Channel::reparameterize now take immutable &self

View File

@ -476,7 +476,7 @@ impl Channel {
/// Reparameterise (update parameters; reconfigure)
///
/// Returns `Err` if the channel was closed earlier
pub fn reparameterize(&mut self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
pub fn reparameterize(&self, params: Arc<ChannelPaddingInstructionsUpdates>) -> Result<()> {
let mut mutable = self
.details
.mutable
@ -664,6 +664,15 @@ where
Ok(())
}
impl HasRelayIds for Channel {
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
self.details.peer_id.identity(key_type)
}
}
/// Make some fake channel details (for testing only!)
#[cfg(any(test, feature = "testing"))]
fn fake_channel_details() -> Arc<ChannelDetails> {