From 1f03e118ef561a93de8725ff1119e21d7c1846b3 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Tue, 28 Feb 2023 14:51:32 +0000 Subject: [PATCH] hsclient state: Restructure using a scope to drop the mutex guard Explicit drops don't work. Instead, introduce a scope. We need two scopes, actually: one where we do the initial table wrangling, and one for the retries after relock. So we must put the meat in a closure so we can reuse it. And we must return the flow control as an enum. Bah, etc. Avoid reformatting this for the moment. This makes the delta legible... --- Cargo.lock | 1 + crates/tor-hsclient/Cargo.toml | 1 + crates/tor-hsclient/src/state.rs | 64 ++++++++++++++++++++++---------- 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a8600870..5512eb2c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3969,6 +3969,7 @@ version = "0.1.1" dependencies = [ "async-trait", "educe", + "either", "futures", "postage", "rand_core 0.6.4", diff --git a/crates/tor-hsclient/Cargo.toml b/crates/tor-hsclient/Cargo.toml index 7da0e0f36..44595419e 100644 --- a/crates/tor-hsclient/Cargo.toml +++ b/crates/tor-hsclient/Cargo.toml @@ -19,6 +19,7 @@ default = [] [dependencies] async-trait = "0.1.2" educe = "0.4.6" +either = "1" futures = "0.3.14" postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] } rand_core = "0.6.2" diff --git a/crates/tor-hsclient/src/state.rs b/crates/tor-hsclient/src/state.rs index 57cf7e8e8..132d58d6c 100644 --- a/crates/tor-hsclient/src/state.rs +++ b/crates/tor-hsclient/src/state.rs @@ -5,12 +5,13 @@ use std::collections::HashMap; use std::fmt::Debug; use std::mem; use std::panic::AssertUnwindSafe; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::Instant; use futures::FutureExt as _; use async_trait::async_trait; +use either::Either::*; use educe::Educe; use postage::stream::Stream as _; use slotmap::dense::DenseSlotMap; @@ -141,7 +142,6 @@ impl ServiceState { impl Services { /// Connect to a hidden service // We *do* drop guard. There is *one* await point, just after drop(guard). - #[allow(clippy::await_holding_lock)] pub(crate) async fn get_or_launch_connection( connector: &HsClientConnector, hs_id: HsId, @@ -150,6 +150,11 @@ impl Services { ) -> Result { let blank_state = || ServiceState::blank(&connector.runtime); + let mut attempts = 0..MAX_ATTEMPTS; + let table_index; + let mut obtain; + let mut got; + { let mut guard = connector .services .lock() @@ -161,7 +166,7 @@ impl Services { let records = services.index.entry(hs_id).or_default(); - let table_index = match records + table_index = match records .iter_mut() .enumerate() .find_map(|(v_index, record)| { @@ -190,7 +195,9 @@ impl Services { } }; - for _attempt in 0..MAX_ATTEMPTS { + obtain = |mut guard: MutexGuard<'_, Services>| { + for _attempt in &mut attempts { + let state = guard .table .get_mut(table_index) @@ -222,7 +229,7 @@ impl Services { continue; } *last_used = now; - return Ok(circuit.clone()); + return Ok::<_, HsClientConnError>(Right(circuit.clone())); } ServiceState::Working { barrier_recv, @@ -237,23 +244,11 @@ impl Services { *state = blank_state(); continue; } - let mut barrier_recv = barrier_recv.clone(); + let barrier_recv = barrier_recv.clone(); let error = error.clone(); - drop(guard); + // Wait for the task to complete (at which point it drops the barrier) - barrier_recv.recv().await; - let error = error - .lock() - .map_err(|_| internal!("Working error poisoned"))?; - if let Some(error) = &*error { - return Err(error.clone()); - } - drop(error); - guard = connector - .services - .lock() - .map_err(|_| internal!("HS connector poisoned (relock)"))?; - continue; + return Ok(Left((error, barrier_recv))); } ServiceState::Closed { .. } => { let (barrier_send, barrier_recv) = postage::barrier::channel(); @@ -359,6 +354,35 @@ impl Services { } Err(internal!("HS connector state management malfunction (exceeded MAX_ATTEMPTS").into()) + }; + + let guard = guard; + got = obtain(guard); + } + loop { + { + let (error, mut barrier_recv) = match got? { + Right(ret) => return Ok(ret), + Left(continuation) => continuation, + }; + + barrier_recv.recv().await; + + let error = error + .lock() + .map_err(|_| internal!("Working error poisoned"))?; + if let Some(error) = &*error { + return Err(error.clone()); + } + } + + let guard = connector + .services + .lock() + .map_err(|_| internal!("HS connector poisoned (relock)"))?; + + got = obtain(guard); + } } }