Merge branch 'rtcompat-refactor' into 'main'

Refactor the tor-rtcompat API.

Closes #301 and #300

See merge request tpo/core/arti!263
This commit is contained in:
eta 2022-01-27 15:53:35 +00:00
commit 2f39dbd587
39 changed files with 564 additions and 388 deletions

View File

@ -28,11 +28,12 @@ rust-latest:
- target/x86_64-unknown-linux-gnu/debug/arti
expire_in: 1 hours
rust-latest-async-std:
rust-latest-async-std-rustls:
stage: build
image: rust:latest
script:
- cd crates/arti-client && cargo check --no-default-features --features=async-std
- rustup component add clippy
- cd crates/arti-client && cargo clippy --no-default-features --features=async-std,rustls
tags:
- amd64

1
Cargo.lock generated
View File

@ -67,6 +67,7 @@ dependencies = [
"arti-client",
"arti-config",
"async-ctrlc",
"cfg-if",
"clap",
"config",
"futures",

View File

@ -21,7 +21,7 @@ serde_json = "1.0.50"
tracing = "0.1.18"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
tokio = { version = "1.4", features = ["full"] }
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features = ["tokio"] }
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features = ["tokio", "native-tls"] }
arti-config = { path="../arti-config", version = "0.0.3"}
arti-client = { package="arti-client", path = "../arti-client", version = "0.0.3"}
tokio-socks = "0.5"

View File

@ -371,7 +371,7 @@ fn main() -> Result<()> {
concurrent: parallel,
upload_payload,
download_payload,
runtime: tor_rtcompat::tokio::create_runtime()?,
runtime: tor_rtcompat::tokio::TokioNativeTlsRuntime::create()?,
results: Default::default(),
};

View File

