Merge branch 'store-yak-bd' into 'main'
bridgedesc: yaks for persistent storage See merge request tpo/core/arti!827
This commit is contained in:
commit
1be9019def
|
@ -138,12 +138,17 @@ mod mockable {
|
|||
///
|
||||
/// Runs in a task.
|
||||
/// Called by `Manager::download_descriptor`, which handles parsing and validation.
|
||||
///
|
||||
/// If `if_modified_since` is `Some`,
|
||||
/// should tolerate an HTTP 304 Not Modified and return `None` in that case.
|
||||
/// If `if_modified_since` is `None`, returning `Ok(None,)` is forbidden.
|
||||
async fn download(
|
||||
self,
|
||||
runtime: &R,
|
||||
circmgr: &Self::CircMgr,
|
||||
bridge: &BridgeConfig,
|
||||
) -> Result<String, Error>;
|
||||
if_modified_since: Option<SystemTime>,
|
||||
) -> Result<Option<String>, Error>;
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
|
@ -156,7 +161,9 @@ impl<R: Runtime> mockable::MockableAPI<R> for () {
|
|||
runtime: &R,
|
||||
circmgr: &Self::CircMgr,
|
||||
bridge: &BridgeConfig,
|
||||
) -> Result<String, Error> {
|
||||
_if_modified_since: Option<SystemTime>,
|
||||
) -> Result<Option<String>, Error> {
|
||||
// TODO actually support _if_modified_since
|
||||
let circuit = circmgr.get_or_launch_dir_specific(bridge).await?;
|
||||
let mut stream = circuit
|
||||
.begin_dir_stream()
|
||||
|
@ -170,7 +177,7 @@ impl<R: Runtime> mockable::MockableAPI<R> for () {
|
|||
_ => internal!("tor_dirclient::download gave non-RequestFailed {:?}", dce).into(),
|
||||
})?;
|
||||
let output = response.into_output_string()?;
|
||||
Ok(output)
|
||||
Ok(Some(output))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -441,6 +448,20 @@ impl<R: Runtime> BridgeDescManager<R> {
|
|||
}
|
||||
}
|
||||
|
||||
/// If download was successful, what we obtained
|
||||
///
|
||||
/// Generated by `process_document`, from a downloaded textual descriptor.
|
||||
struct Downloaded {
|
||||
/// The bridge descriptor, fully parsed and verified
|
||||
desc: BridgeDesc,
|
||||
|
||||
/// When we should start a refresh for this descriptor
|
||||
///
|
||||
/// This is derived from the expiry time,
|
||||
/// and clamped according to limits in the configuration).
|
||||
refetch: SystemTime,
|
||||
}
|
||||
|
||||
impl<R: Runtime, M: Mockable<R>> BridgeDescManager<R, M> {
|
||||
/// Actual constructor, which takes a mockable
|
||||
//
|
||||
|
@ -819,15 +840,7 @@ impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
|
|||
///
|
||||
/// Final act of the the descriptor download task.
|
||||
/// `got` is from [`download_descriptor`](Manager::download_descriptor).
|
||||
///
|
||||
/// The `SystemTime` in `got.ok()`
|
||||
/// is when we should start to refetch for this descriptor
|
||||
/// (clamped according to limits in the configuration).
|
||||
fn record_download_outcome(
|
||||
&mut self,
|
||||
bridge: BridgeKey,
|
||||
got: Result<(BridgeDesc, SystemTime), Error>,
|
||||
) {
|
||||
fn record_download_outcome(&mut self, bridge: BridgeKey, got: Result<Downloaded, Error>) {
|
||||
let RunningInfo { retry_delay, .. } = match self.running.remove(&bridge) {
|
||||
Some(ri) => ri,
|
||||
None => {
|
||||
|
@ -837,7 +850,7 @@ impl<R: Runtime, M: Mockable<R>> StateGuard<'_, R, M> {
|
|||
};
|
||||
|
||||
let insert = match got {
|
||||
Ok((desc, refetch)) => {
|
||||
Ok(Downloaded { desc, refetch }) => {
|
||||
// Successful download. Schedule the refetch, and we'll insert Ok.
|
||||
|
||||
self.refetch_schedule.push(RefetchEntry {
|
||||
|
@ -899,64 +912,84 @@ impl<R: Runtime, M: Mockable<R>> Manager<R, M> {
|
|||
mockable: M,
|
||||
bridge: &BridgeConfig,
|
||||
config: &BridgeDescDownloadConfig,
|
||||
) -> Result<(BridgeDesc, SystemTime), Error> {
|
||||
) -> Result<Downloaded, Error> {
|
||||
debug!(r#"starting download for "{}""#, bridge);
|
||||
|
||||
let output = mockable
|
||||
// convenience alias, capturing the usual parameters from our variables.
|
||||
let process_document = |output| process_document(&self.runtime, config, output);
|
||||
|
||||
let text = mockable
|
||||
.clone()
|
||||
.download(&self.runtime, &self.circmgr, bridge)
|
||||
.download(&self.runtime, &self.circmgr, bridge, None)
|
||||
.await?;
|
||||
let desc = RouterDesc::parse(&output)?;
|
||||
let text = text.expect("got None but no if_modified_since");
|
||||
|
||||
// We *could* just trust this because we have trustworthy provenance
|
||||
// we know that the channel machinery authenticated the identity keys in `bridge`.
|
||||
// But let's do some cross-checking anyway. `check_signature` checks the self-signature.
|
||||
let desc = desc.check_signature().map_err(Arc::new)?;
|
||||
|
||||
let now = self.runtime.wallclock();
|
||||
desc.is_valid_at(&now)?;
|
||||
|
||||
// Justification that use of "dangerously" is correct:
|
||||
// 1. We have checked this just above, so it is valid now.
|
||||
// 2. We are extracting the timeout and implement our own refetch logic using expires.
|
||||
let (desc, (_, expires)) = desc.dangerously_into_parts();
|
||||
|
||||
// Our refetch schedule, and enforcement of descriptor expiry, is somewhat approximate.
|
||||
// The following situations can result in a nominally-expired descriptor being used:
|
||||
//
|
||||
// 1. We primarily enforce the timeout by looking at the expiry time,
|
||||
// subtracting a configured constant, and scheduling the start of a refetch then.
|
||||
// If it takes us longer to do the retry, than the prefetch constant,
|
||||
// we'll still be providing the old descriptor to consumers in the meantime.
|
||||
//
|
||||
// 2. We apply a minimum time before we will refetch a descriptor.
|
||||
// So if the validity time is unreasonably short, we'll use it beyond that time.
|
||||
//
|
||||
// 3. Clock warping could confuse this algorithm. This is inevitable because we
|
||||
// are relying on calendar times (SystemTime) in the descriptor, and because
|
||||
// we don't have a mechanism for being told about clock warps rather than the
|
||||
// passage of time.
|
||||
//
|
||||
// We think this is all OK given that a bridge descriptor is used for trying to
|
||||
// connect to the bridge itself. In particular, we don't want to completely trust
|
||||
// bridges to control our retry logic.
|
||||
let refetch = match expires {
|
||||
ops::Bound::Included(expires) | ops::Bound::Excluded(expires) => expires
|
||||
.checked_sub(config.prefetch)
|
||||
.ok_or(Error::ExtremeValidityTime)?,
|
||||
|
||||
ops::Bound::Unbounded => now
|
||||
.checked_add(config.max_refetch)
|
||||
.ok_or(Error::ExtremeValidityTime)?,
|
||||
};
|
||||
let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
|
||||
|
||||
let desc = BridgeDesc::new(Arc::new(desc));
|
||||
|
||||
Ok((desc, refetch))
|
||||
process_document(&text)
|
||||
}
|
||||
}
|
||||
|
||||
/// Processes and analyses a textual descriptor document into a `Downloaded`
|
||||
///
|
||||
/// Parses it, checks the signature, checks the document validity times,
|
||||
/// and if that's all good, calculates when will want to refetch it.
|
||||
//
|
||||
// TODO pt-client: This function could usefully have some unit tests.
|
||||
fn process_document<R: Runtime>(
|
||||
runtime: &R,
|
||||
config: &BridgeDescDownloadConfig,
|
||||
text: &str,
|
||||
) -> Result<Downloaded, Error> {
|
||||
let desc = RouterDesc::parse(text)?;
|
||||
|
||||
// We *could* just trust this because we have trustworthy provenance
|
||||
// we know that the channel machinery authenticated the identity keys in `bridge`.
|
||||
// But let's do some cross-checking anyway.
|
||||
// `check_signature` checks the self-signature.
|
||||
let desc = desc.check_signature().map_err(Arc::new)?;
|
||||
|
||||
let now = runtime.wallclock();
|
||||
desc.is_valid_at(&now)?;
|
||||
|
||||
// Justification that use of "dangerously" is correct:
|
||||
// 1. We have checked this just above, so it is valid now.
|
||||
// 2. We are extracting the timeout and implement our own refetch logic using expires.
|
||||
let (desc, (_, expires)) = desc.dangerously_into_parts();
|
||||
|
||||
// Our refetch schedule, and enforcement of descriptor expiry, is somewhat approximate.
|
||||
// The following situations can result in a nominally-expired descriptor being used:
|
||||
//
|
||||
// 1. We primarily enforce the timeout by looking at the expiry time,
|
||||
// subtracting a configured constant, and scheduling the start of a refetch then.
|
||||
// If it takes us longer to do the retry, than the prefetch constant,
|
||||
// we'll still be providing the old descriptor to consumers in the meantime.
|
||||
//
|
||||
// 2. We apply a minimum time before we will refetch a descriptor.
|
||||
// So if the validity time is unreasonably short, we'll use it beyond that time.
|
||||
//
|
||||
// 3. Clock warping could confuse this algorithm. This is inevitable because we
|
||||
// are relying on calendar times (SystemTime) in the descriptor, and because
|
||||
// we don't have a mechanism for being told about clock warps rather than the
|
||||
// passage of time.
|
||||
//
|
||||
// We think this is all OK given that a bridge descriptor is used for trying to
|
||||
// connect to the bridge itself. In particular, we don't want to completely trust
|
||||
// bridges to control our retry logic.
|
||||
let refetch = match expires {
|
||||
ops::Bound::Included(expires) | ops::Bound::Excluded(expires) => expires
|
||||
.checked_sub(config.prefetch)
|
||||
.ok_or(Error::ExtremeValidityTime)?,
|
||||
|
||||
ops::Bound::Unbounded => now
|
||||
.checked_add(config.max_refetch)
|
||||
.ok_or(Error::ExtremeValidityTime)?,
|
||||
};
|
||||
let refetch = refetch.clamp(now + config.min_refetch, now + config.max_refetch);
|
||||
|
||||
let desc = BridgeDesc::new(Arc::new(desc));
|
||||
|
||||
Ok(Downloaded { desc, refetch })
|
||||
}
|
||||
|
||||
/// Task which waits for the timeout, and requeues bridges that need to be refetched
|
||||
///
|
||||
/// This task's job is to execute the wakeup instructions provided via `updates`.
|
||||
|
|
|
@ -80,7 +80,8 @@ impl mockable::MockableAPI<R> for Mock {
|
|||
_runtime: &R,
|
||||
_circmgr: &Self::CircMgr,
|
||||
bridge: &BridgeConfig,
|
||||
) -> Result<String, Error> {
|
||||
_if_modified_since: Option<SystemTime>,
|
||||
) -> Result<Option<String>, Error> {
|
||||
eprint!("download ...");
|
||||
let mut mstate = self.mstate.lock().await;
|
||||
mstate.download_calls += 1;
|
||||
|
@ -93,7 +94,7 @@ impl mockable::MockableAPI<R> for Mock {
|
|||
.docs
|
||||
.get(&addr.port())
|
||||
.ok_or(TE("no document", RT::AfterWaiting))?;
|
||||
doc.clone()
|
||||
doc.clone().map(Some)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue