Also implement tokio Async{Read,Write} on Data{Reader,Writer}.

This will let callers use the tokio traits on these types too, if
they call `split()` on the DataStream.

(Tokio also has a `tokio::io::split()` method, but it requires a
lock whereas `DataStream::split()` doesn't.)
This commit is contained in:
Nick Mathewson 2021-10-19 15:49:49 -04:00
parent a9a9f70eb9
commit fb2c7cb85a
1 changed files with 31 additions and 5 deletions

View File

@ -14,7 +14,7 @@ use tokio_crate::io::ReadBuf;
#[cfg(feature = "tokio")]
use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
#[cfg(feature = "tokio")]
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
use std::io::Result as IoResult;
use std::pin::Pin;
@ -104,7 +104,7 @@ impl AsyncRead for DataStream {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<IoResult<usize>> {
Pin::new(&mut self.r).poll_read(cx, buf)
AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
}
}
@ -125,13 +125,13 @@ impl AsyncWrite for DataStream {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<IoResult<usize>> {
Pin::new(&mut self.w).poll_write(cx, buf)
AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
Pin::new(&mut self.w).poll_flush(cx)
AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
Pin::new(&mut self.w).poll_close(cx)
AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
}
}
@ -288,6 +288,21 @@ impl AsyncWrite for DataWriter {
}
}
#[cfg(feature = "tokio")]
impl TokioAsyncWrite for DataWriter {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
}
}
impl DataWriterImpl {
/// Try to flush the current buffer contents as a data cell.
async fn flush_buf(mut self) -> (Self, Result<()>) {
@ -414,6 +429,17 @@ impl AsyncRead for DataReader {
}
}
#[cfg(feature = "tokio")]
impl TokioAsyncRead for DataReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
}
}
impl DataReaderImpl {
/// Pull as many bytes as we can off of self.pending, and return that
/// number of bytes.