Expire channels that have been unused for too long

This commit is contained in:
Yuan Lyu 2022-02-03 23:36:01 -05:00
parent cf4edcac82
commit f6f732a478
10 changed files with 306 additions and 31 deletions

1
Cargo.lock generated
View File

@ -2931,6 +2931,7 @@ dependencies = [
"futures-await-test",
"hex-literal",
"postage",
"rand 0.8.4",
"thiserror",
"tor-error",
"tor-linkspec",

View File

@ -371,6 +371,11 @@ impl<R: Runtime> TorClient<R> {
Arc::downgrade(&dirmgr),
))?;
runtime.spawn(continually_expire_channels(
runtime.clone(),
Arc::downgrade(&chanmgr),
))?;
let client_isolation = IsolationToken::new();
Ok(TorClient {
@ -819,6 +824,24 @@ async fn continually_preemptively_build_circuits<R: Runtime>(
rt.sleep(Duration::from_secs(10)).await;
}
}
/// Periodically expire any channels that have been unused beyond
/// the maximum duration allowed.
///
/// Exist when we find that `chanmgr` is dropped
///
/// This is a daemon task that runs indefinitely in the background
async fn continually_expire_channels<R: Runtime>(rt: R, chanmgr: Weak<tor_chanmgr::ChanMgr<R>>) {
loop {
let delay = if let Some(cm) = Weak::upgrade(&chanmgr) {
cm.expire_channels()
} else {
// channel manager is closed.
return;
};
// This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
rt.sleep(Duration::from_secs(delay.as_secs())).await;
}
}
impl<R: Runtime> Drop for TorClient<R> {
// TODO: Consider moving this into tor-circmgr after we have more

View File

@ -23,6 +23,7 @@ async-trait = "0.1.2"
derive_more = "0.99"
futures = "0.3.14"
postage = { version = "0.4", default-features = false, features = ["futures-traits"] }
rand = "0.8"
tracing = "0.1.18"
thiserror = "1"

View File

@ -5,6 +5,7 @@ use std::sync::Mutex;
use crate::{event::ChanMgrEventSender, Error};
use std::time::Duration;
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_llcrypto::pk;
use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider};
@ -154,6 +155,9 @@ impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
fn is_usable(&self) -> bool {
!self.is_closing()
}
fn duration_unused(&self) -> Option<Duration> {
self.duration_unused()
}
}
#[cfg(test)]

View File

@ -53,6 +53,7 @@ mod mgr;
#[cfg(test)]
mod testing;
use std::time::Duration;
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_proto::channel::Channel;
@ -115,4 +116,11 @@ impl<R: Runtime> ChanMgr<R> {
pub fn bootstrap_events(&self) -> ConnStatusEvents {
self.bootstrap_status.clone()
}
/// Expire all channels that have been unused for too long.
///
/// Return the duration from now until next channel expires.
pub fn expire_channels(&self) -> Duration {
self.mgr.expire_channels()
}
}

View File

