circmgr: Improve retry-and-or-delay logic.
Use the new RetryTime type and its associates to decide how long to wait (if at all) between attempts to build a circuit. Closes #421. Part of #329.
This commit is contained in:
parent
c3b2bcc91e
commit
6d8a6b42e7
|
@ -26,8 +26,9 @@ use crate::config::CircuitTiming;
|
|||
use crate::{DirInfo, Error, Result};
|
||||
|
||||
use retry_error::RetryError;
|
||||
use tor_basic_utils::retry::RetryDelay;
|
||||
use tor_config::MutCfg;
|
||||
use tor_error::internal;
|
||||
use tor_error::{internal, AbsRetryTime, HasRetryTime};
|
||||
use tor_rtcompat::{Runtime, SleepProviderExt};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
@ -777,6 +778,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
|
|||
std::cmp::max(1, self.builder.launch_parallelism(usage)),
|
||||
);
|
||||
|
||||
let mut retry_schedule = RetryDelay::from_msec(100);
|
||||
let mut retry_err = RetryError::<Box<Error>>::in_attempt_to("find or build a circuit");
|
||||
|
||||
for n in 1..(max_iterations + 1) {
|
||||
|
@ -789,7 +791,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
|
|||
Some(t) => t,
|
||||
};
|
||||
|
||||
match self.prepare_action(usage, dir, true) {
|
||||
let error = match self.prepare_action(usage, dir, true) {
|
||||
Ok(action) => {
|
||||
// We successfully found an action: Take that action.
|
||||
let outcome = self
|
||||
|
@ -801,9 +803,11 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
|
|||
Ok(Ok(circ)) => return Ok(circ),
|
||||
Ok(Err(e)) => {
|
||||
info!("Circuit attempt {} failed.", n);
|
||||
retry_err.extend(e);
|
||||
Error::RequestFailed(e)
|
||||
}
|
||||
Err(_) => {
|
||||
// We ran out of "remaining" time; there is nothing
|
||||
// more to be done.
|
||||
retry_err.push(Error::RequestTimeout);
|
||||
break;
|
||||
}
|
||||
|
@ -812,30 +816,36 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
|
|||
Err(e) => {
|
||||
// We couldn't pick the action!
|
||||
info!("Couldn't pick action for circuit attempt {}: {}", n, &e);
|
||||
let small_delay = Duration::from_millis(50);
|
||||
let wait_for_action = match &e {
|
||||
Error::Guard(tor_guardmgr::PickGuardError::AllGuardsDown {
|
||||
retry_at: Some(instant),
|
||||
}) => {
|
||||
// If we failed because all guards are down, that's fine: we just wait until
|
||||
// the next guard is retriable.
|
||||
instant.saturating_duration_since(self.runtime.now()) + small_delay
|
||||
}
|
||||
_ => {
|
||||
// Any other errors are pretty unusual; wait a little while, then try again.
|
||||
small_delay
|
||||
}
|
||||
};
|
||||
retry_err.push(e);
|
||||
if remaining < wait_for_action {
|
||||
// We can't wait long enough. Call this failed now.
|
||||
break;
|
||||
}
|
||||
self.runtime
|
||||
.sleep(std::cmp::min(remaining, wait_for_action))
|
||||
.await;
|
||||
e
|
||||
}
|
||||
};
|
||||
|
||||
// There's been an error. See how long we wait before we retry.
|
||||
let now = self.runtime.now();
|
||||
let retry_time =
|
||||
error.abs_retry_time(now, || retry_schedule.next_delay(&mut rand::thread_rng()));
|
||||
|
||||
// Record the error, flattening it if needed.
|
||||
match error {
|
||||
Error::RequestFailed(e) => retry_err.extend(e),
|
||||
e => retry_err.push(e),
|
||||
}
|
||||
|
||||
// If this is the last iteration, don't bother waiting, since we won't retry.
|
||||
if n == max_iterations {
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait, or not, as appropriate.
|
||||
match retry_time {
|
||||
AbsRetryTime::Immediate => {}
|
||||
AbsRetryTime::Never => break,
|
||||
AbsRetryTime::At(t) => {
|
||||
let remaining = timeout_at.saturating_duration_since(now);
|
||||
let delay = t.saturating_duration_since(now);
|
||||
self.runtime.sleep(std::cmp::min(delay, remaining)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(Error::RequestFailed(retry_err))
|
||||
|
@ -1569,7 +1579,7 @@ mod test {
|
|||
match op {
|
||||
FakeOp::Succeed => Ok((plan.spec, FakeCirc { id: FakeId::next() })),
|
||||
FakeOp::WrongSpec(s) => Ok((s, FakeCirc { id: FakeId::next() })),
|
||||
FakeOp::Fail => Err(Error::PendingCanceled),
|
||||
FakeOp::Fail => Err(Error::CircTimeout),
|
||||
FakeOp::Delay(d) => {
|
||||
let sl = self.runtime.sleep(d);
|
||||
self.runtime.allow_one_advance(d);
|
||||
|
|
Loading…
Reference in New Issue