From 545984b095119ecc656afe69683e820a8d1a67de Mon Sep 17 00:00:00 2001 From: Nick Mathewson Date: Thu, 25 May 2023 10:09:22 -0400 Subject: [PATCH] rpc: Give RpcMgr a registry of connections. We're going to use this to implement arti#863, which requires that some RPC objects be globally nameable. --- Cargo.lock | 2 + crates/arti-rpcserver/Cargo.toml | 2 + crates/arti-rpcserver/src/connection.rs | 16 +++++++ crates/arti-rpcserver/src/mgr.rs | 55 ++++++++++++++++++++++--- 4 files changed, 70 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55fad483b..0cff20a23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,6 +240,7 @@ dependencies = [ "asynchronous-codec", "base64ct", "bytes", + "derive_more", "erased-serde", "futures", "futures-await-test", @@ -257,6 +258,7 @@ dependencies = [ "tor-rtcompat", "tracing", "typetag", + "weak-table", ] [[package]] diff --git a/crates/arti-rpcserver/Cargo.toml b/crates/arti-rpcserver/Cargo.toml index e199f2415..1a37a2a7a 100644 --- a/crates/arti-rpcserver/Cargo.toml +++ b/crates/arti-rpcserver/Cargo.toml @@ -20,6 +20,7 @@ arti-client = { path = "../arti-client", version = "0.9.1", features = ["rpc"] } asynchronous-codec = { version = "0.6.0", features = ["json"] } base64ct = "1.5.1" bytes = "1" +derive_more = "0.99.3" erased-serde = "0.3.25" futures = "0.3.14" generational-arena = "0.2.9" @@ -35,6 +36,7 @@ tor-rpcbase = { path = "../tor-rpcbase", version = "0.1.1" } tor-rtcompat = { path = "../tor-rtcompat", version = "0.9.1" } tracing = "0.1.36" typetag = "0.2.7" +weak-table = "0.3.0" [dev-dependencies] futures-await-test = "0.3.0" diff --git a/crates/arti-rpcserver/src/connection.rs b/crates/arti-rpcserver/src/connection.rs index d3330d584..17e24d498 100644 --- a/crates/arti-rpcserver/src/connection.rs +++ b/crates/arti-rpcserver/src/connection.rs @@ -38,6 +38,16 @@ pub struct Connection { /// Lookup table to find the implementations for methods /// based on RPC object and method types. dispatch_table: Arc, + + /// A unique identifier for this connection. + /// + /// This kind of ID is used to refer to the connection from _outside_ of the + /// context of an RPC connection: it can uniquely identify the connection + /// from e.g. a SOCKS session so that clients can attach streams to it. + /// + /// TODO RPC: Remove this if it turns out to be unneeded. + #[allow(unused)] + connection_id: ConnectionId, } impl rpc::Object for Connection {} rpc::decl_object! {Connection} @@ -75,9 +85,14 @@ pub(crate) type BoxedRequestStream = Pin< pub(crate) type BoxedResponseSink = Pin + Send>>; +/// A random value used to identify an connection. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, derive_more::From, derive_more::Into)] +pub(crate) struct ConnectionId(u128); + impl Connection { /// Create a new connection. pub(crate) fn new( + connection_id: ConnectionId, dispatch_table: Arc, client: Arc, ) -> Self { @@ -88,6 +103,7 @@ impl Connection { client, }), dispatch_table, + connection_id, } } diff --git a/crates/arti-rpcserver/src/mgr.rs b/crates/arti-rpcserver/src/mgr.rs index 86c76dd24..2792a2345 100644 --- a/crates/arti-rpcserver/src/mgr.rs +++ b/crates/arti-rpcserver/src/mgr.rs @@ -1,12 +1,14 @@ //! Top-level `RpcMgr` to launch sessions. -use std::sync::Arc; +use std::sync::{Arc, Mutex, Weak}; use arti_client::TorClient; +use rand::Rng; use tor_rpcbase as rpc; use tor_rtcompat::Runtime; +use weak_table::WeakValueHashMap; -use crate::connection::Connection; +use crate::connection::{Connection, ConnectionId}; /// Shared state, configuration, and data for all RPC sessions. /// @@ -14,9 +16,31 @@ use crate::connection::Connection; /// /// TODO RPC: Actually not all of the above functionality is implemented yet. But it should be. pub struct RpcMgr { + /// Lock-protected view of the manager's state. + // + // TODO RPC: We should probably move everything into Inner, and move an Arc + // around the Mutex. Conceivably we should change the Mutex to an RwLock. + inner: Mutex, +} + +/// The [`RpcMgr`]'s state. This is kept inside a lock for interior mutability. +struct Inner { /// Our reference to the dispatch table used to look up the functions that - /// implement each object on each + /// implement each object on each. + /// + /// TODO RPC: This isn't mutable yet, but we probably want it to be. dispatch_table: Arc, + /// A map from [`ConnectionId`] to weak [`Connection`] references. + /// + /// We use this map to give connections a manager-global identifier that can + /// be used to identify them from a SOCKS connection (or elsewhere outside + /// of the RPC system). + /// + /// We _could_ use a generational arena here, but there isn't any point: + /// since these identifiers are global, we need to keep them secure by + /// MACing anything derived from them, which in turn makes the overhead of a + /// HashMap negligible. + connections: WeakValueHashMap>, } impl RpcMgr { @@ -24,16 +48,37 @@ impl RpcMgr { #[allow(clippy::new_without_default)] pub fn new() -> Self { RpcMgr { - dispatch_table: Arc::new(rpc::DispatchTable::from_inventory()), + inner: Mutex::new(Inner { + dispatch_table: Arc::new(rpc::DispatchTable::from_inventory()), + connections: WeakValueHashMap::new(), + }), } } /// Start a new session based on this RpcMgr, with a given TorClient. /// + /// /// TODO RPC: If `client` is not a `TorClient`, it won't /// be possible to invoke any of its methods. See #837. + #[allow(clippy::missing_panics_doc)] pub fn new_session(&self, client: TorClient) -> Arc { + let connection_id = ConnectionId::from(rand::thread_rng().gen::()); let client_obj = Arc::new(client); - Arc::new(Connection::new(self.dispatch_table.clone(), client_obj)) + + let mut inner = self.inner.lock().expect("poisoned lock"); + let connection = Arc::new(Connection::new( + connection_id, + inner.dispatch_table.clone(), + client_obj, + )); + let old = inner.connections.insert(connection_id, connection.clone()); + assert!( + old.is_none(), + // Specifically, we shouldn't expect collisions until we have made on the + // order of 2^64 connections, and that shouldn't be possible on + // realistic systems. + "connection ID collision detected; this is phenomenally unlikely!", + ); + connection } }