From fb2c7cb85a76acb2a80ae467a635ddb52d80a363 Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Tue, 19 Oct 2021 15:49:49 -0400 Subject: [PATCH] Also implement tokio Async{Read,Write} on Data{Reader,Writer}. This will let callers use the tokio traits on these types too, if they call `split()` on the DataStream. (Tokio also has a `tokio::io::split()` method, but it requires a lock whereas `DataStream::split()` doesn't.) --- crates/tor-proto/src/stream/data.rs | 36 +++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/crates/tor-proto/src/stream/data.rs b/crates/tor-proto/src/stream/data.rs index aed5567e3..a96472dd7 100644 --- a/crates/tor-proto/src/stream/data.rs +++ b/crates/tor-proto/src/stream/data.rs @@ -14,7 +14,7 @@ use tokio_crate::io::ReadBuf; #[cfg(feature = "tokio")] use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite}; #[cfg(feature = "tokio")] -use tokio_util::compat::FuturesAsyncReadCompatExt; +use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; use std::io::Result as IoResult; use std::pin::Pin; @@ -104,7 +104,7 @@ impl AsyncRead for DataStream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.r).poll_read(cx, buf) + AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf) } } @@ -125,13 +125,13 @@ impl AsyncWrite for DataStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.w).poll_write(cx, buf) + AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.w).poll_flush(cx) + AsyncWrite::poll_flush(Pin::new(&mut self.w), cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.w).poll_close(cx) + AsyncWrite::poll_close(Pin::new(&mut self.w), cx) } } @@ -288,6 +288,21 @@ impl AsyncWrite for DataWriter { } } +#[cfg(feature = "tokio")] +impl TokioAsyncWrite for DataWriter { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx) + } +} + impl DataWriterImpl { /// Try to flush the current buffer contents as a data cell. async fn flush_buf(mut self) -> (Self, Result<()>) { @@ -414,6 +429,17 @@ impl AsyncRead for DataReader { } } +#[cfg(feature = "tokio")] +impl TokioAsyncRead for DataReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf) + } +} + impl DataReaderImpl { /// Pull as many bytes as we can off of self.pending, and return that /// number of bytes.