|
|
|
@ -0,0 +1,774 @@
|
|
|
|
|
//! IPT Manager
|
|
|
|
|
//!
|
|
|
|
|
//! Maintains introduction points and publishes descriptors.
|
|
|
|
|
//! Provides a stream of rendezvous requests.
|
|
|
|
|
|
|
|
|
|
use std::any::Any;
|
|
|
|
|
use std::fmt::Debug;
|
|
|
|
|
use std::marker::PhantomData;
|
|
|
|
|
use std::ops::RangeInclusive;
|
|
|
|
|
use std::panic::AssertUnwindSafe;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::time::{Duration, Instant, SystemTime};
|
|
|
|
|
|
|
|
|
|
use futures::channel::{mpsc, oneshot};
|
|
|
|
|
use futures::select_biased;
|
|
|
|
|
use futures::task::SpawnExt as _;
|
|
|
|
|
use futures::{FutureExt as _, SinkExt as _, StreamExt as _};
|
|
|
|
|
|
|
|
|
|
use educe::Educe;
|
|
|
|
|
use postage::watch;
|
|
|
|
|
use rand::Rng as _;
|
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
use tracing::{error, trace, warn};
|
|
|
|
|
use void::{ResultVoidErrExt as _, Void};
|
|
|
|
|
|
|
|
|
|
use tor_circmgr::hspool::HsCircPool;
|
|
|
|
|
use tor_error::internal;
|
|
|
|
|
use tor_linkspec::RelayIds;
|
|
|
|
|
use tor_netdir::NetDirProvider;
|
|
|
|
|
use tor_rtcompat::Runtime;
|
|
|
|
|
|
|
|
|
|
use crate::svc::ipt_establish;
|
|
|
|
|
use crate::timeout_track::{TrackingInstantOffsetNow, TrackingNow};
|
|
|
|
|
use crate::{FatalError, OnionServiceConfig, RendRequest, StartupError};
|
|
|
|
|
use ipt_establish::{IptEstablisher, IptStatus, IptStatusStatus, IptWantsToRetire};
|
|
|
|
|
|
|
|
|
|
use IptStatusStatus as ISS;
|
|
|
|
|
use TrackedStatus as TS;
|
|
|
|
|
|
|
|
|
|
/// Time for which we'll use an IPT relay before selecting a new relay to be our IPT
|
|
|
|
|
// TODO HSS IPT_RELAY_ROTATION_TIME should be tuneable. And, is default correct?
|
|
|
|
|
const IPT_RELAY_ROTATION_TIME: RangeInclusive<Duration> = {
|
|
|
|
|
/// gosh this is clumsy
|
|
|
|
|
const DAY: u64 = 86400;
|
|
|
|
|
Duration::from_secs(DAY * 4)..=Duration::from_secs(DAY * 7)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/// TODO HSS make HsNickname a newtype, somewhere more central, and document it properly
|
|
|
|
|
type HsNickname = String;
|
|
|
|
|
|
|
|
|
|
/// Persistent local identifier for an introduction point
|
|
|
|
|
///
|
|
|
|
|
/// Changes when the IPT relay changes, or the IPT key material changes.
|
|
|
|
|
/// (Different for different `.onion` services, obviously)
|
|
|
|
|
///
|
|
|
|
|
/// Is a randomly-generated byte string, currently 32 long.
|
|
|
|
|
//
|
|
|
|
|
// TODO HSS move IptLocalId somewhere more central?
|
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
|
|
|
#[serde(transparent)]
|
|
|
|
|
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
|
|
|
|
|
struct IptLocalId([u8; 32]);
|
|
|
|
|
|
|
|
|
|
/// IPT Manager (for one hidden service)
|
|
|
|
|
#[derive(Educe)]
|
|
|
|
|
#[educe(Debug(bound))]
|
|
|
|
|
pub(crate) struct IptManager<R, M> {
|
|
|
|
|
/// Immutable contents
|
|
|
|
|
imm: Immutable<R>,
|
|
|
|
|
|
|
|
|
|
/// Mutable state
|
|
|
|
|
state: State<R, M>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Immutable contents of an IPT Manager
|
|
|
|
|
///
|
|
|
|
|
/// Contains things inherent to our identity, and
|
|
|
|
|
/// handles to services that we'll be using.
|
|
|
|
|
#[derive(Educe)]
|
|
|
|
|
#[educe(Debug(bound))]
|
|
|
|
|
pub(crate) struct Immutable<R> {
|
|
|
|
|
/// Runtime
|
|
|
|
|
#[educe(Debug(ignore))]
|
|
|
|
|
runtime: R,
|
|
|
|
|
|
|
|
|
|
/// Netdir provider
|
|
|
|
|
#[educe(Debug(ignore))]
|
|
|
|
|
dirprovider: Arc<dyn NetDirProvider>,
|
|
|
|
|
|
|
|
|
|
/// Nickname
|
|
|
|
|
nick: HsNickname,
|
|
|
|
|
|
|
|
|
|
/// Output MPSC for rendezvous requests
|
|
|
|
|
///
|
|
|
|
|
/// Passed to IPT Establishers we create
|
|
|
|
|
output_rend_reqs: mpsc::Sender<RendRequest>,
|
|
|
|
|
|
|
|
|
|
/// Internal channel for updates from IPT Establishers (sender)
|
|
|
|
|
///
|
|
|
|
|
/// When we make a new `IptEstablisher` we use this arrange for
|
|
|
|
|
/// its status updates to arrive, appropriately tagged, via `status_recv`
|
|
|
|
|
status_send: mpsc::Sender<(IptLocalId, IptStatus)>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// State of an IPT Manager
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub(crate) struct State<R, M> {
|
|
|
|
|
/// Configuration
|
|
|
|
|
config: Arc<OnionServiceConfig>,
|
|
|
|
|
|
|
|
|
|
/// Channel for updates from IPT Establishers (receiver)
|
|
|
|
|
///
|
|
|
|
|
/// We arrange for all the updates to be multiplexed,
|
|
|
|
|
/// as that makes handling them easy in our event loop.
|
|
|
|
|
status_recv: mpsc::Receiver<(IptLocalId, IptStatus)>,
|
|
|
|
|
|
|
|
|
|
/// State: selected relays
|
|
|
|
|
relays: Vec<IptRelay>,
|
|
|
|
|
|
|
|
|
|
/// Signal for us to shut down
|
|
|
|
|
shutdown: oneshot::Receiver<Void>,
|
|
|
|
|
|
|
|
|
|
/// Mockable state, normally [`Real`]
|
|
|
|
|
///
|
|
|
|
|
/// This is in `State` so it can be passed mutably to tests,
|
|
|
|
|
/// even though the main code doesn't need `mut`
|
|
|
|
|
/// since `HsCircPool` is a service with interior mutability.
|
|
|
|
|
mockable: M,
|
|
|
|
|
|
|
|
|
|
/// Runtime (to placate compiler)
|
|
|
|
|
runtime: PhantomData<R>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Mockable state in an IPT Manager - real version
|
|
|
|
|
#[derive(Educe)]
|
|
|
|
|
#[educe(Debug)]
|
|
|
|
|
pub(crate) struct Real<R: Runtime> {
|
|
|
|
|
/// Circuit pool for circuits we need to make
|
|
|
|
|
///
|
|
|
|
|
/// Passed to the each new Establisher
|
|
|
|
|
#[educe(Debug(ignore))]
|
|
|
|
|
pub(crate) circ_pool: Arc<HsCircPool<R>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// One selected relay, at which we are establishing (or relavantly advertised) IPTs
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
struct IptRelay {
|
|
|
|
|
/// The actual relay
|
|
|
|
|
relay: RelayIds,
|
|
|
|
|
|
|
|
|
|
/// The retirement time we selected for this relay
|
|
|
|
|
///
|
|
|
|
|
/// We use `SystemTime`, not `Instant`, because we will want to save it to disk.
|
|
|
|
|
planned_retirement: SystemTime,
|
|
|
|
|
|
|
|
|
|
/// IPTs at this relay
|
|
|
|
|
///
|
|
|
|
|
/// At most one will have [`IsCurrent`].
|
|
|
|
|
ipts: Vec<Ipt>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// One introduction point, representation in memory
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
struct Ipt {
|
|
|
|
|
/// Local persistent identifier
|
|
|
|
|
lid: IptLocalId,
|
|
|
|
|
|
|
|
|
|
/// Handle for the establisher; we keep this here just for its `Drop` action
|
|
|
|
|
///
|
|
|
|
|
/// The real type is `M::IptEstablisher`.
|
|
|
|
|
/// We use `Box<dyn Any>` to avoid propagating the `M` type parameter to `Ipt` etc.
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
|
establisher: Box<dyn Any + Send + Sync + 'static>,
|
|
|
|
|
|
|
|
|
|
/// Last information about how it's doing including timing info
|
|
|
|
|
status_last: TrackedStatus,
|
|
|
|
|
|
|
|
|
|
/// Until when ought we to try to maintain it
|
|
|
|
|
last_descriptor_expiry_including_slop: SystemTime,
|
|
|
|
|
|
|
|
|
|
/// Is this IPT current - should we include it in descriptors ?
|
|
|
|
|
///
|
|
|
|
|
/// `None` might mean:
|
|
|
|
|
/// * WantsToRetire
|
|
|
|
|
/// * We have >N IPTs and we have been using this IPT so long we want to rotate it out
|
|
|
|
|
is_current: Option<IsCurrent>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Last information from establisher about an IPT, with timing info added by us
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
enum TrackedStatus {
|
|
|
|
|
/// Corresponds to [`IptStatusStatus::Faulty`]
|
|
|
|
|
Faulty,
|
|
|
|
|
|
|
|
|
|
/// Corresponds to [`IptStatusStatus::Establishing`]
|
|
|
|
|
Establishing {
|
|
|
|
|
/// When we were told we started to establish, for calculating `time_to_establish`
|
|
|
|
|
started: Instant,
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
/// Corresponds to [`IptStatusStatus::Good`]
|
|
|
|
|
Good {
|
|
|
|
|
/// How long it took to establish (if we could determine that information)
|
|
|
|
|
///
|
|
|
|
|
/// Can only be `Err` in strange situations.
|
|
|
|
|
time_to_establish: Result<Duration, ()>,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Token indicating that this introduction point is current (not Retiring)
|
|
|
|
|
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
|
|
|
|
|
struct IsCurrent;
|
|
|
|
|
|
|
|
|
|
/// Record of intro point establisher state, as stored on disk
|
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
|
|
|
#[allow(dead_code)] // TODO HSS remove
|
|
|
|
|
struct StateRecord {
|
|
|
|
|
/// Relays
|
|
|
|
|
ipt_relays: Vec<RelayRecord>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Record of a selected intro point relay, as stored on disk
|
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
|
|
|
#[allow(dead_code)] // TODO HSS remove
|
|
|
|
|
struct RelayRecord {
|
|
|
|
|
/// Which relay?
|
|
|
|
|
relay: RelayIds,
|
|
|
|
|
/// The IPTs, including the current one and any still-wanted old ones
|
|
|
|
|
ipts: Vec<IptRecord>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Record of a single intro point, as stored on disk
|
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
|
|
|
#[allow(dead_code)] // TODO HSS remove
|
|
|
|
|
struct IptRecord {
|
|
|
|
|
/// Used to find the cryptographic keys, amongst other things
|
|
|
|
|
lid: IptLocalId,
|
|
|
|
|
// TODO HSS other fields need to be here!
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Instructions to the publisher
|
|
|
|
|
/// TODO HSS reconcile IptSetStatus with publish.rs
|
|
|
|
|
enum IptSetStatus {
|
|
|
|
|
/// We have no idea which IPTs to publish.
|
|
|
|
|
Unknown,
|
|
|
|
|
/// We have some IPTs we could publish, but we're not confident about them.
|
|
|
|
|
Uncertain(IptSetToPublish),
|
|
|
|
|
/// We are sure of which IPTs we want to publish.
|
|
|
|
|
Certain(IptSetToPublish),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A set of introduction points as told to the publisher
|
|
|
|
|
/// TODO HSS reconcile IptSetStatus with publish.rs
|
|
|
|
|
struct IptSetToPublish {
|
|
|
|
|
/// The actual introduction points
|
|
|
|
|
ipts: Vec<()>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Return value from one call to the main loop iteration
|
|
|
|
|
enum ShutdownStatus {
|
|
|
|
|
/// We should continue to operate this IPT manager
|
|
|
|
|
Continue,
|
|
|
|
|
/// We should shut down: the service, or maybe the whole process, is shutting down
|
|
|
|
|
Terminate,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl From<oneshot::Canceled> for ShutdownStatus {
|
|
|
|
|
fn from(cancelled: oneshot::Canceled) -> ShutdownStatus {
|
|
|
|
|
ShutdownStatus::Terminate
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl rand::distributions::Distribution<IptLocalId> for rand::distributions::Standard {
|
|
|
|
|
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> IptLocalId {
|
|
|
|
|
IptLocalId(rng.gen())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl IptRelay {
|
|
|
|
|
/// Get a reference to this IPT relay's current intro point state (if any)
|
|
|
|
|
///
|
|
|
|
|
/// `None` means this IPT has no current introduction points.
|
|
|
|
|
/// That might be, briefly, because a new intro point needs to be created;
|
|
|
|
|
/// or it might be because we are retiring the relay.
|
|
|
|
|
fn current_ipt(&self) -> Option<&Ipt> {
|
|
|
|
|
self.ipts
|
|
|
|
|
.iter()
|
|
|
|
|
.find(|ipt| ipt.is_current == Some(IsCurrent))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get a mutable reference to this IPT relay's current intro point state (if any)
|
|
|
|
|
fn current_ipt_mut(&mut self) -> Option<&mut Ipt> {
|
|
|
|
|
self.ipts
|
|
|
|
|
.iter_mut()
|
|
|
|
|
.find(|ipt| ipt.is_current == Some(IsCurrent))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Should this IPT Relay be retired ?
|
|
|
|
|
///
|
|
|
|
|
/// This is determined by our IPT relay rotation time.
|
|
|
|
|
fn should_retire(&self, now: &TrackingNow) -> bool {
|
|
|
|
|
now > &self.planned_retirement
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Make a new introduction point at this relay
|
|
|
|
|
///
|
|
|
|
|
/// It becomes the current IPT.
|
|
|
|
|
fn make_new_ipt<R: Runtime, M: Mockable<R>>(
|
|
|
|
|
&mut self,
|
|
|
|
|
imm: &Immutable<R>,
|
|
|
|
|
mockable: &mut M,
|
|
|
|
|
) -> Result<(), FatalError> {
|
|
|
|
|
let (establisher, mut watch_rx) = mockable.make_new_ipt(imm, self.relay.clone())?;
|
|
|
|
|
|
|
|
|
|
// we'll treat it as Establishing until we find otherwise
|
|
|
|
|
let status_last = TS::Establishing {
|
|
|
|
|
started: imm.runtime.now(),
|
|
|
|
|
};
|
|
|
|
|
let lid: IptLocalId = mockable.thread_rng().gen();
|
|
|
|
|
|
|
|
|
|
imm.runtime
|
|
|
|
|
.spawn({
|
|
|
|
|
let mut status_send = imm.status_send.clone();
|
|
|
|
|
async move {
|
|
|
|
|
loop {
|
|
|
|
|
let Some(status) = watch_rx.next().await else {
|
|
|
|
|
trace!("HS service IPT status task: establisher went away");
|
|
|
|
|
break;
|
|
|
|
|
};
|
|
|
|
|
match status_send.send((lid, status)).await {
|
|
|
|
|
Ok(()) => {}
|
|
|
|
|
Err::<_, mpsc::SendError>(e) => {
|
|
|
|
|
// Not using trace_report because SendError isn't HasKind
|
|
|
|
|
trace!("HS service IPT status task: manager went away: {e}");
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.map_err(|cause| FatalError::Spawn {
|
|
|
|
|
spawning: "IPT establisher watch status task",
|
|
|
|
|
cause: cause.into(),
|
|
|
|
|
})?;
|
|
|
|
|
|
|
|
|
|
let ipt = Ipt {
|
|
|
|
|
lid,
|
|
|
|
|
establisher: Box::new(establisher),
|
|
|
|
|
status_last,
|
|
|
|
|
last_descriptor_expiry_including_slop: imm.runtime.wallclock(), // this'll do
|
|
|
|
|
is_current: Some(IsCurrent),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
self.ipts.push(ipt);
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<R: Runtime, M: Mockable<R>> IptManager<R, M> {
|
|
|
|
|
/// Create a new IptManager
|
|
|
|
|
#[allow(clippy::unnecessary_wraps)] // TODO HSS remove
|
|
|
|
|
pub(crate) fn new(
|
|
|
|
|
runtime: R,
|
|
|
|
|
dirprovider: Arc<dyn NetDirProvider>,
|
|
|
|
|
nick: HsNickname,
|
|
|
|
|
config: Arc<OnionServiceConfig>,
|
|
|
|
|
output_rend_reqs: mpsc::Sender<RendRequest>,
|
|
|
|
|
shutdown: oneshot::Receiver<Void>,
|
|
|
|
|
mockable: M,
|
|
|
|
|
) -> Result<Self, StartupError> {
|
|
|
|
|
// TODO HSS load persistent state
|
|
|
|
|
|
|
|
|
|
// We don't need buffering; since this is written to by dedicated tasks which
|
|
|
|
|
// are reading watches.
|
|
|
|
|
let (status_send, status_recv) = mpsc::channel(0);
|
|
|
|
|
|
|
|
|
|
let imm = Immutable {
|
|
|
|
|
runtime,
|
|
|
|
|
dirprovider,
|
|
|
|
|
nick,
|
|
|
|
|
status_send,
|
|
|
|
|
output_rend_reqs,
|
|
|
|
|
};
|
|
|
|
|
let state = State {
|
|
|
|
|
config,
|
|
|
|
|
status_recv,
|
|
|
|
|
mockable,
|
|
|
|
|
shutdown,
|
|
|
|
|
relays: vec![],
|
|
|
|
|
runtime: PhantomData,
|
|
|
|
|
};
|
|
|
|
|
let mgr = IptManager { imm, state };
|
|
|
|
|
|
|
|
|
|
Ok(mgr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Send the IPT manager off to run and establish intro points
|
|
|
|
|
pub(crate) fn launch_background_tasks(self) -> Result<(), StartupError> {
|
|
|
|
|
let runtime = self.imm.runtime.clone();
|
|
|
|
|
runtime
|
|
|
|
|
.spawn(self.main_loop_task())
|
|
|
|
|
.map_err(|cause| StartupError::Spawn {
|
|
|
|
|
spawning: "ipt manager",
|
|
|
|
|
cause: cause.into(),
|
|
|
|
|
})?;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Iterate over the current IPTs
|
|
|
|
|
///
|
|
|
|
|
/// Yields each `IptRelay` at most once.
|
|
|
|
|
fn current_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
|
|
|
|
|
self.state
|
|
|
|
|
.relays
|
|
|
|
|
.iter()
|
|
|
|
|
.filter_map(|ir| Some((ir, ir.current_ipt()?)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Iterate over the current IPTs in `Good` state
|
|
|
|
|
fn good_ipts(&self) -> impl Iterator<Item = (&IptRelay, &Ipt)> {
|
|
|
|
|
self.current_ipts()
|
|
|
|
|
.filter(|(_ir, ipt)| match ipt.status_last {
|
|
|
|
|
TS::Good { .. } => true,
|
|
|
|
|
TS::Establishing { .. } | TS::Faulty => false,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<R: Runtime, M: Mockable<R>> State<R, M> {
|
|
|
|
|
/// Find the `Ipt` with persistent local id `lid`
|
|
|
|
|
fn ipt_by_lid_mut(&mut self, needle: IptLocalId) -> Option<&mut Ipt> {
|
|
|
|
|
self.relays
|
|
|
|
|
.iter_mut()
|
|
|
|
|
.find_map(|ir| ir.ipts.iter_mut().find(|ipt| ipt.lid == needle))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Choose a new relay to use for IPTs
|
|
|
|
|
fn choose_new_ipt_relay(&mut self, imm: &Immutable<R>) {
|
|
|
|
|
/*
|
|
|
|
|
pick a random suitable relay
|
|
|
|
|
set retirement to something from now.checked_add(IPT_RELAY_ROTATION_TIME)
|
|
|
|
|
*/
|
|
|
|
|
todo!()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Update `self`'s status tracking for one introduction point
|
|
|
|
|
fn handle_ipt_status_update(&mut self, imm: &Immutable<R>, lid: IptLocalId, update: IptStatus) {
|
|
|
|
|
let Some(ipt) = self.ipt_by_lid_mut(lid) else {
|
|
|
|
|
// update from now-withdrawn IPT, ignore it (can happen due to the IPT being a task)
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let IptStatus {
|
|
|
|
|
status: update,
|
|
|
|
|
wants_to_retire,
|
|
|
|
|
n_faults: _,
|
|
|
|
|
} = update;
|
|
|
|
|
|
|
|
|
|
#[allow(clippy::single_match)] // want to be explicit about the Ok type
|
|
|
|
|
match wants_to_retire {
|
|
|
|
|
Err(IptWantsToRetire) => ipt.is_current = None,
|
|
|
|
|
Ok(()) => {}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let now = || imm.runtime.now();
|
|
|
|
|
|
|
|
|
|
ipt.status_last = match update {
|
|
|
|
|
ISS::Establishing => TS::Establishing { started: now() },
|
|
|
|
|
ISS::Good => {
|
|
|
|
|
let time_to_establish = match &ipt.status_last {
|
|
|
|
|
TS::Establishing { started, .. } => {
|
|
|
|
|
// return () at end of ok_or_else closure, for clarity
|
|
|
|
|
#[allow(clippy::unused_unit, clippy::semicolon_if_nothing_returned)]
|
|
|
|
|
now().checked_duration_since(*started).ok_or_else(|| {
|
|
|
|
|
warn!("monotonic clock went backwards! (HS IPT)");
|
|
|
|
|
()
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
other => {
|
|
|
|
|
error!("internal error: HS IPT went from {:?} to Good", &other);
|
|
|
|
|
Err(())
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
TS::Good { time_to_establish }
|
|
|
|
|
}
|
|
|
|
|
ISS::Faulty => TS::Faulty,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<R: Runtime, M: Mockable<R>> IptManager<R, M> {
|
|
|
|
|
/// Make some progress, if possible, and say when to wake up again
|
|
|
|
|
///
|
|
|
|
|
/// Examines the current state and attempts to improve it.
|
|
|
|
|
///
|
|
|
|
|
/// If `idempotently_progress_things_now` makes any changes,
|
|
|
|
|
/// it will return `None`.
|
|
|
|
|
/// It should then be called again immediately.
|
|
|
|
|
///
|
|
|
|
|
/// Otherwise, it returns the time in the future when further work ought to be done:
|
|
|
|
|
/// i.e., the time of the earliest timeout or planned future state change -
|
|
|
|
|
/// as a [`TrackingNow`].
|
|
|
|
|
fn idempotently_progress_things_now(&mut self) -> Result<Option<TrackingNow>, FatalError> {
|
|
|
|
|
/// Return value which means "we changed something, please run me again"
|
|
|
|
|
///
|
|
|
|
|
/// In each case, if we make any changes which indicate we might
|
|
|
|
|
/// want to restart, , we `return CONTINUE`, and
|
|
|
|
|
/// our caller will just call us again.
|
|
|
|
|
///
|
|
|
|
|
/// This approach simplifies the logic: everything here is idempotent.
|
|
|
|
|
/// (It does mean the algorithm can be quadratic in the number of intro points,
|
|
|
|
|
/// but that number is reasonably small for a modern computer and the constant
|
|
|
|
|
/// factor is small too.)
|
|
|
|
|
const CONTINUE: Result<Option<TrackingNow>, FatalError> = Ok(None);
|
|
|
|
|
|
|
|
|
|
// This tracks everything we compare it to, using interior mutability,
|
|
|
|
|
// so that if there is no work to do and no timeouts have expired,
|
|
|
|
|
// we know when we will want to wake up.
|
|
|
|
|
let now = TrackingNow::now(&self.imm.runtime);
|
|
|
|
|
|
|
|
|
|
// ---------- collect garbage ----------
|
|
|
|
|
|
|
|
|
|
// Rotate out an old IPT if we have >N good IPTs
|
|
|
|
|
if self.good_ipts().count() >= self.target_n_intro_points() {
|
|
|
|
|
for ir in &mut self.state.relays {
|
|
|
|
|
if ir.should_retire(&now) {
|
|
|
|
|
if let Some(ipt) = ir.current_ipt_mut() {
|
|
|
|
|
ipt.is_current = None;
|
|
|
|
|
return CONTINUE;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Forget old IPTs (after the last descriptor mentioning them has expired)
|
|
|
|
|
for ir in &mut self.state.relays {
|
|
|
|
|
// When we drop the Ipt we drop the IptEstablisher, withdrawing the intro point
|
|
|
|
|
ir.ipts.retain(|ipt| {
|
|
|
|
|
ipt.is_current.is_some() || now < ipt.last_descriptor_expiry_including_slop
|
|
|
|
|
});
|
|
|
|
|
// No need to return CONTINUE, since there is no other future work implied
|
|
|
|
|
// by discarding a non-current IPT.
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Forget retired IPT relays (all their IPTs are gone)
|
|
|
|
|
self.state
|
|
|
|
|
.relays
|
|
|
|
|
.retain(|ir| !(ir.should_retire(&now) && ir.ipts.is_empty()));
|
|
|
|
|
// If we deleted relays, we might want to select new ones. That happens below.
|
|
|
|
|
|
|
|
|
|
// ---------- make progress ----------
|
|
|
|
|
//
|
|
|
|
|
// Consider selecting new relays and setting up new IPTs.
|
|
|
|
|
|
|
|
|
|
// Create new IPTs at already-chosen relays
|
|
|
|
|
for ir in &mut self.state.relays {
|
|
|
|
|
if !ir.should_retire(&now) && ir.current_ipt_mut().is_none() {
|
|
|
|
|
// We don't have a current IPT at this relay, but we should.
|
|
|
|
|
ir.make_new_ipt(&self.imm, &mut self.state.mockable)?;
|
|
|
|
|
return CONTINUE;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Consider choosing a new IPT relay
|
|
|
|
|
{
|
|
|
|
|
// block {} prevents use of `n_good_ish_relays` for other (wrong) purposes
|
|
|
|
|
|
|
|
|
|
// We optimistically count an Establishing IPT as good-ish;
|
|
|
|
|
// specifically, for the purposes of deciding whether to select a new
|
|
|
|
|
// relay because we don't have enough good-looking ones.
|
|
|
|
|
let n_good_ish_relays = self
|
|
|
|
|
.current_ipts()
|
|
|
|
|
.filter(|(_ir, ipt)| match ipt.status_last {
|
|
|
|
|
TS::Good { .. } | TS::Establishing { .. } => true,
|
|
|
|
|
TS::Faulty => false,
|
|
|
|
|
})
|
|
|
|
|
.count();
|
|
|
|
|
|
|
|
|
|
if n_good_ish_relays < self.target_n_intro_points()
|
|
|
|
|
&& self.state.relays.len() < self.max_n_intro_relays()
|
|
|
|
|
{
|
|
|
|
|
self.state.choose_new_ipt_relay(&self.imm);
|
|
|
|
|
return CONTINUE;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//---------- tell the publisher what to announce ----------
|
|
|
|
|
|
|
|
|
|
let very_recently: Option<TrackingInstantOffsetNow> = (|| {
|
|
|
|
|
// on time overflow, don't treat any as started establishing very recently
|
|
|
|
|
|
|
|
|
|
let fastest_good_establish_time = self
|
|
|
|
|
.current_ipts()
|
|
|
|
|
.filter_map(|(_ir, ipt)| match ipt.status_last {
|
|
|
|
|
TS::Good {
|
|
|
|
|
time_to_establish, ..
|
|
|
|
|
} => Some(time_to_establish.ok()?),
|
|
|
|
|
TS::Establishing { .. } | TS::Faulty => None,
|
|
|
|
|
})
|
|
|
|
|
.min()?;
|
|
|
|
|
|
|
|
|
|
// TODO HSS is this the right guess for IPT establishment?
|
|
|
|
|
// we could use circuit timings etc., but arguably the actual time to establish
|
|
|
|
|
// our fastest IPT is a better estimator here (and we want an optimistic,
|
|
|
|
|
// rather than pessimistic estimate).
|
|
|
|
|
//
|
|
|
|
|
// TODO HSS fastest_good_establish_time factor 2 should be tuneable
|
|
|
|
|
let very_recently = fastest_good_establish_time.checked_mul(2)?;
|
|
|
|
|
|
|
|
|
|
now.checked_sub(very_recently)
|
|
|
|
|
})();
|
|
|
|
|
|
|
|
|
|
let started_establishing_very_recently = || {
|
|
|
|
|
self.current_ipts()
|
|
|
|
|
.filter_map(|(_ir, ipt)| {
|
|
|
|
|
let started = match ipt.status_last {
|
|
|
|
|
TS::Establishing { started } => Some(started),
|
|
|
|
|
TS::Good { .. } | TS::Faulty => None,
|
|
|
|
|
}?;
|
|
|
|
|
|
|
|
|
|
(&started > very_recently.as_ref()?).then_some(())
|
|
|
|
|
})
|
|
|
|
|
.next()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let publish_set = || {
|
|
|
|
|
// TODO HSS calculate set of ipts to publish
|
|
|
|
|
// TODO HSS update each Ipt's last_descriptor_expiry_including_slop
|
|
|
|
|
IptSetToPublish { ipts: vec![] }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let publish_status = if self.good_ipts().count() >= self.target_n_intro_points() {
|
|
|
|
|
IptSetStatus::Certain(publish_set())
|
|
|
|
|
} else if self.good_ipts().next().is_none()
|
|
|
|
|
/* !... .is_empty() */
|
|
|
|
|
{
|
|
|
|
|
IptSetStatus::Unknown
|
|
|
|
|
} else {
|
|
|
|
|
IptSetStatus::Uncertain(publish_set())
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// TODO HSS tell all the being-published IPTs to start accepting introductions
|
|
|
|
|
|
|
|
|
|
let _ = publish_set; // TODO HSS send this to the IPT publisher
|
|
|
|
|
|
|
|
|
|
//---------- store persistent state ----------
|
|
|
|
|
|
|
|
|
|
// TODO HSS store persistent state
|
|
|
|
|
|
|
|
|
|
//---------- we didn't do anything - wait for something to happen ----------
|
|
|
|
|
|
|
|
|
|
Ok(Some(now))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Run one iteration of the loop
|
|
|
|
|
///
|
|
|
|
|
/// Either do some work, making changes to our state,
|
|
|
|
|
/// or, if there's nothing to be done, wait until there *is* something to do.
|
|
|
|
|
async fn run_once(&mut self) -> Result<ShutdownStatus, FatalError> {
|
|
|
|
|
if let Some(now) = self.idempotently_progress_things_now()? {
|
|
|
|
|
select_biased! {
|
|
|
|
|
() = now.wait_for_earliest(&self.imm.runtime).fuse() => {},
|
|
|
|
|
shutdown = &mut self.state.shutdown => return Ok(shutdown.void_unwrap_err().into()),
|
|
|
|
|
|
|
|
|
|
update = self.state.status_recv.next() => {
|
|
|
|
|
let (lid, update) = update.ok_or_else(|| internal!("update mpsc ended!"))?;
|
|
|
|
|
self.state.handle_ipt_status_update(&self.imm, lid, update);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(ShutdownStatus::Continue)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// IPT Manager main loop, runs as a task
|
|
|
|
|
///
|
|
|
|
|
/// Contains the error handling, including catching panics.
|
|
|
|
|
async fn main_loop_task(mut self) {
|
|
|
|
|
loop {
|
|
|
|
|
match async {
|
|
|
|
|
AssertUnwindSafe(self.run_once())
|
|
|
|
|
.catch_unwind()
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|_: Box<dyn Any + Send>| internal!("IPT manager crashed"))?
|
|
|
|
|
}
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Err(crash) => {
|
|
|
|
|
error!("HS service {} crashed! {}", &self.imm.nick, crash);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
Ok(ShutdownStatus::Continue) => continue,
|
|
|
|
|
Ok(ShutdownStatus::Terminate) => break,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Target number of intro points
|
|
|
|
|
pub(crate) fn target_n_intro_points(&self) -> usize {
|
|
|
|
|
self.state
|
|
|
|
|
.config
|
|
|
|
|
.num_intro_points
|
|
|
|
|
.unwrap_or(3 /* TODO HSS should be a const */)
|
|
|
|
|
.into()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Maximum number of concurrent intro point relays
|
|
|
|
|
pub(crate) fn max_n_intro_relays(&self) -> usize {
|
|
|
|
|
// TODO HSS max_n_intro_relays should be configurable
|
|
|
|
|
// TODO HSS consider default, in context of intro point forcing attacks
|
|
|
|
|
self.target_n_intro_points() * 2
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Mockable state for the IPT Manager
|
|
|
|
|
///
|
|
|
|
|
/// This allows us to use a fake IPT Establisher and IPT Publisher,
|
|
|
|
|
/// so that we can unit test the Manager.
|
|
|
|
|
pub(crate) trait Mockable<R>: Debug + Send + Sync + Sized + 'static {
|
|
|
|
|
/// IPT establisher type
|
|
|
|
|
type IptEstablisher: Send + Sync + 'static;
|
|
|
|
|
|
|
|
|
|
/// A random number generator
|
|
|
|
|
type Rng: rand::Rng + rand::CryptoRng;
|
|
|
|
|
|
|
|
|
|
/// Return a random number generator
|
|
|
|
|
fn thread_rng(&self) -> Self::Rng;
|
|
|
|
|
|
|
|
|
|
/// Call `IptEstablisher::new`
|
|
|
|
|
fn make_new_ipt(
|
|
|
|
|
&mut self,
|
|
|
|
|
imm: &Immutable<R>,
|
|
|
|
|
relay: RelayIds,
|
|
|
|
|
) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError>;
|
|
|
|
|
|
|
|
|
|
/// Call `Publisher::new_intro_points`
|
|
|
|
|
fn new_intro_points(&mut self, ipts: ()) {
|
|
|
|
|
todo!() // TODO HSS there should be no default impl; code should be in Real's impl
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<R: Runtime> Mockable<R> for Real<R> {
|
|
|
|
|
type IptEstablisher = IptEstablisher;
|
|
|
|
|
|
|
|
|
|
/// A random number generator
|
|
|
|
|
type Rng = rand::rngs::ThreadRng;
|
|
|
|
|
|
|
|
|
|
/// Return a random number generator
|
|
|
|
|
fn thread_rng(&self) -> Self::Rng {
|
|
|
|
|
rand::thread_rng()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[allow(unreachable_code)] // TODO HSS remove
|
|
|
|
|
fn make_new_ipt(
|
|
|
|
|
&mut self,
|
|
|
|
|
imm: &Immutable<R>,
|
|
|
|
|
relay: RelayIds,
|
|
|
|
|
) -> Result<(Self::IptEstablisher, watch::Receiver<IptStatus>), FatalError> {
|
|
|
|
|
IptEstablisher::new(
|
|
|
|
|
imm.runtime.clone(),
|
|
|
|
|
self.circ_pool.clone(),
|
|
|
|
|
imm.dirprovider.clone(),
|
|
|
|
|
imm.output_rend_reqs.clone(),
|
|
|
|
|
todo!(), // TODO HSS IntroPointId lacks a constructor and maybe should change anyway
|
|
|
|
|
relay,
|
|
|
|
|
todo!(), // TODO HSS keypair ought to be provided by our caller
|
|
|
|
|
todo!(), // TODO HSS RequestDisposition ought to be provided by our caller
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO HSS add unit tests for IptManager
|
|
|
|
|
// Especially, we want to exercise all code paths in idempotently_progress_things_now
|