From 8c314185002d520f0869ea3bd0fc35736b103f98 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Mon, 1 Mar 2021 16:14:43 -0500 Subject: [PATCH] Port to work with tokio or async-std. This is fairly ugly and I think I'll need to mess around with the feature configuration a while until we get something that's pleasant to develop with. This still seems like a good idea, though, since we _will_ need to be executor-agnostic in the end, or we'll have no way to handle wasm or embedded environments. Later down the road, we'll probably want to use futures::Executor or futures::Spawn more than having global entry points in tor_rtcompat. That would probably make our feature story simpler. Tokio is the default now, since tokio seems to be more heavily used for performance-critical stuff. This patch breaks tests; the next one will fix them, albeit questionably. --- tor-chanmgr/Cargo.toml | 12 +++--- tor-chanmgr/src/transport.rs | 1 - tor-chanmgr/src/transport/nativetls.rs | 57 ++++++++++++++++++++------ tor-circmgr/Cargo.toml | 4 +- tor-client/Cargo.toml | 11 +++-- tor-client/src/proxy.rs | 13 +++++- tor-dirclient/Cargo.toml | 4 +- tor-dirclient/src/lib.rs | 3 +- tor-dirmgr/Cargo.toml | 4 +- tor-proto/Cargo.toml | 2 +- tor-rtcompat/Cargo.toml | 7 +++- tor-rtcompat/src/impls.rs | 3 ++ tor-rtcompat/src/impls/async_std.rs | 27 +++++++++++- tor-rtcompat/src/impls/tokio.rs | 45 ++++++++++++++++++++ tor-rtcompat/src/lib.rs | 20 ++++++++- 15 files changed, 177 insertions(+), 36 deletions(-) create mode 100644 tor-rtcompat/src/impls/tokio.rs diff --git a/tor-chanmgr/Cargo.toml b/tor-chanmgr/Cargo.toml index 880f9bc8c..5c21afd8c 100644 --- a/tor-chanmgr/Cargo.toml +++ b/tor-chanmgr/Cargo.toml @@ -7,12 +7,13 @@ license = "MIT OR Apache-2.0" publish = false [features] -default = [ "nativetls" ] +default = [ "tokio" ] -nativetls = [ "async-native-tls", "native-tls" ] +async-std = [ "async-native-tls" ] +tokio = [ "tokio-util", "tokio-native-tls" ] [dependencies] -tor-rtcompat = { path="../tor-rtcompat", version= "*" } +tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false } tor-proto = { path="../tor-proto", version= "*" } tor-linkspec = { path="../tor-linkspec", version= "*" } tor-llcrypto = { path="../tor-llcrypto", version= "*" } @@ -23,9 +24,10 @@ event-listener = "2.5.1" futures = "0.3.13" log = "0.4.14" thiserror = "1.0.24" - +tokio-util = { version = "0.6.3", features = [ "compat" ], optional = true } +tokio-native-tls = { version = "0.3.0", optional = true } async-native-tls = { version = "0.3.3", optional = true } -native-tls = { version = "0.2.7", optional = true } +native-tls = { version = "0.2.7" } [dev-dependencies] futures-await-test = "0.3.0" diff --git a/tor-chanmgr/src/transport.rs b/tor-chanmgr/src/transport.rs index a73294739..0b8ebd925 100644 --- a/tor-chanmgr/src/transport.rs +++ b/tor-chanmgr/src/transport.rs @@ -1,6 +1,5 @@ //! Types for launching TLS connections to relays -#[cfg(feature = "native-tls")] pub mod nativetls; use crate::Result; diff --git a/tor-chanmgr/src/transport/nativetls.rs b/tor-chanmgr/src/transport/nativetls.rs index 08ff8ce6f..2780db558 100644 --- a/tor-chanmgr/src/transport/nativetls.rs +++ b/tor-chanmgr/src/transport/nativetls.rs @@ -1,7 +1,8 @@ -//! Build TLS connections using the async_native_tls crate. -//! -//! Requires that this crate was built with the `nativetls` feature, -//! which is currently on-by-default. +//! Build TLS connections using the async_native_tls or tokio_native_tls crate. + +// XXXX-A2 This should get refactored significantly. Probably we should have +// a boxed-connection-factory type that we can use instead. Once we have a +// pluggable designn, we'll really need something like that. use super::{CertifiedConn, Transport}; use crate::{Error, Result}; @@ -10,14 +11,23 @@ use tor_rtcompat::net::TcpStream; use anyhow::Context; use async_trait::async_trait; +#[allow(unused)] use futures::io::{AsyncRead, AsyncWrite}; +#[cfg(feature = "async-std")] +use async_native_tls::{TlsConnector, TlsStream}; + +#[cfg(feature = "tokio")] +use tokio_native_tls::{TlsConnector, TlsStream}; +#[cfg(feature = "tokio")] +use tokio_util::compat::Compat; + use log::info; /// A Transport that uses async_native_tls. pub struct NativeTlsTransport { /// connector object used to build TLS connections - connector: async_native_tls::TlsConnector, + connector: TlsConnector, } impl NativeTlsTransport { @@ -26,17 +36,29 @@ impl NativeTlsTransport { pub fn new() -> Self { // These function names are scary, but they just mean that // we're skipping web pki, and using our own PKI functions. - let connector = async_native_tls::TlsConnector::new() + let mut builder = native_tls::TlsConnector::builder(); + builder .danger_accept_invalid_certs(true) .danger_accept_invalid_hostnames(true); + #[cfg(feature = "async-std")] + let connector = builder.into(); + + #[cfg(feature = "tokio")] + let connector = builder.build().unwrap().into(); // XXXX-A2 can panic + NativeTlsTransport { connector } } } +#[cfg(feature = "async-std")] +type TlsConnection = TlsStream; +#[cfg(feature = "tokio")] +type TlsConnection = Compat>; + #[async_trait] impl Transport for NativeTlsTransport { - type Connection = async_native_tls::TlsStream; + type Connection = TlsConnection; async fn connect( &self, @@ -56,17 +78,26 @@ impl Transport for NativeTlsTransport { .context("Can't make a TCP stream to target relay.")?; info!("Negotiating TLS with {}", addr); + // TODO: add a random hostname here if it will be used for SNI? - Ok((*addr, self.connector.connect("ignored", stream).await?)) + let connection = self.connector.connect("ignored", stream).await?; + #[cfg(feature = "tokio")] + let connection = { + use tokio_util::compat::TokioAsyncReadCompatExt; + connection.compat() + }; + Ok((*addr, connection)) } } -impl CertifiedConn for async_native_tls::TlsStream -where - S: AsyncRead + AsyncWrite + Unpin, -{ +impl CertifiedConn for TlsConnection { fn peer_cert(&self) -> Result>> { - match self.peer_certificate() { + #[cfg(feature = "async-std")] + let cert = self.peer_certificate(); + #[cfg(feature = "tokio")] + let cert = self.get_ref().get_ref().peer_certificate(); + + match cert { Ok(Some(cert)) => Ok(Some(cert.to_der()?)), Ok(None) => Ok(None), Err(e) => Err(e.into()), diff --git a/tor-circmgr/Cargo.toml b/tor-circmgr/Cargo.toml index 44212142f..521e8de1f 100644 --- a/tor-circmgr/Cargo.toml +++ b/tor-circmgr/Cargo.toml @@ -7,14 +7,14 @@ license = "MIT OR Apache-2.0" publish = false [dependencies] -tor-chanmgr = { path="../tor-chanmgr", version= "*" } +tor-chanmgr = { path="../tor-chanmgr", version= "*", default-features=false } tor-netdir = { path="../tor-netdir", version= "*" } tor-netdoc = { path="../tor-netdoc", version= "*" } tor-proto = { path="../tor-proto", version= "*" } tor-retry = { path="../tor-retry", version= "*" } tor-linkspec = { path="../tor-linkspec", version= "*" } tor-llcrypto = { path="../tor-llcrypto", version= "*" } -tor-rtcompat = { path="../tor-rtcompat", version= "*" } +tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false } anyhow = "1.0.38" async-trait = "0.1.42" diff --git a/tor-client/Cargo.toml b/tor-client/Cargo.toml index 63718cf22..24905d1ec 100644 --- a/tor-client/Cargo.toml +++ b/tor-client/Cargo.toml @@ -6,25 +6,28 @@ edition = "2018" license = "MIT OR Apache-2.0" publish = false +[features] +default = [ "tokio" ] +async-std = [ "tor-rtcompat/async-std", "tor-chanmgr/async-std" ] +tokio = [ "tor-rtcompat/tokio", "tor-chanmgr/tokio" ] + [dependencies] tor-circmgr = { path="../tor-circmgr", version= "*" } -tor-chanmgr = { path="../tor-chanmgr", version= "*" } +tor-chanmgr = { path="../tor-chanmgr", version= "*", default-features=false } tor-dirmgr = { path="../tor-dirmgr", version="*" } tor-netdir = { path="../tor-netdir", version= "*" } tor-netdoc = { path="../tor-netdoc", version= "*" } tor-proto = { path="../tor-proto", version= "*" } -tor-rtcompat = { path="../tor-rtcompat", version= "*" } +tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false } tor-socksproto = { path="../tor-socksproto", version= "*" } tor-dirclient = { path="../tor-dirclient", version = "*" } tor-linkspec = { path="../tor-linkspec", version = "*" } tor-config = { path="../tor-config", version = "*" } anyhow = "1.0.38" -async-native-tls = "0.3.3" config = "0.10.1" futures = "0.3.13" log = "0.4.14" -native-tls = "0.2.7" rand = "0.7.3" serde = { version = "1.0.123", features = ["derive"] } simple-logging = "2.0.2" diff --git a/tor-client/src/proxy.rs b/tor-client/src/proxy.rs index 6ac073510..a92ae3867 100644 --- a/tor-client/src/proxy.rs +++ b/tor-client/src/proxy.rs @@ -1,10 +1,13 @@ //! Implement a simple SOCKS proxy that relays connections over Tor. +#[allow(unused)] use futures::io::{AsyncReadExt, AsyncWriteExt}; use futures::stream::StreamExt; use log::{error, info, warn}; use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::Arc; +#[allow(unused)] +use tor_rtcompat::traits::*; use crate::{client::ConnectPrefs, TorClient}; use tor_proto::circuit::IPVersionPreference; @@ -38,7 +41,7 @@ async fn handle_socks_conn( ) -> Result<()> { let mut handshake = tor_socksproto::SocksHandshake::new(); - let (mut r, mut w) = stream.split(); + let (mut r, mut w) = tor_rtcompat::net::split_io(stream); let mut inbuf = [0_u8; 1024]; let mut n_read = 0; let request = loop { @@ -161,7 +164,13 @@ pub async fn run_socks_proxy(client: Arc, socks_port: u16) -> Result< error!("Couldn't open any listeners."); return Ok(()); // XXXX should return an error. } - let mut incoming = futures::stream::select_all(listeners.iter().map(TcpListener::incoming)); + #[cfg(feature = "async-std")] + let streams_iter = listeners.iter(); + #[cfg(feature = "tokio")] + let streams_iter = listeners.into_iter(); + + let mut incoming = + futures::stream::select_all(streams_iter.map(tor_rtcompat::net::listener_to_stream)); while let Some(stream) = incoming.next().await { let stream = stream.context("Failed to receive incoming stream on SOCKS port")?; diff --git a/tor-dirclient/Cargo.toml b/tor-dirclient/Cargo.toml index b64963690..9be8f6dbe 100644 --- a/tor-dirclient/Cargo.toml +++ b/tor-dirclient/Cargo.toml @@ -8,12 +8,12 @@ publish = false [dependencies] tor-circmgr = { path="../tor-circmgr", version= "*" } -tor-chanmgr = { path="../tor-chanmgr", version= "*" } +tor-chanmgr = { path="../tor-chanmgr", version= "*", default-features=false } tor-llcrypto = { path="../tor-llcrypto", version= "*" } tor-proto = { path="../tor-proto", version= "*" } tor-netdoc = { path="../tor-netdoc", version= "*" } tor-netdir = { path="../tor-netdir", version= "*" } -tor-rtcompat = { path="../tor-rtcompat", version= "*" } +tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false } anyhow = "1.0.38" #async-trait = "0.1.41" diff --git a/tor-dirclient/src/lib.rs b/tor-dirclient/src/lib.rs index 2b7c931f9..5473f9f98 100644 --- a/tor-dirclient/src/lib.rs +++ b/tor-dirclient/src/lib.rs @@ -234,7 +234,8 @@ async fn read_and_decompress( // XXX should be an option and is too long. let read_timeout = Duration::from_secs(10); - let mut timer = tor_rtcompat::timer::Timer::after(read_timeout).fuse(); + let timer = tor_rtcompat::timer::sleep(read_timeout).fuse(); + futures::pin_mut!(timer); loop { let status = futures::select! { diff --git a/tor-dirmgr/Cargo.toml b/tor-dirmgr/Cargo.toml index d5155c341..10faf26ed 100644 --- a/tor-dirmgr/Cargo.toml +++ b/tor-dirmgr/Cargo.toml @@ -12,7 +12,7 @@ legacy-store = [] mmap = [ "memmap" ] [dependencies] -tor-chanmgr = { path="../tor-chanmgr", version= "*" } +tor-chanmgr = { path="../tor-chanmgr", version= "*", default-features=false } tor-checkable = { path="../tor-checkable", version= "*" } tor-circmgr = { path="../tor-circmgr", version= "*" } tor-consdiff = { path="../tor-consdiff", version= "*" } @@ -21,7 +21,7 @@ tor-netdir = { path="../tor-netdir", version= "*" } tor-netdoc = { path="../tor-netdoc", version= "*" } tor-llcrypto = { path="../tor-llcrypto", version= "*" } tor-retry = { path="../tor-retry", version= "*" } -tor-rtcompat = { path="../tor-rtcompat", version= "*" } +tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false } anyhow = "1.0.38" async-rwlock = "1.3.0" diff --git a/tor-proto/Cargo.toml b/tor-proto/Cargo.toml index cca632cb1..89c093982 100644 --- a/tor-proto/Cargo.toml +++ b/tor-proto/Cargo.toml @@ -7,7 +7,7 @@ license = "MIT OR Apache-2.0" publish = false [dependencies] -tor-rtcompat = { path="../tor-rtcompat", version= "*" } +tor-rtcompat = { path="../tor-rtcompat", default-features=false } tor-llcrypto = { path="../tor-llcrypto" } tor-bytes = { path="../tor-bytes" } tor-cert = { path="../tor-cert" } diff --git a/tor-rtcompat/Cargo.toml b/tor-rtcompat/Cargo.toml index f444f9d17..346bed689 100644 --- a/tor-rtcompat/Cargo.toml +++ b/tor-rtcompat/Cargo.toml @@ -8,11 +8,16 @@ publish = false [features] -default = [ "async-std" ] +default = [ "tokio" ] async-std = [ "async-std-crate", "async-io" ] +tokio = [ "tokio-crate", "tokio-stream" ] [dependencies] +futures = "0.3.13" + async-std-crate = { package = "async-std", version = "1.7.0", optional = true } async-io = { version = "1.3.1", optional = true } +tokio-crate = { package = "tokio", version = "1.2.0", optional = true, features = ["full"] } +tokio-stream = { version = "0.1.3", optional = true, features = ["net"]} \ No newline at end of file diff --git a/tor-rtcompat/src/impls.rs b/tor-rtcompat/src/impls.rs index 7ee674bc8..0ea333933 100644 --- a/tor-rtcompat/src/impls.rs +++ b/tor-rtcompat/src/impls.rs @@ -4,3 +4,6 @@ #[cfg(all(feature = "async-std"))] pub(crate) mod async_std; + +#[cfg(all(feature = "tokio"))] +pub(crate) mod tokio; diff --git a/tor-rtcompat/src/impls/async_std.rs b/tor-rtcompat/src/impls/async_std.rs index a18da96be..70a78fb89 100644 --- a/tor-rtcompat/src/impls/async_std.rs +++ b/tor-rtcompat/src/impls/async_std.rs @@ -8,6 +8,22 @@ /// Types used for networking (async_std implementation) pub mod net { pub use async_std_crate::net::{TcpListener, TcpStream}; + + /// Split a read/write stream into its read and write halves. + pub fn split_io(stream: T) -> (futures::io::ReadHalf, futures::io::WriteHalf) + where + T: futures::io::AsyncRead + futures::io::AsyncWrite, + { + use futures::io::AsyncReadExt; + stream.split() + } + + /// Return a stream that yields incoming sockets from `lis` + pub fn listener_to_stream<'a>( + lis: &'a TcpListener, + ) -> impl futures::stream::Stream> + 'a { + lis.incoming() + } } /// Functions for launching and managing tasks (async_std implementation) @@ -17,6 +33,15 @@ pub mod task { /// Functions and types for manipulating timers (async_std implementation) pub mod timer { - pub use async_io::Timer; + use std::time::Duration; + + /// Return a future that will be ready after `duration` has passed. + pub fn sleep(duration: Duration) -> async_io::Timer { + async_io::Timer::after(duration) + } + pub use async_std_crate::future::{timeout, TimeoutError}; } + +/// Traits specific to async_std +pub mod traits {} diff --git a/tor-rtcompat/src/impls/tokio.rs b/tor-rtcompat/src/impls/tokio.rs new file mode 100644 index 000000000..cb498b951 --- /dev/null +++ b/tor-rtcompat/src/impls/tokio.rs @@ -0,0 +1,45 @@ +//! Re-exports of the tokio runtime for use with arti. +//! +//! This crate helps define a slim API around our async runtime so that we +//! can easily swap it out. + +/// Types used for networking (tokio implementation) +pub mod net { + pub use tokio_crate::io::split as split_io; + pub use tokio_crate::net::{TcpListener, TcpStream}; + + /// Return a stream that yields incoming sockets from `lis` + pub fn listener_to_stream( + lis: TcpListener, + ) -> impl futures::stream::Stream> { + tokio_stream::wrappers::TcpListenerStream::new(lis) + } +} + +/// Functions for launching and managing tasks (tokio implementation) +pub mod task { + use std::future::Future; + use tokio_crate::runtime::Runtime; + + /// Create a runtime and run `future` to completion + pub fn block_on(future: F) -> F::Output { + let rt = Runtime::new().unwrap(); // XXXX Not good: This could panic. + rt.block_on(future) + } + + pub use tokio_crate::spawn; + pub use tokio_crate::time::sleep; +} + +/// Functions and types for manipulating timers (tokio implementation) +pub mod timer { + pub use tokio_crate::time::{error::Elapsed as TimeoutError, sleep, timeout}; +} + +/// Traits specific to the runtime. +pub mod traits { + pub use tokio_crate::io::{ + AsyncRead as TokioAsyncRead, AsyncReadExt as TokioAsyncReadExt, + AsyncWrite as TokioAsyncWrite, AsyncWriteExt as TokioAsyncWriteExt, + }; +} diff --git a/tor-rtcompat/src/lib.rs b/tor-rtcompat/src/lib.rs index 6588450d1..ebd69ddb7 100644 --- a/tor-rtcompat/src/lib.rs +++ b/tor-rtcompat/src/lib.rs @@ -15,9 +15,22 @@ pub(crate) mod impls; -#[cfg(all(feature = "async-std"))] +// TODO: This is not an ideal situation, and it's arguably an abuse of +// the features feature. But I can't currently find a reasonable way +// to have the code use the right version of things like "sleep" or +// "spawn" otherwise. +#[cfg(all(feature = "async-std", feature = "tokio"))] +compile_error!("Sorry: At most one of the async-std and tokio features can be used at a time."); + +#[cfg(not(any(feature = "async-std", feature = "tokio")))] +compile_error!("Sorry: Exactly one one of the tor-rtcompat/async-std and tor-rtcompat/tokio features must be specified."); + +#[cfg(feature = "async-std")] use impls::async_std as imp; +#[cfg(all(feature = "tokio", not(feature = "async-std")))] +use impls::tokio as imp; + /// Types used for networking (async_std implementation) pub mod net { pub use crate::imp::net::*; @@ -81,3 +94,8 @@ pub mod timer { } } } + +/// Traits specific to the runtime in use. +pub mod traits { + pub use crate::imp::traits::*; +}