Merge branch 'sighup' into 'main'

Sighup

See merge request tpo/core/arti!702
This commit is contained in:
Ian Jackson 2022-08-30 11:59:03 +00:00
commit 652ada2a4a
5 changed files with 150 additions and 65 deletions

13
Cargo.lock generated
View File

@ -103,6 +103,7 @@ dependencies = [
"secmem-proc",
"serde",
"serde_json",
"signal-hook-async-std",
"tokio",
"toml",
"tor-config",
@ -2991,6 +2992,18 @@ dependencies = [
"signal-hook-registry",
]
[[package]]
name = "signal-hook-async-std"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4aa94397e2023af5b7cff5b8d4785e935cfb77f0e4aab0cae3b26258ace556"
dependencies = [
"async-io",
"futures-lite",
"libc",
"signal-hook",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"

View File

@ -16,7 +16,13 @@ default = ["tokio", "native-tls", "dns-proxy", "harden"]
full = ["async-std", "tokio", "native-tls", "journald", "arti-client/full", "dns-proxy", "harden"]
async-std = ["arti-client/async-std", "tor-rtcompat/async-std", "async-ctrlc", "once_cell"]
async-std = [
"arti-client/async-std",
"tor-rtcompat/async-std",
"async-ctrlc",
"once_cell",
"signal-hook-async-std",
]
dns-proxy = ["trust-dns-proto"]
experimental-api = ["visibility"]
harden = ["secmem-proc"]
@ -55,6 +61,7 @@ rlimit = "0.8.3"
safelog = { path = "../safelog", version = "0.1.0" }
secmem-proc = { version = "0.1.1", optional = true }
serde = { version = "1.0.103", features = ["derive"] }
signal-hook-async-std = { version = "0.2", optional = true }
tokio-crate = { package = "tokio", version = "1.7", optional = true, features = ["signal"] }
tor-config = { path = "../tor-config", version = "0.5.0" }
tor-error = { path = "../tor-error", version = "0.3.2", default-features = false }

View File

@ -171,9 +171,9 @@ pub mod exit;
#[cfg(feature = "experimental-api")]
pub mod process;
#[cfg(feature = "experimental-api")]
pub mod socks;
pub mod reload_cfg;
#[cfg(feature = "experimental-api")]
pub mod watch_cfg;
pub mod socks;
#[cfg(all(not(feature = "experimental-api"), feature = "dns-proxy"))]
mod dns;
@ -182,9 +182,9 @@ mod exit;
#[cfg(not(feature = "experimental-api"))]
mod process;
#[cfg(not(feature = "experimental-api"))]
mod socks;
mod reload_cfg;
#[cfg(not(feature = "experimental-api"))]
mod watch_cfg;
mod socks;
use std::fmt::Write;
@ -262,9 +262,7 @@ async fn run<R: Runtime>(
.config(client_config)
.bootstrap_behavior(OnDemand);
let client = client_builder.create_unbootstrapped()?;
if arti_config.application().watch_configuration {
watch_cfg::watch_for_config_changes(config_sources, arti_config, client.clone())?;
}
reload_cfg::watch_for_config_changes(config_sources, arti_config, client.clone())?;
let mut proxy: Vec<PinnedFuture<(Result<()>, &str)>> = Vec::new();
if socks_port != 0 {

View File

@ -75,3 +75,25 @@ fn running_as_root() -> bool {
#[cfg(not(target_family = "unix"))]
false
}
/// Return an async stream that reports an event whenever we get a `SIGHUP`
/// signal.
///
/// Note that the signal-handling backend can coalesce signals; this is normal.
pub(crate) fn sighup_stream() -> crate::Result<impl futures::Stream<Item = ()>> {
cfg_if::cfg_if! {
if #[cfg(all(feature="tokio", target_family = "unix"))] {
use tokio_crate::signal::unix as s;
let mut signal = s::signal(s::SignalKind::hangup())?;
Ok(futures::stream::poll_fn(move |ctx| signal.poll_recv(ctx)))
} else if #[cfg(all(feature="async-std", target_family = "unix"))] {
use signal_hook_async_std as s;
use futures::stream::StreamExt as _;
let mut signal = s::Signals::new(&[s::consts::signals::SIGHUP])?;
Ok(signals.map(|i| i*2))
} else {
// Not unix or no backend, so we won't ever get a SIGHUP.
Ok(futures::stream::pending())
}
}
}

View File

@ -1,9 +1,8 @@
//! Code to watch configuration files for any changes.
use std::collections::HashSet;
use std::iter;
use std::path::{Path, PathBuf};
use std::sync::mpsc::channel as std_channel;
use std::sync::mpsc::{channel as std_channel, Sender};
use std::time::Duration;
use arti_client::config::Reconfigure;
@ -13,62 +12,83 @@ use tor_config::{sources::FoundConfigFiles, ConfigurationSource, ConfigurationSo
use tor_rtcompat::Runtime;
use tracing::{debug, error, info, warn};
use futures::task::SpawnExt;
use crate::process::sighup_stream;
use crate::{ArtiCombinedConfig, ArtiConfig};
/// How long to wait after a file is created, before we try to read it.
const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
/// Find the configuration files and prepare a watcher
///
/// For the watching to be reliably effective (race-free), the config must be read
/// *after* this point, using the returned `FoundConfigFiles`.
fn prepare_watcher(
sources: &ConfigurationSources,
) -> anyhow::Result<(FileWatcher, FoundConfigFiles)> {
let mut watcher = FileWatcher::new(DEBOUNCE_INTERVAL)?;
let sources = sources.scan()?;
for source in sources.iter() {
match source {
ConfigurationSource::Dir(dir) => watcher.watch_dir(dir)?,
ConfigurationSource::File(file) => watcher.watch_file(file)?,
/// Unwrap first expression or break with the provided error message
macro_rules! ok_or_break {
($e:expr, $msg:expr $(,)?) => {
match ($e) {
Ok(y) => y,
Err(e) => {
error!($msg, e);
break;
}
}
}
Ok((watcher, sources))
};
}
/// Launch a thread to watch our configuration files.
/// Launch a thread to reload our configuration files.
///
/// Whenever one or more files in `files` changes, try to reload our
/// configuration from them and tell TorClient about it.
/// If current configuration requires it, watch for changes in `sources`
/// and try to reload our configuration. On unix platforms, also watch
/// for SIGHUP and reload configuration then.
#[cfg_attr(feature = "experimental-api", visibility::make(pub))]
pub(crate) fn watch_for_config_changes<R: Runtime>(
sources: ConfigurationSources,
original: ArtiConfig,
client: TorClient<R>,
) -> anyhow::Result<()> {
let (mut watcher, found_files) = prepare_watcher(&sources)?;
let watch_file = original.application().watch_configuration;
// If watching, we must reload the config once right away, because
// we have set up the watcher *after* loading it the first time.
//
// That means we safely drop the found_files without races, since we're going to rescan.
drop(found_files);
let mut first_reload = iter::once(notify::DebouncedEvent::Rescan);
let (tx, rx) = std_channel();
let mut watcher = if watch_file {
// If watching, we must reload the config once right away, because
// we have set up the watcher *after* loading the config.
// ignore send error, rx can't be disconnected if we are here
let _ = tx.send(notify::DebouncedEvent::Rescan);
let (watcher, _) = FileWatcher::new_prepared(tx.clone(), DEBOUNCE_INTERVAL, &sources)?;
Some(watcher)
} else {
None
};
#[cfg(target_family = "unix")]
{
use futures::StreamExt;
let mut sighup_stream = sighup_stream()?;
let tx = tx.clone();
client.runtime().spawn(async move {
while let Some(()) = sighup_stream.next().await {
info!("Received SIGHUP");
if tx.send(notify::DebouncedEvent::Rescan).is_err() {
warn!("Failed to reload configuration");
break;
}
}
})?;
}
std::thread::spawn(move || {
// TODO: If someday we make this facility available outside of the
// `arti` application, we probably don't want to have this thread own
// the FileWatcher.
debug!("Entering FS event loop");
while let Some(event) = first_reload.next().or_else(|| watcher.rx().recv().ok()) {
if !watcher.event_matched(&event) {
while let Ok(event) = rx.recv() {
if !watcher.as_ref().map_or(true, |w| w.event_matched(&event)) {
// NOTE: Sadly, it's not safe to log in this case. If the user
// has put a configuration file and a logfile in the same
// directory, logging about discarded events will make us log
// every time we log, and fill up the filesystem.
continue;
}
while let Ok(_ignore) = watcher.rx().try_recv() {
while let Ok(_ignore) = rx.try_recv() {
// Discard other events, so that we only reload once.
//
// We can afford to treat both error cases from try_recv [Empty
@ -78,23 +98,34 @@ pub(crate) fn watch_for_config_changes<R: Runtime>(
}
debug!("FS event {:?}: reloading configuration.", event);
let (new_watcher, found_files) = match prepare_watcher(&sources) {
Ok(y) => y,
Err(e) => {
error!(
"FS watch: failed to rescan config and re-establish watch: {}",
e
);
break;
}
let found_files = if watcher.is_some() {
let (new_watcher, found_files) = ok_or_break!(
FileWatcher::new_prepared(tx.clone(), DEBOUNCE_INTERVAL, &sources),
"FS watch: failed to rescan config and re-establish watch: {}",
);
watcher = Some(new_watcher);
found_files
} else {
ok_or_break!(sources.scan(), "FS watch: failed to rescan config: {}",)
};
watcher = new_watcher;
match reconfigure(found_files, &original, &client) {
Ok(exit) => {
Ok(watch) => {
info!("Successfully reloaded configuration.");
if exit {
break;
if watch && watcher.is_none() {
info!("Starting watching over configuration.");
// If watching, we must reload the config once right away, because
// we have set up the watcher *after* loading the config.
// ignore send error, rx can't be disconnected if we are here
let _ = tx.send(notify::DebouncedEvent::Rescan);
let (new_watcher, _) = ok_or_break!(
FileWatcher::new_prepared(tx.clone(), DEBOUNCE_INTERVAL, &sources),
"FS watch: failed to rescan config and re-establish watch: {}",
);
watcher = Some(new_watcher);
} else if !watch && watcher.is_some() {
info!("Stopped watching over configuration.");
watcher = None;
}
}
Err(e) => warn!("Couldn't reload configuration: {}", e),
@ -113,7 +144,7 @@ pub(crate) fn watch_for_config_changes<R: Runtime>(
/// Reload the configuration files, apply the runtime configuration, and
/// reconfigure the client as much as we can.
///
/// Return true if we should stop watching for configuration changes.
/// Return true if we should be watching for configuration changes.
fn reconfigure<R: Runtime>(
found_files: FoundConfigFiles<'_>,
original: &ArtiConfig,
@ -137,11 +168,7 @@ fn reconfigure<R: Runtime>(
crate::process::enable_process_hardening()?;
}
if !config.application().watch_configuration {
// Stop watching for configuration changes.
return Ok(true);
}
Ok(false)
Ok(config.application().watch_configuration)
}
/// A wrapper around `notify::RecommendedWatcher` to watch a set of parent
@ -165,8 +192,6 @@ fn reconfigure<R: Runtime>(
/// to mess around with `std::sync::mpsc` and filter out the events they want
/// using `FileWatcher::event_matched`.
struct FileWatcher {
/// The channel we receive events on
rx: std::sync::mpsc::Receiver<notify::DebouncedEvent>,
/// An underlying `notify` watcher that tells us about directory changes.
watcher: notify::RecommendedWatcher,
/// The list of directories that we're currently watching.
@ -177,20 +202,40 @@ struct FileWatcher {
impl FileWatcher {
/// Like `notify::watcher`, but create a FileWatcher instead.
fn new(interval: Duration) -> anyhow::Result<Self> {
let (tx, rx) = std_channel();
fn new(tx: Sender<notify::DebouncedEvent>, interval: Duration) -> anyhow::Result<Self> {
let watcher = notify::watcher(tx, interval)?;
Ok(Self {
rx,
watcher,
watching_dirs: HashSet::new(),
watching_files: HashSet::new(),
})
}
/// Access the channel - use for receiving events
fn rx(&self) -> &std::sync::mpsc::Receiver<notify::DebouncedEvent> {
&self.rx
/// Create a FileWatcher already watching files in `sources`
fn new_prepared(
tx: Sender<notify::DebouncedEvent>,
interval: Duration,
sources: &ConfigurationSources,
) -> anyhow::Result<(Self, FoundConfigFiles)> {
Self::new(tx, interval).and_then(|mut this| this.prepare(sources).map(|cfg| (this, cfg)))
}
/// Find the configuration files and prepare the watcher
///
/// For the watching to be reliably effective (race-free), the config must be read
/// *after* this point, using the returned `FoundConfigFiles`.
fn prepare<'a>(
&mut self,
sources: &'a ConfigurationSources,
) -> anyhow::Result<FoundConfigFiles<'a>> {
let sources = sources.scan()?;
for source in sources.iter() {
match source {
ConfigurationSource::Dir(dir) => self.watch_dir(dir)?,
ConfigurationSource::File(file) => self.watch_file(file)?,
}
}
Ok(sources)
}
/// Watch a single file (not a directory).