rpc: revise session initialization a lot.

Formerly, every time we wanted to launch a new connection, we had
to give the RpcMgr a TorClient.  The connection would hold that
TorClient until a session was authenticated, and then would wrap
it in a Session and put it in the object map.

Now, the RpcMgr holds a Box<dyn Fn()...> that knows how to
create Sessions.  When a connection is authenticated, it
asks the Mgr to make it a new session.  This lets us make it
clearer that the TorClient simply can't be given out until the
connection is authenticated.  Later, it will let us create
more types of Session objects under more complicated rules.
This commit is contained in:
Nick Mathewson 2023-06-15 11:40:36 -04:00
parent 6da1acadab
commit 2d28402fb7
5 changed files with 60 additions and 44 deletions

View File

@ -53,7 +53,6 @@ pub struct Connection {
global_id_mac_key: MacKey,
/// A reference to the manager associated with this session.
#[allow(unused)] // TODO RPC
mgr: Weak<RpcMgr>,
}
rpc::decl_object! {Connection}
@ -70,10 +69,6 @@ struct Inner {
/// An object map used to look up most objects by ID, and keep track of
/// which objects are owned by this connection.
objects: ObjMap,
/// A `TorClient` object that we will give out if the connection is successfully
/// authenticated, _and not otherwise_.
client: Arc<dyn rpc::Object>,
}
/// How many updates can be pending, per connection, before they start to block?
@ -119,14 +114,12 @@ impl Connection {
connection_id: ConnectionId,
dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
global_id_mac_key: MacKey,
client: Arc<dyn rpc::Object>,
mgr: Weak<RpcMgr>,
) -> Self {
Self {
inner: Mutex::new(Inner {
inflight: HashMap::new(),
objects: ObjMap::new(),
client,
}),
dispatch_table,
connection_id,
@ -419,6 +412,14 @@ impl Connection {
// Note that we drop the read lock before we await this future!
invoke_future.await
}
/// Try to get a strong reference to the RpcMgr for this connection, and
/// return an error if we can't.
pub(crate) fn mgr(&self) -> Result<Arc<RpcMgr>, MgrDisappearedError> {
self.mgr
.upgrade()
.ok_or(MgrDisappearedError::RpcMgrDisappeared)
}
}
/// A failure that results in closing a [`Connection`].
@ -433,6 +434,19 @@ pub enum ConnectionError {
ReadFailed,
}
/// A failure from trying to upgrade a `Weak<RpcMgr>`.
#[derive(Clone, Debug, thiserror::Error, serde::Serialize)]
pub(crate) enum MgrDisappearedError {
/// We tried to upgrade our reference to the RpcMgr, and failed.
#[error("RPC manager disappeared; Arti is shutting down?")]
RpcMgrDisappeared,
}
impl tor_error::HasKind for MgrDisappearedError {
fn kind(&self) -> tor_error::ErrorKind {
tor_error::ErrorKind::ArtiShuttingDown
}
}
/// A Context object that we pass to each method invocation.
///
/// It provides the `rpc::Context` interface, which is used to send incremental

View File

@ -167,14 +167,10 @@ async fn authenticate_connection(
AuthenticationScheme::InherentUnixPath => {}
}
// TODO RPC: I'm actually not totally sure about the semantics of creating a
// new session object here, since it will _look_ separate from other
// sessions, but in fact they will all share the same object map.
//
// Perhaps we need to think more about the semantics of authenticating more
// then once on the same connection.
let client = unauth.inner.lock().expect("lock poisoned").client.clone();
let session = crate::session::RpcSession::new(client);
let session = {
let mgr = unauth.mgr()?;
mgr.create_session()
};
let session = ctx.register_owned(session);
Ok(AuthenticateReply { session })
}

View File

