Port to work with tokio or async-std.

This is fairly ugly and I think I'll need to mess around with the
feature configuration a while until we get something that's pleasant
to develop with.  This still seems like a good idea, though, since
we _will_ need to be executor-agnostic in the end, or we'll have no
way to handle wasm or embedded environments.

Later down the road, we'll probably want to use futures::Executor or
futures::Spawn more than having global entry points in
tor_rtcompat.  That would probably make our feature story simpler.

Tokio is the default now, since tokio seems to be more heavily used
for performance-critical stuff.

This patch breaks tests; the next one will fix them, albeit
questionably.
This commit is contained in:
Nick Mathewson 2021-03-01 16:14:43 -05:00
parent b33bc268d5
commit 8c31418500
15 changed files with 177 additions and 36 deletions

View File

@ -7,12 +7,13 @@ license = "MIT OR Apache-2.0"
publish = false
[features]
default = [ "nativetls" ]
default = [ "tokio" ]
nativetls = [ "async-native-tls", "native-tls" ]
async-std = [ "async-native-tls" ]
tokio = [ "tokio-util", "tokio-native-tls" ]
[dependencies]
tor-rtcompat = { path="../tor-rtcompat", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false }
tor-proto = { path="../tor-proto", version= "*" }
tor-linkspec = { path="../tor-linkspec", version= "*" }
tor-llcrypto = { path="../tor-llcrypto", version= "*" }
@ -23,9 +24,10 @@ event-listener = "2.5.1"
futures = "0.3.13"
log = "0.4.14"
thiserror = "1.0.24"
tokio-util = { version = "0.6.3", features = [ "compat" ], optional = true }
tokio-native-tls = { version = "0.3.0", optional = true }
async-native-tls = { version = "0.3.3", optional = true }
native-tls = { version = "0.2.7", optional = true }
native-tls = { version = "0.2.7" }
[dev-dependencies]
futures-await-test = "0.3.0"

View File

@ -1,6 +1,5 @@
//! Types for launching TLS connections to relays
#[cfg(feature = "native-tls")]
pub mod nativetls;
use crate::Result;

View File