@ -11,9 +11,11 @@ categories = [ "network-programming", "cryptography" ]
repository="https://gitlab.torproject.org/tpo/core/arti.git/"
[features]
default = [ "tokio" ]
default = [ "tokio", "native-tls" ]
async-std = [ "tor-rtcompat/async-std" ]
tokio = [ "tor-rtcompat/tokio", "tor-proto/tokio" ]
native-tls = [ "tor-rtcompat/native-tls" ]
rustls = [ "tor-rtcompat/rustls" ]
static = [ "tor-rtcompat/static", "tor-dirmgr/static" ]
# Enable experimental APIs that are not yet officially supported.
@ -42,7 +44,7 @@ serde = { version = "1.0.103", features = ["derive"] }
thiserror = "1"
[dev-dependencies]
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio"] }
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio", "native-tls" ] }
tokio-crate = { package = "tokio", version = "1.4", features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros" ] }
hyper = { version = "0.14", features = ["http1", "client", "runtime"] }
pin-project = "1"

View File

@ -13,7 +13,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_crate as tokio;
use tor_rtcompat::tokio::TokioRuntimeHandle;
use tor_rtcompat::tokio::TokioNativeTlsRuntime;
use tor_rtcompat::Runtime;
/// A `hyper` connector to proxy HTTP connections via the Tor network, using Arti.
@ -138,7 +138,7 @@ async fn main() -> Result<()> {
// on Linux platforms)
let config = TorClientConfig::default();
// Arti needs an async runtime handle to spawn async tasks.
let rt: TokioRuntimeHandle = tokio_crate::runtime::Handle::current().into();
let rt: TokioNativeTlsRuntime = tokio_crate::runtime::Handle::current().into();
// We now let the Arti client start and bootstrap a connection to the network.
// (This takes a while to gather the necessary consensus state, etc.)

View File

@ -1,6 +1,7 @@
use anyhow::Result;
use arti_client::{TorClient, TorClientConfig};
use tokio_crate as tokio;
use tor_rtcompat::tokio::TokioNativeTlsRuntime;
use futures::io::{AsyncReadExt, AsyncWriteExt};
@ -16,7 +17,7 @@ async fn main() -> Result<()> {
let config = TorClientConfig::default();
// Arti needs an async runtime handle to spawn async tasks.
// (See "Multiple runtime support" below.)
let rt = tor_rtcompat::tokio::current_runtime()?;
let rt = TokioNativeTlsRuntime::current()?;
eprintln!("connecting to Tor...");

View File

@ -24,9 +24,9 @@ use std::time::Duration;
use crate::{status, Error, Result};
#[cfg(feature = "async-std")]
use tor_rtcompat::async_std::AsyncStdRuntime;
use tor_rtcompat::async_std::PreferredRuntime as PreferredAsyncStdRuntime;
#[cfg(feature = "tokio")]
use tor_rtcompat::tokio::TokioRuntimeHandle;
use tor_rtcompat::tokio::PreferredRuntime as PreferredTokioRuntime;
use tracing::{debug, error, info, warn};
/// An active client session on the Tor network.
@ -246,7 +246,7 @@ impl StreamPrefs {
}
#[cfg(feature = "tokio")]
impl TorClient<TokioRuntimeHandle> {
impl TorClient<PreferredTokioRuntime> {
/// Bootstrap a connection to the Tor network, using the current Tokio runtime.
///
/// Returns a client once there is enough directory material to
@ -259,14 +259,14 @@ impl TorClient<TokioRuntimeHandle> {
/// Panics if called outside of the context of a Tokio runtime.
pub async fn bootstrap_with_tokio(
config: TorClientConfig,
) -> Result<TorClient<TokioRuntimeHandle>> {
let rt = tor_rtcompat::tokio::current_runtime().expect("called outside of Tokio runtime");
) -> Result<TorClient<PreferredTokioRuntime>> {
let rt = PreferredTokioRuntime::current().expect("called outside of Tokio runtime");
Self::bootstrap(rt, config).await
}
}
#[cfg(feature = "async-std")]
impl TorClient<AsyncStdRuntime> {
impl TorClient<PreferredAsyncStdRuntime> {
/// Bootstrap a connection to the Tor network, using the current async-std runtime.
///
/// Returns a client once there is enough directory material to
@ -275,10 +275,9 @@ impl TorClient<AsyncStdRuntime> {
/// This is a convenience wrapper around [`TorClient::bootstrap`].
pub async fn bootstrap_with_async_std(
config: TorClientConfig,
) -> Result<TorClient<AsyncStdRuntime>> {
) -> Result<TorClient<PreferredAsyncStdRuntime>> {
// FIXME(eta): not actually possible for this to fail
let rt =
tor_rtcompat::async_std::current_runtime().expect("failed to get async-std runtime");
let rt = PreferredAsyncStdRuntime::current().expect("failed to get async-std runtime");
Self::bootstrap(rt, config).await
}
}

View File

@ -60,7 +60,7 @@
//! let config = TorClientConfig::default();
//! // Arti needs a handle to an async runtime in order to spawn tasks and use the
//! // network. (See "Multiple runtime support" below.)
//! let rt = tor_rtcompat::tokio::current_runtime()?;
//! let rt = tor_rtcompat::tokio::TokioNativeTlsRuntime::current()?;
//!
//! // Start the Arti client, and let it bootstrap a connection to the Tor network.
//! // (This takes a while to gather the necessary directory information.
@ -118,14 +118,16 @@
//! will expect a type that implements [`tor_rtcompat::Runtime`], which can be obtained:
//!
//! - for Tokio:
//! - by calling [`tor_rtcompat::tokio::current_runtime`], if a Tokio reactor is already running
//! - by calling [`tor_rtcompat::tokio::create_runtime`], to start a new reactor if one is not
//! - by calling [`tor_rtcompat::tokio::PreferredRuntime::current()`], if a Tokio reactor is already running
//! - by calling [`tor_rtcompat::tokio::PreferredRuntime::create()`], to start a new reactor if one is not
//! already running
//! - by manually creating a [`TokioRuntimeHandle`](tor_rtcompat::tokio::TokioRuntimeHandle) from
//! an existing Tokio runtime handle
//! - as above, but explicitly specifying [`TokioNativeTlsRuntime`](tor_rtcompat::tokio::TokioNativeTlsRuntime)
//! or [`TokioRustlsRuntime`](tor_rtcompat::tokio::TokioRustlsRuntime) in place of `PreferredRuntime`.
//! - for async-std:
//! - by calling [`tor_rtcompat::async_std::current_runtime`], which will create a runtime or
//! - by calling [`tor_rtcompat::async_std::PreferredRuntime::current()`], which will create a runtime or
//! retrieve the existing one, if one has already been started
//! - as above, but explicitly specifying [`AsyncStdNativeTlsRuntime`](tor_rtcompat::async_std::AsyncStdNativeTlsRuntime)
//! or [`AsyncStdRustlsRuntime`](tor_rtcompat::async_std::AsyncStdRustlsRuntime) in place of `PreferredRuntime`.
//!
//!
//! # Feature flags

View File

@ -11,14 +11,16 @@ categories = [ "command-line-utilities", "cryptography" ]
repository="https://gitlab.torproject.org/tpo/core/arti.git/"
[features]
default = [ "tokio" ]
default = [ "tokio", "native-tls" ]
async-std = [ "arti-client/async-std", "tor-rtcompat/async-std", "async-ctrlc", "once_cell" ]
tokio = [ "tokio-crate", "arti-client/tokio", "tor-rtcompat/tokio" ]
native-tls = [ "arti-client/native-tls", "tor-rtcompat/native-tls" ]
rustls = [ "arti-client/rustls", "tor-rtcompat/rustls" ]
static = [ "arti-client/static" ]
journald = [ "tracing-journald" ]
[dependencies]
arti-client = { package="arti-client", path = "../arti-client", version = "0.0.3"}
arti-client = { package="arti-client", path = "../arti-client", version = "0.0.3", default-features=false}
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", default-features=false }
tor-socksproto = { path="../tor-socksproto", version = "0.0.3"}
arti-config = { path="../arti-config", version = "0.0.3"}
@ -26,6 +28,7 @@ arti-config = { path="../arti-config", version = "0.0.3"}
anyhow = "1.0.5"
async-ctrlc = { version = "1.2.0", optional = true }
config = { version = "0.11.0", default-features = false }
cfg-if = "1.0.0"
futures = "0.3"
tracing = "0.1.18"
once_cell = { version = "1", optional = true }

View File

@ -96,7 +96,7 @@ use std::sync::Arc;
use arti_client::{TorClient, TorClientConfig};
use arti_config::ArtiConfig;
use tor_rtcompat::{Runtime, SpawnBlocking};
use tor_rtcompat::{BlockOn, Runtime};
use anyhow::Result;
use clap::{App, AppSettings, Arg, SubCommand};
@ -229,10 +229,19 @@ fn main() -> Result<()> {
process::use_max_file_limit();
#[cfg(feature = "tokio")]
let runtime = tor_rtcompat::tokio::create_runtime()?;
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
let runtime = tor_rtcompat::async_std::create_runtime()?;
cfg_if::cfg_if! {
if #[cfg(all(feature="tokio", feature="native-tls"))] {
use tor_rtcompat::tokio::TokioNativeTlsRuntime as ChosenRuntime;
} else if #[cfg(all(feature="tokio", feature="rustls"))] {
use tor_rtcompat::tokio::TokioRustlsRuntime as ChosenRuntime;
} else if #[cfg(all(feature="async-std", feature="native-tls"))] {
use tor_rtcompat::tokio::TokioRustlsRuntime as ChosenRuntime;
} else if #[cfg(all(feature="async-std", feature="rustls"))] {
use tor_rtcompat::tokio::TokioRustlsRuntime as ChosenRuntime;
}
}
let runtime = ChosenRuntime::create()?;
let rt_copy = runtime.clone();
rt_copy.block_on(run(runtime, socks_port, client_config))?;

View File

@ -30,4 +30,4 @@ float_eq = "0.7"
futures-await-test = "0.3.0"
hex-literal = "0.3"
tor-rtmock = { path="../tor-rtmock", version = "0.0.3"}
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio"] }
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio", "native-tls"] }

View File

@ -50,4 +50,4 @@ tor-guardmgr = { path="../tor-guardmgr", version = "0.0.3", features=["testing"]
tor-llcrypto = { path="../tor-llcrypto", version = "0.0.3"}
tor-netdir = { path="../tor-netdir", version = "0.0.3", features=["testing"] }
tor-persist = { path="../tor-persist", version = "0.0.3", features=["testing"] }
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio"] }
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio", "native-tls" ] }

View File

@ -440,7 +440,7 @@ mod test {
/// Helper type used to help type inference.
pub(crate) type OptDummyGuardMgr<'a> =
Option<&'a tor_guardmgr::GuardMgr<tor_rtcompat::tokio::TokioRuntime>>;
Option<&'a tor_guardmgr::GuardMgr<tor_rtcompat::tokio::TokioNativeTlsRuntime>>;
#[test]
fn get_params() {

View File

@ -38,5 +38,5 @@ thiserror = "1"
[dev-dependencies]
futures-await-test = "0.3.0"
tor-rtmock = { path="../tor-rtmock", version = "0.0.3"}
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio"] }
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio", "native-tls" ] }

View File

@ -56,7 +56,5 @@ humantime-serde = "1"
futures-await-test = "0.3.0"
hex-literal = "0.3"
tempfile = "3"
tor-rtcompat = { path = "../tor-rtcompat", version = "0.0.3", features = [
"tokio",
] }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.0.3", features = [ "tokio", "native-tls" ] }
float_eq = "0.7"

View File

@ -42,5 +42,5 @@ tracing = "0.1.18"
tor-netdir = { path="../tor-netdir", version = "0.0.3", features=["testing"]}
tor-netdoc = { path="../tor-netdoc", version = "0.0.3"}
tor-persist = { path="../tor-persist", version = "0.0.3", features=["testing"]}
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio"]}
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio", "native-tls"]}
tor-rtmock = { path="../tor-rtmock", version = "0.0.3"}

View File

@ -49,8 +49,6 @@ tokio-util = { version = "0.6", features = ["compat"], optional = true }
coarsetime = { version = "0.1.20", optional = true }
[dev-dependencies]
tor-rtcompat = { path = "../tor-rtcompat", version = "0.0.3", features = [
"tokio",
] }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.0.3", features = [ "tokio", "native-tls" ] }
hex-literal = "0.3"
hex = "0.4"

View File

@ -81,9 +81,6 @@
//! I bet that there are deadlocks somewhere in this code. I fixed
//! all the ones I could find or think of, but it would be great to
//! find a good way to eliminate every lock that we have.
//!
//! This crate doesn't work with rusttls because of a limitation in the
//! webpki crate.
#![deny(missing_docs)]
#![warn(noop_method_call)]

View File

@ -15,7 +15,10 @@ repository="https://gitlab.torproject.org/tpo/core/arti.git/"
default = [ ]
async-std = [ "async-std-crate", "async-io", "async_executors/async_std" ]
tokio = [ "tokio-crate", "tokio-util", "async_executors/tokio_tp" ]
static = [ "native-tls/vendored" ]
# TODO: This feature makes us link native-tls statically even if we
# don't want to use native-tls in the first place. That's not so clever!
static = [ "native-tls-crate/vendored" ]
native-tls = [ "native-tls-crate", "async-native-tls" ]
rustls = [ "rustls-crate", "async-rustls", "x509-signature" ]
[dependencies]
@ -24,15 +27,20 @@ async_executors = { version = "0.4", default_features = false }
async-trait = "0.1.2"
futures = "0.3"
pin-project = "1"
native-tls = "0.2"
native-tls-crate = { package = "native-tls", version = "0.2", optional = true }
rustls-crate = { package = "rustls", version = "0.19", optional = true, features = [ "dangerous_configuration" ] }
async-std-crate = { package = "async-std", version = "1.7.0", optional = true }
async-io = { version = "1.4.1", optional = true }
async-native-tls = { version = "0.4.0" }
async-native-tls = { version = "0.4.0", optional = true }
tokio-crate = { package = "tokio", version = "1.4", optional = true, features = ["rt", "rt-multi-thread", "io-util", "net", "time" ] }
tokio-util = { version = "0.6", features = ["compat"], optional = true }
async-rustls = { version = "0.2.0", optional = true }
x509-signature = { version = "0.5.0", optional = true }
[dev-dependencies]
# Used for testing our TLS implementation.
native-tls-crate = { package = "native-tls", version = "0.2" }

View File

@ -0,0 +1,26 @@
#!/usr/bin/python
#
# Run a provided command over all possible subsets of important
# tor-rtcompat features.
import sys, subprocess
if sys.argv[1:] == []:
print("You need to name a program to run with different features")
sys.exit(1)
FEATURES = [ "tokio", "async-std", "native-tls", "rustls" ]
COMBINATIONS = [ [] ]
# Generate all combinations of features.
for feature in FEATURES:
new_combinations = [ c + [ feature ] for c in COMBINATIONS ]
COMBINATIONS.extend(new_combinations)
for c in COMBINATIONS:
arg = "--features={}".format(",".join(c))
commandline = sys.argv[1:] + [arg]
print(" ".join(commandline))
subprocess.check_call(commandline)

View File

@ -1,27 +1,46 @@
//! Entry points for use with async_std runtimes.
pub use crate::impls::async_std::create_runtime as create_runtime_impl;
use crate::{compound::CompoundRuntime, SpawnBlocking};
use crate::{compound::CompoundRuntime, BlockOn};
use std::io::Result as IoResult;
#[cfg(feature = "native-tls")]
use crate::impls::native_tls::NativeTlsProvider;
#[cfg(feature = "rustls")]
use crate::impls::rustls::RustlsProvider;
use async_std_crate::net::TcpStream;
use async_executors::AsyncStd;
/// An alias for the async_std runtime that we prefer to use, based on whatever TLS
/// implementation has been enabled.
///
/// If only one of `native_tls` and `rustls` bas been enabled within the
/// `tor-rtcompat` crate, that will be the TLS backend that this uses.
///
/// Currently, `native_tls` is preferred over `rustls` when both are available,
/// because of its maturity within Arti. However, this might change in the
/// future.
#[cfg(all(feature = "native-tls"))]
pub use AsyncStdNativeTlsRuntime as PreferredRuntime;
#[cfg(all(feature = "rustls", not(feature = "native-tls")))]
pub use AsyncStdRustlsRuntime as PreferredRuntime;
/// A [`Runtime`](crate::Runtime) powered by `async_std` and `native_tls`.
#[derive(Clone)]
pub struct AsyncStdRuntime {
#[cfg(all(feature = "native-tls"))]
pub struct AsyncStdNativeTlsRuntime {
/// The actual runtime object.
inner: NativeTlsInner,
}
/// Implementation type for AsyncStdRuntime.
#[cfg(all(feature = "native-tls"))]
type NativeTlsInner = CompoundRuntime<AsyncStd, AsyncStd, AsyncStd, NativeTlsProvider<TcpStream>>;
#[cfg(all(feature = "native-tls"))]
crate::opaque::implement_opaque_runtime! {
AsyncStdRuntime { inner : NativeTlsInner }
AsyncStdNativeTlsRuntime { inner : NativeTlsInner }
}
#[cfg(feature = "rustls")]
@ -41,40 +60,80 @@ crate::opaque::implement_opaque_runtime! {
AsyncStdRustlsRuntime { inner: RustlsInner }
}
/// Return a new async-std-based [`Runtime`](crate::Runtime).
///
/// Generally you should call this function only once, and then use
/// [`Clone::clone()`] to create additional references to that
/// runtime.
pub fn create_runtime() -> std::io::Result<AsyncStdRuntime> {
let rt = create_runtime_impl();
Ok(AsyncStdRuntime {
inner: CompoundRuntime::new(rt, rt, rt, NativeTlsProvider::default()),
})
#[cfg(all(feature = "native-tls"))]
impl AsyncStdNativeTlsRuntime {
/// Return a new [`AsyncStdNativeTlsRuntime`]
///
/// Generally you should call this function only once, and then use
/// [`Clone::clone()`] to create additional references to that
/// runtime.
pub fn create() -> IoResult<Self> {
let rt = create_runtime_impl();
Ok(AsyncStdNativeTlsRuntime {
inner: CompoundRuntime::new(rt, rt, rt, NativeTlsProvider::default()),
})
}
/// Return an [`AsyncStdNativeTlsRuntime`] for the currently running
/// `async_std` executor.
///
/// Note that since async_std executors are global, there is no distinction
/// between this method and [`AsyncStdNativeTlsRuntime::create()`]: it is
/// provided only for API consistency with the Tokio runtimes.
pub fn current() -> IoResult<Self> {
Self::create()
}
/// Helper to run a single test function in a freshly created runtime.
///
/// # Panics
///
/// Panics if we can't create this runtime.
pub fn run_test<P, F, O>(func: P) -> O
where
P: FnOnce(Self) -> F,
F: futures::Future<Output = O>,
{
let runtime = Self::create().expect("Failed to create runtime");
runtime.clone().block_on(func(runtime))
}
}
/// Return a new [`Runtime`](crate::Runtime) based on `async_std` and `rustls`.
#[cfg(feature = "rustls")]
pub fn create_rustls_runtime() -> std::io::Result<AsyncStdRustlsRuntime> {
let rt = create_runtime_impl();
Ok(AsyncStdRustlsRuntime {
inner: CompoundRuntime::new(rt, rt, rt, RustlsProvider::default()),
})
}
impl AsyncStdRustlsRuntime {
/// Return a new [`AsyncStdRustlsRuntime`]
///
/// Generally you should call this function only once, and then use
/// [`Clone::clone()`] to create additional references to that
/// runtime.
pub fn create() -> IoResult<Self> {
let rt = create_runtime_impl();
Ok(AsyncStdRustlsRuntime {
inner: CompoundRuntime::new(rt, rt, rt, RustlsProvider::default()),
})
}
/// Try to return an instance of the currently running async_std
/// [`Runtime`](crate::Runtime).
pub fn current_runtime() -> std::io::Result<AsyncStdRuntime> {
// In async_std, the runtime is a global singleton.
create_runtime()
}
/// Return an [`AsyncStdRustlsRuntime`] for the currently running
/// `async_std` executor.
///
/// Note that since async_std executors are global, there is no distinction
/// between this method and [`AsyncStdNativeTlsRuntime::create()`]: it is
/// provided only for API consistency with the Tokio runtimes.
pub fn current() -> IoResult<Self> {
Self::create()
}
/// Run a test function using a freshly created async_std runtime.
pub fn test_with_runtime<P, F, O>(func: P) -> O
where
P: FnOnce(AsyncStdRuntime) -> F,
F: futures::Future<Output = O>,
{
let runtime = current_runtime().expect("Couldn't get global async_std runtime?");
runtime.clone().block_on(func(runtime))
/// Helper to run a single test function in a freshly created runtime.
///
/// # Panics
///
/// Panics if we can't create this runtime.
pub fn run_test<P, F, O>(func: P) -> O
where
P: FnOnce(Self) -> F,
F: futures::Future<Output = O>,
{
let runtime = Self::create().expect("Failed to create runtime");
runtime.clone().block_on(func(runtime))
}
}

View File

@ -10,7 +10,7 @@ use std::io::Result as IoResult;
/// A runtime made of several parts, each of which implements one trait-group.
///
/// The `SpawnR` component should implements [`Spawn`] and [`SpawnBlocking`];
/// The `SpawnR` component should implements [`Spawn`] and [`BlockOn`];
/// the `SleepR` component should implement [`SleepProvider`]; the `TcpR`
/// component should implement [`TcpProvider`]; and the `TlsR` component should
/// implement [`TlsProvider`].
@ -18,7 +18,6 @@ use std::io::Result as IoResult;
/// You can use this structure to create new runtimes in two ways: either by
/// overriding a single part of an existing runtime, or by building an entirely
/// new runtime from pieces.
#[derive(Clone)]
pub struct CompoundRuntime<SpawnR, SleepR, TcpR, TlsR> {
/// The actual collection of Runtime objects.
///
@ -27,9 +26,19 @@ pub struct CompoundRuntime<SpawnR, SleepR, TcpR, TlsR> {
inner: Arc<Inner<SpawnR, SleepR, TcpR, TlsR>>,
}
// We have to provide this ourselves, since derive(Clone) wrongly infers a
// `where S: Clone` bound (from the generic argument).
impl<SpawnR, SleepR, TcpR, TlsR> Clone for CompoundRuntime<SpawnR, SleepR, TcpR, TlsR> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
/// A collection of objects implementing that traits that make up a [`Runtime`]
struct Inner<SpawnR, SleepR, TcpR, TlsR> {
/// A `Spawn` and `SpawnBlocking` implementation.
/// A `Spawn` and `BlockOn` implementation.
spawn: SpawnR,
/// A `SleepProvider` implementation.
sleep: SleepR,
@ -63,9 +72,9 @@ where
}
}
impl<SpawnR, SleepR, TcpR, TlsR> SpawnBlocking for CompoundRuntime<SpawnR, SleepR, TcpR, TlsR>
impl<SpawnR, SleepR, TcpR, TlsR> BlockOn for CompoundRuntime<SpawnR, SleepR, TcpR, TlsR>
where
SpawnR: SpawnBlocking,
SpawnR: BlockOn,
{
#[inline]
fn block_on<F: futures::Future>(&self, future: F) -> F::Output {

View File

@ -11,4 +11,5 @@ pub(crate) mod tokio;
#[cfg(all(feature = "rustls"))]
pub(crate) mod rustls;
#[cfg(all(feature = "native-tls"))]
pub(crate) mod native_tls;

View File

@ -131,7 +131,7 @@ impl SleepProvider for async_executors::AsyncStd {
}
}
impl SpawnBlocking for async_executors::AsyncStd {
impl BlockOn for async_executors::AsyncStd {
fn block_on<F: Future>(&self, f: F) -> F::Output {
async_executors::AsyncStd::block_on(f)
}

View File

@ -4,6 +4,7 @@ use crate::traits::{CertifiedConn, TlsConnector, TlsProvider};
use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite};
use native_tls_crate as native_tls;
use std::{
convert::TryInto,
io::{Error as IoError, Result as IoResult},
@ -103,13 +104,3 @@ impl<S> Default for NativeTlsProvider<S> {
Self::new()
}
}
// We have to provide this ourselves, since derive(Clone) wrongly infers a
// `where S: Clone` bound (from the generic argument).
impl<S> Clone for NativeTlsProvider<S> {
fn clone(&self) -> Self {
Self {
_phantom: std::marker::PhantomData,
}
}
}

View File

@ -100,28 +100,6 @@ impl<S> Default for RustlsProvider<S> {
}
}
// We have to provide this ourselves, since derive(Clone) wrongly infers a
// `where S: Clone` bound (from the generic argument).
impl<S> Clone for RustlsProvider<S> {
fn clone(&self) -> Self {
Self {
config: Arc::clone(&self.config),
_phantom: std::marker::PhantomData,
}
}
}
// We have to provide this ourselves, since derive(Clone) wrongly infers a
// `where S: Clone` bound (from the generic argument).
impl<S> Clone for RustlsConnector<S> {
fn clone(&self) -> Self {
Self {
connector: self.connector.clone(),
_phantom: std::marker::PhantomData,
}
}
}
/// A [`rustls::ServerCertVerifier`] based on the [`x509_signature`] crate.
///
/// This verifier is necessary since Tor relays doesn't participate in the web

View File

@ -124,40 +124,34 @@ use futures::Future;
use std::io::Result as IoResult;
use std::time::Duration;
/// Helper: Declare that a given tokio runtime object implements the
/// prerequisites for Runtime.
// TODO: Maybe we can do this more simply with a simpler trait?
macro_rules! implement_traits_for {
($runtime:ty) => {
impl SleepProvider for $runtime {
type SleepFuture = tokio_crate::time::Sleep;
fn sleep(&self, duration: Duration) -> Self::SleepFuture {
tokio_crate::time::sleep(duration)
}
}
impl SleepProvider for TokioRuntimeHandle {
type SleepFuture = tokio_crate::time::Sleep;
fn sleep(&self, duration: Duration) -> Self::SleepFuture {
tokio_crate::time::sleep(duration)
}
}
#[async_trait]
impl crate::traits::TcpProvider for $runtime {
type TcpStream = net::TcpStream;
type TcpListener = net::TcpListener;
#[async_trait]
impl crate::traits::TcpProvider for TokioRuntimeHandle {
type TcpStream = net::TcpStream;
type TcpListener = net::TcpListener;
async fn connect(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpStream> {
let s = net::TokioTcpStream::connect(addr).await?;
Ok(s.into())
}
async fn listen(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpListener> {
let lis = net::TokioTcpListener::bind(*addr).await?;
Ok(net::TcpListener { lis })
}
}
};
async fn connect(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpStream> {
let s = net::TokioTcpStream::connect(addr).await?;
Ok(s.into())
}
async fn listen(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpListener> {
let lis = net::TokioTcpListener::bind(*addr).await?;
Ok(net::TcpListener { lis })
}
}
/// Create and return a new Tokio multithreaded runtime.
pub(crate) fn create_runtime() -> IoResult<async_executors::TokioTp> {
pub(crate) fn create_runtime() -> IoResult<TokioRuntimeHandle> {
let mut builder = async_executors::TokioTpBuilder::new();
builder.tokio_builder().enable_all();
builder.build()
let owned = builder.build()?;
Ok(owned.into())
}
/// Wrapper around a Handle to a tokio runtime.
@ -174,6 +168,12 @@ pub(crate) fn create_runtime() -> IoResult<async_executors::TokioTp> {
/// that when creating this object.
#[derive(Clone, Debug)]
pub struct TokioRuntimeHandle {
/// If present, the tokio executor that we've created (and which we own).
///
/// We never access this directly; only through `handle`. We keep it here
/// so that our Runtime types can be agnostic about whether they own the
/// executor.
owned: Option<async_executors::TokioTp>,
/// The underlying Handle.
handle: tokio_crate::runtime::Handle,
}
@ -189,21 +189,33 @@ impl TokioRuntimeHandle {
pub(crate) fn new(handle: tokio_crate::runtime::Handle) -> Self {
handle.into()
}
/// Return true if this handle owns the executor that it points to.
pub fn is_owned(&self) -> bool {
self.owned.is_some()
}
}
impl From<tokio_crate::runtime::Handle> for TokioRuntimeHandle {
fn from(handle: tokio_crate::runtime::Handle) -> Self {
Self { handle }
Self {
owned: None,
handle,
}
}
}
impl SpawnBlocking for async_executors::TokioTp {
fn block_on<F: Future>(&self, f: F) -> F::Output {
async_executors::TokioTp::block_on(self, f)
impl From<async_executors::TokioTp> for TokioRuntimeHandle {
fn from(owner: async_executors::TokioTp) -> TokioRuntimeHandle {
let handle = owner.block_on(async { tokio_crate::runtime::Handle::current() });
Self {
owned: Some(owner),
handle,
}
}
}
impl SpawnBlocking for TokioRuntimeHandle {
impl BlockOn for TokioRuntimeHandle {
fn block_on<F: Future>(&self, f: F) -> F::Output {
self.handle.block_on(f)
}
@ -219,6 +231,3 @@ impl futures::task::Spawn for TokioRuntimeHandle {
Ok(())
}
}
implement_traits_for! {async_executors::TokioTp}
implement_traits_for! {TokioRuntimeHandle}

View File

@ -40,7 +40,7 @@
//! The `tor-rtcompat` crate provides several traits that
//! encapsulate different runtime capabilities.
//!
//! * A runtime is a [`SpawnBlocking`] if it can block on a future.
//! * A runtime is a [`BlockOn`] if it can block on a future.
//! * A runtime is a [`SleepProvider`] if it can make timer futures that
//! become Ready after a given interval of time.
//! * A runtime is a [`TcpProvider`] if it can make and receive TCP
@ -60,11 +60,13 @@
//! * If you want to construct a default runtime that you won't be
//! using for anything besides Arti, you can use [`create_runtime()`].
//!
//! * If you want to explicitly construct a runtime with a specific
//! backend, you can do so with [`async_std::create_runtime`] or
//! [`tokio::create_runtime`]. Or if you have already constructed a
//! * If you want to use a runtime with an explicitly chosen backend,
//! name its type directly as [`async_std::AsyncStdNativeTlsRuntime`],
//! [`async_std::AsyncStdRustlsRuntime`], [`tokio::TokioNativeTlsRuntime`],
//! or [`tokio::TokioRustlsRuntime`]. To construct one of these runtimes,
//! call its `create()` method. Or if you have already constructed a
//! tokio runtime that you want to use, you can wrap it as a
//! [`Runtime`] explicitly with [`tokio::TokioRuntimeHandle`].
//! [`Runtime`] explicitly with `current()`.
//!
//! # Cargo features
//!
@ -92,7 +94,7 @@
//! We could simplify this code significantly by removing most of the
//! traits it exposes, and instead just exposing a single
//! implementation. For example, instead of exposing a
//! [`SpawnBlocking`] trait to represent blocking until a task is
//! [`BlockOn`] trait to represent blocking until a task is
//! done, we could just provide a single global `block_on` function.
//!
//! That simplification would come at a cost, however. First of all,
@ -140,6 +142,10 @@
#![warn(clippy::unseparated_literal_suffix)]
#![deny(clippy::unwrap_used)]
#[cfg(all(
any(feature = "native-tls", feature = "rustls"),
any(feature = "async-std", feature = "tokio")
))]
pub(crate) mod impls;
pub mod task;
@ -148,11 +154,15 @@ mod opaque;
mod timer;
mod traits;
#[cfg(all(test, any(feature = "tokio", feature = "async-std")))]
#[cfg(all(
test,
any(feature = "native-tls", feature = "rustls"),
any(feature = "async-std", feature = "tokio")
))]
mod test;
pub use traits::{
CertifiedConn, Runtime, SleepProvider, SpawnBlocking, TcpListener, TcpProvider, TlsProvider,
BlockOn, CertifiedConn, Runtime, SleepProvider, TcpListener, TcpProvider, TlsProvider,
};
pub use timer::{SleepProviderExt, Timeout, TimeoutError};
@ -163,14 +173,36 @@ pub mod tls {
pub use crate::traits::{CertifiedConn, TlsConnector};
}
#[cfg(feature = "tokio")]
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
pub mod tokio;
#[cfg(feature = "async-std")]
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "async-std"))]
pub mod async_std;
pub use compound::CompoundRuntime;
#[cfg(all(
any(feature = "native-tls", feature = "rustls"),
feature = "async-std",
not(feature = "tokio")
))]
use async_std as preferred_backend_mod;
#[cfg(all(any(feature = "native-tls", feature = "rustls"), feature = "tokio"))]
use tokio as preferred_backend_mod;
/// The runtime that we prefer to use, out of all the runtimes compiled into the
/// tor-rtcompat crate.
///
/// If `tokio` and `async-std` are both available, we prefer `tokio` for its
/// performance.
/// If `native_tls` and `rustls` are both available, we prefer `native_tls` since
/// it has been used in Arti for longer.
#[cfg(all(
any(feature = "native-tls", feature = "rustls"),
any(feature = "async-std", feature = "tokio")
))]
pub use preferred_backend_mod::PreferredRuntime;
/// Try to return an instance of the currently running [`Runtime`].
///
/// # Limitations
@ -188,47 +220,50 @@ pub use compound::CompoundRuntime;
///
/// Once you have a runtime returned by this function, you should
/// just create more handles to it via [`Clone`].
#[cfg(any(feature = "async-std", feature = "tokio"))]
///
/// This function returns a type-erased `impl Runtime` rather than a specific
/// runtime implementation, so that you can be sure that your code doesn't
/// depend on any runtime-specific features. If that's not what you want, you
/// can call [`PreferredRuntime::current`], or the `create` function on some
/// specific runtime in the `tokio` or `async_std` modules.
#[cfg(all(
any(feature = "native-tls", feature = "rustls"),
any(feature = "async-std", feature = "tokio")
))]
pub fn current_user_runtime() -> std::io::Result<impl Runtime> {
#[cfg(feature = "tokio")]
{
crate::tokio::current_runtime()
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
{
crate::async_std::current_runtime()
}
PreferredRuntime::current()
}
/// Return a new instance of the default [`Runtime`].
///
/// Generally you should call this function at most once, and then use
/// [`Clone::clone()`] to create additional references to that
/// runtime.
/// [`Clone::clone()`] to create additional references to that runtime.
///
/// Tokio users may want to avoid this function and instead make a
/// runtime using [`current_user_runtime()`] or
/// [`tokio::current_runtime()`]: this function always _builds_ a
/// runtime, and if you already have a runtime, that isn't what you
/// want with Tokio.
/// Tokio users may want to avoid this function and instead make a runtime using
/// [`current_user_runtime()`] or [`tokio::PreferredRuntime::current()`]: this
/// function always _builds_ a runtime, and if you already have a runtime, that
/// isn't what you want with Tokio.
///
/// If you need more fine-grained control over a runtime, you can
/// create it using an appropriate builder type or function.
#[cfg(any(feature = "async-std", feature = "tokio"))]
/// If you need more fine-grained control over a runtime, you can create it
/// using an appropriate builder type or function.
///
/// This function returns a type-erased `impl Runtime` rather than a specific
/// runtime implementation, so that you can be sure that your code doesn't
/// depend on any runtime-specific features. If that's not what you want, you
/// can call [`PreferredRuntime::create`], or the `create` function on some
/// specific runtime in the `tokio` or `async_std` modules.
#[cfg(all(
any(feature = "native-tls", feature = "rustls"),
any(feature = "async-std", feature = "tokio")
))]
pub fn create_runtime() -> std::io::Result<impl Runtime> {
#[cfg(feature = "tokio")]
{
crate::tokio::create_runtime()
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
{
crate::async_std::create_runtime()
}
PreferredRuntime::create()
}
/// Helpers for test_with_all_runtimes
pub mod testing__ {
/// A trait for an object that might represent a test failure.
/// A trait for an object that might represent a test failure, or which
/// might just be `()`.
pub trait TestOutcome {
/// Abort if the test has failed.
fn check_ok(&self);
@ -236,42 +271,88 @@ pub mod testing__ {
impl TestOutcome for () {
fn check_ok(&self) {}
}
impl<T, E> TestOutcome for Result<T, E> {
impl<E: std::fmt::Debug> TestOutcome for Result<(), E> {
fn check_ok(&self) {
assert!(self.is_ok());
self.as_ref().expect("Test failure");
}
}
}
/// Helper: define a macro that expands a token tree iff a pair of features are
/// both present.
macro_rules! declare_conditional_macro {
( $(#[$meta:meta])* macro $name:ident = ($f1:expr, $f2:expr) ) => {
$( #[$meta] )*
#[cfg(all(feature=$f1, feature=$f2))]
#[macro_export]
macro_rules! $name {
($tt:tt) => {
$tt
};
}
$( #[$meta] )*
#[cfg(not(all(feature=$f1, feature=$f2)))]
#[macro_export]
macro_rules! $name {
($tt:tt) => {};
}
// Needed so that we can access this macro at this path, both within the
// crate and without.
pub use $name;
};
}
/// Defines macros that will expand when certain runtimes are available.
pub mod cond {
declare_conditional_macro! {
/// Expand a token tree if the TokioNativeTlsRuntime is available.
macro if_tokio_native_tls_present = ("tokio", "native-tls")
}
declare_conditional_macro! {
/// Expand a token tree if the TokioRustlsRuntime is available.
macro if_tokio_rustls_present = ("tokio", "rustls")
}
declare_conditional_macro! {
/// Expand a token tree if the TokioNativeTlsRuntime is available.
macro if_async_std_native_tls_present = ("async-std", "native-tls")
}
declare_conditional_macro! {
/// Expand a token tree if the TokioNativeTlsRuntime is available.
macro if_async_std_rustls_present = ("async-std", "rustls")
}
}
/// Run a test closure, passing as argument every supported runtime.
///
/// (This is a macro so that it can repeat the closure as two separate
/// (This is a macro so that it can repeat the closure as multiple separate
/// expressions, so it can take on two different types, if needed.)
#[macro_export]
#[cfg(all(feature = "tokio", feature = "async-std"))]
#[cfg(all(
any(feature = "native-tls", feature = "rustls"),
any(feature = "tokio", feature = "async-std"),
))]
macro_rules! test_with_all_runtimes {
( $fn:expr ) => {{
use $crate::cond::*;
use $crate::testing__::TestOutcome;
$crate::tokio::test_with_runtime($fn).check_ok();
$crate::async_std::test_with_runtime($fn)
}};
}
// We have to do this outcome-checking business rather than just using
// the ? operator or calling expect() because some of the closures that
// we use this macro with return (), and some return Result.
/// Run a test closure, passing as argument every supported runtime.
#[macro_export]
#[cfg(all(feature = "tokio", not(feature = "async-std")))]
macro_rules! test_with_all_runtimes {
( $fn:expr ) => {{
$crate::tokio::test_with_runtime($fn)
}};
}
/// Run a test closure, passing as argument every supported runtime.
#[macro_export]
#[cfg(all(not(feature = "tokio"), feature = "async-std"))]
macro_rules! test_with_all_runtimes {
( $fn:expr ) => {{
$crate::async_std::test_with_runtime($fn)
if_tokio_native_tls_present! {{
$crate::tokio::TokioNativeTlsRuntime::run_test($fn).check_ok();
}}
if_tokio_rustls_present! {{
$crate::tokio::TokioRustlsRuntime::run_test($fn).check_ok();
}}
if_async_std_native_tls_present! {{
$crate::async_std::AsyncStdNativeTlsRuntime::run_test($fn).check_ok();
}}
if_async_std_rustls_present! {{
$crate::async_std::AsyncStdRustlsRuntime::run_test($fn).check_ok();
}}
}};
}
@ -279,20 +360,12 @@ macro_rules! test_with_all_runtimes {
///
/// (Always prefers tokio if present.)
#[macro_export]
#[cfg(feature = "tokio")]
#[cfg(all(
any(feature = "native-tls", feature = "rustls"),
any(feature = "tokio", feature = "async-std"),
))]
macro_rules! test_with_one_runtime {
( $fn:expr ) => {{
$crate::tokio::test_with_runtime($fn)
}};
}
/// Run a test closure, passing as argument one supported runtime.
///
/// (Always prefers tokio if present.)
#[macro_export]
#[cfg(all(not(feature = "tokio"), feature = "async-std"))]
macro_rules! test_with_one_runtime {
( $fn:expr ) => {{
$crate::async_std::test_with_runtime($fn)
$crate::PreferredRuntime::run_test($fn)
}};
}

View File

@ -16,7 +16,7 @@ macro_rules! implement_opaque_runtime {
}
}
impl $crate::traits::SpawnBlocking for $t {
impl $crate::traits::BlockOn for $t {
#[inline]
fn block_on<F: futures::Future>(&self, future: F) -> F::Output {
self.$member.block_on(future)

View File

@ -38,7 +38,11 @@ impl Future for YieldFuture {
}
}
#[cfg(all(test, any(feature = "tokio", feature = "async-std")))]
#[cfg(all(
test,
any(feature = "native-tls", feature = "rustls"),
any(feature = "tokio", feature = "async-std")
))]
mod test {
use super::yield_now;
use crate::test_with_all_runtimes;
@ -46,7 +50,7 @@ mod test {
use std::sync::atomic::{AtomicBool, Ordering};
#[test]
fn test_yield() -> std::io::Result<()> {
fn test_yield() {
test_with_all_runtimes!(|_| async {
let b = AtomicBool::new(false);
use Ordering::SeqCst;
@ -77,7 +81,6 @@ mod test {
}
);
std::io::Result::Ok(())
})?;
Ok(())
});
}
}