@ -2,17 +2,23 @@
use std::sync::{Arc, Mutex, RwLock, Weak};
use arti_client::TorClient;
use rand::Rng;
use tor_rpcbase as rpc;
use tor_rtcompat::Runtime;
use weak_table::WeakValueHashMap;
use crate::{
connection::{Connection, ConnectionId},
globalid::{GlobalId, MacKey},
RpcSession,
};
/// A function we use to construct Session objects in response to authentication.
//
// TODO RPC: Perhaps this should return a Result?
// TODO RPC: Perhaps this should take an argument describing what kind of
// authentication there was?
type SessionFactory = Box<dyn Fn() -> Arc<RpcSession> + Send + Sync>;
/// Shared state, configuration, and data for all RPC sessions.
///
/// An RpcMgr knows how to listen for incoming RPC connections, and launch sessions based on them.
@ -33,6 +39,10 @@ pub struct RpcMgr {
/// We keep this in an `Arc` so we can share it with sessions.
dispatch_table: Arc<RwLock<rpc::DispatchTable>>,
/// A function that we use to construct new Session objects when authentication
/// is successful.
session_factory: SessionFactory,
/// Lock-protected view of the manager's state.
inner: Mutex<Inner>,
}
@ -55,14 +65,14 @@ struct Inner {
impl RpcMgr {
/// Create a new RpcMgr.
///
/// TODO RPC: Perhaps this should take a Client instead, and new_session
/// should take nothing. Also perhaps instead of a Client, it should take
/// an `Arc<dyn Object>` that becomes the session.
#[allow(clippy::new_without_default)]
pub fn new() -> Arc<Self> {
pub fn new<F>(make_session: F) -> Arc<Self>
where
F: Fn() -> Arc<RpcSession> + Send + Sync + 'static,
{
Arc::new(RpcMgr {
global_id_mac_key: MacKey::new(&mut rand::thread_rng()),
dispatch_table: Arc::new(RwLock::new(rpc::DispatchTable::from_inventory())),
session_factory: Box::new(make_session),
inner: Mutex::new(Inner {
connections: WeakValueHashMap::new(),
}),
@ -70,21 +80,15 @@ impl RpcMgr {
}
/// Start a new session based on this RpcMgr, with a given TorClient.
///
///
/// TODO RPC: If `client` is not a `TorClient<PreferredRuntime>`, it won't
/// be possible to invoke any of its methods. See #837.
#[allow(clippy::missing_panics_doc)]
pub fn new_connection<R: Runtime>(self: &Arc<Self>, client: TorClient<R>) -> Arc<Connection> {
pub fn new_connection(self: &Arc<Self>) -> Arc<Connection> {
let connection_id = ConnectionId::from(rand::thread_rng().gen::<[u8; 16]>());
let client_obj = Arc::new(client);
let mut inner = self.inner.lock().expect("poisoned lock");
let connection = Arc::new(Connection::new(
connection_id,
self.dispatch_table.clone(),
self.global_id_mac_key.clone(),
client_obj,
Arc::downgrade(self),
));
let old = inner.connections.insert(connection_id, connection.clone());
@ -119,4 +123,9 @@ impl RpcMgr {
let connection = inner.connections.get(&id.connection)?;
connection.lookup_by_idx(id.local_id)
}
/// Construct a new object to serve as the `session` for a connection.
pub(crate) fn create_session(&self) -> Arc<RpcSession> {
(self.session_factory)()
}
}

View File

@ -17,18 +17,14 @@ pub struct RpcSession {
rpc::decl_object! { @expose RpcSession }
impl RpcSession {
/// Create a new session (internal)
///
/// TODO RPC: remove.
pub fn new(client: Arc<dyn rpc::Object>) -> Arc<Self> {
Arc::new(Self { client })
}
/// Create a new session object containing a single client object.
///
/// TODO RPC: If `client` is not a `TorClient<PreferredRuntime>`, it won't
/// be possible to invoke any of its methods. See #837.
pub fn new_with_client<R: tor_rtcompat::Runtime>(
client: Arc<arti_client::TorClient<R>>,
) -> Arc<Self> {
Self::new(client)
Arc::new(Self { client })
}
}

View File

@ -1,7 +1,7 @@
//! Experimental RPC support.
use anyhow::Result;
use arti_rpcserver::RpcMgr;
use arti_rpcserver::{RpcMgr, RpcSession};
use futures::task::SpawnExt;
use std::{path::Path, sync::Arc};
@ -33,9 +33,11 @@ pub(crate) fn launch_rpc_listener<R: Runtime>(
// TODO RPC: there should be an error return instead.
// TODO RPC: Maybe the UnixListener functionality belongs in tor-rtcompat?
// But I certainly don't want to make breaking changes there if we can help it.
// But I certainly don't want to make breaking changes there if we can help
// it.
let listener = UnixListener::bind(path)?;
let rpc_mgr = RpcMgr::new();
let rpc_mgr =
RpcMgr::new(move || RpcSession::new_with_client(Arc::new(client.isolated_client())));
let rt_clone = runtime.clone();
let rpc_mgr_clone = rpc_mgr.clone();
@ -43,7 +45,7 @@ pub(crate) fn launch_rpc_listener<R: Runtime>(
// succeeded or not. This is something we should fix when we refactor
// our service-launching code.
runtime.spawn(async {
let result = run_rpc_listener(rt_clone, listener, rpc_mgr_clone, client).await;
let result = run_rpc_listener(rt_clone, listener, rpc_mgr_clone).await;
if let Err(e) = result {
tracing::warn!("RPC manager quit with an error: {}", e);
}
@ -56,19 +58,18 @@ async fn run_rpc_listener<R: Runtime>(
runtime: R,
listener: UnixListener,
rpc_mgr: Arc<RpcMgr>,
client: TorClient<R>,
) -> Result<()> {
loop {
let (stream, _addr) = listener.accept().await?;
// TODO RPC: Perhaps we should have rpcmgr hold the client reference?
let session = rpc_mgr.new_connection(client.isolated_client());
let connection = rpc_mgr.new_connection();
let (input, output) = stream.into_split();
#[cfg(feature = "tokio")]
let (input, output) = (input.compat(), output.compat_write());
runtime.spawn(async {
let result = session.run(input, output).await;
let result = connection.run(input, output).await;
if let Err(e) = result {
tracing::warn!("RPC session ended with an error: {}", e);
}