From f6f732a47818f789db76ad692d0025de1051072f Mon Sep 17 00:00:00 2001 From: Yuan Lyu Date: Thu, 3 Feb 2022 23:36:01 -0500 Subject: [PATCH] Expire channels that have been unused for too long --- Cargo.lock | 1 + crates/arti-client/src/client.rs | 23 ++++ crates/tor-chanmgr/Cargo.toml | 1 + crates/tor-chanmgr/src/builder.rs | 4 + crates/tor-chanmgr/src/lib.rs | 8 ++ crates/tor-chanmgr/src/mgr.rs | 39 ++++++- crates/tor-chanmgr/src/mgr/map.rs | 148 ++++++++++++++++++++++-- crates/tor-proto/src/channel.rs | 67 ++++++++--- crates/tor-proto/src/channel/circmap.rs | 21 ++++ crates/tor-proto/src/channel/reactor.rs | 25 ++++ 10 files changed, 306 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 52321cac1..3e385033c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2931,6 +2931,7 @@ dependencies = [ "futures-await-test", "hex-literal", "postage", + "rand 0.8.4", "thiserror", "tor-error", "tor-linkspec", diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index 6396a8423..e5e30e798 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -371,6 +371,11 @@ impl TorClient { Arc::downgrade(&dirmgr), ))?; + runtime.spawn(continually_expire_channels( + runtime.clone(), + Arc::downgrade(&chanmgr), + ))?; + let client_isolation = IsolationToken::new(); Ok(TorClient { @@ -819,6 +824,24 @@ async fn continually_preemptively_build_circuits( rt.sleep(Duration::from_secs(10)).await; } } +/// Periodically expire any channels that have been unused beyond +/// the maximum duration allowed. +/// +/// Exist when we find that `chanmgr` is dropped +/// +/// This is a daemon task that runs indefinitely in the background +async fn continually_expire_channels(rt: R, chanmgr: Weak>) { + loop { + let delay = if let Some(cm) = Weak::upgrade(&chanmgr) { + cm.expire_channels() + } else { + // channel manager is closed. + return; + }; + // This will sometimes be an underestimate, but it's no big deal; we just sleep some more. + rt.sleep(Duration::from_secs(delay.as_secs())).await; + } +} impl Drop for TorClient { // TODO: Consider moving this into tor-circmgr after we have more diff --git a/crates/tor-chanmgr/Cargo.toml b/crates/tor-chanmgr/Cargo.toml index 392921bbf..56a15abad 100644 --- a/crates/tor-chanmgr/Cargo.toml +++ b/crates/tor-chanmgr/Cargo.toml @@ -23,6 +23,7 @@ async-trait = "0.1.2" derive_more = "0.99" futures = "0.3.14" postage = { version = "0.4", default-features = false, features = ["futures-traits"] } +rand = "0.8" tracing = "0.1.18" thiserror = "1" diff --git a/crates/tor-chanmgr/src/builder.rs b/crates/tor-chanmgr/src/builder.rs index c5a035e89..31b83c40d 100644 --- a/crates/tor-chanmgr/src/builder.rs +++ b/crates/tor-chanmgr/src/builder.rs @@ -5,6 +5,7 @@ use std::sync::Mutex; use crate::{event::ChanMgrEventSender, Error}; +use std::time::Duration; use tor_linkspec::{ChanTarget, OwnedChanTarget}; use tor_llcrypto::pk; use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider}; @@ -154,6 +155,9 @@ impl crate::mgr::AbstractChannel for tor_proto::channel::Channel { fn is_usable(&self) -> bool { !self.is_closing() } + fn duration_unused(&self) -> Option { + self.duration_unused() + } } #[cfg(test)] diff --git a/crates/tor-chanmgr/src/lib.rs b/crates/tor-chanmgr/src/lib.rs index ee7a927f7..b3d58c86c 100644 --- a/crates/tor-chanmgr/src/lib.rs +++ b/crates/tor-chanmgr/src/lib.rs @@ -53,6 +53,7 @@ mod mgr; #[cfg(test)] mod testing; +use std::time::Duration; use tor_linkspec::{ChanTarget, OwnedChanTarget}; use tor_proto::channel::Channel; @@ -115,4 +116,11 @@ impl ChanMgr { pub fn bootstrap_events(&self) -> ConnStatusEvents { self.bootstrap_status.clone() } + + /// Expire all channels that have been unused for too long. + /// + /// Return the duration from now until next channel expires. + pub fn expire_channels(&self) -> Duration { + self.mgr.expire_channels() + } } diff --git a/crates/tor-chanmgr/src/mgr.rs b/crates/tor-chanmgr/src/mgr.rs index c42b7b0ad..cb22b7440 100644 --- a/crates/tor-chanmgr/src/mgr.rs +++ b/crates/tor-chanmgr/src/mgr.rs @@ -1,11 +1,14 @@ //! Abstract implementation of a channel manager +use crate::mgr::map::OpenEntry; use crate::{Error, Result}; use async_trait::async_trait; use futures::channel::oneshot; use futures::future::{FutureExt, Shared}; +use rand::Rng; use std::hash::Hash; +use std::time::Duration; mod map; @@ -23,6 +26,9 @@ pub(crate) trait AbstractChannel: Clone { /// hit a bug, or for some other reason. We don't return unusable /// channels back to the user. fn is_usable(&self) -> bool; + /// Return the amount of time a channel has not been in use. + /// Return None if the channel is currently in use. + fn duration_unused(&self) -> Option; } /// Trait to describe how channels are created. @@ -130,10 +136,10 @@ impl AbstractChanMgr { let action = self .channels .change_state(&ident, |oldstate| match oldstate { - Some(Open(ref ch)) => { - if ch.is_usable() { + Some(Open(ref ent)) => { + if ent.channel.is_usable() { // Good channel. Return it. - let action = Action::Return(Ok(ch.clone())); + let action = Action::Return(Ok(ent.channel.clone())); (oldstate, action) } else { // Unusable channel. Move to the Building @@ -185,7 +191,15 @@ impl AbstractChanMgr { Ok(chan) => { // The channel got built: remember it, tell the // others, and return it. - self.channels.replace(ident.clone(), Open(chan.clone()))?; + self.channels.replace( + ident.clone(), + Open(OpenEntry { + channel: chan.clone(), + max_unused_duration: Duration::from_secs( + rand::thread_rng().gen_range(180..270), + ), + }), + )?; // It's okay if all the receivers went away: // that means that nobody was waiting for this channel. let _ignore_err = send.send(Ok(chan.clone())); @@ -206,6 +220,18 @@ impl AbstractChanMgr { last_err } + /// Expire any channels that have been unused longer than + /// their maximum unused duration assigned during creation. + /// + /// Return a duration from now until next channel expires. + /// + /// If all channels are in use or there are no open channels, + /// return 180 seconds which is the minimum value of + /// max_unused_duration. + pub(crate) fn expire_channels(&self) -> Duration { + self.channels.expire_channels() + } + /// Test only: return the current open usable channel with a given /// `ident`, if any. #[cfg(test)] @@ -215,7 +241,7 @@ impl AbstractChanMgr { ) -> Option { use map::ChannelState::*; match self.channels.get(ident) { - Ok(Some(Open(ref ch))) if ch.is_usable() => Some(ch.clone()), + Ok(Some(Open(ref ent))) if ent.channel.is_usable() => Some(ent.channel.clone()), _ => None, } } @@ -260,6 +286,9 @@ mod test { fn is_usable(&self) -> bool { !self.closing.load(Ordering::SeqCst) } + fn duration_unused(&self) -> Option { + None + } } impl FakeChannel { diff --git a/crates/tor-chanmgr/src/mgr/map.rs b/crates/tor-chanmgr/src/mgr/map.rs index ec47b302c..7aed760d8 100644 --- a/crates/tor-chanmgr/src/mgr/map.rs +++ b/crates/tor-chanmgr/src/mgr/map.rs @@ -1,5 +1,7 @@ //! Simple implementation for the internal map state of a ChanMgr. +use std::time::Duration; + use super::{AbstractChannel, Pending}; use crate::{Error, Result}; @@ -33,7 +35,7 @@ pub(crate) enum ChannelState { /// This channel might not be usable: it might be closing or /// broken. We need to check its is_usable() method before /// yielding it to the user. - Open(C), + Open(OpenEntry), /// A channel that's getting built. Building(Pending), /// A temporary invalid state. @@ -43,13 +45,22 @@ pub(crate) enum ChannelState { Poisoned(Priv), } +/// An open channel entry. +#[derive(Clone)] +pub(crate) struct OpenEntry { + /// The underlying open channel. + pub(crate) channel: C, + /// The maximum unused duration allowed for this channel. + pub(crate) max_unused_duration: Duration, +} + impl ChannelState { /// Create a new shallow copy of this ChannelState. #[cfg(test)] fn clone_ref(&self) -> Result { use ChannelState::*; match self { - Open(chan) => Ok(Open(chan.clone())), + Open(ent) => Ok(Open(ent.clone())), Building(pending) => Ok(Building(pending.clone())), Poisoned(_) => Err(Error::Internal("Poisoned state in channel map")), } @@ -60,7 +71,7 @@ impl ChannelState { #[cfg(test)] fn unwrap_open(&self) -> C { match self { - ChannelState::Open(chan) => chan.clone(), + ChannelState::Open(ent) => ent.clone().channel, _ => panic!("Not an open channel"), } } @@ -71,8 +82,8 @@ impl ChannelState { /// matching identity for this state. fn check_ident(&self, ident: &C::Ident) -> Result<()> { match self { - ChannelState::Open(chan) => { - if chan.ident() == ident { + ChannelState::Open(ent) => { + if ent.channel.ident() == ident { Ok(()) } else { Err(Error::Internal("Identity mismatch")) @@ -82,6 +93,31 @@ impl ChannelState { ChannelState::Building(_) => Ok(()), } } + + /// Return true if a channel is ready to expire `now`. + /// Update `expire_after` if a smaller duration than + /// the given value is required to expire this channel. + fn ready_to_expire(&self, expire_after: &mut Duration) -> bool { + if let ChannelState::Open(ent) = self { + let unused_duration = ent.channel.duration_unused(); + if let Some(unused_duration) = unused_duration { + let max_unused_duration = ent.max_unused_duration; + + if unused_duration < max_unused_duration { + *expire_after = + std::cmp::min(*expire_after, max_unused_duration - unused_duration); + true + } else { + false + } + } else { + // still in use + true + } + } else { + true + } + } } impl ChannelMap { @@ -123,7 +159,7 @@ impl ChannelMap { let mut map = self.channels.lock()?; map.retain(|_, state| match state { ChannelState::Poisoned(_) => false, - ChannelState::Open(ch) => ch.is_usable(), + ChannelState::Open(ent) => ent.channel.is_usable(), ChannelState::Building(_) => true, }); Ok(()) @@ -178,6 +214,19 @@ impl ChannelMap { } } } + + /// Expire all channels that have been unused for too long. + /// + /// Return a Duration until the next time at which + /// a channel _could_ expire. + pub(crate) fn expire_channels(&self) -> Duration { + let mut ret = Duration::from_secs(180); + self.channels + .lock() + .expect("Poisoned lock") + .retain(|_id, chan| chan.ready_to_expire(&mut ret)); + ret + } } #[cfg(test)] @@ -188,6 +237,7 @@ mod test { struct FakeChannel { ident: &'static str, usable: bool, + unused_duration: Option, } impl AbstractChannel for FakeChannel { type Ident = u8; @@ -197,17 +247,45 @@ mod test { fn is_usable(&self) -> bool { self.usable } + fn duration_unused(&self) -> Option { + self.unused_duration.map(Duration::from_secs) + } } fn ch(ident: &'static str) -> ChannelState { - ChannelState::Open(FakeChannel { + let channel = FakeChannel { ident, usable: true, + unused_duration: None, + }; + ChannelState::Open(OpenEntry { + channel, + max_unused_duration: Duration::from_secs(180), + }) + } + fn ch_with_details( + ident: &'static str, + max_unused_duration: Duration, + unused_duration: Option, + ) -> ChannelState { + let channel = FakeChannel { + ident, + usable: true, + unused_duration, + }; + ChannelState::Open(OpenEntry { + channel, + max_unused_duration, }) } fn closed(ident: &'static str) -> ChannelState { - ChannelState::Open(FakeChannel { + let channel = FakeChannel { ident, usable: false, + unused_duration: None, + }; + ChannelState::Open(OpenEntry { + channel, + max_unused_duration: Duration::from_secs(180), }) } @@ -220,20 +298,20 @@ mod test { assert!(map.replace(b'w', ch("wello")).unwrap().is_none()); match map.get(&b'h') { - Ok(Some(Open(chan))) if chan.ident == "hello" => {} + Ok(Some(Open(ent))) if ent.channel.ident == "hello" => {} _ => panic!(), } assert!(map.get(&b'W').unwrap().is_none()); match map.replace(b'h', ch("hebbo")) { - Ok(Some(Open(chan))) if chan.ident == "hello" => {} + Ok(Some(Open(ent))) if ent.channel.ident == "hello" => {} _ => panic!(), } assert!(map.remove(&b'Z').unwrap().is_none()); match map.remove(&b'h') { - Ok(Some(Open(chan))) if chan.ident == "hebbo" => {} + Ok(Some(Open(ent))) if ent.channel.ident == "hebbo" => {} _ => panic!(), } } @@ -309,4 +387,52 @@ mod test { assert!(matches!(e, Err(Error::Internal(_)))); assert!(matches!(map.get(&b'G'), Err(Error::Internal(_)))); } + + #[test] + fn expire_channels() { + let map = ChannelMap::new(); + + // Channel that has been unused beyond max duration allowed is expired + map.replace( + b'w', + ch_with_details("wello", Duration::from_secs(180), Some(181)), + ) + .unwrap(); + + // Minimum value of max unused duration is 180 seconds + assert_eq!(180, map.expire_channels().as_secs()); + assert!(map.get(&b'w').unwrap().is_none()); + + let map = ChannelMap::new(); + + // Channel that has been unused for shorter than max unused duration + map.replace( + b'w', + ch_with_details("wello", Duration::from_secs(180), Some(120)), + ) + .unwrap(); + + map.replace( + b'y', + ch_with_details("yello", Duration::from_secs(180), Some(170)), + ) + .unwrap(); + + // Channel that has been unused beyond max duration allowed is expired + map.replace( + b'g', + ch_with_details("gello", Duration::from_secs(180), Some(181)), + ) + .unwrap(); + + // Closed channel should be retained + map.replace(b'h', closed("hello")).unwrap(); + + // Return duration untill next channel expires + assert_eq!(10, map.expire_channels().as_secs()); + assert!(map.get(&b'w').unwrap().is_some()); + assert!(map.get(&b'y').unwrap().is_some()); + assert!(map.get(&b'h').unwrap().is_some()); + assert!(map.get(&b'g').unwrap().is_none()); + } } diff --git a/crates/tor-proto/src/channel.rs b/crates/tor-proto/src/channel.rs index cdbebe5bf..ebfa083bf 100644 --- a/crates/tor-proto/src/channel.rs +++ b/crates/tor-proto/src/channel.rs @@ -66,6 +66,7 @@ use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, CtrlMsg, Rea pub use crate::channel::unique_id::UniqId; use crate::circuit; use crate::circuit::celltypes::CreateResponse; +use crate::util::ts::Timestamp; use crate::{Error, Result}; use std::pin::Pin; use tor_cell::chancell::{msg, ChanCell, CircId}; @@ -79,7 +80,7 @@ use futures::io::{AsyncRead, AsyncWrite}; use futures::{Sink, SinkExt}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use tracing::trace; @@ -128,6 +129,9 @@ pub(crate) struct ChannelDetails { rsa_id: RsaIdentity, /// If true, this channel is closing. closed: AtomicBool, + /// Since when the channel became unused. + /// If None, this channle is still in use by at lesat one circuit. + unused_since: Mutex>, } impl Sink for Channel { @@ -245,12 +249,16 @@ impl Channel { let (control_tx, control_rx) = mpsc::unbounded(); let (cell_tx, cell_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); let closed = AtomicBool::new(false); + let ts = Timestamp::new(); + ts.update(); + let unused_since = Mutex::new(Some(ts)); let details = ChannelDetails { unique_id, ed25519_id, rsa_id, closed, + unused_since, }; let details = Arc::new(details); @@ -316,6 +324,19 @@ impl Channel { self.details.closed.load(Ordering::SeqCst) } + /// If the channel is not in use, return the amount of time + /// it has had with no circuits. + /// + /// Return None if the channel is currently in use. + pub fn duration_unused(&self) -> Option { + self.details + .unused_since + .lock() + .expect("Poisoned lock") + .as_ref() + .map(|t| t.time_since_update().into()) + } + /// Check whether a cell type is permissible to be _sent_ on an /// open client channel. fn check_cell(&self, cell: &ChanCell) -> Result<()> { @@ -423,16 +444,7 @@ pub(crate) mod test { use tor_cell::chancell::{msg, ChanCell}; /// Make a new fake reactor-less channel. For testing only, obviously. - pub(crate) fn fake_channel() -> Channel { - let unique_id = UniqId::new(); - - let details = Arc::new(ChannelDetails { - unique_id, - ed25519_id: [6_u8; 32].into(), - rsa_id: [10_u8; 20].into(), - closed: AtomicBool::new(false), - }); - + pub(crate) fn fake_channel(details: Arc) -> Channel { Channel { control: mpsc::unbounded().0, cell_tx: mpsc::channel(CHANNEL_BUFFER_SIZE).0, @@ -440,10 +452,23 @@ pub(crate) mod test { } } + fn fake_channel_details() -> Arc { + let unique_id = UniqId::new(); + let unused_since = Mutex::new(None); + + Arc::new(ChannelDetails { + unique_id, + ed25519_id: [6_u8; 32].into(), + rsa_id: [10_u8; 20].into(), + closed: AtomicBool::new(false), + unused_since, + }) + } + #[test] fn send_bad() { tor_rtcompat::test_with_all_runtimes!(|_rt| async move { - let chan = fake_channel(); + let chan = fake_channel(fake_channel_details()); let cell = ChanCell::new(7.into(), msg::Created2::new(&b"hihi"[..]).into()); let e = chan.check_cell(&cell); @@ -480,7 +505,7 @@ pub(crate) mod test { #[test] fn check_match() { use std::net::SocketAddr; - let chan = fake_channel(); + let chan = fake_channel(fake_channel_details()); struct ChanT { ed_id: Ed25519Identity, @@ -519,8 +544,20 @@ pub(crate) mod test { #[test] fn unique_id() { - let ch1 = fake_channel(); - let ch2 = fake_channel(); + let ch1 = fake_channel(fake_channel_details()); + let ch2 = fake_channel(fake_channel_details()); assert_ne!(ch1.unique_id(), ch2.unique_id()); } + + #[test] + fn duration_unused_at() { + let details = fake_channel_details(); + let ch = fake_channel(Arc::clone(&details)); + let now = Timestamp::new(); + now.update(); + let mut unused_since = details.unused_since.lock().unwrap(); + *unused_since = Some(now); + drop(unused_since); // release it so that next line doesn't hang + assert!(ch.duration_unused().is_some()); + } } diff --git a/crates/tor-proto/src/channel/circmap.rs b/crates/tor-proto/src/channel/circmap.rs index dd2d1c604..d6e79e2bd 100644 --- a/crates/tor-proto/src/channel/circmap.rs +++ b/crates/tor-proto/src/channel/circmap.rs @@ -168,6 +168,18 @@ impl CircMap { pub(super) fn remove(&mut self, id: CircId) -> Option { self.m.remove(&id) } + + /// Return the total number of open and opening entries in the map + pub(super) fn open_ent_count(&self) -> usize { + // TODO: We want to change this from O(n) back to O(1). + // Maybe we should have the CircMap keep track of + // the open-or-opening entries count. + self.m + .iter() + .filter(|(_id, ent)| matches!(ent, &&CircEnt::Open(_) | &&CircEnt::Opening(_, _))) + .count() + } + // TODO: Eventually if we want relay support, we'll need to support // circuit IDs chosen by somebody else. But for now, we don't need those. } @@ -210,10 +222,19 @@ mod test { ids_high.push(id_high); } + // Test open / opening entry counting + assert_eq!(128, map_low.open_ent_count()); + assert_eq!(128, map_high.open_ent_count()); + // Test remove assert!(map_low.get_mut(ids_low[0]).is_some()); map_low.remove(ids_low[0]); assert!(map_low.get_mut(ids_low[0]).is_none()); + assert_eq!(127, map_low.open_ent_count()); + + // Test DestroySent doesn't count + map_low.destroy_sent(CircId::from(256), HalfCirc::new(1)); + assert_eq!(127, map_low.open_ent_count()); // Test advance_from_opening. diff --git a/crates/tor-proto/src/channel/reactor.rs b/crates/tor-proto/src/channel/reactor.rs index 402d9c7d6..fd7c7618f 100644 --- a/crates/tor-proto/src/channel/reactor.rs +++ b/crates/tor-proto/src/channel/reactor.rs @@ -9,6 +9,7 @@ use super::circmap::{CircEnt, CircMap}; use crate::circuit::halfcirc::HalfCirc; use crate::util::err::ReactorError; +use crate::util::ts::Timestamp; use crate::{Error, Result}; use tor_cell::chancell::msg::{Destroy, DestroyReason}; use tor_cell::chancell::{msg::ChanMsg, ChanCell, CircId}; @@ -229,6 +230,7 @@ impl Reactor { .add_ent(&mut rng, created_sender, sender) .map(|id| (id, circ_unique_id)); let _ = tx.send(ret); // don't care about other side going away + self.update_disused_since(); } } Ok(()) @@ -320,6 +322,7 @@ impl Reactor { async fn deliver_destroy(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> { // Remove the circuit from the map: nothing more can be done with it. let entry = self.circs.remove(circid); + self.update_disused_since(); match entry { // If the circuit is waiting for CREATED, tell it that it // won't get one. @@ -371,12 +374,30 @@ impl Reactor { // TODO: It would be great to have a tighter upper bound for // the number of relay cells we'll receive. self.circs.destroy_sent(id, HalfCirc::new(3000)); + self.update_disused_since(); let destroy = Destroy::new(DestroyReason::NONE).into(); let cell = ChanCell::new(id, destroy); self.send_cell(cell).await?; Ok(()) } + + /// Update disused timestamp with current time if this channel is no longer used + fn update_disused_since(&self) { + if self.circs.open_ent_count() == 0 { + // Update disused_since if it is still `None` + let mut disused_since = self.details.unused_since.lock().expect("Poisoned lock"); + if disused_since.is_none() { + let now = Timestamp::new(); + now.update(); + *disused_since = Some(now); + } + } else { + // Mark this channel as in use + let mut disused_since = self.details.unused_since.lock().expect("Poisoned lock"); + *disused_since = None; + } + } } #[cfg(test)] @@ -462,6 +483,7 @@ pub(crate) mod test { fn new_circ_closed() { tor_rtcompat::test_with_all_runtimes!(|rt| async move { let (chan, mut reactor, mut output, _input) = new_reactor(); + assert!(chan.duration_unused().is_some()); // unused yet let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once()); let (pending, circr) = ret.unwrap(); @@ -475,6 +497,8 @@ pub(crate) mod test { let ent = reactor.circs.get_mut(id); assert!(matches!(ent, Some(CircEnt::Opening(_, _)))); + assert!(chan.duration_unused().is_none()); // in use + // Now drop the circuit; this should tell the reactor to remove // the circuit from the map. drop(pending); @@ -485,6 +509,7 @@ pub(crate) mod test { let cell = output.next().await.unwrap(); assert_eq!(cell.circid(), id); assert!(matches!(cell.msg(), ChanMsg::Destroy(_))); + assert!(chan.duration_unused().is_some()); // unused again }); }