tor-rtmock: Provide MockExecutor
This commit is contained in:
parent
70fdd92030
commit
d82ed8d793
|
@ -4820,11 +4820,14 @@ name = "tor-rtmock"
|
||||||
version = "0.8.2"
|
version = "0.8.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"educe",
|
||||||
"futures",
|
"futures",
|
||||||
"futures-await-test",
|
"futures-await-test",
|
||||||
"humantime 2.1.0",
|
"humantime 2.1.0",
|
||||||
|
"itertools 0.11.0",
|
||||||
"pin-project",
|
"pin-project",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
|
"slotmap",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tor-basic-utils",
|
"tor-basic-utils",
|
||||||
"tor-rtcompat",
|
"tor-rtcompat",
|
||||||
|
|
|
@ -13,9 +13,12 @@ repository = "https://gitlab.torproject.org/tpo/core/arti.git/"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1.54"
|
async-trait = "0.1.54"
|
||||||
|
educe = "0.4.6"
|
||||||
futures = "0.3.14"
|
futures = "0.3.14"
|
||||||
humantime = "2"
|
humantime = "2"
|
||||||
|
itertools = "0.11.0"
|
||||||
pin-project = "1"
|
pin-project = "1"
|
||||||
|
slotmap = "1.0.6"
|
||||||
thiserror = "1"
|
thiserror = "1"
|
||||||
tor-rtcompat = { version = "0.9.1", path = "../tor-rtcompat" }
|
tor-rtcompat = { version = "0.9.1", path = "../tor-rtcompat" }
|
||||||
tracing = "0.1.36"
|
tracing = "0.1.36"
|
||||||
|
|
|
@ -47,6 +47,7 @@ mod util;
|
||||||
|
|
||||||
pub mod io;
|
pub mod io;
|
||||||
pub mod net;
|
pub mod net;
|
||||||
|
pub mod task;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
|
|
||||||
mod net_runtime;
|
mod net_runtime;
|
||||||
|
|
|
@ -0,0 +1,581 @@
|
||||||
|
//! Executor for running tests with mocked environment
|
||||||
|
//!
|
||||||
|
//! See [`MockExecutor`]
|
||||||
|
|
||||||
|
use std::fmt::{self, Debug, Display};
|
||||||
|
use std::future::Future;
|
||||||
|
use std::iter;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::{Arc, Mutex, MutexGuard};
|
||||||
|
use std::task::{Context, Poll, Wake, Waker};
|
||||||
|
|
||||||
|
use futures::pin_mut;
|
||||||
|
use futures::task::{FutureObj, Spawn, SpawnError};
|
||||||
|
use futures::FutureExt as _;
|
||||||
|
|
||||||
|
use educe::Educe;
|
||||||
|
use itertools::{chain, izip};
|
||||||
|
use slotmap::DenseSlotMap;
|
||||||
|
use tracing::trace;
|
||||||
|
|
||||||
|
use tor_rtcompat::BlockOn;
|
||||||
|
|
||||||
|
use Poll::*;
|
||||||
|
use TaskState::*;
|
||||||
|
|
||||||
|
/// Type-erased future, one for each of our (normal) tasks
|
||||||
|
type TaskFuture = FutureObj<'static, ()>;
|
||||||
|
|
||||||
|
/// Future for the argument to `block_on`, which is handled specially
|
||||||
|
type MainFuture<'m> = Pin<&'m mut dyn Future<Output = ()>>;
|
||||||
|
|
||||||
|
//---------- principal data structures ----------
|
||||||
|
|
||||||
|
/// Executor for running tests with mocked environment
|
||||||
|
///
|
||||||
|
/// For test cases which don't actually wait for anything in the real world.
|
||||||
|
///
|
||||||
|
/// This is the executor.
|
||||||
|
/// It implements [`Spawn`] and [`BlockOn`]
|
||||||
|
///
|
||||||
|
/// It will usually be used as part of a `MockRuntime`.
|
||||||
|
///
|
||||||
|
/// # Restricted environment
|
||||||
|
///
|
||||||
|
/// Tests run with this executor must not attempt to block
|
||||||
|
/// on anything "outside":
|
||||||
|
/// every future that anything awaits must (eventually) be woken directly
|
||||||
|
/// *by some other task* in the same test case.
|
||||||
|
///
|
||||||
|
/// (By directly we mean that the [`Waker::wake`] call is made
|
||||||
|
/// by that waking future, before that future itself awaits anything.)
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// This executor will malfunction or panic if reentered.
|
||||||
|
#[derive(Clone, Default, Educe)]
|
||||||
|
#[educe(Debug)]
|
||||||
|
pub struct MockExecutor {
|
||||||
|
/// Mutable state
|
||||||
|
#[educe(Debug(ignore))]
|
||||||
|
data: ArcMutexData,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mutable state, wrapper type mostly so we can provide `.lock()`
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
struct ArcMutexData(Arc<Mutex<Data>>);
|
||||||
|
|
||||||
|
/// Task id, module to hide `Ti` alias
|
||||||
|
mod task_id {
|
||||||
|
slotmap::new_key_type! {
|
||||||
|
/// Task ID, usually called `TaskId`
|
||||||
|
///
|
||||||
|
/// Short name in special `task_id` module so that [`Debug`] is nice
|
||||||
|
pub(super) struct Ti;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
use task_id::Ti as TaskId;
|
||||||
|
|
||||||
|
/// Executor's state
|
||||||
|
///
|
||||||
|
/// ### Task state machine
|
||||||
|
///
|
||||||
|
/// A task is created in `tasks`, `Awake`, so also in `awake`.
|
||||||
|
///
|
||||||
|
/// When we poll it, we take it out of `awake` and set it to `Asleep`,
|
||||||
|
/// and then call `poll()`.
|
||||||
|
/// Any time after that, it can be made `Awake` again (and put back onto `awake`)
|
||||||
|
/// by the waker ([`ActualWaker`], wrapped in [`Waker`]).
|
||||||
|
///
|
||||||
|
/// The task's future is of course also present here in this data structure.
|
||||||
|
/// However, during poll we must release the lock,
|
||||||
|
/// so we cannot borrow the future from `Data`.
|
||||||
|
/// Instead, we move it out. So `Task.fut` is an `Option`.
|
||||||
|
///
|
||||||
|
/// ### "Main" task - the argument to `block_on`
|
||||||
|
///
|
||||||
|
/// The signature of `BlockOn::block_on` accepts a non-`'static` future
|
||||||
|
/// (and a non-`Send`/`Sync` one).
|
||||||
|
///
|
||||||
|
/// So we cannot store that future in `Data` because `Data` is `'static`.
|
||||||
|
/// Instead, this main task future is passed as an argument down the call stack.
|
||||||
|
/// In the data structure we simply store a placeholder, `TaskFutureInfo::Main`.
|
||||||
|
#[derive(Default)]
|
||||||
|
struct Data {
|
||||||
|
/// Tasks
|
||||||
|
///
|
||||||
|
/// Includes tasks spawned with `spawn`,
|
||||||
|
/// and also the future passed to `block_on`.
|
||||||
|
tasks: DenseSlotMap<TaskId, Task>,
|
||||||
|
|
||||||
|
/// `awake` lists precisely: tasks that are `Awake`, plus maybe stale `TaskId`s
|
||||||
|
awake: Vec<TaskId>,
|
||||||
|
|
||||||
|
/// If a future from `progress_until_stalled` exists
|
||||||
|
progressing_until_stalled: Option<ProgressingUntilStalled>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Record of a single task
|
||||||
|
///
|
||||||
|
/// Tracks a spawned task, or the main task (the argument to `block_on`).
|
||||||
|
///
|
||||||
|
/// Stored in [`Data`]`.tasks`.
|
||||||
|
struct Task {
|
||||||
|
/// For debugging output
|
||||||
|
desc: String,
|
||||||
|
/// Has this been woken via a waker? (And is it in `Data.awake`?)
|
||||||
|
state: TaskState,
|
||||||
|
/// The actual future (or a placeholder for it)
|
||||||
|
///
|
||||||
|
/// May be `None` because we've temporarily moved it out so we can poll it
|
||||||
|
fut: Option<TaskFutureInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A future as stored in our record of a [`Task`]
|
||||||
|
enum TaskFutureInfo {
|
||||||
|
/// The [`Future`]. All is normal.
|
||||||
|
Normal(TaskFuture),
|
||||||
|
/// The future isn't here because this task is the main future for `block_on`
|
||||||
|
Main,
|
||||||
|
}
|
||||||
|
use TaskFutureInfo as TFI;
|
||||||
|
|
||||||
|
/// State of a task - do we think it needs to be polled?
|
||||||
|
///
|
||||||
|
/// Stored in [`Task`]`.state`.
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum TaskState {
|
||||||
|
/// Awake - needs to be polled
|
||||||
|
///
|
||||||
|
/// Established by [`waker.wake()`](Waker::wake)
|
||||||
|
Awake,
|
||||||
|
/// Asleep - does *not* need to be polled
|
||||||
|
///
|
||||||
|
/// Established each time just before we call the future's [`poll`](Future::poll)
|
||||||
|
Asleep,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Actual implementor of `Wake` for use in a `Waker`
|
||||||
|
///
|
||||||
|
/// Futures (eg, channels from [`futures`]) will use this to wake a task
|
||||||
|
/// when it should be polled.
|
||||||
|
struct ActualWaker {
|
||||||
|
/// Executor state
|
||||||
|
data: ArcMutexData,
|
||||||
|
|
||||||
|
/// Which task this is
|
||||||
|
id: TaskId,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// State used for an in-progress call to
|
||||||
|
/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
|
||||||
|
///
|
||||||
|
/// If present in [`Data`], an (async) call to `progress_until_stalled`
|
||||||
|
/// is in progress.
|
||||||
|
///
|
||||||
|
/// The future from `progress_until_stalled`, [`ProgressUntilStalledFuture`]
|
||||||
|
/// is a normal-ish future.
|
||||||
|
/// It can be polled in the normal way.
|
||||||
|
/// When it is polled, it looks here, in `finished`, to see if it's `Ready`.
|
||||||
|
///
|
||||||
|
/// The future is made ready, and woken (via `waker`),
|
||||||
|
/// by bespoke code in the task executor loop.
|
||||||
|
///
|
||||||
|
/// When `ProgressUntilStalledFuture` (maybe completes and) is dropped,
|
||||||
|
/// its `Drop` impl is used to remove this from `Data.progressing_until_stalled`.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct ProgressingUntilStalled {
|
||||||
|
/// Have we, in fact, stalled?
|
||||||
|
///
|
||||||
|
/// Made `Ready` by special code in the executor loop
|
||||||
|
finished: Poll<()>,
|
||||||
|
|
||||||
|
/// Waker
|
||||||
|
///
|
||||||
|
/// Signalled by special code in the executor loop
|
||||||
|
waker: Option<Waker>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future from
|
||||||
|
/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
|
||||||
|
///
|
||||||
|
/// See [`ProgressingUntilStalled`] for an overview of this aspect of the contraption.
|
||||||
|
///
|
||||||
|
/// Existence of this struct implies `Data.progressing_until_stalled` is `Some`.
|
||||||
|
/// There can only be one at a time.
|
||||||
|
#[derive(Educe)]
|
||||||
|
#[educe(Debug)]
|
||||||
|
struct ProgressUntilStalledFuture {
|
||||||
|
/// Executor's state; this future's state is in `.progressing_until_stalled`
|
||||||
|
#[educe(Debug(ignore))]
|
||||||
|
data: ArcMutexData,
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------- creation ----------
|
||||||
|
|
||||||
|
impl MockExecutor {
|
||||||
|
/// Make a `MockExecutor` with default parameters
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------- spawning ----------
|
||||||
|
|
||||||
|
impl MockExecutor {
|
||||||
|
/// Spawn a task and return something to identify it
|
||||||
|
///
|
||||||
|
/// `desc` should `Display` as some kind of short string (ideally without spaces)
|
||||||
|
/// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
|
||||||
|
///
|
||||||
|
/// The returned value is an opaque task identifier which is very cheap to clone
|
||||||
|
/// and which can be used by the caller in debug logging,
|
||||||
|
/// if it's desired to correlate with the debug output from `MockExecutor`.
|
||||||
|
/// Most callers will want to ignore it.
|
||||||
|
///
|
||||||
|
/// This method is infalliable. (The `MockExecutor` cannot be shut down.)
|
||||||
|
pub fn spawn_identified(
|
||||||
|
&self,
|
||||||
|
desc: impl Display,
|
||||||
|
fut: impl Future<Output = ()> + Send + Sync + 'static,
|
||||||
|
) -> impl Debug + Clone + Send + Sync + 'static {
|
||||||
|
self.spawn_internal(desc.to_string(), FutureObj::from(Box::new(fut)))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a task and return its `TaskId`
|
||||||
|
///
|
||||||
|
/// Convenience method for use by `spawn_identified` and `spawn_obj`.
|
||||||
|
/// The future passed to `block_on` is not handled here.
|
||||||
|
fn spawn_internal(&self, desc: String, fut: TaskFuture) -> TaskId {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
data.insert_task(desc, TFI::Normal(fut))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Data {
|
||||||
|
/// Insert a task given its `TaskFutureInfo` and return its `TaskId`.
|
||||||
|
fn insert_task(&mut self, desc: String, fut: TaskFutureInfo) -> TaskId {
|
||||||
|
let state = Awake;
|
||||||
|
let id = self.tasks.insert(Task {
|
||||||
|
state,
|
||||||
|
desc,
|
||||||
|
fut: Some(fut),
|
||||||
|
});
|
||||||
|
self.awake.push(id);
|
||||||
|
trace!("MockExecutor spawned {:?}={:?}", id, self.tasks[id]);
|
||||||
|
id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Spawn for MockExecutor {
|
||||||
|
fn spawn_obj(&self, future: TaskFuture) -> Result<(), SpawnError> {
|
||||||
|
self.spawn_internal("".into(), future);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------- block_on ----------
|
||||||
|
|
||||||
|
impl BlockOn for MockExecutor {
|
||||||
|
/// Run `fut` to completion, synchronously
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Might malfunction or panic if:
|
||||||
|
///
|
||||||
|
/// * The provided future doesn't complete (without externally blocking),
|
||||||
|
/// but instead waits for something.
|
||||||
|
///
|
||||||
|
/// * The `MockExecutor` is reentered. (Eg, `block_on` is reentered.)
|
||||||
|
fn block_on<F>(&self, fut: F) -> F::Output
|
||||||
|
where
|
||||||
|
F: Future,
|
||||||
|
{
|
||||||
|
let mut value: Option<F::Output> = None;
|
||||||
|
let fut = {
|
||||||
|
let value = &mut value;
|
||||||
|
async move {
|
||||||
|
trace!("MockExecutor block_on future...");
|
||||||
|
let t = fut.await;
|
||||||
|
trace!("MockExecutor block_on future returned...");
|
||||||
|
*value = Some(t);
|
||||||
|
trace!("MockExecutor block_on future exiting.");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
pin_mut!(fut);
|
||||||
|
self.data.lock().insert_task("main".into(), TFI::Main);
|
||||||
|
self.execute_to_completion(fut);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::let_and_return)] // clarity
|
||||||
|
let value = value.take().unwrap_or_else(|| {
|
||||||
|
let data = self.data.lock();
|
||||||
|
panic!(
|
||||||
|
r"
|
||||||
|
all futures blocked. waiting for the real world? or deadlocked (waiting for each other) ?
|
||||||
|
|
||||||
|
{data:#?}
|
||||||
|
"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------- execution - core implementation ----------
|
||||||
|
|
||||||
|
impl MockExecutor {
|
||||||
|
/// Keep polling tasks until nothing more can be done
|
||||||
|
///
|
||||||
|
/// Ie, stop when `awake` is empty and `progressing_until_stalled` is `None`.
|
||||||
|
fn execute_to_completion(&self, mut main_fut: MainFuture) {
|
||||||
|
trace!("MockExecutor execute_to_completion...");
|
||||||
|
loop {
|
||||||
|
self.execute_until_first_stall(main_fut.as_mut());
|
||||||
|
|
||||||
|
// Handle `progressing_until_stalled`
|
||||||
|
let pus_waker = {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
let pus = &mut data.progressing_until_stalled;
|
||||||
|
trace!("MockExecutor execute_to_completion PUS={:?}", &pus);
|
||||||
|
let Some(pus) = pus else {
|
||||||
|
// No progressing_until_stalled, we're actually done.
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
assert_eq!(
|
||||||
|
pus.finished, Pending,
|
||||||
|
"ProgressingUntilStalled finished twice?!"
|
||||||
|
);
|
||||||
|
pus.finished = Ready(());
|
||||||
|
pus.waker
|
||||||
|
.clone()
|
||||||
|
.expect("ProgressUntilStalledFuture not ever polled!")
|
||||||
|
};
|
||||||
|
pus_waker.wake();
|
||||||
|
}
|
||||||
|
trace!("MockExecutor execute_to_completion done");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Keep polling tasks until `awake` is empty
|
||||||
|
///
|
||||||
|
/// (Ignores `progressing_until_stalled` - so if one is active,
|
||||||
|
/// will return when all other tasks have blocked.)
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Might malfunction or panic if called reentrantly
|
||||||
|
fn execute_until_first_stall(&self, mut main_fut: MainFuture) {
|
||||||
|
trace!("MockExecutor execute_until_first_stall ...");
|
||||||
|
'outer: loop {
|
||||||
|
// Take a `Awake` task off `awake` and make it `Polling`
|
||||||
|
let (id, mut fut) = 'inner: loop {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
let Some(id) = data.awake.pop() else { break 'outer };
|
||||||
|
let Some(task) = data.tasks.get_mut(id) else {
|
||||||
|
trace!("MockExecutor {id:?} vanished");
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
task.state = Asleep;
|
||||||
|
let fut = task.fut.take().expect("future missing from task!");
|
||||||
|
break 'inner (id, fut);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Poll the selected task
|
||||||
|
let waker = Waker::from(Arc::new(ActualWaker {
|
||||||
|
data: self.data.clone(),
|
||||||
|
id,
|
||||||
|
}));
|
||||||
|
trace!("MockExecutor {id:?} polling...");
|
||||||
|
let mut cx = Context::from_waker(&waker);
|
||||||
|
let r = match &mut fut {
|
||||||
|
TFI::Normal(fut) => fut.poll_unpin(&mut cx),
|
||||||
|
TFI::Main => main_fut.as_mut().poll(&mut cx),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Deal with the returned `Poll`
|
||||||
|
{
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
let task = data
|
||||||
|
.tasks
|
||||||
|
.get_mut(id)
|
||||||
|
.expect("task vanished while we were polling it");
|
||||||
|
|
||||||
|
match r {
|
||||||
|
Pending => {
|
||||||
|
trace!("MockExecutor {id:?} -> Pending");
|
||||||
|
if task.fut.is_some() {
|
||||||
|
panic!("task reinserted while we polled it?!");
|
||||||
|
}
|
||||||
|
// The task might have been woking *by its own poll method*.
|
||||||
|
// That's why we set it to `Asleep` *earlier* rather than here.
|
||||||
|
// All we need to do is put the future back.
|
||||||
|
task.fut = Some(fut);
|
||||||
|
}
|
||||||
|
Ready(()) => {
|
||||||
|
trace!("MockExecutor {id:?} -> Ready");
|
||||||
|
// Oh, it finished!
|
||||||
|
// It might be in `awake`, but that's allowed to contain stale tasks,
|
||||||
|
// so we *don't* need to scan that list and remove it.
|
||||||
|
data.tasks.remove(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
trace!("MockExecutor execute_until_first_stall done.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Wake for ActualWaker {
|
||||||
|
fn wake(self: Arc<Self>) {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
trace!("MockExecutor {:?} wake", &self.id);
|
||||||
|
let Some(task) = data.tasks.get_mut(self.id) else { return };
|
||||||
|
match task.state {
|
||||||
|
Awake => {}
|
||||||
|
Asleep => {
|
||||||
|
task.state = Awake;
|
||||||
|
data.awake.push(self.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------- "progress until stalled" functionality ----------
|
||||||
|
|
||||||
|
impl MockExecutor {
|
||||||
|
/// Run tasks in the current executor until every task is waiting
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Might malfunction or panic if more than one such call is running at once.
|
||||||
|
///
|
||||||
|
/// (Ie, you must `.await` or drop the returned `Future`
|
||||||
|
/// before calling this method again.)
|
||||||
|
pub fn progress_until_stalled(&self) -> impl Future<Output = ()> {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
assert!(
|
||||||
|
data.progressing_until_stalled.is_none(),
|
||||||
|
"progress_until_stalled called more than once"
|
||||||
|
);
|
||||||
|
trace!("MockExecutor progress_until_stalled...");
|
||||||
|
data.progressing_until_stalled = Some(ProgressingUntilStalled {
|
||||||
|
finished: Pending,
|
||||||
|
waker: None,
|
||||||
|
});
|
||||||
|
ProgressUntilStalledFuture {
|
||||||
|
data: self.data.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for ProgressUntilStalledFuture {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
|
||||||
|
let mut data = self.data.lock();
|
||||||
|
let pus = data.progressing_until_stalled.as_mut();
|
||||||
|
trace!("MockExecutor progress_until_stalled polling... {:?}", &pus);
|
||||||
|
let pus = pus.expect("ProgressingUntilStalled missing");
|
||||||
|
pus.waker = Some(cx.waker().clone());
|
||||||
|
pus.finished
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for ProgressUntilStalledFuture {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.data.lock().progressing_until_stalled = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------- ancillary and convenience functions ----------
|
||||||
|
|
||||||
|
/// Trait to let us assert at compile time that something is nicely `Sync` etc.
|
||||||
|
trait EnsureSyncSend: Sync + Send + 'static {}
|
||||||
|
impl EnsureSyncSend for ActualWaker {}
|
||||||
|
impl EnsureSyncSend for MockExecutor {}
|
||||||
|
|
||||||
|
impl ArcMutexData {
|
||||||
|
/// Lock and obtain the guard
|
||||||
|
///
|
||||||
|
/// Convenience method which panics on poison
|
||||||
|
fn lock(&self) -> MutexGuard<Data> {
|
||||||
|
self.0.lock().expect("data lock poisoned")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//---------- bespoke Debug impls ----------
|
||||||
|
|
||||||
|
// See `impl Debug for Data` for notes on the output
|
||||||
|
impl Debug for Task {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
let Task { desc, state, fut } = self;
|
||||||
|
write!(f, "{:?}", desc)?;
|
||||||
|
write!(f, "=")?;
|
||||||
|
match fut {
|
||||||
|
None => write!(f, "P")?,
|
||||||
|
Some(TFI::Normal(_)) => write!(f, "f")?,
|
||||||
|
Some(TFI::Main) => write!(f, "m")?,
|
||||||
|
}
|
||||||
|
match state {
|
||||||
|
Awake => write!(f, "W")?,
|
||||||
|
Asleep => write!(f, "s")?,
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper: `Debug`s as a list of tasks, given the `Data` for lookups and a list of the ids
|
||||||
|
struct DebugTasks<'d, F>(&'d Data, F);
|
||||||
|
|
||||||
|
// See `impl Debug for Data` for notes on the output
|
||||||
|
impl<F, I> Debug for DebugTasks<'_, F>
|
||||||
|
where
|
||||||
|
F: Fn() -> I,
|
||||||
|
I: Iterator<Item = TaskId>,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
let DebugTasks(data, ids) = self;
|
||||||
|
for (id, delim) in izip!(ids(), chain!(iter::once(""), iter::repeat(" ")),) {
|
||||||
|
write!(f, "{delim}{id:?}")?;
|
||||||
|
match data.tasks.get(id) {
|
||||||
|
None => write!(f, "-")?,
|
||||||
|
Some(task) => write!(f, "={task:?}")?,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `Task`s in `Data` are printed as `Ti(ID)"SPEC"=FLAGS"`.
|
||||||
|
///
|
||||||
|
/// `FLAGS` are:
|
||||||
|
///
|
||||||
|
/// * `P`: this task is being polled (its `TaskFutureInfo` is absent)
|
||||||
|
/// * `f`: this is a normal task with a future and its future is present in `Data`
|
||||||
|
/// * `m`: this is the main task from `block_on`
|
||||||
|
///
|
||||||
|
/// * `W`: the task is awake
|
||||||
|
/// * `s`: the task is asleep
|
||||||
|
//
|
||||||
|
// We do it this way because the naive dump from derive is very expansive
|
||||||
|
// and makes it impossible to see the wood for the trees.
|
||||||
|
// This very compact representation it easier to find a task of interest in the output.
|
||||||
|
//
|
||||||
|
// This is implemented in `impl Debug for Task`.
|
||||||
|
impl Debug for Data {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
let Data {
|
||||||
|
tasks,
|
||||||
|
awake,
|
||||||
|
progressing_until_stalled: pus,
|
||||||
|
} = self;
|
||||||
|
let mut s = f.debug_struct("Data");
|
||||||
|
s.field("tasks", &DebugTasks(self, || tasks.keys()));
|
||||||
|
s.field("awake", &DebugTasks(self, || awake.iter().cloned()));
|
||||||
|
s.field("p.u.s", pus);
|
||||||
|
s.finish()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue