Previously, the reactor would use an `UnboundedSender` to send things to
the `RawCellStream`, in order that the reactor wouldn't block if you
failed to read from the latter. This is bad, though, since it means
people can just run us out of memory by sending lots of things.
To fix this, we make the new `StreamReader` type (which does the reading
parts from `RawCellStream`) keep track of the stream's receive window
and issue SENDMEs once *it* has consumed enough data to require it, thus
meaning that we shouldn't get sent enough data to fill the channel
between reactor and `StreamReader` (and, if we do, that's someone trying
to flood us, and we abort the circuit).
As hinted to above, the `RawCellStream` was removed and its reading
functionalities replaced by `StreamReader`; its writing functionalities
are handled by `StreamTarget` anyway, so we just give out one of those
for the write end. This now means we don't need any mutexes!
note: this commit introduces a known issue, arti#230
Rather like e8e9699c3c ("Get rid of
tor-proto's ChannelImpl, and use the reactor more instead"), this
admittedly rather large commit refactors the way circuits in `tor-proto`
work, centralising all of the logic in one large nonblocking reactor
which other things send messages into and out of, instead of having a
bunch of `-Impl` types that are protected by mutexes.
Congestion control becomes a lot simpler with this refactor, since the
reactor can manage both stream- and circuit-level congestion control
unilaterally without having to share this information with consumers,
meaning we can get rid of some locks.
The way streams work also changes, in order to facilitate better
handling of backpressure / fairness between streams: each stream now has
a set of channels to send and receive messages over, instead of sending
relay cells directly onto the channel (now, the reactor pulls messages
off each stream in each map, and tries to avoid doing so if it won't be
able to forward them yet).
Additionally, a lot of "close this circuit / stream" messages aren't
required any more, since that state is simply indicated by one end of a
channel going away. This should make cleanup a lot less brittle.
Getting all of this to work involved writing a fair deal of intricate
nonblocking code in Reactor::run_once that tries very hard to be mindful
of making backpressure work correctly (and congestion control); the old
code could get away with having tasks .await on things, but the new
reactor can't really do this (as it'd lock the reactor up), so has to do
everything in a nonblocking manner.
@nickm pointed out that refactoring tor_proto::channel's Reactor to do
sending as well meant that it could only send or receive, but not both,
simultaneously, which was bad!
To fix this, rewrite Reactor::run_once to use a handcrafted future (with
futures::future::poll_fn) that can handle the logic required to push
items onto the sink asynchronously (i.e. checking that it can be written
to before trying to do that, and then flushing it).
This also means we don't use select_biased! any more, and just handroll
that logic ourselves; as a small bonus, we can now process all 3 kinds
of message in one run_once() call, instead of having to do only one of
them.
Instead of awkwardly sharing the internals of a `tor-proto` `Channel`
between the reactor task and any other tasks, move most of the internals
into the reactor and have other tasks communicate with the reactor via
message-passing to allocate circuits and send cells.
This makes a lot of things simple, and has convenient properties like
not needing to wrap the `Channel` in an `Arc` (though some places in the
code still do this for now).
A lot of test code required tweaking in order to deal with the refactor;
in fact, fixing the tests probably took longer than writing the mainline
code (!). Importantly, we now use `tokio`'s `tokio::test` annotation
instead of `async_test`, so that we can run things in the background
(which is required to have reactors running for the circuit tests).
This is an instance of #205, and also kind of #217.
We need this for the circuit timeout estimator (#57). It needs to
know "how recently have we got some incoming traffic", so that it
can tell whether a circuit has truly timed out, or whether the
entire network is down.
I'm implementing this with coarsetime, since we need to update these
in response to every single incoming cell, and we need the timestamp
operation to be _fast_.
(This reinstates an earlier commit, f30b2280, which I reverted
because we didn't need it at the time.)
Closes#179.
Basically the same thing as 371437d338
("Refactor tor_proto::channel::Reactor to use an UnboundedSender"), but
for tor_proto::circuit's Reactor instead.
(part of arti#217)
There wasn't any good reason for tor-proto's channel reactor to use a
shedload of oneshot channels instead of just an mpsc UnboundedSender,
and the whole `CtrlResult` thing made even less sense.
Straighten this code out by replacing all of that machinery with a
simple UnboundedSender, instead.
(part of arti#218)
Most of the structs in `arti-client` have example code now, to give a
clearer idea of how they're used.
Annoyingly, a lot of the types exposed in `arti-client` are actually
re-exports, which makes documentation a bit harder: example code that
references other parts of `arti-client` can't actually be run as a
doctest, since the crate it's in is a dependency of `arti-client`.
We might be able to fix this in future by doing the documentation in
`arti-client` itself, but rustdoc seems to have some weird behaviours
there that need to be investigated first (for example, it seems to merge
the re-export and original documentation, and also put the re-export
documentation on the `impl` block for some reason).
For now, though, this commit just writes the docs from the point of view
of an `arti-client` consumer, removing notes specific to the crate in
which they're defined. It's not ideal, but at least the end user
experience is decent.
This will let callers use the tokio traits on these types too, if
they call `split()` on the DataStream.
(Tokio also has a `tokio::io::split()` method, but it requires a
lock whereas `DataStream::split()` doesn't.)
futures::io::AsyncRead (and Write) isn't the same thing as tokio::io::AsyncRead,
which is a somewhat annoying misfeature of the Rust async ecosystem (!).
To mitigate this somewhat for people trying to use the `DataStream` struct with
tokio, implement the tokio versions of the above traits using `tokio-util`'s
compat layer, if a crate feature (`tokio`) is enabled.