rpc: Make an RpcMgr type to own the DispatchTable.

In the future, this will probably hold more data as well, like a
TorClient and some configuration info.

The TorClient will present an issue; I've made comments about that.

Closes #820
This commit is contained in:
Nick Mathewson 2023-04-19 07:59:17 -04:00
parent 287a619d99
commit 25398b5a3b
7 changed files with 59 additions and 15 deletions

1
Cargo.lock generated
View File

@ -234,7 +234,6 @@ dependencies = [
"erased-serde",
"futures",
"futures-await-test",
"once_cell",
"pin-project",
"serde",
"serde_json",

View File

@ -24,7 +24,6 @@ asynchronous-codec = { version = "0.6.0", features = ["json"] }
bytes = "1"
erased-serde = "0.3.25"
futures = "0.3.14"
once_cell = "1"
pin-project = "1"
serde = { version = "1.0.103", features = ["derive"] }
serde_json = "1.0.50"

View File

@ -39,9 +39,12 @@
mod cancel;
mod err;
mod mgr;
mod msgs;
mod session;
mod streams;
#[cfg(feature = "tokio")]
pub mod listen;
pub use mgr::RpcMgr;

View File

@ -3,12 +3,14 @@
//! TODO RPC: This doesn't belong here, I think. But we want it to be at a
//! lower level than the `arti` crate.
use arti_client::TorClient;
use futures::stream::StreamExt;
use std::io::Result;
use std::path::Path;
use std::sync::Arc;
use tokio::net::UnixListener;
use tokio_crate as tokio;
use tor_rtcompat::Runtime;
use crate::msgs::{BoxedResponse, FlexibleRequest};
@ -17,17 +19,21 @@ use crate::msgs::{BoxedResponse, FlexibleRequest};
///
/// TODO RPC: This API is temporary and should be replaced. It's just here for
/// testing. For now, it only works on unix, and only with tokio.
pub async fn accept_connections<P: AsRef<Path>>(path: P) -> Result<()> {
pub async fn accept_connections<P: AsRef<Path>, R: Runtime>(
path: P,
client: TorClient<R>,
) -> Result<()> {
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
// TODO RPC: there should be an error return instead.
// TODO RPC: Maybe the UnixListener functionality belongs in tor-rtcompat?
// But I certainly don't want to make breaking changes there if we can help it.
let listener = UnixListener::bind(path)?;
let mgr = crate::mgr::RpcMgr::new(client);
loop {
let (stream, _addr) = listener.accept().await?;
let session = Arc::new(crate::session::Session::new());
let session = Arc::new(mgr.new_session());
let (input, output) = stream.into_split();
let input = Box::pin(
asynchronous_codec::FramedRead::new(

View File

@ -0,0 +1,38 @@
//! Top-level `RpcMgr` to launch sessions.
use std::sync::Arc;
use arti_client::TorClient;
use tor_rpcbase as rpc;
use tor_rtcompat::Runtime;
use crate::session::Session;
/// Shared state, configuration, and data for all RPC sessions.
///
/// An RpcMgr knows how to listen for incoming RPC connections, and launch sessions based on them.
///
/// TODO RPC: Actually not all of the above functionality is implemented yet. But it should be.
pub struct RpcMgr {
// DOCDOC
// TODO: I think we're going to need a non-generic version of this, and a general pattern for declaring
// non-generic wrappers for some of our Runtime-parameterized things.
//
// `base_client: TorClient<R>,`
/// DOCDOC
dispatch_table: Arc<rpc::DispatchTable>,
}
impl RpcMgr {
/// Create a new RpcMgr.
pub fn new<R: Runtime>(_: TorClient<R>) -> Self {
RpcMgr {
dispatch_table: Arc::new(rpc::DispatchTable::from_inventory()),
}
}
/// Start a new session based on this RpcMgr.
pub(crate) fn new_session(&self) -> Session {
Session::new(self.dispatch_table.clone())
}
}

View File

@ -12,7 +12,6 @@ use futures::{
stream::{FusedStream, FuturesUnordered},
FutureExt, Sink, SinkExt as _, StreamExt,
};
use once_cell::sync::Lazy;
use pin_project::pin_project;
use rpc::dispatch::BoxedUpdateSink;
use serde_json::error::Category as JsonErrorCategory;
@ -32,6 +31,10 @@ use tor_rpcbase as rpc;
pub(crate) struct Session {
/// The mutable state of this session
inner: Mutex<Inner>,
/// Lookup table to find the implementations for methods
/// based on RPC object and method types.
dispatch_table: Arc<rpc::DispatchTable>,
}
impl rpc::Object for Session {}
@ -73,21 +76,15 @@ pub(crate) type BoxedRequestStream = Pin<
pub(crate) type BoxedResponseSink =
Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>;
/// A lazily constructed type-based dispatch table used for invoking functions
/// based on RPC object and method types.
//
// TODO RPC: This will be moved into an Arc that lives in some kind of
// SessionManager.
static DISPATCH_TABLE: Lazy<rpc::DispatchTable> = Lazy::new(rpc::DispatchTable::from_inventory);
impl Session {
/// Create a new session.
pub(crate) fn new() -> Self {
pub(crate) fn new(dispatch_table: Arc<rpc::DispatchTable>) -> Self {
Self {
inner: Mutex::new(Inner {
inflight: HashMap::new(),
authenticated: false,
}),
dispatch_table,
}
}
@ -300,7 +297,7 @@ impl Session {
let context: Box<dyn rpc::Context> = Box::new(RequestContext {
session: Arc::clone(self),
});
DISPATCH_TABLE
self.dispatch_table
.invoke(obj, method, context, tx_updates)?
.await
}

View File

@ -204,7 +204,9 @@ async fn run<R: Runtime>(
std::fs::remove_file(&pipe_path)?;
}
runtime.spawn(arti_rpcserver::listen::accept_connections(pipe_path).map(|_| ()))?;
runtime.spawn(
arti_rpcserver::listen::accept_connections(pipe_path, client.clone()).map(|_| ()),
)?;
}
let proxy = futures::future::select_all(proxy).map(|(finished, _index, _others)| finished);