Mark consensus as "not-pending" even if its microdescs come from cache.

Previously our code would clear the 'pending' flag on a consensus
only when a _downloaded_ md made it become usable.

Closes #199.
This commit is contained in:
Nick Mathewson 2021-10-20 14:04:54 -04:00
parent 43506601dc
commit fdddb74de4
3 changed files with 48 additions and 8 deletions

View File

@ -99,7 +99,7 @@ async fn load_once<R: Runtime>(
missing.len()
);
let documents = load_all(dirmgr, missing)?;
state.add_from_cache(documents)
state.add_from_cache(documents, dirmgr.store_if_rw())
};
dirmgr.notify().await;
outcome

View File

@ -399,6 +399,21 @@ impl<R: Runtime> DirMgr<R> {
.upgrade_to_readwrite()
}
/// Return a reference to the store, if it is currently read-write.
fn store_if_rw(&self) -> Option<&Mutex<SqliteStore>> {
let rw = !self
.store
.lock()
.expect("Directory storage lock poisoned")
.is_readonly();
// A race-condition is possible here, but I believe it's harmless.
if rw {
Some(&self.store)
} else {
None
}
}
/// Construct a DirMgr from a DirMgrConfig.
fn from_config(
config: DirMgrConfig,
@ -428,8 +443,6 @@ impl<R: Runtime> DirMgr<R> {
///
/// Return false if there is no such consensus.
async fn load_directory(self: &Arc<Self>) -> Result<bool> {
//let store = &self.store;
let state = state::GetConsensusState::new(Arc::downgrade(self), CacheUsage::CacheOnly)?;
let _ = bootstrap::load(Arc::clone(self), Box::new(state)).await?;
@ -690,7 +703,14 @@ trait DirState: Send {
fn can_advance(&self) -> bool;
/// Add one or more documents from our cache; returns 'true' if there
/// was any change in this state.
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool>;
///
/// If `storage` is provided, then we should write any state changes into
/// it.
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool>;
/// Add information that we have just downloaded to this state; returns
/// 'true' if there as any change in this state.

View File

@ -166,7 +166,11 @@ impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
Err(Error::ManagerDropped)
}
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool> {
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
_storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool> {
let text = match docs.into_iter().next() {
None => return Ok(false),
Some((
@ -326,7 +330,11 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
Err(Error::ManagerDropped)
}
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool> {
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
_storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool> {
let mut changed = false;
// Here we iterate over the documents we want, taking them from
// our input and remembering them.
@ -563,7 +571,11 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
Err(Error::ManagerDropped)
}
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<bool> {
fn add_from_cache(
&mut self,
docs: HashMap<DocId, DocumentText>,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool> {
let mut microdescs = Vec::new();
for (id, text) in docs {
if let DocId::Microdesc(digest) = id {
@ -582,7 +594,15 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
}
let changed = !microdescs.is_empty();
self.register_microdescs(microdescs);
if self.register_microdescs(microdescs) {
if let Some(store) = storage {
let mut store = store.lock().expect("Directory storage lock poisoned");
info!("Marked consensus usable.");
store.mark_consensus_usable(&self.meta)?;
// DOCDOC: explain why we're doing this here.
store.expire_all()?;
}
}
Ok(changed)
}