View File

@ -6,6 +6,7 @@ use crate::traits::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
use futures::stream::StreamExt;
use native_tls_crate as native_tls;
use std::io::Result as IoResult;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::time::{Duration, Instant, SystemTime};
@ -225,7 +226,7 @@ macro_rules! runtime_tests {
$(
#[test]
fn $id() -> IoResult<()> {
super::$id(&crate::tokio::create_runtime()?)
super::$id(&crate::tokio::PreferredRuntime::create()?)
}
)*
}
@ -235,7 +236,7 @@ macro_rules! runtime_tests {
$(
#[test]
fn $id() -> IoResult<()> {
super::$id(&crate::async_std::create_runtime()?)
super::$id(&crate::async_std::PreferredRuntime::create()?)
}
)*
}
@ -244,44 +245,44 @@ macro_rules! runtime_tests {
macro_rules! tls_runtime_tests {
{ $($id:ident),* $(,)? } => {
#[cfg(feature="tokio")]
#[cfg(all(feature="tokio", feature = "native-tls"))]
mod tokio_native_tls_tests {
use std::io::Result as IoResult;
$(
#[test]
fn $id() -> IoResult<()> {
super::$id(&crate::tokio::create_runtime()?)
super::$id(&crate::tokio::TokioNativeTlsRuntime::create()?)
}
)*
}
#[cfg(feature="async-std")]
#[cfg(all(feature="async-std", feature = "native-tls"))]
mod async_std_native_tls_tests {
use std::io::Result as IoResult;
$(
#[test]
fn $id() -> IoResult<()> {
super::$id(&crate::async_std::create_runtime()?)
super::$id(&crate::async_std::AsyncStdNativeTlsRuntime::create()?)
}
)*
}
#[cfg(all(feature="tokio", feature="rustls"))]
mod tokio_rusttls_tests {
mod tokio_rustls_tests {
use std::io::Result as IoResult;
$(
#[test]
fn $id() -> IoResult<()> {
super::$id(&crate::tokio::create_rustls_runtime()?)
super::$id(&crate::tokio::TokioRustlsRuntime::create()?)
}
)*
}
#[cfg(all(feature="async-std", feature="rustls"))]
mod async_std_rusttls_tests {
mod async_std_rustls_tests {
use std::io::Result as IoResult;
$(
#[test]
fn $id() -> IoResult<()> {
super::$id(&crate::async_std::create_rustls_runtime()?)
super::$id(&crate::async_std::AsyncStdRustlsRuntime::create()?)
}
)*
}

View File

@ -1,16 +1,30 @@
//! Entry points for use with Tokio runtimes.
use crate::impls::native_tls::NativeTlsProvider;
use crate::impls::tokio::TokioRuntimeHandle as Handle;
use async_executors::TokioTp;
use crate::{CompoundRuntime, Runtime, SpawnBlocking};
use crate::{BlockOn, CompoundRuntime};
use std::io::{Error as IoError, ErrorKind, Result as IoResult};
#[cfg(feature = "native-tls")]
use crate::impls::native_tls::NativeTlsProvider;
#[cfg(feature = "rustls")]
use crate::impls::rustls::RustlsProvider;
use crate::impls::tokio::net::TcpStream;
/// A [`Runtime`] built around a Handle to a tokio runtime, and `native_tls`.
/// An alias for the Tokio runtime that we prefer to use, based on whatever TLS
/// implementation has been enabled.
///
/// If only one of `native_tls` and `rustls` bas been enabled within the
/// `tor-rtcompat` crate, that will be the TLS backend that this uses.
///
/// Currently, `native_tls` is preferred over `rustls` when both are available,
/// because of its maturity within Arti. However, this might change in the
/// future.
#[cfg(feature = "native-tls")]
pub use TokioNativeTlsRuntime as PreferredRuntime;
#[cfg(all(feature = "rustls", not(feature = "native-tls")))]
pub use TokioRustlsRuntime as PreferredRuntime;
/// A [`Runtime`](crate::Runtime) built around a Handle to a tokio runtime, and `native_tls`.
///
/// # Limitations
///
@ -18,18 +32,20 @@ use crate::impls::tokio::net::TcpStream;
/// implementations for Tokio's time, net, and io facilities, but we have
/// no good way to check that when creating this object.
#[derive(Clone)]
pub struct TokioRuntimeHandle {
#[cfg(feature = "native-tls")]
pub struct TokioNativeTlsRuntime {
/// The actual [`CompoundRuntime`] that implements this.
inner: HandleInner,
}
/// Implementation type for a TokioRuntimeHandle.
#[cfg(feature = "native-tls")]
type HandleInner = CompoundRuntime<Handle, Handle, Handle, NativeTlsProvider<TcpStream>>;
/// A [`Runtime`] built around a Handle to a tokio runtime, and `rustls`.
/// A [`Runtime`](crate::Runtime) built around a Handle to a tokio runtime, and `rustls`.
#[derive(Clone)]
#[cfg(feature = "rustls")]
pub struct TokioRustlsRuntimeHandle {
pub struct TokioRustlsRuntime {
/// The actual [`CompoundRuntime`] that implements this.
inner: RustlsHandleInner,
}
@ -38,143 +54,132 @@ pub struct TokioRustlsRuntimeHandle {
#[cfg(feature = "rustls")]
type RustlsHandleInner = CompoundRuntime<Handle, Handle, Handle, RustlsProvider<TcpStream>>;
/// A [`Runtime`] built around an owned `TokioTp` executor, and `native_tls`.
#[derive(Clone)]
pub struct TokioRuntime {
/// The actual [`CompoundRuntime`] that implements this.
inner: TokioRuntimeInner,
}
/// A [`Runtime`] built around an owned `TokioTp` executor, and `rustls`.
#[derive(Clone)]
#[cfg(feature = "rustls")]
pub struct TokioRustlsRuntime {
/// The actual [`CompoundRuntime`] that implements this.
inner: TokioRustlsRuntimeInner,
}
/// Implementation type for TokioRuntime.
type TokioRuntimeInner = CompoundRuntime<TokioTp, TokioTp, TokioTp, NativeTlsProvider<TcpStream>>;
/// Implementation type for TokioRustlsRuntime.
#[cfg(feature = "rustls")]
type TokioRustlsRuntimeInner =
CompoundRuntime<TokioTp, TokioTp, TokioTp, RustlsProvider<TcpStream>>;
#[cfg(feature = "native-tls")]
crate::opaque::implement_opaque_runtime! {
TokioRuntimeHandle { inner : HandleInner }
}
crate::opaque::implement_opaque_runtime! {
TokioRuntime { inner : TokioRuntimeInner }
TokioNativeTlsRuntime { inner : HandleInner }
}
#[cfg(feature = "rustls")]
crate::opaque::implement_opaque_runtime! {
TokioRustlsRuntimeHandle { inner : RustlsHandleInner }
TokioRustlsRuntime { inner : RustlsHandleInner }
}
#[cfg(feature = "rustls")]
crate::opaque::implement_opaque_runtime! {
TokioRustlsRuntime { inner : TokioRustlsRuntimeInner }
}
impl From<tokio_crate::runtime::Handle> for TokioRuntimeHandle {
#[cfg(feature = "native-tls")]
impl From<tokio_crate::runtime::Handle> for TokioNativeTlsRuntime {
fn from(h: tokio_crate::runtime::Handle) -> Self {
let h = Handle::new(h);
TokioRuntimeHandle {
TokioNativeTlsRuntime {
inner: CompoundRuntime::new(h.clone(), h.clone(), h, NativeTlsProvider::default()),
}
}
}
/// Create and return a new Tokio multithreaded runtime.
fn create_tokio_runtime() -> IoResult<TokioRuntime> {
crate::impls::tokio::create_runtime().map(|r| TokioRuntime {
inner: CompoundRuntime::new(r.clone(), r.clone(), r, NativeTlsProvider::default()),
})
}
/// Create and return a new Tokio multithreaded runtime configured to use `rustls`.
#[cfg(feature = "rustls")]
fn create_tokio_rustls_runtime() -> IoResult<TokioRustlsRuntime> {
crate::impls::tokio::create_runtime().map(|r| TokioRustlsRuntime {
inner: CompoundRuntime::new(r.clone(), r.clone(), r, RustlsProvider::default()),
})
impl From<tokio_crate::runtime::Handle> for TokioRustlsRuntime {
fn from(h: tokio_crate::runtime::Handle) -> Self {
let h = Handle::new(h);
TokioRustlsRuntime {
inner: CompoundRuntime::new(h.clone(), h.clone(), h, RustlsProvider::default()),
}
}
}
/// Create a new Tokio-based [`Runtime`].
///
/// Generally you should call this function only once, and then use
/// [`Clone::clone()`] to create additional references to that
/// runtime.
///
/// Tokio users may want to avoid this function and instead make a
/// runtime using [`current_runtime()`]: this function always _builds_ a
/// runtime, and if you already have a runtime, that isn't what you
/// want with Tokio.
pub fn create_runtime() -> std::io::Result<impl Runtime> {
create_tokio_runtime()
#[cfg(feature = "native-tls")]
impl TokioNativeTlsRuntime {
/// Create a new [`TokioNativeTlsRuntime`].
///
/// The return value will own the underlying Tokio runtime object, which
/// will be dropped when the last copy of this handle is freed.
///
/// If you want to use a currently running runtime instead, call
/// [`TokioNativeTlsRuntime::current()`].
pub fn create() -> IoResult<Self> {
crate::impls::tokio::create_runtime().map(|r| TokioNativeTlsRuntime {
inner: CompoundRuntime::new(r.clone(), r.clone(), r, NativeTlsProvider::default()),
})
}
/// Return a [`TokioNativeTlsRuntime`] wrapping the currently running
/// Tokio runtime.
///
/// # Usage note
///
/// We should never call this from inside other Arti crates, or from library
/// crates that want to support multiple runtimes! This function is for
/// Arti _users_ who want to wrap some existing Tokio runtime as a
/// [`Runtime`](crate::Runtime). It is not for library crates that want to work with
/// multiple runtimes.
///
/// Once you have a runtime returned by this function, you should just
/// create more handles to it via [`Clone`].
pub fn current() -> IoResult<Self> {
Ok(current_handle()?.into())
}
/// Helper to run a single test function in a freshly created runtime.
///
/// # Panics
///
/// Panics if we can't create this runtime.
pub fn run_test<P, F, O>(func: P) -> O
where
P: FnOnce(Self) -> F,
F: futures::Future<Output = O>,
{
let runtime = Self::create().expect("Failed to create runtime");
runtime.clone().block_on(func(runtime))
}
}
/// Create a new Tokio-based [`Runtime`] with `rustls`.
///
/// Generally you should call this function only once, and then use
/// [`Clone::clone()`] to create additional references to that
/// runtime.
///
/// Tokio users may want to avoid this function and instead make a
/// runtime using [`current_runtime()`]: this function always _builds_ a
/// runtime, and if you already have a runtime, that isn't what you
/// want with Tokio.
#[cfg(feature = "rustls")]
pub fn create_rustls_runtime() -> std::io::Result<impl Runtime> {
create_tokio_rustls_runtime()
impl TokioRustlsRuntime {
/// Create a new [`TokioRustlsRuntime`].
///
/// The return value will own the underlying Tokio runtime object, which
/// will be dropped when the last copy of this handle is freed.
///
/// If you want to use a currently running runtime instead, call
/// [`TokioRustlsRuntime::current()`].
pub fn create() -> IoResult<Self> {
crate::impls::tokio::create_runtime().map(|r| TokioRustlsRuntime {
inner: CompoundRuntime::new(r.clone(), r.clone(), r, RustlsProvider::default()),
})
}
/// Return a [`TokioRustlsRuntime`] wrapping the currently running
/// Tokio runtime.
///
/// # Usage note
///
/// We should never call this from inside other Arti crates, or from library
/// crates that want to support multiple runtimes! This function is for
/// Arti _users_ who want to wrap some existing Tokio runtime as a
/// [`Runtime`](crate::Runtime). It is not for library crates that want to work with
/// multiple runtimes.
///
/// Once you have a runtime returned by this function, you should just
/// create more handles to it via [`Clone`].
pub fn current() -> IoResult<Self> {
Ok(current_handle()?.into())
}
/// Helper to run a single test function in a freshly created runtime.
///
/// # Panics
///
/// Panics if we can't create this runtime.
pub fn run_test<P, F, O>(func: P) -> O
where
P: FnOnce(Self) -> F,
F: futures::Future<Output = O>,
{
let runtime = Self::create().expect("Failed to create runtime");
runtime.clone().block_on(func(runtime))
}
}
/// Try to return an instance of the currently running tokio [`Runtime`].
///
/// # Usage note
///
/// We should never call this from inside other Arti crates, or from
/// library crates that want to support multiple runtimes! This
/// function is for Arti _users_ who want to wrap some existing Tokio
/// runtime as a [`Runtime`]. It is not for library
/// crates that want to work with multiple runtimes.
///
/// Once you have a runtime returned by this function, you should
/// just create more handles to it via [`Clone`].
pub fn current_runtime() -> std::io::Result<TokioRuntimeHandle> {
let handle = tokio_crate::runtime::Handle::try_current()
.map_err(|e| IoError::new(ErrorKind::Other, e))?;
let h = Handle::new(handle);
Ok(TokioRuntimeHandle {
inner: CompoundRuntime::new(h.clone(), h.clone(), h, NativeTlsProvider::default()),
})
}
/// Return an instance of the currently running tokio [`Runtime`], wrapped to
/// use `rustls`.
#[cfg(feature = "rustls")]
pub fn current_runtime_rustls() -> std::io::Result<TokioRustlsRuntimeHandle> {
let handle = tokio_crate::runtime::Handle::try_current()
.map_err(|e| IoError::new(ErrorKind::Other, e))?;
let h = Handle::new(handle);
Ok(TokioRustlsRuntimeHandle {
inner: CompoundRuntime::new(h.clone(), h.clone(), h, RustlsProvider::default()),
})
}
/// Run a test function using a freshly created tokio runtime.
///
/// # Panics
///
/// Panics if we can't create a tokio runtime.
pub fn test_with_runtime<P, F, O>(func: P) -> O
where
P: FnOnce(TokioRuntime) -> F,
F: futures::Future<Output = O>,
{
let runtime = create_tokio_runtime().expect("Failed to create a tokio runtime");
runtime.clone().block_on(func(runtime))
/// As `Handle::try_current()`, but return an IoError on failure.
#[cfg(any(feature = "native-tls", feature = "rustls"))]
fn current_handle() -> std::io::Result<tokio_crate::runtime::Handle> {
tokio_crate::runtime::Handle::try_current().map_err(|e| IoError::new(ErrorKind::Other, e))
}

View File

@ -16,7 +16,7 @@ use std::time::{Duration, Instant, SystemTime};
/// * [`SleepProvider`] to pause a task for a given amount of time.
/// * [`TcpProvider`] to launch and accept TCP connections.
/// * [`TlsProvider`] to launch TLS connections.
/// * [`SpawnBlocking`] to block on a future and run it to completion
/// * [`BlockOn`] to block on a future and run it to completion
/// (This may become optional in the future, if/when we add WASM
/// support).
///
@ -29,7 +29,7 @@ pub trait Runtime:
Sync
+ Send
+ Spawn
+ SpawnBlocking
+ BlockOn
+ Clone
+ SleepProvider
+ TcpProvider
@ -42,7 +42,7 @@ impl<T> Runtime for T where
T: Sync
+ Send
+ Spawn
+ SpawnBlocking
+ BlockOn
+ Clone
+ SleepProvider
+ TcpProvider
@ -105,7 +105,7 @@ pub trait SleepProvider {
}
/// Trait for a runtime that can block on a future.
pub trait SpawnBlocking {
pub trait BlockOn {
/// Run `future` until it is ready, and return its output.
fn block_on<F: Future>(&self, future: F) -> F::Output;
}

View File

@ -22,4 +22,4 @@ tor-rtcompat = { version = "0.0.3", path = "../tor-rtcompat" }
[dev-dependencies]
futures-await-test = "0.3.0"
rand = "0.8"
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio"] }
tor-rtcompat = { path="../tor-rtcompat", version = "0.0.3", features=["tokio", "native-tls" ] }

View File

@ -502,7 +502,7 @@ mod test {
}
#[test]
fn end_to_end() -> IoResult<()> {
fn end_to_end() {
test_with_all_runtimes!(|_rt| async {
let (client1, client2) = client_pair();
let lis = client2.listen(&"0.0.0.0:99".parse().unwrap()).await?;
@ -532,7 +532,7 @@ mod test {
r1?;
r2?;
IoResult::Ok(())
})
});
}
#[test]
@ -573,7 +573,7 @@ mod test {
}
#[test]
fn listener_stream() -> IoResult<()> {
fn listener_stream() {
test_with_all_runtimes!(|_rt| async {
let (client1, client2) = client_pair();
@ -602,16 +602,18 @@ mod test {
r1?;
r2?;
IoResult::Ok(())
})
});
}
#[test]
fn tls_basics() -> IoResult<()> {
fn tls_basics() {
let (client1, client2) = client_pair();
let cert = b"I am certified for something I assure you.";
let lis = client2.listen_tls(&"0.0.0.0:0".parse().unwrap(), cert[..].into())?;
let address = lis.local_addr()?;
let lis = client2
.listen_tls(&"0.0.0.0:0".parse().unwrap(), cert[..].into())
.unwrap();
let address = lis.local_addr().unwrap();
test_with_all_runtimes!(|_rt| async {
let (r1, r2): (IoResult<()>, IoResult<()>) = futures::join!(
@ -642,6 +644,6 @@ mod test {
r1?;
r2?;
IoResult::Ok(())
})
});
}
}

View File

@ -4,7 +4,7 @@
// we should make it so that more code is more shared.
use crate::net::MockNetProvider;
use tor_rtcompat::{Runtime, SleepProvider, SpawnBlocking, TcpProvider, TlsProvider};
use tor_rtcompat::{BlockOn, Runtime, SleepProvider, TcpProvider, TlsProvider};
use crate::io::LocalStream;
use async_trait::async_trait;
@ -48,7 +48,7 @@ impl<R: Runtime> Spawn for MockNetRuntime<R> {
}
}
impl<R: Runtime> SpawnBlocking for MockNetRuntime<R> {
impl<R: Runtime> BlockOn for MockNetRuntime<R> {
fn block_on<F: Future>(&self, future: F) -> F::Output {
self.runtime.block_on(future)
}

View File

@ -1,7 +1,7 @@
//! Declare MockSleepRuntime.
use crate::time::MockSleepProvider;
use tor_rtcompat::{Runtime, SleepProvider, SpawnBlocking, TcpProvider, TlsProvider};
use tor_rtcompat::{BlockOn, Runtime, SleepProvider, TcpProvider, TlsProvider};
use async_trait::async_trait;
use futures::task::{FutureObj, Spawn, SpawnError};
@ -86,7 +86,7 @@ impl<R: Runtime> Spawn for MockSleepRuntime<R> {
}
}
impl<R: Runtime> SpawnBlocking for MockSleepRuntime<R> {
impl<R: Runtime> BlockOn for MockSleepRuntime<R> {
fn block_on<F: Future>(&self, future: F) -> F::Output {
self.runtime.block_on(future)
}

View File

@ -455,7 +455,7 @@ mod test {
}
#[test]
fn time_moves_on() -> std::io::Result<()> {
fn time_moves_on() {
test_with_all_runtimes!(|_| async {
use futures::channel::oneshot;
use std::sync::atomic::AtomicBool;
@ -514,6 +514,6 @@ mod test {
}
);
std::io::Result::Ok(())
})
});
}
}