rpc: Give RpcMgr a registry of connections.

We're going to use this to implement arti#863, which requires that
some RPC objects be globally nameable.
This commit is contained in:
Nick Mathewson 2023-05-25 10:09:22 -04:00
parent 0b2511dd2b
commit 545984b095
4 changed files with 70 additions and 5 deletions

2
Cargo.lock generated
View File

@ -240,6 +240,7 @@ dependencies = [
"asynchronous-codec", "asynchronous-codec",
"base64ct", "base64ct",
"bytes", "bytes",
"derive_more",
"erased-serde", "erased-serde",
"futures", "futures",
"futures-await-test", "futures-await-test",
@ -257,6 +258,7 @@ dependencies = [
"tor-rtcompat", "tor-rtcompat",
"tracing", "tracing",
"typetag", "typetag",
"weak-table",
] ]
[[package]] [[package]]

View File

@ -20,6 +20,7 @@ arti-client = { path = "../arti-client", version = "0.9.1", features = ["rpc"] }
asynchronous-codec = { version = "0.6.0", features = ["json"] } asynchronous-codec = { version = "0.6.0", features = ["json"] }
base64ct = "1.5.1" base64ct = "1.5.1"
bytes = "1" bytes = "1"
derive_more = "0.99.3"
erased-serde = "0.3.25" erased-serde = "0.3.25"
futures = "0.3.14" futures = "0.3.14"
generational-arena = "0.2.9" generational-arena = "0.2.9"
@ -35,6 +36,7 @@ tor-rpcbase = { path = "../tor-rpcbase", version = "0.1.1" }
tor-rtcompat = { path = "../tor-rtcompat", version = "0.9.1" } tor-rtcompat = { path = "../tor-rtcompat", version = "0.9.1" }
tracing = "0.1.36" tracing = "0.1.36"
typetag = "0.2.7" typetag = "0.2.7"
weak-table = "0.3.0"
[dev-dependencies] [dev-dependencies]
futures-await-test = "0.3.0" futures-await-test = "0.3.0"

View File

@ -38,6 +38,16 @@ pub struct Connection {
/// Lookup table to find the implementations for methods /// Lookup table to find the implementations for methods
/// based on RPC object and method types. /// based on RPC object and method types.
dispatch_table: Arc<rpc::DispatchTable>, dispatch_table: Arc<rpc::DispatchTable>,
/// A unique identifier for this connection.
///
/// This kind of ID is used to refer to the connection from _outside_ of the
/// context of an RPC connection: it can uniquely identify the connection
/// from e.g. a SOCKS session so that clients can attach streams to it.
///
/// TODO RPC: Remove this if it turns out to be unneeded.
#[allow(unused)]
connection_id: ConnectionId,
} }
impl rpc::Object for Connection {} impl rpc::Object for Connection {}
rpc::decl_object! {Connection} rpc::decl_object! {Connection}
@ -75,9 +85,14 @@ pub(crate) type BoxedRequestStream = Pin<
pub(crate) type BoxedResponseSink = pub(crate) type BoxedResponseSink =
Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>; Pin<Box<dyn Sink<BoxedResponse, Error = asynchronous_codec::JsonCodecError> + Send>>;
/// A random value used to identify an connection.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, derive_more::From, derive_more::Into)]
pub(crate) struct ConnectionId(u128);
impl Connection { impl Connection {
/// Create a new connection. /// Create a new connection.
pub(crate) fn new( pub(crate) fn new(
connection_id: ConnectionId,
dispatch_table: Arc<rpc::DispatchTable>, dispatch_table: Arc<rpc::DispatchTable>,
client: Arc<dyn rpc::Object>, client: Arc<dyn rpc::Object>,
) -> Self { ) -> Self {
@ -88,6 +103,7 @@ impl Connection {
client, client,
}), }),
dispatch_table, dispatch_table,
connection_id,
} }
} }

View File

