hsclient state: Restructure using a scope to drop the mutex guard

Explicit drops don't work.  Instead, introduce a scope.

We need two scopes, actually: one where we do the initial table
wrangling, and one for the retries after relock.

So we must put the meat in a closure so we can reuse it.
And we must return the flow control as an enum.  Bah, etc.

Avoid reformatting this for the moment.  This makes the delta legible...
This commit is contained in:
Ian Jackson 2023-02-28 14:51:32 +00:00
parent d1863c3178
commit 1f03e118ef
3 changed files with 46 additions and 20 deletions

1
Cargo.lock generated
View File

@ -3969,6 +3969,7 @@ version = "0.1.1"
dependencies = [
"async-trait",
"educe",
"either",
"futures",
"postage",
"rand_core 0.6.4",

View File

@ -19,6 +19,7 @@ default = []
[dependencies]
async-trait = "0.1.2"
educe = "0.4.6"
either = "1"
futures = "0.3.14"
postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] }
rand_core = "0.6.2"

View File

@ -5,12 +5,13 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::mem;
use std::panic::AssertUnwindSafe;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Instant;
use futures::FutureExt as _;
use async_trait::async_trait;
use either::Either::*;
use educe::Educe;
use postage::stream::Stream as _;
use slotmap::dense::DenseSlotMap;
@ -141,7 +142,6 @@ impl<D: MockableConnectorData> ServiceState<D> {
impl<D: MockableConnectorData> Services<D> {
/// Connect to a hidden service
// We *do* drop guard. There is *one* await point, just after drop(guard).
#[allow(clippy::await_holding_lock)]
pub(crate) async fn get_or_launch_connection(
connector: &HsClientConnector<impl Runtime, D>,
hs_id: HsId,
@ -150,6 +150,11 @@ impl<D: MockableConnectorData> Services<D> {
) -> Result<D::ClientCirc, HsClientConnError> {
let blank_state = || ServiceState::blank(&connector.runtime);
let mut attempts = 0..MAX_ATTEMPTS;
let table_index;
let mut obtain;
let mut got;
{
let mut guard = connector
.services
.lock()
@ -161,7 +166,7 @@ impl<D: MockableConnectorData> Services<D> {
let records = services.index.entry(hs_id).or_default();
let table_index = match records
table_index = match records
.iter_mut()
.enumerate()
.find_map(|(v_index, record)| {
@ -190,7 +195,9 @@ impl<D: MockableConnectorData> Services<D> {
}
};
for _attempt in 0..MAX_ATTEMPTS {
obtain = |mut guard: MutexGuard<'_, Services<D>>| {
for _attempt in &mut attempts {
let state = guard
.table
.get_mut(table_index)
@ -222,7 +229,7 @@ impl<D: MockableConnectorData> Services<D> {
continue;
}
*last_used = now;
return Ok(circuit.clone());
return Ok::<_, HsClientConnError>(Right(circuit.clone()));
}
ServiceState::Working {
barrier_recv,
@ -237,23 +244,11 @@ impl<D: MockableConnectorData> Services<D> {
*state = blank_state();
continue;
}
let mut barrier_recv = barrier_recv.clone();
let barrier_recv = barrier_recv.clone();
let error = error.clone();
drop(guard);
// Wait for the task to complete (at which point it drops the barrier)
barrier_recv.recv().await;
let error = error
.lock()
.map_err(|_| internal!("Working error poisoned"))?;
if let Some(error) = &*error {
return Err(error.clone());
}
drop(error);
guard = connector
.services
.lock()
.map_err(|_| internal!("HS connector poisoned (relock)"))?;
continue;
return Ok(Left((error, barrier_recv)));
}
ServiceState::Closed { .. } => {
let (barrier_send, barrier_recv) = postage::barrier::channel();
@ -359,6 +354,35 @@ impl<D: MockableConnectorData> Services<D> {
}
Err(internal!("HS connector state management malfunction (exceeded MAX_ATTEMPTS").into())
};
let guard = guard;
got = obtain(guard);
}
loop {
{
let (error, mut barrier_recv) = match got? {
Right(ret) => return Ok(ret),
Left(continuation) => continuation,
};
barrier_recv.recv().await;
let error = error
.lock()
.map_err(|_| internal!("Working error poisoned"))?;
if let Some(error) = &*error {
return Err(error.clone());
}
}
let guard = connector
.services
.lock()
.map_err(|_| internal!("HS connector poisoned (relock)"))?;
got = obtain(guard);
}
}
}