rpc: Use Pin<Box<Stream/Sink>> for run_loop

This commit is contained in:
Nick Mathewson 2023-04-11 10:02:37 -04:00
parent 3ad5d2f6ea
commit a02a200ccd
2 changed files with 24 additions and 16 deletions

View File

@ -29,15 +29,17 @@ pub async fn accept_connections<P: AsRef<Path>>(path: P) -> Result<()> {
let (stream, _addr) = listener.accept().await?;
let session = Arc::new(crate::session::Session::new());
let (input, output) = stream.into_split();
let input = asynchronous_codec::FramedRead::new(
input.compat(),
asynchronous_codec::JsonCodec::<(), Request>::new(),
)
.fuse();
let output = asynchronous_codec::FramedWrite::new(
let input = Box::pin(
asynchronous_codec::FramedRead::new(
input.compat(),
asynchronous_codec::JsonCodec::<(), Request>::new(),
)
.fuse(),
);
let output = Box::pin(asynchronous_codec::FramedWrite::new(
output.compat_write(),
crate::streams::JsonLinesEncoder::<BoxedResponse>::default(),
);
));
tokio::spawn(async {
let result = session.run_loop(input, output).await;

View File

@ -60,6 +60,17 @@ const UPDATE_CHAN_SIZE: usize = 128;
/// Channel type used to send updates to the main session loop.
type UpdateSender = mpsc::Sender<BoxedResponse>;
/// A type-erased [`FusedStream`] yielding [`Request`]s.
//
// (We name this type and [`BoxedResponseSink`] below so as to keep the signature for run_loop
// nice and simple.)
pub(crate) type BoxedRequestStream =
Pin<Box<dyn FusedStream<Item = Result<Request, asynchronous_codec::JsonCodecError>> + Send>>;
/// A type-erased [`Sink`] accepting [`BoxedResponse`]s.
pub(crate) type BoxedResponseSink =
Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>;
impl Session {
/// Create a new session.
pub(crate) fn new() -> Self {
@ -108,16 +119,11 @@ impl Session {
/// Run in a loop, handling requests from `request_stream` and writing
/// responses onto `response_stream`.
pub(crate) async fn run_loop<IN, OUT>(
pub(crate) async fn run_loop(
self: Arc<Self>,
mut request_stream: IN,
mut response_sink: OUT,
) -> Result<(), SessionError>
where
IN: FusedStream<Item = Result<Request, asynchronous_codec::JsonCodecError>> + Unpin,
OUT: Sink<BoxedResponse> + Unpin,
OUT::Error: std::error::Error + Send + Sync + 'static,
{
mut request_stream: BoxedRequestStream,
mut response_sink: BoxedResponseSink,
) -> Result<(), SessionError> {
// This function will multiplex on three streams:
// * `request_stream` -- a stream of incoming requests from the client.
// * `finished_requests` -- a stream of responses from requests that