tor-proto: implement tokio Async{Read, Write} traits conditionally

futures::io::AsyncRead (and Write) isn't the same thing as tokio::io::AsyncRead,
which is a somewhat annoying misfeature of the Rust async ecosystem (!).

To mitigate this somewhat for people trying to use the `DataStream` struct with
tokio, implement the tokio versions of the above traits using `tokio-util`'s
compat layer, if a crate feature (`tokio`) is enabled.
This commit is contained in:
eta 2021-10-19 17:26:52 +01:00
parent b42a6712c9
commit ccd1d36e90
4 changed files with 40 additions and 1 deletions

2
Cargo.lock generated
View File

@ -2742,6 +2742,8 @@ dependencies = [
"rand_core 0.6.3",
"subtle",
"thiserror",
"tokio",
"tokio-util",
"tor-bytes",
"tor-cell",
"tor-cert",

View File

@ -13,7 +13,7 @@ repository="https://gitlab.torproject.org/tpo/core/arti.git/"
[features]
default = [ "tokio" ]
async-std = [ "tor-rtcompat/async-std" ]
tokio = [ "tor-rtcompat/tokio" ]
tokio = [ "tor-rtcompat/tokio", "tor-proto/tokio" ]
static = [ "tor-rtcompat/static", "tor-dirmgr/static" ]
experimental-api = []

View File

@ -14,6 +14,7 @@ repository="https://gitlab.torproject.org/tpo/core/arti.git/"
default = []
hs = []
ntor_v3 = []
tokio = [ "tokio-crate", "tokio-util" ]
[dependencies]
tor-llcrypto = { path="../tor-llcrypto", version="0.0.0" }
@ -43,6 +44,9 @@ thiserror = "1.0.24"
typenum = "1.13.0"
zeroize = "1.3.0"
tokio-crate = { package = "tokio", version = "1.7.0", optional = true }
tokio-util = { version = "0.6", features = ["compat"], optional = true }
[dev-dependencies]
futures-await-test = "0.3.0"
hex-literal = "0.3.1"

View File

@ -9,6 +9,13 @@ use futures::io::{AsyncRead, AsyncWrite};
use futures::task::{Context, Poll};
use futures::Future;
#[cfg(feature = "tokio")]
use tokio_crate::io::ReadBuf;
#[cfg(feature = "tokio")]
use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
#[cfg(feature = "tokio")]
use tokio_util::compat::FuturesAsyncReadCompatExt;
use std::io::Result as IoResult;
use std::pin::Pin;
use std::sync::Arc;
@ -94,6 +101,17 @@ impl AsyncRead for DataStream {
}
}
#[cfg(feature = "tokio")]
impl TokioAsyncRead for DataStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
}
}
impl AsyncWrite for DataStream {
fn poll_write(
mut self: Pin<&mut Self>,
@ -110,6 +128,21 @@ impl AsyncWrite for DataStream {
}
}
#[cfg(feature = "tokio")]
impl TokioAsyncWrite for DataStream {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
}
}
/// An enumeration for the state of a DataWriter.
///
/// We have to use an enum here because, for as long as we're waiting