tor_rtcompat: Documentation and cleanups

This commit is contained in:
Nick Mathewson 2021-04-17 14:01:01 -04:00
parent a774cdde80
commit 342a98dccd
5 changed files with 240 additions and 71 deletions

View File

@ -142,6 +142,7 @@ use std::time::Duration;
use crate::traits::*;
/// Create and return a new `async_std` runtime.
pub fn create_runtime() -> async_executors::AsyncStd {
async_executors::AsyncStd::new()
}
@ -155,9 +156,9 @@ impl SleepProvider for async_executors::AsyncStd {
#[async_trait]
impl TcpListener for net::TcpListener {
type Stream = net::TcpStream;
type TcpStream = net::TcpStream;
type Incoming = net::IncomingStreams;
async fn accept(&self) -> IoResult<(Self::Stream, SocketAddr)> {
async fn accept(&self) -> IoResult<(Self::TcpStream, SocketAddr)> {
net::TcpListener::accept(self).await
}
fn incoming(self) -> net::IncomingStreams {

View File

@ -95,42 +95,17 @@ mod net {
}
/// Try to accept a socket on this listener.
pub async fn accept(&self) -> IoResult<(TcpStream, SocketAddr)> {
pub(crate) async fn accept(&self) -> IoResult<(TcpStream, SocketAddr)> {
let (stream, addr) = self.lis.accept().await?;
Ok((stream.into(), addr))
}
/// Return a stream that yields incoming
pub fn incoming(&self) -> Incoming<'_> {
Incoming { lis: &self.lis }
}
pub(crate) fn incoming2(self) -> IncomingTcpStreams {
/// Wrap this listener up as a Stream.
pub(crate) fn incoming_streams(self) -> IncomingTcpStreams {
IncomingTcpStreams { lis: self.lis }
}
}
/// Asynchronous stream that yields incoming connections from a
/// TcpListener.
///
/// This is analogous to async_std::net::Incoming.
#[pin_project]
pub struct Incoming<'a> {
/// Reference to the underlying listener.
lis: &'a TokioTcpListener,
}
impl<'a> futures::stream::Stream for Incoming<'a> {
type Item = IoResult<TcpStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let p = self.project();
let (stream, _addr) = futures::ready!(p.lis.poll_accept(cx))?;
Poll::Ready(Some(Ok(stream.into())))
}
}
/// Asynchronous stream that yields incoming connections from a
/// TcpListener.
///
@ -270,6 +245,7 @@ use std::io::Result as IoResult;
use std::net::SocketAddr;
use std::time::Duration;
/// Create and return a new Tokio multithreaded runtime.
pub fn create_runtime() -> IoResult<async_executors::TokioTp> {
let mut builder = async_executors::TokioTpBuilder::new();
builder.tokio_builder().enable_all();
@ -285,13 +261,13 @@ impl SleepProvider for async_executors::TokioTp {
#[async_trait]
impl TcpListener for net::TcpListener {
type Stream = net::TcpStream;
type TcpStream = net::TcpStream;
type Incoming = net::IncomingTcpStreams;
async fn accept(&self) -> IoResult<(Self::Stream, SocketAddr)> {
async fn accept(&self) -> IoResult<(Self::TcpStream, SocketAddr)> {
net::TcpListener::accept(self).await
}
fn incoming(self) -> Self::Incoming {
self.incoming2()
self.incoming_streams()
}
}

View File

@ -7,9 +7,25 @@
//! Right now this crate supports async_std and tokio; tokio is the
//! default. You can control this with the `async-std` or `tokio`
//! features on this crate.
//!
//! This crate exposes a [`Runtime`] type that represents the features
//! available from an asynchronous runtime. This includes the
//! standardized features (spawning tasks), and ones for which no
//! standardized API currently exist (sleeping and networking and
//! TLS).
//!
//! The [`Runtime`] trait is implemented using the [`async_executors`]
//! crate; if that crate later expands to provide similar
//! functionality, this crate will contract. Implementations are
//! currently provided for `async_executors::TokioTp` (if this crate
//! was builtwith the `tokio` feature) and `async_executors::AsyncStd`
//! (if this crate was built with the `async-std` feature).
//!
//! Note that this crate is explicitly limited to the features that
//! Arti requires.
//#![deny(missing_docs)]
//#![deny(clippy::missing_docs_in_private_items)]
#![deny(missing_docs)]
#![deny(clippy::missing_docs_in_private_items)]
use std::io::Result as IoResult;
@ -17,12 +33,17 @@ pub(crate) mod impls;
mod timer;
mod traits;
#[cfg(not(any(feature = "async-std", feature = "tokio")))]
compile_error!("Sorry: At least one of the tor-rtcompat/async-std and tor-rtcompat/tokio features must be specified.");
pub use traits::{
CertifiedConn, Runtime, SleepProvider, SpawnBlocking, TcpListener, TcpProvider, TlsProvider,
};
pub use timer::{SleepProviderExt, Timeout, TimeoutError};
/// Traits used to describe TLS connections and objects that can
/// create them.
pub mod tls {
pub use crate::traits::{CertifiedConn, TlsConnector};
}
@ -32,16 +53,30 @@ pub use impls::async_std::create_runtime as create_async_std_runtime;
#[cfg(feature = "tokio")]
pub use impls::tokio::create_runtime as create_tokio_runtime;
/// The default runtime type that we return from [`create_runtime()`] or
/// [`test_with_runtime()`]
#[cfg(feature = "tokio")]
pub type DefaultRuntime = async_executors::TokioTp;
type DefaultRuntime = async_executors::TokioTp;
/// The default runtime type that we return from [`create_runtime()`] or
/// [`test_with_runtime()`]
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
pub type DefaultRuntime = async_executors::AsyncStd;
type DefaultRuntime = async_executors::AsyncStd;
/// Return a new instance of the default [`Runtime`].
///
/// Generally you should call this function only once, and then use
/// [`Clone::clone()`] to create additional references to that
/// runtime.
///
/// If you need more fine-grained control over a runtime, you can
/// create it using an appropriate builder type from
/// [`async_executors`].
pub fn create_runtime() -> IoResult<impl Runtime> {
create_default_runtime()
}
/// Helper: create and return a default runtime.
#[allow(clippy::unnecessary_wraps)]
fn create_default_runtime() -> IoResult<DefaultRuntime> {
#[cfg(feature = "tokio")]
@ -54,10 +89,30 @@ fn create_default_runtime() -> IoResult<DefaultRuntime> {
}
#[cfg(not(any(feature = "async-std", feature = "tokio")))]
{
// This isn't reachable, since the crate won't actually compile
// unless some runtime is enabled.
panic!("tor-rtcompat was built with no supported runtimes.")
}
}
/// Run a given asynchronous function, which takes a runtime as an argument,
/// using the default runtime.
///
/// This is intended for writing test cases that need a runtime.
///
/// # Example
///
/// ```
/// # use std::time::Duration;
/// use tor_rtcompat::SleepProviderExt;
///
/// // Run a simple test using a timeout.
/// tor_rtcompat::test_with_runtime(|runtime| async move {
/// async fn one_plus_two() -> u32 { 1 + 2 }
/// let future = runtime.timeout(Duration::from_secs(5), one_plus_two());
/// assert_eq!(future.await, Ok(3));
/// });
/// ```
#[allow(clippy::clone_on_copy)]
pub fn test_with_runtime<P, F, O>(func: P) -> O
where

View File

@ -1,3 +1,5 @@
//! Definitions for [`SleepProviderExt`] and related types.
use crate::traits::SleepProvider;
use futures::{Future, FutureExt};
use pin_project::pin_project;
@ -7,7 +9,12 @@ use std::{
time::{Duration, SystemTime},
};
#[derive(Copy, Clone, Debug)]
/// An error value given when a function times out.
///
/// This value is generated when the timeout from
/// [`SleepProviderExt::timeout`] expires before the provided future
/// is ready.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct TimeoutError;
impl std::error::Error for TimeoutError {}
impl std::fmt::Display for TimeoutError {
@ -16,7 +23,18 @@ impl std::fmt::Display for TimeoutError {
}
}
/// An extension trait on [`SleepProvider`] for timeouts and clock delays.
pub trait SleepProviderExt: SleepProvider {
/// Wrap a [`Future`] with a timeout.
///
/// The output of the new future will be the returned value of
/// `future` if it completes within `duration`. Otherwise, it
/// will be `Err(TimeoutError)`.
///
/// # Limitations
///
/// This uses [`SleepProvider::sleep`] for its timer, and is
/// subject to the same limitations.
fn timeout<F: Future>(&self, duration: Duration, future: F) -> Timeout<F, Self::SleepFuture> {
let sleep_future = self.sleep(duration);
@ -28,6 +46,22 @@ pub trait SleepProviderExt: SleepProvider {
/// Pause until the wall-clock is at `when` or later, trying to
/// recover from clock jumps.
///
/// Unlike [`SleepProvider::sleep()`], the future returned by this function will
/// wake up periodically to check the current time, and see if
/// it is at or past the target.
///
/// # Limitations
///
/// The ability of this function to detect clock jumps is limited
/// to its granularity; it may finish a while after the declared
/// wallclock time if the system clock jumps forward.
///
/// This function does not detect backward clock jumps; arguably,
/// we should have another function to do that.
///
/// This uses [`SleepProvider::sleep`] for its timer, and is
/// subject to the same limitations.
fn sleep_until_wallclock(&self, when: SystemTime) -> SleepUntilWallclock<'_, Self> {
SleepUntilWallclock {
provider: self,
@ -39,10 +73,13 @@ pub trait SleepProviderExt: SleepProvider {
impl<T: SleepProvider> SleepProviderExt for T {}
/// A timeout returned by [`SleepProviderExt::timeout`].
#[pin_project]
pub struct Timeout<T, S> {
/// The future we want to execute.
#[pin]
future: T,
/// The future implementing the timeout.
#[pin]
sleep_future: S,
}
@ -67,10 +104,14 @@ where
}
}
/// A future implementing [`SleepProviderExt::sleep_until_wallclock`].
#[pin_project]
pub struct SleepUntilWallclock<'a, SP: SleepProvider + ?Sized> {
/// Reference to the provider that we use to make new SleepFutures.
provider: &'a SP,
/// The time that we are waiting for.
target: SystemTime,
/// The future representing our current delay.
sleep_future: Option<Pin<Box<SP::SleepFuture>>>,
}
@ -81,39 +122,65 @@ where
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let now = SystemTime::now();
if now >= self.target {
return Poll::Ready(());
}
let delay = calc_next_delay(now, self.target);
// Strategy: we implement sleep_until_wallclock by
// waiting in increments of up to MAX_SLEEP, checking the
// wall clock before and after each increment. This makes
// us wake up a bit more frequently, but enables us to detect it
// if the system clock jumps forward.
let target = self.target;
let this = self.project();
// DOCDOC: why do we store but not poll sleep_future?
this.sleep_future.take();
let mut sleep_future = Box::pin(this.provider.sleep(delay));
match sleep_future.poll_unpin(cx) {
Poll::Pending => {
*this.sleep_future = Some(sleep_future);
Poll::Pending
loop {
let now = SystemTime::now();
if now >= target {
return Poll::Ready(());
}
let (last_delay, delay) = calc_next_delay(now, target);
// Note that we store this future to keep it from being
// cancelled, even though we don't ever poll it more than
// once.
//
// TODO: I'm not sure that it's actually necessary to keep
// this future around.
this.sleep_future.take();
let mut sleep_future = Box::pin(this.provider.sleep(delay));
match sleep_future.poll_unpin(cx) {
Poll::Pending => {
*this.sleep_future = Some(sleep_future);
return Poll::Pending;
}
Poll::Ready(()) => {
if last_delay {
return Poll::Ready(());
}
}
}
Poll::Ready(()) => Poll::Ready(()),
}
}
}
/// Return the amount of time we should wait next, when running
/// sleep_until_wallclock().
/// sleep_until_wallclock(). Also return a boolean indicating whether we
/// expect this to be the final delay.
///
/// (This is a separate function for testing.)
fn calc_next_delay(now: SystemTime, when: SystemTime) -> Duration {
/// We never sleep more than this much, in case our system clock jumps
fn calc_next_delay(now: SystemTime, when: SystemTime) -> (bool, Duration) {
/// We never sleep more than this much, in case our system clock jumps.
///
/// Note that there's a tradeoff here: Making this duration
/// shorter helps our accuracy, but makes us wake up more
/// frequently and consume more CPU.
const MAX_SLEEP: Duration = Duration::from_secs(600);
let remainder = when
.duration_since(now)
.unwrap_or_else(|_| Duration::from_secs(0));
std::cmp::min(MAX_SLEEP, remainder)
if remainder > MAX_SLEEP {
(false, MAX_SLEEP)
} else {
(true, remainder)
}
}
#[cfg(test)]
@ -121,7 +188,9 @@ mod test {
use super::*;
#[test]
fn sleep_delay() {
use calc_next_delay as calc;
fn calc(now: SystemTime, when: SystemTime) -> Duration {
calc_next_delay(now, when).1
}
let minute = Duration::from_secs(60);
let second = Duration::from_secs(1);
let start = SystemTime::now();

View File

@ -1,3 +1,4 @@
//! Declarations for traits that we need our runtimes to implement.
use async_trait::async_trait;
use futures::stream;
use futures::{AsyncRead, AsyncWrite, Future};
@ -10,7 +11,22 @@ pub use futures::task::Spawn;
/// A runtime that we can use to run Tor as a client.
///
/// DOCDOC
/// This trait comprises several other traits that we require all of our
/// runtimes to provide:
///
/// * [`futures::task::Spawn`] to launch new background tasks.
/// * [`SleepProvider`] to pause a task for a given amount of time.
/// * [`TcpProvider`] to launch and accept TCP connections.
/// * [`TlsProvider`] to launch TLS connections.
/// * [`SpawnBlocking`] to block on a future and run it to completion
/// (This may become optional in the future, if/when we add WASM
/// support).
///
/// We require that every `Runtime` has an efficient [`Clone`] implementation
/// that gives a new opaque reference to the same underlying runtime.
///
/// Additionally, every `Runtime` is [`Send`] and [`Sync`], though these
/// requirements may be somewhat relaxed in the future.
pub trait Runtime:
Sync + Send + Spawn + SpawnBlocking + Clone + SleepProvider + TcpProvider + TlsProvider + 'static
{
@ -29,38 +45,76 @@ impl<T> Runtime for T where
{
}
/// Trait for a runtime that can wait until a timer has expired.
///
/// Every `SleepProvider` also implements [`crate::SleepProviderExt`];
/// see that trait for other useful functions.
pub trait SleepProvider {
/// A future returned by [`SleepProvider::sleep()`]
type SleepFuture: Future<Output = ()> + Send + 'static;
/// Return a future that will be ready after `duration` has
/// elapsed.
fn sleep(&self, duration: Duration) -> Self::SleepFuture;
}
/// Trait for a runtime that can block on a future.
pub trait SpawnBlocking {
fn block_on<F: Future>(&self, f: F) -> F::Output;
/// Run `future` until it is ready, and return its output.
fn block_on<F: Future>(&self, future: F) -> F::Output;
}
// TODO: Use of asynctrait is not ideal, since we have to box with every
/// Trait for a runtime that can create and accept TCP connections.
///
/// (In Arti we use the [`AsyncRead`] and [`AsyncWrite`] traits from
/// [`futures::io`] as more standard, even though the ones from Tokio
/// can be a bit more efficient. Let's hope that they converge in the
/// future.)
// TODO: Use of async_trait is not ideal, since we have to box with every
// call. Still, async_io basically makes that necessary :/
#[async_trait]
pub trait TcpProvider {
/// The type for the TCP connections returned by [`Self::connect()`].
type TcpStream: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static;
type TcpListener: TcpListener<Stream = Self::TcpStream> + Send + Sync + Unpin + 'static;
/// The type for the TCP listeners returned by [`Self::listen()`].
type TcpListener: TcpListener<TcpStream = Self::TcpStream> + Send + Sync + Unpin + 'static;
/// Launch a TCP connection to a given socket address.
///
/// Note that unlike [`std::net:TcpStream::connect`], we do not accept
/// any types other than a single [`SocketAddr`]. We do this because,
/// as a Tor implementation, we most be absolutely sure not to perform
/// unnecessary DNS lookups.
async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::TcpStream>;
/// Open a TCP listener on a given socket address.
async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::TcpListener>;
}
// TODO: Use of asynctrait is not ideal here either.
/// Trait for a local socket that accepts incoming TCP streams.
///
/// These objects are returned by instances of [`TcpProvider`]. To use
/// one, either call `accept` to accept a single connection, or
/// use `incoming` to wrap this object as a [`stream::Stream`].
// TODO: Use of async_trait is not ideal here either.
#[async_trait]
pub trait TcpListener {
type Stream: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static;
type Incoming: stream::Stream<Item = IoResult<(Self::Stream, SocketAddr)>> + Unpin;
async fn accept(&self) -> IoResult<(Self::Stream, SocketAddr)>;
/// The type of TCP connections returned by [`Self::accept()`].
type TcpStream: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static;
/// The type of [`stream::Stream`] returned by [`Self::incoming()`].
type Incoming: stream::Stream<Item = IoResult<(Self::TcpStream, SocketAddr)>> + Unpin;
/// Wait for an incoming stream; return it along with its address.
async fn accept(&self) -> IoResult<(Self::TcpStream, SocketAddr)>;
/// Wrap this listener into a new [`stream::Stream`] that yields
/// TCP streams and addresses.
fn incoming(self) -> Self::Incoming;
}
/// An object with a peer certificate.
/// An object with a peer certificate: typically a TLS connection.
pub trait CertifiedConn {
/// Try to return the (der-encoded) peer certificate for this
/// Try to return the (DER-encoded) peer certificate for this
/// connection, if any.
fn peer_certificate(&self) -> IoResult<Option<Vec<u8>>>;
}
@ -68,14 +122,19 @@ pub trait CertifiedConn {
/// An object that knows how to make a TLS-over-TCP connection we
/// can use in Tor.
///
/// DOCDOC Not for general use.
/// (Note that because of Tor's peculiarities, this is not a
/// general-purpose TLS type. Unlike typical users, Tor does not want
/// its TLS library to validate TLS or check for hostnames.)
#[async_trait]
pub trait TlsConnector {
/// The type of connection returned by this connector
type Conn: AsyncRead + AsyncWrite + CertifiedConn + Unpin + Send + 'static;
/// Launch a TLS-over-TCP connection to a given address.
/// TODO: document args
///
/// Declare `sni_hostname` as the desired hostname, but don't
/// actually validate the certificate or check that it matches the
/// intended address.
async fn connect_unvalidated(
&self,
addr: &SocketAddr,
@ -83,9 +142,18 @@ pub trait TlsConnector {
) -> IoResult<Self::Conn>;
}
/// Trait for a runtime that knows how to create TLS connections.
///
/// This is separate from [`TlsConnector`] because eventually we may
/// eventually want to support multiple `TlsConnector` implementations
/// that use a single [`Runtime`].
pub trait TlsProvider {
/// The Connector object that this provider can return.
type Connector: TlsConnector<Conn = Self::TlsStream> + Send + Sync + Unpin;
/// The type of the stream returned by that connector.
type TlsStream: AsyncRead + AsyncWrite + CertifiedConn + Unpin + Send + 'static;
/// Return a TLS connector for use with this runtime.
fn tls_connector(&self) -> Self::Connector;
}