diff --git a/Cargo.lock b/Cargo.lock index 600762e35..ae04d3480 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -228,7 +228,6 @@ dependencies = [ name = "arti-rpcserver" version = "0.0.1" dependencies = [ - "anyhow", "arti-client", "asynchronous-codec", "bytes", diff --git a/crates/arti-rpcserver/Cargo.toml b/crates/arti-rpcserver/Cargo.toml index c765602e4..4fcaae1fb 100644 --- a/crates/arti-rpcserver/Cargo.toml +++ b/crates/arti-rpcserver/Cargo.toml @@ -19,7 +19,6 @@ experimental = ["tokio"] tokio = ["tokio-crate", "tokio-util"] [dependencies] -anyhow = "1" # XXXX REMOVE arti-client = { path = "../arti-client", version = "0.8.2" } asynchronous-codec = { version = "0.6.0", features = ["json"] } bytes = "1" diff --git a/crates/arti-rpcserver/src/session.rs b/crates/arti-rpcserver/src/session.rs index 427102f7c..43184e929 100644 --- a/crates/arti-rpcserver/src/session.rs +++ b/crates/arti-rpcserver/src/session.rs @@ -110,8 +110,7 @@ impl Session { self: Arc, mut request_stream: IN, mut response_sink: OUT, - ) -> anyhow::Result<()> - // XXXX Make a real error type and remove this dependency. + ) -> Result<(), SessionError> where IN: FusedStream> + Unpin, OUT: Sink + Unpin, @@ -137,7 +136,7 @@ impl Session { let r: BoxedResponse = r.expect("Somehow, future::pending() terminated."); debug_assert!(r.body.is_final()); self.remove_request(&r.id); - response_sink.send(r).await?; + response_sink.send(r).await.map_err(|_| SessionError::WriteFailed)?; } r = rx_update.next() => { @@ -145,7 +144,7 @@ impl Session { // inform the client. let update = r.expect("Somehow, tx_update got closed."); debug_assert!(! update.body.is_final()); - response_sink.send(update).await?; + response_sink.send(update).await.map_err(|_| SessionError::WriteFailed)?; } req = request_stream.next() => { @@ -164,7 +163,7 @@ impl Session { id: RequestId::Str("-----".into()), // TODO RPC real error type body: BoxedResponseBody::Error(Box::new(format!("Parse error: {}", e))) - }).await?; + }).await.map_err(|_| SessionError::WriteFailed)?; // TODO RPC: Perhaps we should keep going? (Only if this is an authenticated session!) break 'outer; @@ -230,6 +229,15 @@ impl Session { } } +/// A failure that results in closing a Session. +#[derive(Clone, Debug, thiserror::Error)] +#[non_exhaustive] +pub(crate) enum SessionError { + /// Unable to write to our connection. + #[error("Could not write to connection")] + WriteFailed, +} + /// A Context object that we pass to each command invocation. /// /// It provides the `rpc::Context` interface, which is used to send incremental