tor-rtcompat/scheduler: add unit tests, FireIn -> FireAt

Addressing review comments: added some unit tests for the new scheduler
type, and made FireIn use an Instant instead (making it FireAt).
This commit is contained in:
eta 2022-03-24 14:07:40 +00:00
parent 1ca79ff988
commit ee47a16697
2 changed files with 142 additions and 7 deletions

View File

@ -406,6 +406,9 @@ pub mod cond {
///
/// (This is a macro so that it can repeat the closure as multiple separate
/// expressions, so it can take on two different types, if needed.)
//
// NOTE(eta): changing this #[cfg] can affect tests inside this crate that use
// this macro, like in scheduler.rs
#[macro_export]
#[cfg(all(
any(feature = "native-tls", feature = "rustls"),

View File

@ -7,7 +7,7 @@ use futures::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::time::{Duration, Instant};
use pin_project::pin_project;
@ -16,8 +16,8 @@ use pin_project::pin_project;
enum SchedulerCommand {
/// Run the task now.
Fire,
/// Run the task after the provided duration.
FireIn(Duration),
/// Run the task at the provided `Instant`.
FireAt(Instant),
/// Cancel a pending execution, if there is one.
Cancel,
}
@ -78,12 +78,12 @@ impl TaskHandle {
pub fn fire(&self) -> bool {
self.tx.unbounded_send(SchedulerCommand::Fire).is_ok()
}
/// Trigger this handle's corresponding schedule after `dur`.
/// Trigger this handle's corresponding schedule at `instant`.
///
/// Returns `true` if the schedule still exists, and `false` otherwise.
pub fn fire_in(&self, dur: Duration) -> bool {
pub fn fire_at(&self, instant: Instant) -> bool {
self.tx
.unbounded_send(SchedulerCommand::FireIn(dur))
.unbounded_send(SchedulerCommand::FireAt(instant))
.is_ok()
}
/// Cancel a pending firing of the handle's corresponding schedule.
@ -104,7 +104,9 @@ impl<R: SleepProvider> TaskScheduleP<'_, R> {
*self.instant_fire = true;
*self.sleep = None;
}
SchedulerCommand::FireIn(dur) => {
SchedulerCommand::FireAt(instant) => {
let now = self.rt.now();
let dur = instant.saturating_duration_since(now);
*self.instant_fire = false;
*self.sleep = Some(Box::pin(self.rt.sleep(dur)));
}
@ -146,3 +148,133 @@ impl<R: SleepProvider> Stream for TaskSchedule<R> {
Poll::Pending
}
}
// test_with_all_runtimes! only exists if these features are satisfied.
#[cfg(all(
test,
any(feature = "native-tls", feature = "rustls"),
any(feature = "tokio", feature = "async-std"),
))]
mod test {
use crate::scheduler::TaskSchedule;
use crate::{test_with_all_runtimes, SleepProvider};
use futures::FutureExt;
use futures::StreamExt;
use std::time::{Duration, Instant};
#[test]
fn it_fires_immediately() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, _hdl) = TaskSchedule::new(rt);
assert!(sch.next().now_or_never().is_some());
});
}
#[test]
#[allow(clippy::unwrap_used)]
fn it_dies_if_dropped() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt);
drop(hdl);
assert!(sch.next().now_or_never().unwrap().is_none());
});
}
#[test]
fn it_fires_on_demand() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt);
assert!(sch.next().now_or_never().is_some());
assert!(sch.next().now_or_never().is_none());
assert!(hdl.fire());
assert!(sch.next().now_or_never().is_some());
assert!(sch.next().now_or_never().is_none());
});
}
#[test]
fn it_cancels_instant_firings() {
// NOTE(eta): this test very much assumes that unbounded channels will
// transmit things instantly. If it breaks, that's probably why.
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt);
assert!(sch.next().now_or_never().is_some());
assert!(sch.next().now_or_never().is_none());
assert!(hdl.fire());
assert!(hdl.cancel());
assert!(sch.next().now_or_never().is_none());
});
}
#[test]
fn it_fires_after_self_reschedule() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, _hdl) = TaskSchedule::new(rt);
assert!(sch.next().now_or_never().is_some());
sch.fire_in(Duration::from_millis(100));
assert!(sch.next().now_or_never().is_none());
assert!(sch.next().await.is_some());
assert!(sch.next().now_or_never().is_none());
});
}
#[test]
fn it_fires_after_external_reschedule() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt);
assert!(sch.next().now_or_never().is_some());
hdl.fire_at(Instant::now() + Duration::from_millis(100));
assert!(sch.next().now_or_never().is_none());
assert!(sch.next().await.is_some());
assert!(sch.next().now_or_never().is_none());
});
}
#[test]
fn it_cancels_delayed_firings() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt.clone());
assert!(sch.next().now_or_never().is_some());
hdl.fire_at(Instant::now() + Duration::from_millis(100));
assert!(sch.next().now_or_never().is_none());
rt.sleep(Duration::from_millis(50)).await;
assert!(sch.next().now_or_never().is_none());
hdl.cancel();
assert!(sch.next().now_or_never().is_none());
rt.sleep(Duration::from_millis(100)).await;
assert!(sch.next().now_or_never().is_none());
});
}
#[test]
fn last_fire_wins() {
test_with_all_runtimes!(|rt| async move {
let (mut sch, hdl) = TaskSchedule::new(rt.clone());
assert!(sch.next().now_or_never().is_some());
hdl.fire_at(Instant::now() + Duration::from_millis(100));
hdl.fire();
assert!(sch.next().now_or_never().is_some());
assert!(sch.next().now_or_never().is_none());
rt.sleep(Duration::from_millis(150)).await;
assert!(sch.next().now_or_never().is_none());
});
}
}