rpc: Remove anyhow dependency

This commit is contained in:
Nick Mathewson 2023-04-05 13:31:17 -04:00
parent 4c020de38c
commit 07bb57a4c2
3 changed files with 13 additions and 7 deletions

1
Cargo.lock generated
View File

@ -228,7 +228,6 @@ dependencies = [
name = "arti-rpcserver" name = "arti-rpcserver"
version = "0.0.1" version = "0.0.1"
dependencies = [ dependencies = [
"anyhow",
"arti-client", "arti-client",
"asynchronous-codec", "asynchronous-codec",
"bytes", "bytes",

View File

@ -19,7 +19,6 @@ experimental = ["tokio"]
tokio = ["tokio-crate", "tokio-util"] tokio = ["tokio-crate", "tokio-util"]
[dependencies] [dependencies]
anyhow = "1" # XXXX REMOVE
arti-client = { path = "../arti-client", version = "0.8.2" } arti-client = { path = "../arti-client", version = "0.8.2" }
asynchronous-codec = { version = "0.6.0", features = ["json"] } asynchronous-codec = { version = "0.6.0", features = ["json"] }
bytes = "1" bytes = "1"

View File

@ -110,8 +110,7 @@ impl Session {
self: Arc<Self>, self: Arc<Self>,
mut request_stream: IN, mut request_stream: IN,
mut response_sink: OUT, mut response_sink: OUT,
) -> anyhow::Result<()> ) -> Result<(), SessionError>
// XXXX Make a real error type and remove this dependency.
where where
IN: FusedStream<Item = Result<Request, asynchronous_codec::JsonCodecError>> + Unpin, IN: FusedStream<Item = Result<Request, asynchronous_codec::JsonCodecError>> + Unpin,
OUT: Sink<BoxedResponse> + Unpin, OUT: Sink<BoxedResponse> + Unpin,
@ -137,7 +136,7 @@ impl Session {
let r: BoxedResponse = r.expect("Somehow, future::pending() terminated."); let r: BoxedResponse = r.expect("Somehow, future::pending() terminated.");
debug_assert!(r.body.is_final()); debug_assert!(r.body.is_final());
self.remove_request(&r.id); self.remove_request(&r.id);
response_sink.send(r).await?; response_sink.send(r).await.map_err(|_| SessionError::WriteFailed)?;
} }
r = rx_update.next() => { r = rx_update.next() => {
@ -145,7 +144,7 @@ impl Session {
// inform the client. // inform the client.
let update = r.expect("Somehow, tx_update got closed."); let update = r.expect("Somehow, tx_update got closed.");
debug_assert!(! update.body.is_final()); debug_assert!(! update.body.is_final());
response_sink.send(update).await?; response_sink.send(update).await.map_err(|_| SessionError::WriteFailed)?;
} }
req = request_stream.next() => { req = request_stream.next() => {
@ -164,7 +163,7 @@ impl Session {
id: RequestId::Str("-----".into()), id: RequestId::Str("-----".into()),
// TODO RPC real error type // TODO RPC real error type
body: BoxedResponseBody::Error(Box::new(format!("Parse error: {}", e))) body: BoxedResponseBody::Error(Box::new(format!("Parse error: {}", e)))
}).await?; }).await.map_err(|_| SessionError::WriteFailed)?;
// TODO RPC: Perhaps we should keep going? (Only if this is an authenticated session!) // TODO RPC: Perhaps we should keep going? (Only if this is an authenticated session!)
break 'outer; break 'outer;
@ -230,6 +229,15 @@ impl Session {
} }
} }
/// A failure that results in closing a Session.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub(crate) enum SessionError {
/// Unable to write to our connection.
#[error("Could not write to connection")]
WriteFailed,
}
/// A Context object that we pass to each command invocation. /// A Context object that we pass to each command invocation.
/// ///
/// It provides the `rpc::Context` interface, which is used to send incremental /// It provides the `rpc::Context` interface, which is used to send incremental