From 7f799c956bde519efdbff3f814f19ac4e7879345 Mon Sep 17 00:00:00 2001 From: Yuan Lyu Date: Mon, 8 Nov 2021 22:42:59 -0500 Subject: [PATCH] Implement optimistic stream --- crates/arti-client/src/client.rs | 14 ++++++++++- crates/tor-proto/src/circuit.rs | 38 +++++++++++++---------------- crates/tor-proto/src/stream/data.rs | 35 ++++++++++++++++++++++++-- 3 files changed, 63 insertions(+), 24 deletions(-) diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index 0d8ea8677..5a155682c 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -54,6 +54,8 @@ pub struct ConnectPrefs { ip_ver_pref: IpVersionPreference, /// Id of the isolation group the connection should be part of isolation_group: Option, + /// Whether to use optimistic data stream + optimistic_stream: bool, } impl ConnectPrefs { @@ -104,6 +106,11 @@ impl ConnectPrefs { self.ip_ver_pref } + /// Get the optimistic_stream flag + fn optimistic_stream(&self) -> bool { + self.optimistic_stream + } + /// Return a TargetPort to describe what kind of exit policy our /// target circuit needs to support. fn wrap_target_port(&self, port: u16) -> TargetPort { @@ -234,7 +241,12 @@ impl TorClient { // TODO: make this configurable. let stream_timeout = Duration::new(10, 0); - let stream_future = circ.begin_stream(&addr, port, Some(flags.begin_flags())); + let stream_future = circ.begin_stream( + &addr, + port, + Some(flags.begin_flags()), + flags.optimistic_stream(), + ); let stream = self .runtime .timeout(stream_timeout, stream_future) diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index b8e35c95e..09362a0e1 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -519,22 +519,17 @@ impl ClientCirc { /// Start a DataStream (anonymized connection) to the given /// address and port, using a BEGIN cell. - async fn begin_data_stream(self: Arc, msg: RelayMsg) -> Result { - let stream = self.begin_stream_impl(msg).await?; - // TODO: waiting for a response here precludes optimistic data. - - let response = stream.recv().await?; - match response { - RelayMsg::Connected(_) => Ok(DataStream::new(stream)), - RelayMsg::End(cell) => Err(Error::EndReceived(cell.reason())), - _ => { - self.protocol_error().await; - Err(Error::StreamProto(format!( - "Received {} while waiting for connection", - response.cmd() - ))) - } + async fn begin_data_stream( + self: Arc, + msg: RelayMsg, + optimistic: bool, + ) -> Result { + let raw_s = self.begin_stream_impl(msg).await?; + let mut stream = DataStream::new(raw_s); + if !optimistic { + stream.wait_for_connection().await?; } + Ok(stream) } /// Start a stream to the given address and port, using a BEGIN @@ -546,17 +541,18 @@ impl ClientCirc { self: Arc, target: &str, port: u16, - flags: Option, + begin_flags: Option, + optimistic: bool, ) -> Result { - let flags = flags.unwrap_or_default(); - let beginmsg = Begin::new(target, port, flags)?; - self.begin_data_stream(beginmsg.into()).await + let begin_flags = begin_flags.unwrap_or_default(); + let beginmsg = Begin::new(target, port, begin_flags)?; + self.begin_data_stream(beginmsg.into(), optimistic).await } /// Start a new stream to the last relay in the circuit, using /// a BEGIN_DIR cell. pub async fn begin_dir_stream(self: Arc) -> Result { - self.begin_data_stream(RelayMsg::BeginDir).await + self.begin_data_stream(RelayMsg::BeginDir, false).await } /// Perform a DNS lookup, using a RESOLVE cell with the last relay @@ -1716,7 +1712,7 @@ mod test { let begin_and_send_fut = async move { // Take our circuit and make a stream on it. let mut stream = circ_clone - .begin_stream("www.example.com", 443, None) + .begin_stream("www.example.com", 443, None, false) .await .unwrap(); let junk = [0_u8; 1024]; diff --git a/crates/tor-proto/src/stream/data.rs b/crates/tor-proto/src/stream/data.rs index b3c714c16..2da5a3aba 100644 --- a/crates/tor-proto/src/stream/data.rs +++ b/crates/tor-proto/src/stream/data.rs @@ -113,7 +113,8 @@ pub struct DataReader { impl DataStream { /// Wrap a RawCellStream as a DataStream. /// - /// Call only after a CONNECTED cell has been received. + /// For non-optimistic stream, function `wait_for_connection` + /// must be called after to make sure CONNECTED is received. pub(crate) fn new(s: RawCellStream) -> Self { let s = Arc::new(s); let r = DataReader { @@ -121,6 +122,7 @@ impl DataStream { s: Arc::clone(&s), pending: Vec::new(), offset: 0, + connected: false, })), }; let w = DataWriter { @@ -137,6 +139,27 @@ impl DataStream { pub fn split(self) -> (DataReader, DataWriter) { (self.r, self.w) } + + /// Wait till a CONNECTED cell is received. + pub async fn wait_for_connection(&mut self) -> Result<()> { + // We must put state back before returning + let state = self.r.state.take().expect("Missing state in DataReader"); + + if let DataReaderState::Ready(imp) = state { + let (imp, result) = imp.read_cell().await; + if result.is_ok() { + // imp is marked as connected by reading the first CONNECTED cell + self.r.state = Some(DataReaderState::Ready(imp)); + Ok(()) + } else { + result + } + } else { + Err(Error::StreamProto( + "Expected ready state of a new stream.".to_owned(), + )) + } + } } impl AsyncRead for DataStream { @@ -408,6 +431,10 @@ struct DataReaderImpl { /// Index into pending to show what we've already read. offset: usize, + + /// This flag indicates that a CONNECTED cell has been received, + /// when set to true. + connected: bool, } impl AsyncRead for DataReader { @@ -506,7 +533,11 @@ impl DataReaderImpl { let cell = self.s.recv().await; let result = match cell { - Ok(RelayMsg::Data(d)) => { + Ok(RelayMsg::Connected(_)) if !self.connected => { + self.connected = true; + Ok(()) + } + Ok(RelayMsg::Data(d)) if self.connected => { self.add_data(d.into()); Ok(()) }