Implement optimistic stream

This commit is contained in:
Yuan Lyu 2021-11-08 22:42:59 -05:00
parent e97804b3d4
commit 7f799c956b
3 changed files with 63 additions and 24 deletions

View File

@ -54,6 +54,8 @@ pub struct ConnectPrefs {
ip_ver_pref: IpVersionPreference,
/// Id of the isolation group the connection should be part of
isolation_group: Option<IsolationToken>,
/// Whether to use optimistic data stream
optimistic_stream: bool,
}
impl ConnectPrefs {
@ -104,6 +106,11 @@ impl ConnectPrefs {
self.ip_ver_pref
}
/// Get the optimistic_stream flag
fn optimistic_stream(&self) -> bool {
self.optimistic_stream
}
/// Return a TargetPort to describe what kind of exit policy our
/// target circuit needs to support.
fn wrap_target_port(&self, port: u16) -> TargetPort {
@ -234,7 +241,12 @@ impl<R: Runtime> TorClient<R> {
// TODO: make this configurable.
let stream_timeout = Duration::new(10, 0);
let stream_future = circ.begin_stream(&addr, port, Some(flags.begin_flags()));
let stream_future = circ.begin_stream(
&addr,
port,
Some(flags.begin_flags()),
flags.optimistic_stream(),
);
let stream = self
.runtime
.timeout(stream_timeout, stream_future)

View File

@ -519,22 +519,17 @@ impl ClientCirc {
/// Start a DataStream (anonymized connection) to the given
/// address and port, using a BEGIN cell.
async fn begin_data_stream(self: Arc<Self>, msg: RelayMsg) -> Result<DataStream> {
let stream = self.begin_stream_impl(msg).await?;
// TODO: waiting for a response here precludes optimistic data.
let response = stream.recv().await?;
match response {
RelayMsg::Connected(_) => Ok(DataStream::new(stream)),
RelayMsg::End(cell) => Err(Error::EndReceived(cell.reason())),
_ => {
self.protocol_error().await;
Err(Error::StreamProto(format!(
"Received {} while waiting for connection",
response.cmd()
)))
}
async fn begin_data_stream(
self: Arc<Self>,
msg: RelayMsg,
optimistic: bool,
) -> Result<DataStream> {
let raw_s = self.begin_stream_impl(msg).await?;
let mut stream = DataStream::new(raw_s);
if !optimistic {
stream.wait_for_connection().await?;
}
Ok(stream)
}
/// Start a stream to the given address and port, using a BEGIN
@ -546,17 +541,18 @@ impl ClientCirc {
self: Arc<Self>,
target: &str,
port: u16,
flags: Option<IpVersionPreference>,
begin_flags: Option<IpVersionPreference>,
optimistic: bool,
) -> Result<DataStream> {
let flags = flags.unwrap_or_default();
let beginmsg = Begin::new(target, port, flags)?;
self.begin_data_stream(beginmsg.into()).await
let begin_flags = begin_flags.unwrap_or_default();
let beginmsg = Begin::new(target, port, begin_flags)?;
self.begin_data_stream(beginmsg.into(), optimistic).await
}
/// Start a new stream to the last relay in the circuit, using
/// a BEGIN_DIR cell.
pub async fn begin_dir_stream(self: Arc<Self>) -> Result<DataStream> {
self.begin_data_stream(RelayMsg::BeginDir).await
self.begin_data_stream(RelayMsg::BeginDir, false).await
}
/// Perform a DNS lookup, using a RESOLVE cell with the last relay
@ -1716,7 +1712,7 @@ mod test {
let begin_and_send_fut = async move {
// Take our circuit and make a stream on it.
let mut stream = circ_clone
.begin_stream("www.example.com", 443, None)
.begin_stream("www.example.com", 443, None, false)
.await
.unwrap();
let junk = [0_u8; 1024];

View File

@ -113,7 +113,8 @@ pub struct DataReader {
impl DataStream {
/// Wrap a RawCellStream as a DataStream.
///
/// Call only after a CONNECTED cell has been received.
/// For non-optimistic stream, function `wait_for_connection`
/// must be called after to make sure CONNECTED is received.
pub(crate) fn new(s: RawCellStream) -> Self {
let s = Arc::new(s);
let r = DataReader {
@ -121,6 +122,7 @@ impl DataStream {
s: Arc::clone(&s),
pending: Vec::new(),
offset: 0,
connected: false,
})),
};
let w = DataWriter {
@ -137,6 +139,27 @@ impl DataStream {
pub fn split(self) -> (DataReader, DataWriter) {
(self.r, self.w)
}
/// Wait till a CONNECTED cell is received.
pub async fn wait_for_connection(&mut self) -> Result<()> {
// We must put state back before returning
let state = self.r.state.take().expect("Missing state in DataReader");
if let DataReaderState::Ready(imp) = state {
let (imp, result) = imp.read_cell().await;
if result.is_ok() {
// imp is marked as connected by reading the first CONNECTED cell
self.r.state = Some(DataReaderState::Ready(imp));
Ok(())
} else {
result
}
} else {
Err(Error::StreamProto(
"Expected ready state of a new stream.".to_owned(),
))
}
}
}
impl AsyncRead for DataStream {
@ -408,6 +431,10 @@ struct DataReaderImpl {
/// Index into pending to show what we've already read.
offset: usize,
/// This flag indicates that a CONNECTED cell has been received,
/// when set to true.
connected: bool,
}
impl AsyncRead for DataReader {
@ -506,7 +533,11 @@ impl DataReaderImpl {
let cell = self.s.recv().await;
let result = match cell {
Ok(RelayMsg::Data(d)) => {
Ok(RelayMsg::Connected(_)) if !self.connected => {
self.connected = true;
Ok(())
}
Ok(RelayMsg::Data(d)) if self.connected => {
self.add_data(d.into());
Ok(())
}