DirMgr: more trace!() logs and information in download/load functions.

My goal here is to make sure that we can't confuse
one download operation and another, and that we actually know
what's going on.  Previously, not all state transitions or
attempts to fetch information actually corresponded to a log.
This commit is contained in:
Nick Mathewson 2023-04-10 10:27:48 -04:00
parent b2486bba1e
commit 8151fa504f
1 changed files with 30 additions and 8 deletions

View File

@ -281,6 +281,7 @@ static CANNED_RESPONSE: Lazy<Mutex<Vec<String>>> = Lazy::new(|| Mutex::new(vec![
/// Don't launch more than `parallelism` requests at once.
async fn fetch_multiple<R: Runtime>(
dirmgr: Arc<DirMgr<R>>,
attempt_id: AttemptId,
missing: &[DocId],
parallelism: usize,
) -> Result<Vec<(ClientRequest, DirResponse)>> {
@ -289,6 +290,9 @@ async fn fetch_multiple<R: Runtime>(
make_requests_for_documents(&dirmgr.runtime, missing, &**store, &dirmgr.config.get())?
};
trace!(attempt=%attempt_id, "Launching {} requests for {} documents",
requests.len(), missing.len());
#[cfg(test)]
{
let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
@ -330,6 +334,8 @@ async fn fetch_multiple<R: Runtime>(
}
}
trace!(attempt=%attempt_id, "received {} useful responses from our requests.", useful_responses.len());
Ok(useful_responses)
}
@ -381,13 +387,14 @@ pub(crate) async fn load<R: Runtime>(
) -> Result<Box<dyn DirState>> {
let mut safety_counter = 0_usize;
loop {
trace!(state=%state.describe(), "Loading from cache");
trace!(attempt=%attempt_id, state=%state.describe(), "Loading from cache");
let mut changed = false;
let outcome = load_once(&dirmgr, &mut state, attempt_id, &mut changed).await;
{
let mut store = dirmgr.store.lock().expect("store lock poisoned");
dirmgr.apply_netdir_changes(&mut state, &mut **store)?;
}
trace!(attempt=%attempt_id, ?outcome, "Load operation completed.");
if let Err(e) = outcome {
match e.bootstrap_action() {
@ -402,11 +409,13 @@ pub(crate) async fn load<R: Runtime>(
if state.can_advance() {
state = state.advance();
trace!(attempt=%attempt_id, state=state.describe(), "State has advanced.");
safety_counter = 0;
} else {
if !changed {
// TODO: Are there more nonfatal errors that mean we should
// break?
trace!(attempt=%attempt_id, state=state.describe(), "No state advancement after load; nothing more to find in the cache.");
break;
}
safety_counter += 1;
@ -432,7 +441,7 @@ async fn download_attempt<R: Runtime>(
attempt_id: AttemptId,
) -> Result<()> {
let missing = state.missing_docs();
let fetched = fetch_multiple(Arc::clone(dirmgr), &missing, parallelism).await?;
let fetched = fetch_multiple(Arc::clone(dirmgr), attempt_id, &missing, parallelism).await?;
let mut n_errors = 0;
for (client_req, dir_response) in fetched {
let source = dir_response.source().map(Clone::clone);
@ -517,6 +526,8 @@ pub(crate) async fn download<R: Runtime>(
) -> Result<()> {
let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
trace!(attempt=%attempt_id, state=%state.describe(), "Trying to download directory material.");
'next_state: loop {
let retry_config = state.dl_config();
let parallelism = retry_config.parallelism();
@ -527,7 +538,9 @@ pub(crate) async fn download<R: Runtime>(
let mut now = {
let dirmgr = upgrade_weak_ref(&dirmgr)?;
let mut changed = false;
trace!(attempt=%attempt_id, state=%state.describe(),"Attempting to load directory information from cache.");
let load_result = load_once(&dirmgr, state, attempt_id, &mut changed).await;
trace!(attempt=%attempt_id, state=%state.describe(), outcome=?load_result, "Load attempt complete.");
if let Err(e) = &load_result {
// If the load failed but the error can be blamed on a directory
// cache, do so.
@ -543,6 +556,7 @@ pub(crate) async fn download<R: Runtime>(
// Skip the downloads if we can...
if state.can_advance() {
advance(state);
trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
continue 'next_state;
}
// Apply any netdir changes that the state gives us.
@ -553,6 +567,7 @@ pub(crate) async fn download<R: Runtime>(
dirmgr.apply_netdir_changes(state, &mut **store)?;
}
if state.is_ready(Readiness::Complete) {
trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
return Ok(());
}
@ -570,23 +585,24 @@ pub(crate) async fn download<R: Runtime>(
// the final attempt.
let next_delay = retry.next_delay(&mut rand::thread_rng());
if let Some(delay) = delay.replace(next_delay) {
debug!("Waiting {:?} for next download attempt...", delay);
let time_until_reset = {
reset_time
.duration_since(now)
.unwrap_or(Duration::from_secs(0))
};
schedule.sleep(delay.min(time_until_reset)).await?;
let real_delay = delay.min(time_until_reset);
debug!(attempt=%attempt_id, "Waiting {:?} for next download attempt...", real_delay);
schedule.sleep(real_delay).await?;
now = upgrade_weak_ref(&dirmgr)?.runtime.wallclock();
if now >= reset_time {
info!("Download attempt timed out completely; resetting download state.");
info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
reset(state);
continue 'next_state;
}
}
info!("{}: {}", attempt + 1, state.describe());
info!(attempt=%attempt_id, "{}: {}", attempt + 1, state.describe());
let reset_time = no_more_than_a_week_from(now, state.reset_time());
now = {
@ -594,9 +610,11 @@ pub(crate) async fn download<R: Runtime>(
futures::select_biased! {
outcome = download_attempt(&dirmgr, state, parallelism.into(), attempt_id).fuse() => {
if let Err(e) = outcome {
warn!("Error while downloading: {}", e.report());
warn!(attempt=%attempt_id, "Error while downloading: {}", e.report());
propagate_fatal_errors!(Err(e));
continue 'next_attempt;
} else {
trace!(attempt=%attempt_id, "Successfully downloaded some information.");
}
}
_ = schedule.sleep_until_wallclock(reset_time).fuse() => {
@ -604,7 +622,8 @@ pub(crate) async fn download<R: Runtime>(
// example) we're downloading the last few
// microdescriptors on a consensus that now
// we're ready to replace.
reset(state);
info!(attempt=%attempt_id, "Directory being fetched is now outdated; resetting download state.");
reset(state);
continue 'next_state;
},
};
@ -622,11 +641,13 @@ pub(crate) async fn download<R: Runtime>(
// Exit if there is nothing more to download.
if state.is_ready(Readiness::Complete) {
trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Complete.");
return Ok(());
}
// Report usable-ness if appropriate.
if on_usable.is_some() && state.is_ready(Readiness::Usable) {
trace!(attempt=%attempt_id, state=%state.describe(), "Directory is now Usable.");
// Unwrap should be safe due to parent `.is_some()` check
#[allow(clippy::unwrap_used)]
let _ = on_usable.take().unwrap().send(());
@ -635,6 +656,7 @@ pub(crate) async fn download<R: Runtime>(
if state.can_advance() {
// We have enough info to advance to another state.
advance(state);
trace!(attempt=%attempt_id, state=%state.describe(), "State has advanced.");
continue 'next_state;
}
}