@ -1,7 +1,8 @@
//! Build TLS connections using the async_native_tls crate.
//!
//! Requires that this crate was built with the `nativetls` feature,
//! which is currently on-by-default.
//! Build TLS connections using the async_native_tls or tokio_native_tls crate.
// XXXX-A2 This should get refactored significantly. Probably we should have
// a boxed-connection-factory type that we can use instead. Once we have a
// pluggable designn, we'll really need something like that.
use super::{CertifiedConn, Transport};
use crate::{Error, Result};
@ -10,14 +11,23 @@ use tor_rtcompat::net::TcpStream;
use anyhow::Context;
use async_trait::async_trait;
#[allow(unused)]
use futures::io::{AsyncRead, AsyncWrite};
#[cfg(feature = "async-std")]
use async_native_tls::{TlsConnector, TlsStream};
#[cfg(feature = "tokio")]
use tokio_native_tls::{TlsConnector, TlsStream};
#[cfg(feature = "tokio")]
use tokio_util::compat::Compat;
use log::info;
/// A Transport that uses async_native_tls.
pub struct NativeTlsTransport {
/// connector object used to build TLS connections
connector: async_native_tls::TlsConnector,
connector: TlsConnector,
}
impl NativeTlsTransport {
@ -26,17 +36,29 @@ impl NativeTlsTransport {
pub fn new() -> Self {
// These function names are scary, but they just mean that
// we're skipping web pki, and using our own PKI functions.
let connector = async_native_tls::TlsConnector::new()
let mut builder = native_tls::TlsConnector::builder();
builder
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true);
#[cfg(feature = "async-std")]
let connector = builder.into();
#[cfg(feature = "tokio")]
let connector = builder.build().unwrap().into(); // XXXX-A2 can panic
NativeTlsTransport { connector }
}
}
#[cfg(feature = "async-std")]
type TlsConnection = TlsStream<TcpStream>;
#[cfg(feature = "tokio")]
type TlsConnection = Compat<TlsStream<TcpStream>>;
#[async_trait]
impl Transport for NativeTlsTransport {
type Connection = async_native_tls::TlsStream<TcpStream>;
type Connection = TlsConnection;
async fn connect<T: ChanTarget + Sync + ?Sized>(
&self,
@ -56,17 +78,26 @@ impl Transport for NativeTlsTransport {
.context("Can't make a TCP stream to target relay.")?;
info!("Negotiating TLS with {}", addr);
// TODO: add a random hostname here if it will be used for SNI?
Ok((*addr, self.connector.connect("ignored", stream).await?))
let connection = self.connector.connect("ignored", stream).await?;
#[cfg(feature = "tokio")]
let connection = {
use tokio_util::compat::TokioAsyncReadCompatExt;
connection.compat()
};
Ok((*addr, connection))
}
}
impl<S> CertifiedConn for async_native_tls::TlsStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
impl CertifiedConn for TlsConnection {
fn peer_cert(&self) -> Result<Option<Vec<u8>>> {
match self.peer_certificate() {
#[cfg(feature = "async-std")]
let cert = self.peer_certificate();
#[cfg(feature = "tokio")]
let cert = self.get_ref().get_ref().peer_certificate();
match cert {
Ok(Some(cert)) => Ok(Some(cert.to_der()?)),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),

View File

@ -7,14 +7,14 @@ license = "MIT OR Apache-2.0"
publish = false
[dependencies]
tor-chanmgr = { path="../tor-chanmgr", version= "*" }
tor-chanmgr = { path="../tor-chanmgr", version= "*", default-features=false }
tor-netdir = { path="../tor-netdir", version= "*" }
tor-netdoc = { path="../tor-netdoc", version= "*" }
tor-proto = { path="../tor-proto", version= "*" }
tor-retry = { path="../tor-retry", version= "*" }
tor-linkspec = { path="../tor-linkspec", version= "*" }
tor-llcrypto = { path="../tor-llcrypto", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false }
anyhow = "1.0.38"
async-trait = "0.1.42"

View File

@ -6,25 +6,28 @@ edition = "2018"
license = "MIT OR Apache-2.0"
publish = false
[features]
default = [ "tokio" ]
async-std = [ "tor-rtcompat/async-std", "tor-chanmgr/async-std" ]
tokio = [ "tor-rtcompat/tokio", "tor-chanmgr/tokio" ]
[dependencies]
tor-circmgr = { path="../tor-circmgr", version= "*" }
tor-chanmgr = { path="../tor-chanmgr", version= "*" }
tor-chanmgr = { path="../tor-chanmgr", version= "*", default-features=false }
tor-dirmgr = { path="../tor-dirmgr", version="*" }
tor-netdir = { path="../tor-netdir", version= "*" }
tor-netdoc = { path="../tor-netdoc", version= "*" }
tor-proto = { path="../tor-proto", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false }
tor-socksproto = { path="../tor-socksproto", version= "*" }
tor-dirclient = { path="../tor-dirclient", version = "*" }
tor-linkspec = { path="../tor-linkspec", version = "*" }
tor-config = { path="../tor-config", version = "*" }
anyhow = "1.0.38"
async-native-tls = "0.3.3"
config = "0.10.1"
futures = "0.3.13"
log = "0.4.14"
native-tls = "0.2.7"
rand = "0.7.3"
serde = { version = "1.0.123", features = ["derive"] }
simple-logging = "2.0.2"

View File

@ -1,10 +1,13 @@
//! Implement a simple SOCKS proxy that relays connections over Tor.
#[allow(unused)]
use futures::io::{AsyncReadExt, AsyncWriteExt};
use futures::stream::StreamExt;
use log::{error, info, warn};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
#[allow(unused)]
use tor_rtcompat::traits::*;
use crate::{client::ConnectPrefs, TorClient};
use tor_proto::circuit::IPVersionPreference;
@ -38,7 +41,7 @@ async fn handle_socks_conn(
) -> Result<()> {
let mut handshake = tor_socksproto::SocksHandshake::new();
let (mut r, mut w) = stream.split();
let (mut r, mut w) = tor_rtcompat::net::split_io(stream);
let mut inbuf = [0_u8; 1024];
let mut n_read = 0;
let request = loop {
@ -161,7 +164,13 @@ pub async fn run_socks_proxy(client: Arc<TorClient>, socks_port: u16) -> Result<
error!("Couldn't open any listeners.");
return Ok(()); // XXXX should return an error.
}
let mut incoming = futures::stream::select_all(listeners.iter().map(TcpListener::incoming));
#[cfg(feature = "async-std")]
let streams_iter = listeners.iter();
#[cfg(feature = "tokio")]
let streams_iter = listeners.into_iter();
let mut incoming =
futures::stream::select_all(streams_iter.map(tor_rtcompat::net::listener_to_stream));
while let Some(stream) = incoming.next().await {
let stream = stream.context("Failed to receive incoming stream on SOCKS port")?;

View File

@ -8,12 +8,12 @@ publish = false
[dependencies]
tor-circmgr = { path="../tor-circmgr", version= "*" }
tor-chanmgr = { path="../tor-chanmgr", version= "*" }
tor-chanmgr = { path="../tor-chanmgr", version= "*", default-features=false }
tor-llcrypto = { path="../tor-llcrypto", version= "*" }
tor-proto = { path="../tor-proto", version= "*" }
tor-netdoc = { path="../tor-netdoc", version= "*" }
tor-netdir = { path="../tor-netdir", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false }
anyhow = "1.0.38"
#async-trait = "0.1.41"

View File

@ -234,7 +234,8 @@ async fn read_and_decompress(
// XXX should be an option and is too long.
let read_timeout = Duration::from_secs(10);
let mut timer = tor_rtcompat::timer::Timer::after(read_timeout).fuse();
let timer = tor_rtcompat::timer::sleep(read_timeout).fuse();
futures::pin_mut!(timer);
loop {
let status = futures::select! {

View File

@ -12,7 +12,7 @@ legacy-store = []
mmap = [ "memmap" ]
[dependencies]
tor-chanmgr = { path="../tor-chanmgr", version= "*" }
tor-chanmgr = { path="../tor-chanmgr", version= "*", default-features=false }
tor-checkable = { path="../tor-checkable", version= "*" }
tor-circmgr = { path="../tor-circmgr", version= "*" }
tor-consdiff = { path="../tor-consdiff", version= "*" }
@ -21,7 +21,7 @@ tor-netdir = { path="../tor-netdir", version= "*" }
tor-netdoc = { path="../tor-netdoc", version= "*" }
tor-llcrypto = { path="../tor-llcrypto", version= "*" }
tor-retry = { path="../tor-retry", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", version= "*", default-features=false }
anyhow = "1.0.38"
async-rwlock = "1.3.0"

View File

@ -7,7 +7,7 @@ license = "MIT OR Apache-2.0"
publish = false
[dependencies]
tor-rtcompat = { path="../tor-rtcompat", version= "*" }
tor-rtcompat = { path="../tor-rtcompat", default-features=false }
tor-llcrypto = { path="../tor-llcrypto" }
tor-bytes = { path="../tor-bytes" }
tor-cert = { path="../tor-cert" }

View File

@ -8,11 +8,16 @@ publish = false
[features]
default = [ "async-std" ]
default = [ "tokio" ]
async-std = [ "async-std-crate", "async-io" ]
tokio = [ "tokio-crate", "tokio-stream" ]
[dependencies]
futures = "0.3.13"
async-std-crate = { package = "async-std", version = "1.7.0", optional = true }
async-io = { version = "1.3.1", optional = true }
tokio-crate = { package = "tokio", version = "1.2.0", optional = true, features = ["full"] }
tokio-stream = { version = "0.1.3", optional = true, features = ["net"]}

View File

@ -4,3 +4,6 @@
#[cfg(all(feature = "async-std"))]
pub(crate) mod async_std;
#[cfg(all(feature = "tokio"))]
pub(crate) mod tokio;

View File

@ -8,6 +8,22 @@
/// Types used for networking (async_std implementation)
pub mod net {
pub use async_std_crate::net::{TcpListener, TcpStream};
/// Split a read/write stream into its read and write halves.
pub fn split_io<T>(stream: T) -> (futures::io::ReadHalf<T>, futures::io::WriteHalf<T>)
where
T: futures::io::AsyncRead + futures::io::AsyncWrite,
{
use futures::io::AsyncReadExt;
stream.split()
}
/// Return a stream that yields incoming sockets from `lis`
pub fn listener_to_stream<'a>(
lis: &'a TcpListener,
) -> impl futures::stream::Stream<Item = Result<TcpStream, std::io::Error>> + 'a {
lis.incoming()
}
}
/// Functions for launching and managing tasks (async_std implementation)
@ -17,6 +33,15 @@ pub mod task {
/// Functions and types for manipulating timers (async_std implementation)
pub mod timer {
pub use async_io::Timer;
use std::time::Duration;
/// Return a future that will be ready after `duration` has passed.
pub fn sleep(duration: Duration) -> async_io::Timer {
async_io::Timer::after(duration)
}
pub use async_std_crate::future::{timeout, TimeoutError};
}
/// Traits specific to async_std
pub mod traits {}

View File

@ -0,0 +1,45 @@
//! Re-exports of the tokio runtime for use with arti.
//!
//! This crate helps define a slim API around our async runtime so that we
//! can easily swap it out.
/// Types used for networking (tokio implementation)
pub mod net {
pub use tokio_crate::io::split as split_io;
pub use tokio_crate::net::{TcpListener, TcpStream};
/// Return a stream that yields incoming sockets from `lis`
pub fn listener_to_stream(
lis: TcpListener,
) -> impl futures::stream::Stream<Item = Result<TcpStream, std::io::Error>> {
tokio_stream::wrappers::TcpListenerStream::new(lis)
}
}
/// Functions for launching and managing tasks (tokio implementation)
pub mod task {
use std::future::Future;
use tokio_crate::runtime::Runtime;
/// Create a runtime and run `future` to completion
pub fn block_on<F: Future>(future: F) -> F::Output {
let rt = Runtime::new().unwrap(); // XXXX Not good: This could panic.
rt.block_on(future)
}
pub use tokio_crate::spawn;
pub use tokio_crate::time::sleep;
}
/// Functions and types for manipulating timers (tokio implementation)
pub mod timer {
pub use tokio_crate::time::{error::Elapsed as TimeoutError, sleep, timeout};
}
/// Traits specific to the runtime.
pub mod traits {
pub use tokio_crate::io::{
AsyncRead as TokioAsyncRead, AsyncReadExt as TokioAsyncReadExt,
AsyncWrite as TokioAsyncWrite, AsyncWriteExt as TokioAsyncWriteExt,
};
}

View File

@ -15,9 +15,22 @@
pub(crate) mod impls;
#[cfg(all(feature = "async-std"))]
// TODO: This is not an ideal situation, and it's arguably an abuse of
// the features feature. But I can't currently find a reasonable way
// to have the code use the right version of things like "sleep" or
// "spawn" otherwise.
#[cfg(all(feature = "async-std", feature = "tokio"))]
compile_error!("Sorry: At most one of the async-std and tokio features can be used at a time.");
#[cfg(not(any(feature = "async-std", feature = "tokio")))]
compile_error!("Sorry: Exactly one one of the tor-rtcompat/async-std and tor-rtcompat/tokio features must be specified.");
#[cfg(feature = "async-std")]
use impls::async_std as imp;
#[cfg(all(feature = "tokio", not(feature = "async-std")))]
use impls::tokio as imp;
/// Types used for networking (async_std implementation)
pub mod net {
pub use crate::imp::net::*;
@ -81,3 +94,8 @@ pub mod timer {
}
}
}
/// Traits specific to the runtime in use.
pub mod traits {
pub use crate::imp::traits::*;
}