@ -1,12 +1,14 @@
//! Top-level `RpcMgr` to launch sessions. //! Top-level `RpcMgr` to launch sessions.
use std::sync::Arc; use std::sync::{Arc, Mutex, Weak};
use arti_client::TorClient; use arti_client::TorClient;
use rand::Rng;
use tor_rpcbase as rpc; use tor_rpcbase as rpc;
use tor_rtcompat::Runtime; use tor_rtcompat::Runtime;
use weak_table::WeakValueHashMap;
use crate::connection::Connection; use crate::connection::{Connection, ConnectionId};
/// Shared state, configuration, and data for all RPC sessions. /// Shared state, configuration, and data for all RPC sessions.
/// ///
@ -14,9 +16,31 @@ use crate::connection::Connection;
/// ///
/// TODO RPC: Actually not all of the above functionality is implemented yet. But it should be. /// TODO RPC: Actually not all of the above functionality is implemented yet. But it should be.
pub struct RpcMgr { pub struct RpcMgr {
/// Lock-protected view of the manager's state.
//
// TODO RPC: We should probably move everything into Inner, and move an Arc
// around the Mutex. Conceivably we should change the Mutex to an RwLock.
inner: Mutex<Inner>,
}
/// The [`RpcMgr`]'s state. This is kept inside a lock for interior mutability.
struct Inner {
/// Our reference to the dispatch table used to look up the functions that /// Our reference to the dispatch table used to look up the functions that
/// implement each object on each /// implement each object on each.
///
/// TODO RPC: This isn't mutable yet, but we probably want it to be.
dispatch_table: Arc<rpc::DispatchTable>, dispatch_table: Arc<rpc::DispatchTable>,
/// A map from [`ConnectionId`] to weak [`Connection`] references.
///
/// We use this map to give connections a manager-global identifier that can
/// be used to identify them from a SOCKS connection (or elsewhere outside
/// of the RPC system).
///
/// We _could_ use a generational arena here, but there isn't any point:
/// since these identifiers are global, we need to keep them secure by
/// MACing anything derived from them, which in turn makes the overhead of a
/// HashMap negligible.
connections: WeakValueHashMap<ConnectionId, Weak<Connection>>,
} }
impl RpcMgr { impl RpcMgr {
@ -24,16 +48,37 @@ impl RpcMgr {
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new() -> Self { pub fn new() -> Self {
RpcMgr { RpcMgr {
dispatch_table: Arc::new(rpc::DispatchTable::from_inventory()), inner: Mutex::new(Inner {
dispatch_table: Arc::new(rpc::DispatchTable::from_inventory()),
connections: WeakValueHashMap::new(),
}),
} }
} }
/// Start a new session based on this RpcMgr, with a given TorClient. /// Start a new session based on this RpcMgr, with a given TorClient.
/// ///
///
/// TODO RPC: If `client` is not a `TorClient<PreferredRuntime>`, it won't /// TODO RPC: If `client` is not a `TorClient<PreferredRuntime>`, it won't
/// be possible to invoke any of its methods. See #837. /// be possible to invoke any of its methods. See #837.
#[allow(clippy::missing_panics_doc)]
pub fn new_session<R: Runtime>(&self, client: TorClient<R>) -> Arc<Connection> { pub fn new_session<R: Runtime>(&self, client: TorClient<R>) -> Arc<Connection> {
let connection_id = ConnectionId::from(rand::thread_rng().gen::<u128>());
let client_obj = Arc::new(client); let client_obj = Arc::new(client);
Arc::new(Connection::new(self.dispatch_table.clone(), client_obj))
let mut inner = self.inner.lock().expect("poisoned lock");
let connection = Arc::new(Connection::new(
connection_id,
inner.dispatch_table.clone(),
client_obj,
));
let old = inner.connections.insert(connection_id, connection.clone());
assert!(
old.is_none(),
// Specifically, we shouldn't expect collisions until we have made on the
// order of 2^64 connections, and that shouldn't be possible on
// realistic systems.
"connection ID collision detected; this is phenomenally unlikely!",
);
connection
} }
} }