From 2c4a176e664f33537b7dea35051da3dacf81e991 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 25 Aug 2022 08:34:41 -0400 Subject: [PATCH 1/4] Add functionality to listen for SIGHUPs. --- Cargo.lock | 13 +++++++++++++ crates/arti/Cargo.toml | 9 ++++++++- crates/arti/src/process.rs | 22 ++++++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 17f83d6dd..4e0e4a9f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,7 @@ dependencies = [ "secmem-proc", "serde", "serde_json", + "signal-hook-async-std", "tokio", "toml", "tor-config", @@ -2991,6 +2992,18 @@ dependencies = [ "signal-hook-registry", ] +[[package]] +name = "signal-hook-async-std" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4aa94397e2023af5b7cff5b8d4785e935cfb77f0e4aab0cae3b26258ace556" +dependencies = [ + "async-io", + "futures-lite", + "libc", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" diff --git a/crates/arti/Cargo.toml b/crates/arti/Cargo.toml index ee913c4b9..ec4f8d5c6 100644 --- a/crates/arti/Cargo.toml +++ b/crates/arti/Cargo.toml @@ -16,7 +16,13 @@ default = ["tokio", "native-tls", "dns-proxy", "harden"] full = ["async-std", "tokio", "native-tls", "journald", "arti-client/full", "dns-proxy", "harden"] -async-std = ["arti-client/async-std", "tor-rtcompat/async-std", "async-ctrlc", "once_cell"] +async-std = [ + "arti-client/async-std", + "tor-rtcompat/async-std", + "async-ctrlc", + "once_cell", + "signal-hook-async-std", +] dns-proxy = ["trust-dns-proto"] experimental-api = ["visibility"] harden = ["secmem-proc"] @@ -55,6 +61,7 @@ rlimit = "0.8.3" safelog = { path = "../safelog", version = "0.1.0" } secmem-proc = { version = "0.1.1", optional = true } serde = { version = "1.0.103", features = ["derive"] } +signal-hook-async-std = { version = "0.2", optional = true } tokio-crate = { package = "tokio", version = "1.7", optional = true, features = ["signal"] } tor-config = { path = "../tor-config", version = "0.5.0" } tor-error = { path = "../tor-error", version = "0.3.2", default-features = false } diff --git a/crates/arti/src/process.rs b/crates/arti/src/process.rs index 6c4f8f6d7..2fd5b16de 100644 --- a/crates/arti/src/process.rs +++ b/crates/arti/src/process.rs @@ -75,3 +75,25 @@ fn running_as_root() -> bool { #[cfg(not(target_family = "unix"))] false } + +/// Return an async stream that reports an event whenever we get a `SIGHUP` +/// signal. +/// +/// Note that the signal-handling backend can coalesce signals; this is normal. +pub(crate) fn sighup_stream() -> crate::Result> { + cfg_if::cfg_if! { + if #[cfg(all(feature="tokio", target_family = "unix"))] { + use tokio_crate::signal::unix as s; + let mut signal = s::signal(s::SignalKind::hangup())?; + Ok(futures::stream::poll_fn(move |ctx| signal.poll_recv(ctx))) + } else if #[cfg(all(feature="async-std", target_family = "unix"))] { + use signal_hook_async_std as s; + use futures::stream::StreamExt as _; + let mut signal = s::Signals::new(&[s::consts::signals::SIGHUP])?; + Ok(signals.map(|i| i*2)) + } else { + // Not unix or no backend, so we won't ever get a SIGHUP. + Ok(futures::stream::pending()) + } + } +} From 7a3fec6fecc816114a138ad0b07102d7c0b398e3 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 25 Aug 2022 08:34:57 -0400 Subject: [PATCH 2/4] WIP: listen for sighups and reconfigure? --- crates/arti/src/lib.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/crates/arti/src/lib.rs b/crates/arti/src/lib.rs index 01b746dbf..201c00673 100644 --- a/crates/arti/src/lib.rs +++ b/crates/arti/src/lib.rs @@ -192,6 +192,8 @@ pub use cfg::{ ApplicationConfig, ApplicationConfigBuilder, ArtiCombinedConfig, ArtiConfig, ArtiConfigBuilder, ProxyConfig, ProxyConfigBuilder, SystemConfig, SystemConfigBuilder, ARTI_EXAMPLE_CONFIG, }; +use futures::stream::StreamExt; +use futures::task::SpawnExt; pub use logging::{LoggingConfig, LoggingConfigBuilder}; use arti_client::config::default_config_files; @@ -266,6 +268,17 @@ async fn run( watch_cfg::watch_for_config_changes(config_sources, arti_config, client.clone())?; } + #[cfg(target_family = "unix")] + { + // let client = client.clone(); + let mut sighup_stream = process::sighup_stream()?; + runtime.spawn(async move { + while let Some(()) = sighup_stream.next().await { + info!("SIGHUP!"); + } + })?; + } + let mut proxy: Vec, &str)>> = Vec::new(); if socks_port != 0 { let runtime = runtime.clone(); From 748d5aea483bdce24024fe9f1567d3ab727735a8 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 27 Aug 2022 12:26:26 +0200 Subject: [PATCH 3/4] connect SIGHUP to watch_cfg --- crates/arti/src/lib.rs | 17 +---- crates/arti/src/watch_cfg.rs | 135 +++++++++++++++++++++++------------ 2 files changed, 90 insertions(+), 62 deletions(-) diff --git a/crates/arti/src/lib.rs b/crates/arti/src/lib.rs index 201c00673..8add94ef4 100644 --- a/crates/arti/src/lib.rs +++ b/crates/arti/src/lib.rs @@ -192,8 +192,6 @@ pub use cfg::{ ApplicationConfig, ApplicationConfigBuilder, ArtiCombinedConfig, ArtiConfig, ArtiConfigBuilder, ProxyConfig, ProxyConfigBuilder, SystemConfig, SystemConfigBuilder, ARTI_EXAMPLE_CONFIG, }; -use futures::stream::StreamExt; -use futures::task::SpawnExt; pub use logging::{LoggingConfig, LoggingConfigBuilder}; use arti_client::config::default_config_files; @@ -264,20 +262,7 @@ async fn run( .config(client_config) .bootstrap_behavior(OnDemand); let client = client_builder.create_unbootstrapped()?; - if arti_config.application().watch_configuration { - watch_cfg::watch_for_config_changes(config_sources, arti_config, client.clone())?; - } - - #[cfg(target_family = "unix")] - { - // let client = client.clone(); - let mut sighup_stream = process::sighup_stream()?; - runtime.spawn(async move { - while let Some(()) = sighup_stream.next().await { - info!("SIGHUP!"); - } - })?; - } + watch_cfg::watch_for_config_changes(config_sources, arti_config, client.clone())?; let mut proxy: Vec, &str)>> = Vec::new(); if socks_port != 0 { diff --git a/crates/arti/src/watch_cfg.rs b/crates/arti/src/watch_cfg.rs index d24ac4584..0125eb4ad 100644 --- a/crates/arti/src/watch_cfg.rs +++ b/crates/arti/src/watch_cfg.rs @@ -1,9 +1,8 @@ //! Code to watch configuration files for any changes. use std::collections::HashSet; -use std::iter; use std::path::{Path, PathBuf}; -use std::sync::mpsc::channel as std_channel; +use std::sync::mpsc::{channel as std_channel, Sender}; use std::time::Duration; use arti_client::config::Reconfigure; @@ -13,6 +12,9 @@ use tor_config::{sources::FoundConfigFiles, ConfigurationSource, ConfigurationSo use tor_rtcompat::Runtime; use tracing::{debug, error, info, warn}; +use futures::task::SpawnExt; + +use crate::process::sighup_stream; use crate::{ArtiCombinedConfig, ArtiConfig}; /// How long to wait after a file is created, before we try to read it. @@ -22,10 +24,10 @@ const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1); /// /// For the watching to be reliably effective (race-free), the config must be read /// *after* this point, using the returned `FoundConfigFiles`. -fn prepare_watcher( - sources: &ConfigurationSources, -) -> anyhow::Result<(FileWatcher, FoundConfigFiles)> { - let mut watcher = FileWatcher::new(DEBOUNCE_INTERVAL)?; +fn prepare_watcher<'a>( + watcher: &mut FileWatcher, + sources: &'a ConfigurationSources, +) -> anyhow::Result> { let sources = sources.scan()?; for source in sources.iter() { match source { @@ -33,7 +35,20 @@ fn prepare_watcher( ConfigurationSource::File(file) => watcher.watch_file(file)?, } } - Ok((watcher, sources)) + Ok(sources) +} + +/// Unwrap first expression or break with the provided error message +macro_rules! ok_or_break { + ($e:expr, $msg:expr $(,)?) => { + match ($e) { + Ok(y) => y, + Err(e) => { + error!($msg, e); + break; + } + } + }; } /// Launch a thread to watch our configuration files. @@ -46,29 +61,52 @@ pub(crate) fn watch_for_config_changes( original: ArtiConfig, client: TorClient, ) -> anyhow::Result<()> { - let (mut watcher, found_files) = prepare_watcher(&sources)?; + let watch_file = original.application().watch_configuration; - // If watching, we must reload the config once right away, because - // we have set up the watcher *after* loading it the first time. - // - // That means we safely drop the found_files without races, since we're going to rescan. - drop(found_files); - let mut first_reload = iter::once(notify::DebouncedEvent::Rescan); + let (tx, rx) = std_channel(); + let mut watcher = if watch_file { + // If watching, we must reload the config once right away, because + // we have set up the watcher *after* loading the config. + // ignore send error, rx can't be disconnected if we are here + let _ = tx.send(notify::DebouncedEvent::Rescan); + let mut watcher = FileWatcher::new(tx.clone(), DEBOUNCE_INTERVAL)?; + prepare_watcher(&mut watcher, &sources)?; + Some(watcher) + } else { + None + }; + + #[cfg(target_family = "unix")] + { + use futures::StreamExt; + + let mut sighup_stream = sighup_stream()?; + let tx = tx.clone(); + client.runtime().spawn(async move { + while let Some(()) = sighup_stream.next().await { + info!("Received SIGHUP"); + if tx.send(notify::DebouncedEvent::Rescan).is_err() { + warn!("Failed to reload configuration"); + break; + } + } + })?; + } std::thread::spawn(move || { // TODO: If someday we make this facility available outside of the // `arti` application, we probably don't want to have this thread own // the FileWatcher. debug!("Entering FS event loop"); - while let Some(event) = first_reload.next().or_else(|| watcher.rx().recv().ok()) { - if !watcher.event_matched(&event) { + while let Ok(event) = rx.recv() { + if !watcher.as_ref().map_or(true, |w| w.event_matched(&event)) { // NOTE: Sadly, it's not safe to log in this case. If the user // has put a configuration file and a logfile in the same // directory, logging about discarded events will make us log // every time we log, and fill up the filesystem. continue; } - while let Ok(_ignore) = watcher.rx().try_recv() { + while let Ok(_ignore) = rx.try_recv() { // Discard other events, so that we only reload once. // // We can afford to treat both error cases from try_recv [Empty @@ -78,23 +116,41 @@ pub(crate) fn watch_for_config_changes( } debug!("FS event {:?}: reloading configuration.", event); - let (new_watcher, found_files) = match prepare_watcher(&sources) { - Ok(y) => y, - Err(e) => { - error!( - "FS watch: failed to rescan config and re-establish watch: {}", - e - ); - break; - } + let found_files = if watcher.is_some() { + let mut new_watcher = ok_or_break!( + FileWatcher::new(tx.clone(), DEBOUNCE_INTERVAL), + "FS watch: failed to create new watcher: {}", + ); + let found_files = ok_or_break!( + prepare_watcher(&mut new_watcher, &sources), + "FS watch: failed to rescan config and re-establish watch: {}", + ); + + watcher = Some(new_watcher); + found_files + } else { + ok_or_break!(sources.scan(), "FS watch: failed to rescan config: {}",) }; - watcher = new_watcher; match reconfigure(found_files, &original, &client) { - Ok(exit) => { + Ok(watch) => { info!("Successfully reloaded configuration."); - if exit { - break; + if watch && watcher.is_none() { + // If watching, we must reload the config once right away, because + // we have set up the watcher *after* loading the config. + // ignore send error, rx can't be disconnected if we are here + let _ = tx.send(notify::DebouncedEvent::Rescan); + let mut new_watcher = ok_or_break!( + FileWatcher::new(tx.clone(), DEBOUNCE_INTERVAL), + "FS watch: failed to restart watching: {}", + ); + ok_or_break!( + prepare_watcher(&mut new_watcher, &sources), + "FS watch: failed to rescan configuration: {}", + ); + watcher = Some(new_watcher); + } else if !watch && watcher.is_some() { + watcher = None; } } Err(e) => warn!("Couldn't reload configuration: {}", e), @@ -113,7 +169,7 @@ pub(crate) fn watch_for_config_changes( /// Reload the configuration files, apply the runtime configuration, and /// reconfigure the client as much as we can. /// -/// Return true if we should stop watching for configuration changes. +/// Return true if we should be watching for configuration changes. fn reconfigure( found_files: FoundConfigFiles<'_>, original: &ArtiConfig, @@ -137,11 +193,7 @@ fn reconfigure( crate::process::enable_process_hardening()?; } - if !config.application().watch_configuration { - // Stop watching for configuration changes. - return Ok(true); - } - Ok(false) + Ok(config.application().watch_configuration) } /// A wrapper around `notify::RecommendedWatcher` to watch a set of parent @@ -165,8 +217,6 @@ fn reconfigure( /// to mess around with `std::sync::mpsc` and filter out the events they want /// using `FileWatcher::event_matched`. struct FileWatcher { - /// The channel we receive events on - rx: std::sync::mpsc::Receiver, /// An underlying `notify` watcher that tells us about directory changes. watcher: notify::RecommendedWatcher, /// The list of directories that we're currently watching. @@ -177,22 +227,15 @@ struct FileWatcher { impl FileWatcher { /// Like `notify::watcher`, but create a FileWatcher instead. - fn new(interval: Duration) -> anyhow::Result { - let (tx, rx) = std_channel(); + fn new(tx: Sender, interval: Duration) -> anyhow::Result { let watcher = notify::watcher(tx, interval)?; Ok(Self { - rx, watcher, watching_dirs: HashSet::new(), watching_files: HashSet::new(), }) } - /// Access the channel - use for receiving events - fn rx(&self) -> &std::sync::mpsc::Receiver { - &self.rx - } - /// Watch a single file (not a directory). /// /// Idempotent: does nothing if we're already watching that file. From 8510ba534d8ba54c6d7f72ff3c28adfe409aa627 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Sat, 27 Aug 2022 12:44:11 +0200 Subject: [PATCH 4/4] small refactoring to reduce duplicaiton of config reloading --- crates/arti/src/lib.rs | 10 +-- .../arti/src/{watch_cfg.rs => reload_cfg.rs} | 76 ++++++++++--------- 2 files changed, 44 insertions(+), 42 deletions(-) rename crates/arti/src/{watch_cfg.rs => reload_cfg.rs} (85%) diff --git a/crates/arti/src/lib.rs b/crates/arti/src/lib.rs index 8add94ef4..23e2546ea 100644 --- a/crates/arti/src/lib.rs +++ b/crates/arti/src/lib.rs @@ -171,9 +171,9 @@ pub mod exit; #[cfg(feature = "experimental-api")] pub mod process; #[cfg(feature = "experimental-api")] -pub mod socks; +pub mod reload_cfg; #[cfg(feature = "experimental-api")] -pub mod watch_cfg; +pub mod socks; #[cfg(all(not(feature = "experimental-api"), feature = "dns-proxy"))] mod dns; @@ -182,9 +182,9 @@ mod exit; #[cfg(not(feature = "experimental-api"))] mod process; #[cfg(not(feature = "experimental-api"))] -mod socks; +mod reload_cfg; #[cfg(not(feature = "experimental-api"))] -mod watch_cfg; +mod socks; use std::fmt::Write; @@ -262,7 +262,7 @@ async fn run( .config(client_config) .bootstrap_behavior(OnDemand); let client = client_builder.create_unbootstrapped()?; - watch_cfg::watch_for_config_changes(config_sources, arti_config, client.clone())?; + reload_cfg::watch_for_config_changes(config_sources, arti_config, client.clone())?; let mut proxy: Vec, &str)>> = Vec::new(); if socks_port != 0 { diff --git a/crates/arti/src/watch_cfg.rs b/crates/arti/src/reload_cfg.rs similarity index 85% rename from crates/arti/src/watch_cfg.rs rename to crates/arti/src/reload_cfg.rs index 0125eb4ad..cbddbc453 100644 --- a/crates/arti/src/watch_cfg.rs +++ b/crates/arti/src/reload_cfg.rs @@ -20,24 +20,6 @@ use crate::{ArtiCombinedConfig, ArtiConfig}; /// How long to wait after a file is created, before we try to read it. const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1); -/// Find the configuration files and prepare a watcher -/// -/// For the watching to be reliably effective (race-free), the config must be read -/// *after* this point, using the returned `FoundConfigFiles`. -fn prepare_watcher<'a>( - watcher: &mut FileWatcher, - sources: &'a ConfigurationSources, -) -> anyhow::Result> { - let sources = sources.scan()?; - for source in sources.iter() { - match source { - ConfigurationSource::Dir(dir) => watcher.watch_dir(dir)?, - ConfigurationSource::File(file) => watcher.watch_file(file)?, - } - } - Ok(sources) -} - /// Unwrap first expression or break with the provided error message macro_rules! ok_or_break { ($e:expr, $msg:expr $(,)?) => { @@ -51,10 +33,11 @@ macro_rules! ok_or_break { }; } -/// Launch a thread to watch our configuration files. +/// Launch a thread to reload our configuration files. /// -/// Whenever one or more files in `files` changes, try to reload our -/// configuration from them and tell TorClient about it. +/// If current configuration requires it, watch for changes in `sources` +/// and try to reload our configuration. On unix platforms, also watch +/// for SIGHUP and reload configuration then. #[cfg_attr(feature = "experimental-api", visibility::make(pub))] pub(crate) fn watch_for_config_changes( sources: ConfigurationSources, @@ -69,8 +52,7 @@ pub(crate) fn watch_for_config_changes( // we have set up the watcher *after* loading the config. // ignore send error, rx can't be disconnected if we are here let _ = tx.send(notify::DebouncedEvent::Rescan); - let mut watcher = FileWatcher::new(tx.clone(), DEBOUNCE_INTERVAL)?; - prepare_watcher(&mut watcher, &sources)?; + let (watcher, _) = FileWatcher::new_prepared(tx.clone(), DEBOUNCE_INTERVAL, &sources)?; Some(watcher) } else { None @@ -117,15 +99,10 @@ pub(crate) fn watch_for_config_changes( debug!("FS event {:?}: reloading configuration.", event); let found_files = if watcher.is_some() { - let mut new_watcher = ok_or_break!( - FileWatcher::new(tx.clone(), DEBOUNCE_INTERVAL), - "FS watch: failed to create new watcher: {}", - ); - let found_files = ok_or_break!( - prepare_watcher(&mut new_watcher, &sources), + let (new_watcher, found_files) = ok_or_break!( + FileWatcher::new_prepared(tx.clone(), DEBOUNCE_INTERVAL, &sources), "FS watch: failed to rescan config and re-establish watch: {}", ); - watcher = Some(new_watcher); found_files } else { @@ -136,20 +113,18 @@ pub(crate) fn watch_for_config_changes( Ok(watch) => { info!("Successfully reloaded configuration."); if watch && watcher.is_none() { + info!("Starting watching over configuration."); // If watching, we must reload the config once right away, because // we have set up the watcher *after* loading the config. // ignore send error, rx can't be disconnected if we are here let _ = tx.send(notify::DebouncedEvent::Rescan); - let mut new_watcher = ok_or_break!( - FileWatcher::new(tx.clone(), DEBOUNCE_INTERVAL), - "FS watch: failed to restart watching: {}", - ); - ok_or_break!( - prepare_watcher(&mut new_watcher, &sources), - "FS watch: failed to rescan configuration: {}", + let (new_watcher, _) = ok_or_break!( + FileWatcher::new_prepared(tx.clone(), DEBOUNCE_INTERVAL, &sources), + "FS watch: failed to rescan config and re-establish watch: {}", ); watcher = Some(new_watcher); } else if !watch && watcher.is_some() { + info!("Stopped watching over configuration."); watcher = None; } } @@ -236,6 +211,33 @@ impl FileWatcher { }) } + /// Create a FileWatcher already watching files in `sources` + fn new_prepared( + tx: Sender, + interval: Duration, + sources: &ConfigurationSources, + ) -> anyhow::Result<(Self, FoundConfigFiles)> { + Self::new(tx, interval).and_then(|mut this| this.prepare(sources).map(|cfg| (this, cfg))) + } + + /// Find the configuration files and prepare the watcher + /// + /// For the watching to be reliably effective (race-free), the config must be read + /// *after* this point, using the returned `FoundConfigFiles`. + fn prepare<'a>( + &mut self, + sources: &'a ConfigurationSources, + ) -> anyhow::Result> { + let sources = sources.scan()?; + for source in sources.iter() { + match source { + ConfigurationSource::Dir(dir) => self.watch_dir(dir)?, + ConfigurationSource::File(file) => self.watch_file(file)?, + } + } + Ok(sources) + } + /// Watch a single file (not a directory). /// /// Idempotent: does nothing if we're already watching that file.