chanmgr: Add identities to pending state in map.

This will let us migrate from `HashMap<Ed25519Identity, Entry>` to
`ByRelayIds<Entry>`.
This commit is contained in:
Nick Mathewson 2022-10-14 13:07:39 -04:00
parent e51fdbfb1b
commit 735100455d
2 changed files with 46 additions and 8 deletions

View File

@ -12,7 +12,7 @@ use std::result::Result as StdResult;
use std::sync::Arc;
use std::time::Duration;
use tor_error::internal;
use tor_linkspec::HasRelayIds;
use tor_linkspec::{HasRelayIds, RelayIds};
use tor_netdir::params::NetParameters;
use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
@ -120,10 +120,13 @@ impl<CF: AbstractChannelFactory> AbstractChanMgr<CF> {
/// Helper: return the objects used to inform pending tasks
/// about a newly open or failed channel.
fn setup_launch<C: Clone>(&self) -> (map::ChannelState<C>, Sending<C>) {
fn setup_launch<C: Clone>(&self, ids: RelayIds) -> (map::ChannelState<C>, Sending<C>) {
let (snd, rcv) = oneshot::channel();
let shared = rcv.shared();
(map::ChannelState::Building(shared), snd)
let pending = rcv.shared();
(
map::ChannelState::Building(map::PendingEntry { ids, pending }),
snd,
)
}
/// Get a channel whose identity is `ident`.
@ -143,6 +146,8 @@ impl<CF: AbstractChannelFactory> AbstractChanMgr<CF> {
) -> Result<(CF::Channel, ChanProvenance)> {
use ChannelUsage as CU;
// TODO pt-client: This is not yet used.
let chan = self.get_or_launch_internal(ident, target).await?;
match usage {
@ -179,6 +184,8 @@ impl<CF: AbstractChannelFactory> AbstractChanMgr<CF> {
// TODO(nickm): It would be neat to use tor_retry instead.
let mut last_err = None;
let ids = RelayIds::from_relay_ids(&target);
for _ in 0..N_ATTEMPTS {
// First, see what state we're in, and what we should do
// about it.
@ -196,19 +203,19 @@ impl<CF: AbstractChannelFactory> AbstractChanMgr<CF> {
} else {
// Unusable channel. Move to the Building
// state and launch a new channel.
let (newstate, send) = self.setup_launch();
let (newstate, send) = self.setup_launch(ids.clone());
let action = Action::Launch(send);
(Some(newstate), action)
}
}
Some(Building(ref pending)) => {
Some(Building(map::PendingEntry { ref pending, .. })) => {
let action = Action::Wait(pending.clone());
(oldstate, action)
}
None => {
// No channel. Move to the Building
// state and launch a new channel.
let (newstate, send) = self.setup_launch();
let (newstate, send) = self.setup_launch(ids.clone());
let action = Action::Launch(send);
(Some(newstate), action)
}

View File

@ -11,6 +11,8 @@ use std::sync::Arc;
use tor_cell::chancell::msg::PaddingNegotiate;
use tor_config::PaddingLevel;
use tor_error::{internal, into_internal};
use tor_linkspec::HasRelayIds;
use tor_linkspec::RelayIds;
use tor_netdir::{params::NetParameters, params::CHANNEL_PADDING_TIMEOUT_UPPER_BOUND};
use tor_proto::channel::padding::Parameters as PaddingParameters;
use tor_proto::channel::padding::ParametersBuilder as PaddingParametersBuilder;
@ -79,7 +81,7 @@ pub(crate) enum ChannelState<C> {
/// yielding it to the user.
Open(OpenEntry<C>),
/// A channel that's getting built.
Building(Pending<C>),
Building(PendingEntry<C>),
}
/// An open channel entry.
@ -91,6 +93,35 @@ pub(crate) struct OpenEntry<C> {
pub(crate) max_unused_duration: Duration,
}
/// An entry for a not-yet-build channel
#[derive(Clone)]
pub(crate) struct PendingEntry<C> {
/// The keys of the relay to which we're trying to open a channel.
pub(crate) ids: RelayIds,
/// A future we can clone and listen on to learn when this channel attempt
/// is successful or failed.
///
/// This entry will be removed from the map (and possibly replaced with an
/// `OpenEntry`) _before_ this future becomes ready.
pub(crate) pending: Pending<C>,
}
impl<C> HasRelayIds for ChannelState<C>
where
C: HasRelayIds,
{
fn identity(
&self,
key_type: tor_linkspec::RelayIdType,
) -> Option<tor_linkspec::RelayIdRef<'_>> {
match self {
ChannelState::Open(OpenEntry { channel, .. }) => channel.identity(key_type),
ChannelState::Building(PendingEntry { ids, .. }) => ids.identity(key_type),
}
}
}
impl<C: Clone> ChannelState<C> {
/// Create a new shallow copy of this ChannelState.
#[cfg(test)]