From d82ed8d793f5ba3a895fc30b8c2f61a273845198 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Mon, 3 Jul 2023 13:11:18 +0100 Subject: [PATCH] tor-rtmock: Provide MockExecutor --- Cargo.lock | 3 + crates/tor-rtmock/Cargo.toml | 3 + crates/tor-rtmock/src/lib.rs | 1 + crates/tor-rtmock/src/task.rs | 581 ++++++++++++++++++++++++++++++++++ 4 files changed, 588 insertions(+) create mode 100644 crates/tor-rtmock/src/task.rs diff --git a/Cargo.lock b/Cargo.lock index 5042b2400..4da90f524 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4820,11 +4820,14 @@ name = "tor-rtmock" version = "0.8.2" dependencies = [ "async-trait", + "educe", "futures", "futures-await-test", "humantime 2.1.0", + "itertools 0.11.0", "pin-project", "rand 0.8.5", + "slotmap", "thiserror", "tor-basic-utils", "tor-rtcompat", diff --git a/crates/tor-rtmock/Cargo.toml b/crates/tor-rtmock/Cargo.toml index 99d498f06..262e1a2e8 100644 --- a/crates/tor-rtmock/Cargo.toml +++ b/crates/tor-rtmock/Cargo.toml @@ -13,9 +13,12 @@ repository = "https://gitlab.torproject.org/tpo/core/arti.git/" [dependencies] async-trait = "0.1.54" +educe = "0.4.6" futures = "0.3.14" humantime = "2" +itertools = "0.11.0" pin-project = "1" +slotmap = "1.0.6" thiserror = "1" tor-rtcompat = { version = "0.9.1", path = "../tor-rtcompat" } tracing = "0.1.36" diff --git a/crates/tor-rtmock/src/lib.rs b/crates/tor-rtmock/src/lib.rs index 845f8e1cf..a85d5946f 100644 --- a/crates/tor-rtmock/src/lib.rs +++ b/crates/tor-rtmock/src/lib.rs @@ -47,6 +47,7 @@ mod util; pub mod io; pub mod net; +pub mod task; pub mod time; mod net_runtime; diff --git a/crates/tor-rtmock/src/task.rs b/crates/tor-rtmock/src/task.rs new file mode 100644 index 000000000..4702b8c07 --- /dev/null +++ b/crates/tor-rtmock/src/task.rs @@ -0,0 +1,581 @@ +//! Executor for running tests with mocked environment +//! +//! See [`MockExecutor`] + +use std::fmt::{self, Debug, Display}; +use std::future::Future; +use std::iter; +use std::pin::Pin; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::task::{Context, Poll, Wake, Waker}; + +use futures::pin_mut; +use futures::task::{FutureObj, Spawn, SpawnError}; +use futures::FutureExt as _; + +use educe::Educe; +use itertools::{chain, izip}; +use slotmap::DenseSlotMap; +use tracing::trace; + +use tor_rtcompat::BlockOn; + +use Poll::*; +use TaskState::*; + +/// Type-erased future, one for each of our (normal) tasks +type TaskFuture = FutureObj<'static, ()>; + +/// Future for the argument to `block_on`, which is handled specially +type MainFuture<'m> = Pin<&'m mut dyn Future>; + +//---------- principal data structures ---------- + +/// Executor for running tests with mocked environment +/// +/// For test cases which don't actually wait for anything in the real world. +/// +/// This is the executor. +/// It implements [`Spawn`] and [`BlockOn`] +/// +/// It will usually be used as part of a `MockRuntime`. +/// +/// # Restricted environment +/// +/// Tests run with this executor must not attempt to block +/// on anything "outside": +/// every future that anything awaits must (eventually) be woken directly +/// *by some other task* in the same test case. +/// +/// (By directly we mean that the [`Waker::wake`] call is made +/// by that waking future, before that future itself awaits anything.) +/// +/// # Panics +/// +/// This executor will malfunction or panic if reentered. +#[derive(Clone, Default, Educe)] +#[educe(Debug)] +pub struct MockExecutor { + /// Mutable state + #[educe(Debug(ignore))] + data: ArcMutexData, +} + +/// Mutable state, wrapper type mostly so we can provide `.lock()` +#[derive(Clone, Default)] +struct ArcMutexData(Arc>); + +/// Task id, module to hide `Ti` alias +mod task_id { + slotmap::new_key_type! { + /// Task ID, usually called `TaskId` + /// + /// Short name in special `task_id` module so that [`Debug`] is nice + pub(super) struct Ti; + } +} +use task_id::Ti as TaskId; + +/// Executor's state +/// +/// ### Task state machine +/// +/// A task is created in `tasks`, `Awake`, so also in `awake`. +/// +/// When we poll it, we take it out of `awake` and set it to `Asleep`, +/// and then call `poll()`. +/// Any time after that, it can be made `Awake` again (and put back onto `awake`) +/// by the waker ([`ActualWaker`], wrapped in [`Waker`]). +/// +/// The task's future is of course also present here in this data structure. +/// However, during poll we must release the lock, +/// so we cannot borrow the future from `Data`. +/// Instead, we move it out. So `Task.fut` is an `Option`. +/// +/// ### "Main" task - the argument to `block_on` +/// +/// The signature of `BlockOn::block_on` accepts a non-`'static` future +/// (and a non-`Send`/`Sync` one). +/// +/// So we cannot store that future in `Data` because `Data` is `'static`. +/// Instead, this main task future is passed as an argument down the call stack. +/// In the data structure we simply store a placeholder, `TaskFutureInfo::Main`. +#[derive(Default)] +struct Data { + /// Tasks + /// + /// Includes tasks spawned with `spawn`, + /// and also the future passed to `block_on`. + tasks: DenseSlotMap, + + /// `awake` lists precisely: tasks that are `Awake`, plus maybe stale `TaskId`s + awake: Vec, + + /// If a future from `progress_until_stalled` exists + progressing_until_stalled: Option, +} + +/// Record of a single task +/// +/// Tracks a spawned task, or the main task (the argument to `block_on`). +/// +/// Stored in [`Data`]`.tasks`. +struct Task { + /// For debugging output + desc: String, + /// Has this been woken via a waker? (And is it in `Data.awake`?) + state: TaskState, + /// The actual future (or a placeholder for it) + /// + /// May be `None` because we've temporarily moved it out so we can poll it + fut: Option, +} + +/// A future as stored in our record of a [`Task`] +enum TaskFutureInfo { + /// The [`Future`]. All is normal. + Normal(TaskFuture), + /// The future isn't here because this task is the main future for `block_on` + Main, +} +use TaskFutureInfo as TFI; + +/// State of a task - do we think it needs to be polled? +/// +/// Stored in [`Task`]`.state`. +#[derive(Debug)] +enum TaskState { + /// Awake - needs to be polled + /// + /// Established by [`waker.wake()`](Waker::wake) + Awake, + /// Asleep - does *not* need to be polled + /// + /// Established each time just before we call the future's [`poll`](Future::poll) + Asleep, +} + +/// Actual implementor of `Wake` for use in a `Waker` +/// +/// Futures (eg, channels from [`futures`]) will use this to wake a task +/// when it should be polled. +struct ActualWaker { + /// Executor state + data: ArcMutexData, + + /// Which task this is + id: TaskId, +} + +/// State used for an in-progress call to +/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`] +/// +/// If present in [`Data`], an (async) call to `progress_until_stalled` +/// is in progress. +/// +/// The future from `progress_until_stalled`, [`ProgressUntilStalledFuture`] +/// is a normal-ish future. +/// It can be polled in the normal way. +/// When it is polled, it looks here, in `finished`, to see if it's `Ready`. +/// +/// The future is made ready, and woken (via `waker`), +/// by bespoke code in the task executor loop. +/// +/// When `ProgressUntilStalledFuture` (maybe completes and) is dropped, +/// its `Drop` impl is used to remove this from `Data.progressing_until_stalled`. +#[derive(Debug)] +struct ProgressingUntilStalled { + /// Have we, in fact, stalled? + /// + /// Made `Ready` by special code in the executor loop + finished: Poll<()>, + + /// Waker + /// + /// Signalled by special code in the executor loop + waker: Option, +} + +/// Future from +/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`] +/// +/// See [`ProgressingUntilStalled`] for an overview of this aspect of the contraption. +/// +/// Existence of this struct implies `Data.progressing_until_stalled` is `Some`. +/// There can only be one at a time. +#[derive(Educe)] +#[educe(Debug)] +struct ProgressUntilStalledFuture { + /// Executor's state; this future's state is in `.progressing_until_stalled` + #[educe(Debug(ignore))] + data: ArcMutexData, +} + +//---------- creation ---------- + +impl MockExecutor { + /// Make a `MockExecutor` with default parameters + pub fn new() -> Self { + Self::default() + } +} + +//---------- spawning ---------- + +impl MockExecutor { + /// Spawn a task and return something to identify it + /// + /// `desc` should `Display` as some kind of short string (ideally without spaces) + /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`. + /// + /// The returned value is an opaque task identifier which is very cheap to clone + /// and which can be used by the caller in debug logging, + /// if it's desired to correlate with the debug output from `MockExecutor`. + /// Most callers will want to ignore it. + /// + /// This method is infalliable. (The `MockExecutor` cannot be shut down.) + pub fn spawn_identified( + &self, + desc: impl Display, + fut: impl Future + Send + Sync + 'static, + ) -> impl Debug + Clone + Send + Sync + 'static { + self.spawn_internal(desc.to_string(), FutureObj::from(Box::new(fut))) + } + + /// Spawn a task and return its `TaskId` + /// + /// Convenience method for use by `spawn_identified` and `spawn_obj`. + /// The future passed to `block_on` is not handled here. + fn spawn_internal(&self, desc: String, fut: TaskFuture) -> TaskId { + let mut data = self.data.lock(); + data.insert_task(desc, TFI::Normal(fut)) + } +} + +impl Data { + /// Insert a task given its `TaskFutureInfo` and return its `TaskId`. + fn insert_task(&mut self, desc: String, fut: TaskFutureInfo) -> TaskId { + let state = Awake; + let id = self.tasks.insert(Task { + state, + desc, + fut: Some(fut), + }); + self.awake.push(id); + trace!("MockExecutor spawned {:?}={:?}", id, self.tasks[id]); + id + } +} + +impl Spawn for MockExecutor { + fn spawn_obj(&self, future: TaskFuture) -> Result<(), SpawnError> { + self.spawn_internal("".into(), future); + Ok(()) + } +} + +//---------- block_on ---------- + +impl BlockOn for MockExecutor { + /// Run `fut` to completion, synchronously + /// + /// # Panics + /// + /// Might malfunction or panic if: + /// + /// * The provided future doesn't complete (without externally blocking), + /// but instead waits for something. + /// + /// * The `MockExecutor` is reentered. (Eg, `block_on` is reentered.) + fn block_on(&self, fut: F) -> F::Output + where + F: Future, + { + let mut value: Option = None; + let fut = { + let value = &mut value; + async move { + trace!("MockExecutor block_on future..."); + let t = fut.await; + trace!("MockExecutor block_on future returned..."); + *value = Some(t); + trace!("MockExecutor block_on future exiting."); + } + }; + + { + pin_mut!(fut); + self.data.lock().insert_task("main".into(), TFI::Main); + self.execute_to_completion(fut); + } + + #[allow(clippy::let_and_return)] // clarity + let value = value.take().unwrap_or_else(|| { + let data = self.data.lock(); + panic!( + r" +all futures blocked. waiting for the real world? or deadlocked (waiting for each other) ? + +{data:#?} +" + ); + }); + + value + } +} + +//---------- execution - core implementation ---------- + +impl MockExecutor { + /// Keep polling tasks until nothing more can be done + /// + /// Ie, stop when `awake` is empty and `progressing_until_stalled` is `None`. + fn execute_to_completion(&self, mut main_fut: MainFuture) { + trace!("MockExecutor execute_to_completion..."); + loop { + self.execute_until_first_stall(main_fut.as_mut()); + + // Handle `progressing_until_stalled` + let pus_waker = { + let mut data = self.data.lock(); + let pus = &mut data.progressing_until_stalled; + trace!("MockExecutor execute_to_completion PUS={:?}", &pus); + let Some(pus) = pus else { + // No progressing_until_stalled, we're actually done. + break; + }; + assert_eq!( + pus.finished, Pending, + "ProgressingUntilStalled finished twice?!" + ); + pus.finished = Ready(()); + pus.waker + .clone() + .expect("ProgressUntilStalledFuture not ever polled!") + }; + pus_waker.wake(); + } + trace!("MockExecutor execute_to_completion done"); + } + + /// Keep polling tasks until `awake` is empty + /// + /// (Ignores `progressing_until_stalled` - so if one is active, + /// will return when all other tasks have blocked.) + /// + /// # Panics + /// + /// Might malfunction or panic if called reentrantly + fn execute_until_first_stall(&self, mut main_fut: MainFuture) { + trace!("MockExecutor execute_until_first_stall ..."); + 'outer: loop { + // Take a `Awake` task off `awake` and make it `Polling` + let (id, mut fut) = 'inner: loop { + let mut data = self.data.lock(); + let Some(id) = data.awake.pop() else { break 'outer }; + let Some(task) = data.tasks.get_mut(id) else { + trace!("MockExecutor {id:?} vanished"); + continue; + }; + task.state = Asleep; + let fut = task.fut.take().expect("future missing from task!"); + break 'inner (id, fut); + }; + + // Poll the selected task + let waker = Waker::from(Arc::new(ActualWaker { + data: self.data.clone(), + id, + })); + trace!("MockExecutor {id:?} polling..."); + let mut cx = Context::from_waker(&waker); + let r = match &mut fut { + TFI::Normal(fut) => fut.poll_unpin(&mut cx), + TFI::Main => main_fut.as_mut().poll(&mut cx), + }; + + // Deal with the returned `Poll` + { + let mut data = self.data.lock(); + let task = data + .tasks + .get_mut(id) + .expect("task vanished while we were polling it"); + + match r { + Pending => { + trace!("MockExecutor {id:?} -> Pending"); + if task.fut.is_some() { + panic!("task reinserted while we polled it?!"); + } + // The task might have been woking *by its own poll method*. + // That's why we set it to `Asleep` *earlier* rather than here. + // All we need to do is put the future back. + task.fut = Some(fut); + } + Ready(()) => { + trace!("MockExecutor {id:?} -> Ready"); + // Oh, it finished! + // It might be in `awake`, but that's allowed to contain stale tasks, + // so we *don't* need to scan that list and remove it. + data.tasks.remove(id); + } + } + } + } + trace!("MockExecutor execute_until_first_stall done."); + } +} + +impl Wake for ActualWaker { + fn wake(self: Arc) { + let mut data = self.data.lock(); + trace!("MockExecutor {:?} wake", &self.id); + let Some(task) = data.tasks.get_mut(self.id) else { return }; + match task.state { + Awake => {} + Asleep => { + task.state = Awake; + data.awake.push(self.id); + } + } + } +} + +//---------- "progress until stalled" functionality ---------- + +impl MockExecutor { + /// Run tasks in the current executor until every task is waiting + /// + /// # Panics + /// + /// Might malfunction or panic if more than one such call is running at once. + /// + /// (Ie, you must `.await` or drop the returned `Future` + /// before calling this method again.) + pub fn progress_until_stalled(&self) -> impl Future { + let mut data = self.data.lock(); + assert!( + data.progressing_until_stalled.is_none(), + "progress_until_stalled called more than once" + ); + trace!("MockExecutor progress_until_stalled..."); + data.progressing_until_stalled = Some(ProgressingUntilStalled { + finished: Pending, + waker: None, + }); + ProgressUntilStalledFuture { + data: self.data.clone(), + } + } +} + +impl Future for ProgressUntilStalledFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + let mut data = self.data.lock(); + let pus = data.progressing_until_stalled.as_mut(); + trace!("MockExecutor progress_until_stalled polling... {:?}", &pus); + let pus = pus.expect("ProgressingUntilStalled missing"); + pus.waker = Some(cx.waker().clone()); + pus.finished + } +} + +impl Drop for ProgressUntilStalledFuture { + fn drop(&mut self) { + self.data.lock().progressing_until_stalled = None; + } +} + +//---------- ancillary and convenience functions ---------- + +/// Trait to let us assert at compile time that something is nicely `Sync` etc. +trait EnsureSyncSend: Sync + Send + 'static {} +impl EnsureSyncSend for ActualWaker {} +impl EnsureSyncSend for MockExecutor {} + +impl ArcMutexData { + /// Lock and obtain the guard + /// + /// Convenience method which panics on poison + fn lock(&self) -> MutexGuard { + self.0.lock().expect("data lock poisoned") + } +} + +//---------- bespoke Debug impls ---------- + +// See `impl Debug for Data` for notes on the output +impl Debug for Task { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let Task { desc, state, fut } = self; + write!(f, "{:?}", desc)?; + write!(f, "=")?; + match fut { + None => write!(f, "P")?, + Some(TFI::Normal(_)) => write!(f, "f")?, + Some(TFI::Main) => write!(f, "m")?, + } + match state { + Awake => write!(f, "W")?, + Asleep => write!(f, "s")?, + }; + Ok(()) + } +} + +/// Helper: `Debug`s as a list of tasks, given the `Data` for lookups and a list of the ids +struct DebugTasks<'d, F>(&'d Data, F); + +// See `impl Debug for Data` for notes on the output +impl Debug for DebugTasks<'_, F> +where + F: Fn() -> I, + I: Iterator, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let DebugTasks(data, ids) = self; + for (id, delim) in izip!(ids(), chain!(iter::once(""), iter::repeat(" ")),) { + write!(f, "{delim}{id:?}")?; + match data.tasks.get(id) { + None => write!(f, "-")?, + Some(task) => write!(f, "={task:?}")?, + } + } + Ok(()) + } +} + +/// `Task`s in `Data` are printed as `Ti(ID)"SPEC"=FLAGS"`. +/// +/// `FLAGS` are: +/// +/// * `P`: this task is being polled (its `TaskFutureInfo` is absent) +/// * `f`: this is a normal task with a future and its future is present in `Data` +/// * `m`: this is the main task from `block_on` +/// +/// * `W`: the task is awake +/// * `s`: the task is asleep +// +// We do it this way because the naive dump from derive is very expansive +// and makes it impossible to see the wood for the trees. +// This very compact representation it easier to find a task of interest in the output. +// +// This is implemented in `impl Debug for Task`. +impl Debug for Data { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let Data { + tasks, + awake, + progressing_until_stalled: pus, + } = self; + let mut s = f.debug_struct("Data"); + s.field("tasks", &DebugTasks(self, || tasks.keys())); + s.field("awake", &DebugTasks(self, || awake.iter().cloned())); + s.field("p.u.s", pus); + s.finish() + } +}