@ -1,11 +1,14 @@
//! Abstract implementation of a channel manager
use crate::mgr::map::OpenEntry;
use crate::{Error, Result};
use async_trait::async_trait;
use futures::channel::oneshot;
use futures::future::{FutureExt, Shared};
use rand::Rng;
use std::hash::Hash;
use std::time::Duration;
mod map;
@ -23,6 +26,9 @@ pub(crate) trait AbstractChannel: Clone {
/// hit a bug, or for some other reason. We don't return unusable
/// channels back to the user.
fn is_usable(&self) -> bool;
/// Return the amount of time a channel has not been in use.
/// Return None if the channel is currently in use.
fn duration_unused(&self) -> Option<Duration>;
}
/// Trait to describe how channels are created.
@ -130,10 +136,10 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> {
let action = self
.channels
.change_state(&ident, |oldstate| match oldstate {
Some(Open(ref ch)) => {
if ch.is_usable() {
Some(Open(ref ent)) => {
if ent.channel.is_usable() {
// Good channel. Return it.
let action = Action::Return(Ok(ch.clone()));
let action = Action::Return(Ok(ent.channel.clone()));
(oldstate, action)
} else {
// Unusable channel. Move to the Building
@ -185,7 +191,15 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> {
Ok(chan) => {
// The channel got built: remember it, tell the
// others, and return it.
self.channels.replace(ident.clone(), Open(chan.clone()))?;
self.channels.replace(
ident.clone(),
Open(OpenEntry {
channel: chan.clone(),
max_unused_duration: Duration::from_secs(
rand::thread_rng().gen_range(180..270),
),
}),
)?;
// It's okay if all the receivers went away:
// that means that nobody was waiting for this channel.
let _ignore_err = send.send(Ok(chan.clone()));
@ -206,6 +220,18 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> {
last_err
}
/// Expire any channels that have been unused longer than
/// their maximum unused duration assigned during creation.
///
/// Return a duration from now until next channel expires.
///
/// If all channels are in use or there are no open channels,
/// return 180 seconds which is the minimum value of
/// max_unused_duration.
pub(crate) fn expire_channels(&self) -> Duration {
self.channels.expire_channels()
}
/// Test only: return the current open usable channel with a given
/// `ident`, if any.
#[cfg(test)]
@ -215,7 +241,7 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> {
) -> Option<CF::Channel> {
use map::ChannelState::*;
match self.channels.get(ident) {
Ok(Some(Open(ref ch))) if ch.is_usable() => Some(ch.clone()),
Ok(Some(Open(ref ent))) if ent.channel.is_usable() => Some(ent.channel.clone()),
_ => None,
}
}
@ -260,6 +286,9 @@ mod test {
fn is_usable(&self) -> bool {
!self.closing.load(Ordering::SeqCst)
}
fn duration_unused(&self) -> Option<Duration> {
None
}
}
impl FakeChannel {

View File

@ -1,5 +1,7 @@
//! Simple implementation for the internal map state of a ChanMgr.
use std::time::Duration;
use super::{AbstractChannel, Pending};
use crate::{Error, Result};
@ -33,7 +35,7 @@ pub(crate) enum ChannelState<C> {
/// This channel might not be usable: it might be closing or
/// broken. We need to check its is_usable() method before
/// yielding it to the user.
Open(C),
Open(OpenEntry<C>),
/// A channel that's getting built.
Building(Pending<C>),
/// A temporary invalid state.
@ -43,13 +45,22 @@ pub(crate) enum ChannelState<C> {
Poisoned(Priv),
}
/// An open channel entry.
#[derive(Clone)]
pub(crate) struct OpenEntry<C> {
/// The underlying open channel.
pub(crate) channel: C,
/// The maximum unused duration allowed for this channel.
pub(crate) max_unused_duration: Duration,
}
impl<C: Clone> ChannelState<C> {
/// Create a new shallow copy of this ChannelState.
#[cfg(test)]
fn clone_ref(&self) -> Result<Self> {
use ChannelState::*;
match self {
Open(chan) => Ok(Open(chan.clone())),
Open(ent) => Ok(Open(ent.clone())),
Building(pending) => Ok(Building(pending.clone())),
Poisoned(_) => Err(Error::Internal("Poisoned state in channel map")),
}
@ -60,7 +71,7 @@ impl<C: Clone> ChannelState<C> {
#[cfg(test)]
fn unwrap_open(&self) -> C {
match self {
ChannelState::Open(chan) => chan.clone(),
ChannelState::Open(ent) => ent.clone().channel,
_ => panic!("Not an open channel"),
}
}
@ -71,8 +82,8 @@ impl<C: AbstractChannel> ChannelState<C> {
/// matching identity for this state.
fn check_ident(&self, ident: &C::Ident) -> Result<()> {
match self {
ChannelState::Open(chan) => {
if chan.ident() == ident {
ChannelState::Open(ent) => {
if ent.channel.ident() == ident {
Ok(())
} else {
Err(Error::Internal("Identity mismatch"))
@ -82,6 +93,31 @@ impl<C: AbstractChannel> ChannelState<C> {
ChannelState::Building(_) => Ok(()),
}
}
/// Return true if a channel is ready to expire `now`.
/// Update `expire_after` if a smaller duration than
/// the given value is required to expire this channel.
fn ready_to_expire(&self, expire_after: &mut Duration) -> bool {
if let ChannelState::Open(ent) = self {
let unused_duration = ent.channel.duration_unused();
if let Some(unused_duration) = unused_duration {
let max_unused_duration = ent.max_unused_duration;
if unused_duration < max_unused_duration {
*expire_after =
std::cmp::min(*expire_after, max_unused_duration - unused_duration);
true
} else {
false
}
} else {
// still in use
true
}
} else {
true
}
}
}
impl<C: AbstractChannel> ChannelMap<C> {
@ -123,7 +159,7 @@ impl<C: AbstractChannel> ChannelMap<C> {
let mut map = self.channels.lock()?;
map.retain(|_, state| match state {
ChannelState::Poisoned(_) => false,
ChannelState::Open(ch) => ch.is_usable(),
ChannelState::Open(ent) => ent.channel.is_usable(),
ChannelState::Building(_) => true,
});
Ok(())
@ -178,6 +214,19 @@ impl<C: AbstractChannel> ChannelMap<C> {
}
}
}
/// Expire all channels that have been unused for too long.
///
/// Return a Duration until the next time at which
/// a channel _could_ expire.
pub(crate) fn expire_channels(&self) -> Duration {
let mut ret = Duration::from_secs(180);
self.channels
.lock()
.expect("Poisoned lock")
.retain(|_id, chan| chan.ready_to_expire(&mut ret));
ret
}
}
#[cfg(test)]
@ -188,6 +237,7 @@ mod test {
struct FakeChannel {
ident: &'static str,
usable: bool,
unused_duration: Option<u64>,
}
impl AbstractChannel for FakeChannel {
type Ident = u8;
@ -197,17 +247,45 @@ mod test {
fn is_usable(&self) -> bool {
self.usable
}
fn duration_unused(&self) -> Option<Duration> {
self.unused_duration.map(Duration::from_secs)
}
}
fn ch(ident: &'static str) -> ChannelState<FakeChannel> {
ChannelState::Open(FakeChannel {
let channel = FakeChannel {
ident,
usable: true,
unused_duration: None,
};
ChannelState::Open(OpenEntry {
channel,
max_unused_duration: Duration::from_secs(180),
})
}
fn ch_with_details(
ident: &'static str,
max_unused_duration: Duration,
unused_duration: Option<u64>,
) -> ChannelState<FakeChannel> {
let channel = FakeChannel {
ident,
usable: true,
unused_duration,
};
ChannelState::Open(OpenEntry {
channel,
max_unused_duration,
})
}
fn closed(ident: &'static str) -> ChannelState<FakeChannel> {
ChannelState::Open(FakeChannel {
let channel = FakeChannel {
ident,
usable: false,
unused_duration: None,
};
ChannelState::Open(OpenEntry {
channel,
max_unused_duration: Duration::from_secs(180),
})
}
@ -220,20 +298,20 @@ mod test {
assert!(map.replace(b'w', ch("wello")).unwrap().is_none());
match map.get(&b'h') {
Ok(Some(Open(chan))) if chan.ident == "hello" => {}
Ok(Some(Open(ent))) if ent.channel.ident == "hello" => {}
_ => panic!(),
}
assert!(map.get(&b'W').unwrap().is_none());
match map.replace(b'h', ch("hebbo")) {
Ok(Some(Open(chan))) if chan.ident == "hello" => {}
Ok(Some(Open(ent))) if ent.channel.ident == "hello" => {}
_ => panic!(),
}
assert!(map.remove(&b'Z').unwrap().is_none());
match map.remove(&b'h') {
Ok(Some(Open(chan))) if chan.ident == "hebbo" => {}
Ok(Some(Open(ent))) if ent.channel.ident == "hebbo" => {}
_ => panic!(),
}
}
@ -309,4 +387,52 @@ mod test {
assert!(matches!(e, Err(Error::Internal(_))));
assert!(matches!(map.get(&b'G'), Err(Error::Internal(_))));
}
#[test]
fn expire_channels() {
let map = ChannelMap::new();
// Channel that has been unused beyond max duration allowed is expired
map.replace(
b'w',
ch_with_details("wello", Duration::from_secs(180), Some(181)),
)
.unwrap();
// Minimum value of max unused duration is 180 seconds
assert_eq!(180, map.expire_channels().as_secs());
assert!(map.get(&b'w').unwrap().is_none());
let map = ChannelMap::new();
// Channel that has been unused for shorter than max unused duration
map.replace(
b'w',
ch_with_details("wello", Duration::from_secs(180), Some(120)),
)
.unwrap();
map.replace(
b'y',
ch_with_details("yello", Duration::from_secs(180), Some(170)),
)
.unwrap();
// Channel that has been unused beyond max duration allowed is expired
map.replace(
b'g',
ch_with_details("gello", Duration::from_secs(180), Some(181)),
)
.unwrap();
// Closed channel should be retained
map.replace(b'h', closed("hello")).unwrap();
// Return duration untill next channel expires
assert_eq!(10, map.expire_channels().as_secs());
assert!(map.get(&b'w').unwrap().is_some());
assert!(map.get(&b'y').unwrap().is_some());
assert!(map.get(&b'h').unwrap().is_some());
assert!(map.get(&b'g').unwrap().is_none());
}
}

View File

@ -66,6 +66,7 @@ use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, CtrlMsg, Rea
pub use crate::channel::unique_id::UniqId;
use crate::circuit;
use crate::circuit::celltypes::CreateResponse;
use crate::util::ts::Timestamp;
use crate::{Error, Result};
use std::pin::Pin;
use tor_cell::chancell::{msg, ChanCell, CircId};
@ -79,7 +80,7 @@ use futures::io::{AsyncRead, AsyncWrite};
use futures::{Sink, SinkExt};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tracing::trace;
@ -128,6 +129,9 @@ pub(crate) struct ChannelDetails {
rsa_id: RsaIdentity,
/// If true, this channel is closing.
closed: AtomicBool,
/// Since when the channel became unused.
/// If None, this channle is still in use by at lesat one circuit.
unused_since: Mutex<Option<Timestamp>>,
}
impl Sink<ChanCell> for Channel {
@ -245,12 +249,16 @@ impl Channel {
let (control_tx, control_rx) = mpsc::unbounded();
let (cell_tx, cell_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let closed = AtomicBool::new(false);
let ts = Timestamp::new();
ts.update();
let unused_since = Mutex::new(Some(ts));
let details = ChannelDetails {
unique_id,
ed25519_id,
rsa_id,
closed,
unused_since,
};
let details = Arc::new(details);
@ -316,6 +324,19 @@ impl Channel {
self.details.closed.load(Ordering::SeqCst)
}
/// If the channel is not in use, return the amount of time
/// it has had with no circuits.
///
/// Return None if the channel is currently in use.
pub fn duration_unused(&self) -> Option<std::time::Duration> {
self.details
.unused_since
.lock()
.expect("Poisoned lock")
.as_ref()
.map(|t| t.time_since_update().into())
}
/// Check whether a cell type is permissible to be _sent_ on an
/// open client channel.
fn check_cell(&self, cell: &ChanCell) -> Result<()> {
@ -423,16 +444,7 @@ pub(crate) mod test {
use tor_cell::chancell::{msg, ChanCell};
/// Make a new fake reactor-less channel. For testing only, obviously.
pub(crate) fn fake_channel() -> Channel {
let unique_id = UniqId::new();
let details = Arc::new(ChannelDetails {
unique_id,
ed25519_id: [6_u8; 32].into(),
rsa_id: [10_u8; 20].into(),
closed: AtomicBool::new(false),
});
pub(crate) fn fake_channel(details: Arc<ChannelDetails>) -> Channel {
Channel {
control: mpsc::unbounded().0,
cell_tx: mpsc::channel(CHANNEL_BUFFER_SIZE).0,
@ -440,10 +452,23 @@ pub(crate) mod test {
}
}
fn fake_channel_details() -> Arc<ChannelDetails> {
let unique_id = UniqId::new();
let unused_since = Mutex::new(None);
Arc::new(ChannelDetails {
unique_id,
ed25519_id: [6_u8; 32].into(),
rsa_id: [10_u8; 20].into(),
closed: AtomicBool::new(false),
unused_since,
})
}
#[test]
fn send_bad() {
tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
let chan = fake_channel();
let chan = fake_channel(fake_channel_details());
let cell = ChanCell::new(7.into(), msg::Created2::new(&b"hihi"[..]).into());
let e = chan.check_cell(&cell);
@ -480,7 +505,7 @@ pub(crate) mod test {
#[test]
fn check_match() {
use std::net::SocketAddr;
let chan = fake_channel();
let chan = fake_channel(fake_channel_details());
struct ChanT {
ed_id: Ed25519Identity,
@ -519,8 +544,20 @@ pub(crate) mod test {
#[test]
fn unique_id() {
let ch1 = fake_channel();
let ch2 = fake_channel();
let ch1 = fake_channel(fake_channel_details());
let ch2 = fake_channel(fake_channel_details());
assert_ne!(ch1.unique_id(), ch2.unique_id());
}
#[test]
fn duration_unused_at() {
let details = fake_channel_details();
let ch = fake_channel(Arc::clone(&details));
let now = Timestamp::new();
now.update();
let mut unused_since = details.unused_since.lock().unwrap();
*unused_since = Some(now);
drop(unused_since); // release it so that next line doesn't hang
assert!(ch.duration_unused().is_some());
}
}

View File

@ -168,6 +168,18 @@ impl CircMap {
pub(super) fn remove(&mut self, id: CircId) -> Option<CircEnt> {
self.m.remove(&id)
}
/// Return the total number of open and opening entries in the map
pub(super) fn open_ent_count(&self) -> usize {
// TODO: We want to change this from O(n) back to O(1).
// Maybe we should have the CircMap keep track of
// the open-or-opening entries count.
self.m
.iter()
.filter(|(_id, ent)| matches!(ent, &&CircEnt::Open(_) | &&CircEnt::Opening(_, _)))
.count()
}
// TODO: Eventually if we want relay support, we'll need to support
// circuit IDs chosen by somebody else. But for now, we don't need those.
}
@ -210,10 +222,19 @@ mod test {
ids_high.push(id_high);
}
// Test open / opening entry counting
assert_eq!(128, map_low.open_ent_count());
assert_eq!(128, map_high.open_ent_count());
// Test remove
assert!(map_low.get_mut(ids_low[0]).is_some());
map_low.remove(ids_low[0]);
assert!(map_low.get_mut(ids_low[0]).is_none());
assert_eq!(127, map_low.open_ent_count());
// Test DestroySent doesn't count
map_low.destroy_sent(CircId::from(256), HalfCirc::new(1));
assert_eq!(127, map_low.open_ent_count());
// Test advance_from_opening.

View File

@ -9,6 +9,7 @@
use super::circmap::{CircEnt, CircMap};
use crate::circuit::halfcirc::HalfCirc;
use crate::util::err::ReactorError;
use crate::util::ts::Timestamp;
use crate::{Error, Result};
use tor_cell::chancell::msg::{Destroy, DestroyReason};
use tor_cell::chancell::{msg::ChanMsg, ChanCell, CircId};
@ -229,6 +230,7 @@ impl Reactor {
.add_ent(&mut rng, created_sender, sender)
.map(|id| (id, circ_unique_id));
let _ = tx.send(ret); // don't care about other side going away
self.update_disused_since();
}
}
Ok(())
@ -320,6 +322,7 @@ impl Reactor {
async fn deliver_destroy(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
// Remove the circuit from the map: nothing more can be done with it.
let entry = self.circs.remove(circid);
self.update_disused_since();
match entry {
// If the circuit is waiting for CREATED, tell it that it
// won't get one.
@ -371,12 +374,30 @@ impl Reactor {
// TODO: It would be great to have a tighter upper bound for
// the number of relay cells we'll receive.
self.circs.destroy_sent(id, HalfCirc::new(3000));
self.update_disused_since();
let destroy = Destroy::new(DestroyReason::NONE).into();
let cell = ChanCell::new(id, destroy);
self.send_cell(cell).await?;
Ok(())
}
/// Update disused timestamp with current time if this channel is no longer used
fn update_disused_since(&self) {
if self.circs.open_ent_count() == 0 {
// Update disused_since if it is still `None`
let mut disused_since = self.details.unused_since.lock().expect("Poisoned lock");
if disused_since.is_none() {
let now = Timestamp::new();
now.update();
*disused_since = Some(now);
}
} else {
// Mark this channel as in use
let mut disused_since = self.details.unused_since.lock().expect("Poisoned lock");
*disused_since = None;
}
}
}
#[cfg(test)]
@ -462,6 +483,7 @@ pub(crate) mod test {
fn new_circ_closed() {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
let (chan, mut reactor, mut output, _input) = new_reactor();
assert!(chan.duration_unused().is_some()); // unused yet
let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
let (pending, circr) = ret.unwrap();
@ -475,6 +497,8 @@ pub(crate) mod test {
let ent = reactor.circs.get_mut(id);
assert!(matches!(ent, Some(CircEnt::Opening(_, _))));
assert!(chan.duration_unused().is_none()); // in use
// Now drop the circuit; this should tell the reactor to remove
// the circuit from the map.
drop(pending);
@ -485,6 +509,7 @@ pub(crate) mod test {
let cell = output.next().await.unwrap();
assert_eq!(cell.circid(), id);
assert!(matches!(cell.msg(), ChanMsg::Destroy(_)));
assert!(chan.duration_unused().is_some()); // unused again
});
}