rpc: revise the relationship between Mgr and Connection
This adds a Weak reference from Connection to Mgr, makes DispatchTable mutable, and makes a few other changes as discussed between me and Diziet the other week. I bet we are not done tweaking this, but I hope it's a setp forwards.
This commit is contained in:
parent
a156e60780
commit
789953d800
|
@ -5,7 +5,7 @@ mod auth;
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
pin::Pin,
|
||||
sync::{Arc, Mutex},
|
||||
sync::{Arc, Mutex, RwLock, Weak},
|
||||
};
|
||||
|
||||
use asynchronous_codec::JsonCodecError;
|
||||
|
@ -25,6 +25,7 @@ use crate::{
|
|||
globalid::{GlobalId, MacKey},
|
||||
msgs::{BoxedResponse, FlexibleRequest, Request, RequestId, ResponseBody},
|
||||
objmap::{GenIdx, ObjMap},
|
||||
RpcMgr,
|
||||
};
|
||||
|
||||
use tor_rpcbase as rpc;
|
||||
|
@ -38,7 +39,7 @@ pub struct Connection {
|
|||
|
||||
/// Lookup table to find the implementations for methods
|
||||
/// based on RPC object and method types.
|
||||
dispatch_table: Arc<rpc::DispatchTable>,
|
||||
dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
|
||||
|
||||
/// A unique identifier for this connection.
|
||||
///
|
||||
|
@ -53,6 +54,10 @@ pub struct Connection {
|
|||
/// A `MacKey` used to create `GlobalIds` for the objects whose identifiers
|
||||
/// need to exist outside this connection.
|
||||
global_id_mac_key: MacKey,
|
||||
|
||||
/// A reference to the manager associated with this session.
|
||||
#[allow(unused)] // TODO RPC
|
||||
mgr: Weak<RpcMgr>,
|
||||
}
|
||||
rpc::decl_object! {Connection}
|
||||
|
||||
|
@ -115,9 +120,10 @@ impl Connection {
|
|||
/// Create a new connection.
|
||||
pub(crate) fn new(
|
||||
connection_id: ConnectionId,
|
||||
dispatch_table: Arc<rpc::DispatchTable>,
|
||||
dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
|
||||
global_id_mac_key: MacKey,
|
||||
client: Arc<dyn rpc::Object>,
|
||||
mgr: Weak<RpcMgr>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Mutex::new(Inner {
|
||||
|
@ -128,6 +134,7 @@ impl Connection {
|
|||
dispatch_table,
|
||||
connection_id,
|
||||
global_id_mac_key,
|
||||
mgr,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -406,9 +413,14 @@ impl Connection {
|
|||
let context: Box<dyn rpc::Context> = Box::new(RequestContext {
|
||||
conn: Arc::clone(self),
|
||||
});
|
||||
self.dispatch_table
|
||||
.invoke(obj, method, context, tx_updates)?
|
||||
.await
|
||||
let invoke_future = self
|
||||
.dispatch_table
|
||||
.read()
|
||||
.expect("lock poisoned")
|
||||
.invoke(obj, method, context, tx_updates)?;
|
||||
|
||||
// Note that we drop the read lock before we await this future!
|
||||
invoke_future.await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Top-level `RpcMgr` to launch sessions.
|
||||
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
|
||||
use arti_client::TorClient;
|
||||
use rand::Rng;
|
||||
|
@ -21,23 +21,24 @@ use crate::{
|
|||
pub struct RpcMgr {
|
||||
/// A key that we use to ensure that identifiers are unforgeable.
|
||||
///
|
||||
/// When giving out a global identifier.
|
||||
mac_key: MacKey,
|
||||
/// When giving out a global (non-session-bound) identifier, we use this key
|
||||
/// to authenticate the identifier when it's given back to us.
|
||||
///
|
||||
/// We make copies of this key when constructing a session.
|
||||
global_id_mac_key: MacKey,
|
||||
|
||||
/// Our reference to the dispatch table used to look up the functions that
|
||||
/// implement each object on each.
|
||||
///
|
||||
/// We keep this in an `Arc` so we can share it with sessions.
|
||||
dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
|
||||
|
||||
/// Lock-protected view of the manager's state.
|
||||
//
|
||||
// TODO RPC: We should probably move everything into Inner, and move an Arc
|
||||
// around the Mutex. Conceivably we should change the Mutex to an RwLock.
|
||||
inner: Mutex<Inner>,
|
||||
}
|
||||
|
||||
/// The [`RpcMgr`]'s state. This is kept inside a lock for interior mutability.
|
||||
struct Inner {
|
||||
/// Our reference to the dispatch table used to look up the functions that
|
||||
/// implement each object on each.
|
||||
///
|
||||
/// TODO RPC: This isn't mutable yet, but we probably want it to be.
|
||||
dispatch_table: Arc<rpc::DispatchTable>,
|
||||
/// A map from [`ConnectionId`] to weak [`Connection`] references.
|
||||
///
|
||||
/// We use this map to give connections a manager-global identifier that can
|
||||
|
@ -58,14 +59,14 @@ impl RpcMgr {
|
|||
/// should take nothing. Also perhaps instead of a Client, it should take
|
||||
/// an `Arc<dyn Object>` that becomes the session.
|
||||
#[allow(clippy::new_without_default)]
|
||||
pub fn new() -> Self {
|
||||
RpcMgr {
|
||||
mac_key: MacKey::new(&mut rand::thread_rng()),
|
||||
pub fn new() -> Arc<Self> {
|
||||
Arc::new(RpcMgr {
|
||||
global_id_mac_key: MacKey::new(&mut rand::thread_rng()),
|
||||
dispatch_table: Arc::new(RwLock::new(rpc::DispatchTable::from_inventory())),
|
||||
inner: Mutex::new(Inner {
|
||||
dispatch_table: Arc::new(rpc::DispatchTable::from_inventory()),
|
||||
connections: WeakValueHashMap::new(),
|
||||
}),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Start a new session based on this RpcMgr, with a given TorClient.
|
||||
|
@ -74,16 +75,17 @@ impl RpcMgr {
|
|||
/// TODO RPC: If `client` is not a `TorClient<PreferredRuntime>`, it won't
|
||||
/// be possible to invoke any of its methods. See #837.
|
||||
#[allow(clippy::missing_panics_doc)]
|
||||
pub fn new_session<R: Runtime>(&self, client: TorClient<R>) -> Arc<Connection> {
|
||||
pub fn new_session<R: Runtime>(self: &Arc<Self>, client: TorClient<R>) -> Arc<Connection> {
|
||||
let connection_id = ConnectionId::from(rand::thread_rng().gen::<[u8; 16]>());
|
||||
let client_obj = Arc::new(client);
|
||||
|
||||
let mut inner = self.inner.lock().expect("poisoned lock");
|
||||
let connection = Arc::new(Connection::new(
|
||||
connection_id,
|
||||
inner.dispatch_table.clone(),
|
||||
self.mac_key.clone(),
|
||||
self.dispatch_table.clone(),
|
||||
self.global_id_mac_key.clone(),
|
||||
client_obj,
|
||||
Arc::downgrade(self),
|
||||
));
|
||||
let old = inner.connections.insert(connection_id, connection.clone());
|
||||
assert!(
|
||||
|
@ -106,7 +108,7 @@ impl RpcMgr {
|
|||
&self,
|
||||
id: &rpc::ObjectId,
|
||||
) -> Result<Arc<dyn rpc::Object>, rpc::LookupError> {
|
||||
let global_id = GlobalId::try_decode(&self.mac_key, id)?;
|
||||
let global_id = GlobalId::try_decode(&self.global_id_mac_key, id)?;
|
||||
self.lookup_by_global_id(&global_id)
|
||||
.ok_or_else(|| rpc::LookupError::NoObject(id.clone()))
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ pub(crate) fn launch_rpc_listener<R: Runtime>(
|
|||
// TODO RPC: Maybe the UnixListener functionality belongs in tor-rtcompat?
|
||||
// But I certainly don't want to make breaking changes there if we can help it.
|
||||
let listener = UnixListener::bind(path)?;
|
||||
let rpc_mgr = Arc::new(RpcMgr::new());
|
||||
let rpc_mgr = RpcMgr::new();
|
||||
let rt_clone = runtime.clone();
|
||||
let rpc_mgr_clone = rpc_mgr.clone();
|
||||
|
||||
|
|
Loading…
Reference in New Issue