Initial work on periodically reloading state.

We can use this in the case where we don't get the lock on the
state file, because another process is running.
This commit is contained in:
Nick Mathewson 2021-10-19 11:26:22 -04:00
parent 6b26ae20a1
commit 89d1fb1767
4 changed files with 54 additions and 28 deletions

View File

@ -313,7 +313,6 @@ impl<R: Runtime> CircuitBuilder<R> {
pub fn save_state(&self) -> Result<()> {
// TODO: someday we'll want to only do this if there is something
// changed.
let _ignore = self.storage.try_lock()?; // XXXX don't ignore.
let state = self.builder.timeouts.build_state();
self.storage.store(&state)?;
Ok(())

View File

@ -201,7 +201,17 @@ impl<R: Runtime> CircMgr<R> {
Ok(circmgr)
}
/// Flush state to the state manager, if there is any unsaved state.
/// Reload state from the state manager.
///
/// We only call this method if we _don't_ have the lock on the state
/// files. If we have the lock, we only want to save.
pub fn reload_persistent_state(&self) -> Result<()> {
warn!("reload_persistent_state isn't implemented.");
Ok(())
}
/// Flush state to the state manager, if there is any unsaved state and
/// we have the lock.
pub fn update_persistent_state(&self) -> Result<()> {
self.mgr.peek_builder().save_state()?;
self.mgr

View File

@ -160,7 +160,7 @@ impl<R: Runtime> TorClient<R> {
}
let chanmgr = Arc::new(tor_chanmgr::ChanMgr::new(runtime.clone()));
let circmgr =
tor_circmgr::CircMgr::new(circ_cfg, statemgr, &runtime, Arc::clone(&chanmgr))?;
tor_circmgr::CircMgr::new(circ_cfg, statemgr.clone(), &runtime, Arc::clone(&chanmgr))?;
let dirmgr = tor_dirmgr::DirMgr::bootstrap_from_config(
dir_cfg,
runtime.clone(),
@ -179,9 +179,10 @@ impl<R: Runtime> TorClient<R> {
Arc::downgrade(&dirmgr),
))?;
runtime.spawn(flush_state_to_disk(
runtime.spawn(update_persistent_state(
runtime.clone(),
Arc::downgrade(&circmgr),
statemgr,
))?;
runtime.spawn(continually_launch_timeout_testing_circuits(
@ -312,12 +313,6 @@ impl<R: Runtime> TorClient<R> {
Ok(circ)
}
/// Try to flush persistent state into storage.
fn update_persistent_state(&self) -> Result<()> {
self.circmgr.update_persistent_state()?;
Ok(())
}
}
/// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
@ -365,16 +360,40 @@ async fn keep_circmgr_params_updated<R: Runtime>(
/// Exit when we notice that `circmgr` has been dropped.
///
/// This is a daemon task: it runs indefinitely in the background.
async fn flush_state_to_disk<R: Runtime>(runtime: R, circmgr: Weak<tor_circmgr::CircMgr<R>>) {
async fn update_persistent_state<R: Runtime>(
runtime: R,
circmgr: Weak<tor_circmgr::CircMgr<R>>,
statemgr: FsStateMgr,
) {
#![allow(clippy::collapsible_else_if)]
// TODO: Consider moving this into tor-circmgr after we have more
// experience with the state system.
loop {
if let Some(circmgr) = Weak::upgrade(&circmgr) {
if let Err(e) = circmgr.update_persistent_state() {
error!("Unable to flush circmgr state: {}", e);
let had_lock = statemgr.can_store();
let lock_acquired = match statemgr.try_lock() {
Ok(v) => v,
Err(e) => {
error!("Unable to check lock status: {}", e);
break;
}
};
if !had_lock && lock_acquired {
info!("We now own the lock on our state files.");
}
if let Some(circmgr) = Weak::upgrade(&circmgr) {
if !had_lock {
if let Err(e) = circmgr.reload_persistent_state() {
error!("Unable to reload circmgr state: {}", e);
break;
}
} else {
if let Err(e) = circmgr.update_persistent_state() {
error!("Unable to flush circmgr state: {}", e);
break;
}
}
} else {
debug!("Circmgr has disappeared; task exiting.");
break;
@ -430,9 +449,12 @@ impl<R: Runtime> Drop for TorClient<R> {
// TODO: Consider moving this into tor-circmgr after we have more
// experience with the state system.
fn drop(&mut self) {
info!("Flushing persistent state at exit.");
if let Err(e) = self.update_persistent_state() {
error!("Unable to flush state on client exit: {}", e);
match self.circmgr.update_persistent_state() {
Ok(()) => info!("Flushed persistent state at exit."),
Err(tor_circmgr::Error::State(tor_persist::Error::NoLock)) => {
debug!("Lock not held; no state to flush.")
}
Err(e) => error!("Unable to flush state on client exit: {}", e),
}
}
}

View File

@ -276,18 +276,11 @@ impl<R: Runtime> GuardMgr<R> {
/// Flush our current guard state to the state manager, if there
/// is any unsaved state.
///
/// Return true if we were able to save, and false if we couldn't
/// get the lock.
pub fn update_persistent_state(&self) -> Result<bool, GuardMgrError> {
pub fn update_persistent_state(&self) -> Result<(), GuardMgrError> {
let inner = self.inner.lock().expect("Poisoned lock");
if inner.default_storage.try_lock()? {
trace!("Flushing guard state to disk.");
inner.default_storage.store(&inner.active_guards)?;
Ok(true)
} else {
Ok(false)
}
trace!("Flushing guard state to disk.");
inner.default_storage.store(&inner.active_guards)?;
Ok(())
}
/// Update the state of this [`GuardMgr`] based on a new or modified
@ -909,6 +902,8 @@ mod test {
fn init<R: Runtime>(rt: R) -> (GuardMgr<R>, TestingStateMgr, NetDir) {
use tor_netdir::{testnet, MdReceiver, PartialNetDir};
let statemgr = TestingStateMgr::new();
let have_lock = statemgr.try_lock().unwrap();
assert!(have_lock);
let guardmgr = GuardMgr::new(rt, statemgr.clone()).unwrap();
let (con, mds) = testnet::construct_network().unwrap();
let override_p = "guard-min-filtered-sample-size=5 guard-n-primary-guards=2"