Rename LogId -> UniqId.

This commit is contained in:
Nick Mathewson 2020-12-01 13:44:25 -05:00
parent 3fe97baa2a
commit ea0cb1d2cc
7 changed files with 125 additions and 106 deletions

View File

@ -56,11 +56,11 @@
mod circmap;
mod codec;
mod handshake;
mod logid;
mod reactor;
mod unique_id;
pub(crate) use crate::channel::logid::LogId;
use crate::channel::reactor::{CtrlMsg, CtrlResult};
pub(crate) use crate::channel::unique_id::UniqId;
use crate::circuit;
use crate::circuit::celltypes::CreateResponse;
use crate::{Error, Result};
@ -93,7 +93,7 @@ type CellFrame<T> = futures_codec::Framed<T, crate::channel::codec::ChannelCodec
/// A channel is a direct connection to a Tor relay, implemented using TLS.
pub struct Channel {
/// Logging identifier for this stream. (Used for logging only.)
logid: LogId,
unique_id: UniqId,
/// Validated Ed25519 identity for this peer.
ed25519_id: Ed25519Identity,
/// Validated RSA identity for this peer.
@ -128,7 +128,7 @@ struct ChannelImpl {
sendclosed: Option<oneshot::Sender<CtrlMsg>>,
/// Context for allocating unique circuit log identifiers.
circ_logid_ctx: logid::CircLogIdContext,
circ_unique_id_ctx: unique_id::CircUniqIdContext,
}
/// Structure for building and launching a Tor channel.
@ -187,7 +187,7 @@ impl Channel {
link_protocol: u16,
tls_sink: Box<dyn Sink<ChanCell, Error = tor_cell::Error> + Send + Unpin + 'static>,
tls_stream: T,
logid: LogId,
unique_id: UniqId,
ed25519_id: Ed25519Identity,
rsa_id: RSAIdentity,
) -> (Arc<Self>, reactor::Reactor<T>)
@ -206,11 +206,11 @@ impl Channel {
circmap: Arc::downgrade(&circmap),
sendctrl,
sendclosed: Some(sendclosed),
circ_logid_ctx: logid::CircLogIdContext::new(),
circ_unique_id_ctx: unique_id::CircUniqIdContext::new(),
};
let inner = Mutex::new(inner);
let channel = Channel {
logid,
unique_id,
ed25519_id,
rsa_id,
closed: AtomicBool::new(false),
@ -224,7 +224,7 @@ impl Channel {
recvctrl,
recvclosed,
tls_stream,
logid,
unique_id,
);
(channel, reactor)
@ -287,7 +287,7 @@ impl Channel {
Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log.
_ => trace!(
"{}: Sending {} for {}",
self.logid,
self.unique_id,
cell.msg().cmd(),
cell.circid()
),
@ -320,7 +320,7 @@ impl Channel {
let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
let (send_circ_destroy, recv_circ_destroy) = oneshot::channel();
let (circ_logid, id) = {
let (circ_unique_id, id) = {
let mut inner = self.inner.lock().await;
inner
.sendctrl
@ -328,15 +328,15 @@ impl Channel {
.await
.map_err(|_| Error::InternalError("Can't queue circuit closer".into()))?;
if let Some(circmap) = inner.circmap.upgrade() {
let my_logid = self.logid;
let circ_logid = inner.circ_logid_ctx.next(my_logid);
let my_unique_id = self.unique_id;
let circ_unique_id = inner.circ_unique_id_ctx.next(my_unique_id);
let mut cmap = circmap.lock().await;
(circ_logid, cmap.add_ent(rng, createdsender, sender)?)
(circ_unique_id, cmap.add_ent(rng, createdsender, sender)?)
} else {
return Err(Error::ChannelClosed);
}
};
trace!("{}: Allocated CircId {}", circ_logid, id);
trace!("{}: Allocated CircId {}", circ_unique_id, id);
let destroy_handle = CircDestroyHandle::new(id, send_circ_destroy);
@ -346,7 +346,7 @@ impl Channel {
createdreceiver,
Some(destroy_handle),
receiver,
circ_logid,
circ_unique_id,
))
}
@ -447,17 +447,17 @@ pub(crate) mod test {
let circmap = circmap::CircMap::new(circmap::CircIdRange::High);
let circmap = Arc::new(Mutex::new(circmap));
let logid = LogId::new();
let unique_id = UniqId::new();
let inner = ChannelImpl {
link_protocol: 4,
tls: Box::new(cell_send),
circmap: Arc::downgrade(&circmap),
sendctrl: ctrl_send,
sendclosed: None,
circ_logid_ctx: logid::CircLogIdContext::new(),
circ_unique_id_ctx: unique_id::CircUniqIdContext::new(),
};
let channel = Channel {
logid,
unique_id,
ed25519_id: [0u8; 32].into(),
rsa_id: [0u8; 20].into(),
closed: AtomicBool::new(false),

View File

@ -6,7 +6,7 @@ use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use crate::channel::codec::ChannelCodec;
use crate::channel::LogId;
use crate::channel::UniqId;
use crate::{Error, Result};
use tor_cell::chancell::{msg, ChanCmd};
@ -40,7 +40,7 @@ pub struct OutboundClientHandshake<T: AsyncRead + AsyncWrite + Send + Unpin + 's
target_addr: Option<SocketAddr>,
/// Logging identifier for this stream. (Used for logging only.)
logid: LogId,
unique_id: UniqId,
}
/// A client channel on which versions have been negotiated and the
@ -59,7 +59,7 @@ pub struct UnverifiedChannel<T: AsyncRead + AsyncWrite + Send + Unpin + 'static>
#[allow(dead_code)] // Relays will need this.
netinfo_cell: msg::Netinfo,
/// Logging identifier for this stream. (Used for logging only.)
logid: LogId,
unique_id: UniqId,
}
/// A client channel on which versions have been negotiated,
@ -77,7 +77,7 @@ pub struct VerifiedChannel<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> {
/// Declared target for this stream, if any.
target_addr: Option<SocketAddr>,
/// Logging identifier for this stream. (Used for logging only.)
logid: LogId,
unique_id: UniqId,
/// Validated Ed25519 identity for this peer.
ed25519_id: Ed25519Identity,
/// Validated RSA identity for this peer.
@ -90,7 +90,7 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> OutboundClientHandshake
Self {
tls,
target_addr,
logid: LogId::new(),
unique_id: UniqId::new(),
}
}
@ -98,10 +98,10 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> OutboundClientHandshake
/// the relay's handshake information.
pub async fn connect(mut self) -> Result<UnverifiedChannel<T>> {
match self.target_addr {
Some(addr) => debug!("{}: starting Tor handshake with {}", self.logid, addr),
None => debug!("{}: starting Tor handshake", self.logid),
Some(addr) => debug!("{}: starting Tor handshake with {}", self.unique_id, addr),
None => debug!("{}: starting Tor handshake", self.unique_id),
}
trace!("{}: sending versions", self.logid);
trace!("{}: sending versions", self.unique_id);
// Send versions cell
{
let my_versions = msg::Versions::new(LINK_PROTOCOLS);
@ -110,7 +110,7 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> OutboundClientHandshake
}
// Get versions cell.
trace!("{}: waiting for versions", self.logid);
trace!("{}: waiting for versions", self.unique_id);
let their_versions: msg::Versions = {
// TODO: this could be turned into another function, I suppose.
let mut hdr = [0u8; 5];
@ -124,13 +124,13 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> OutboundClientHandshake
let mut reader = Reader::from_slice(&msg);
reader.extract()?
};
trace!("{}: received {:?}", self.logid, their_versions);
trace!("{}: received {:?}", self.unique_id, their_versions);
// Determine which link protocol we negotiated.
let link_protocol = their_versions
.best_shared_link_protocol(LINK_PROTOCOLS)
.ok_or_else(|| Error::ChanProto("No shared link protocols".into()))?;
trace!("{}: negotiated version {}", self.logid, link_protocol);
trace!("{}: negotiated version {}", self.unique_id, link_protocol);
// Now we can switch to using a "Framed". We can ignore the
// AsyncRead/AsyncWrite aspects of the tls, and just treat it
@ -144,11 +144,11 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> OutboundClientHandshake
let mut seen_authchallenge = false;
// Loop: reject duplicate and unexpected cells
trace!("{}: waiting for rest of handshake.", self.logid);
trace!("{}: waiting for rest of handshake.", self.unique_id);
while let Some(m) = tls.next().await {
use msg::ChanMsg::*;
let (_, m) = m?.into_circid_and_msg();
trace!("{}: received a {} cell.", self.logid, m.cmd());
trace!("{}: received a {} cell.", self.unique_id, m.cmd());
match m {
// Are these technically allowed?
Padding(_) | VPadding(_) => (),
@ -187,14 +187,14 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> OutboundClientHandshake
(Some(_), None) => Err(Error::ChanProto("Missing netinfo or closed stream".into())),
(None, _) => Err(Error::ChanProto("Missing certs cell".into())),
(Some(certs_cell), Some(netinfo_cell)) => {
trace!("{}: receieved handshake, ready to verify.", self.logid);
trace!("{}: receieved handshake, ready to verify.", self.unique_id);
Ok(UnverifiedChannel {
link_protocol,
tls,
certs_cell,
netinfo_cell,
target_addr: self.target_addr,
logid: self.logid,
unique_id: self.unique_id,
})
}
}
@ -337,7 +337,7 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> UnverifiedChannel<T> {
trace!(
"{}: Validated identity as {} [{}]",
self.logid,
self.unique_id,
ed25519_id,
rsa_id
);
@ -361,7 +361,7 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> UnverifiedChannel<T> {
Ok(VerifiedChannel {
link_protocol: self.link_protocol,
tls: self.tls,
logid: self.logid,
unique_id: self.unique_id,
target_addr: self.target_addr,
ed25519_id,
rsa_id,
@ -382,13 +382,13 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> VerifiedChannel<T> {
Arc<super::Channel>,
super::reactor::Reactor<stream::SplitStream<CellFrame<T>>>,
)> {
trace!("{}: Sending netinfo cell.", self.logid);
trace!("{}: Sending netinfo cell.", self.unique_id);
let netinfo = msg::Netinfo::for_client(self.target_addr.as_ref().map(SocketAddr::ip));
self.tls.send(netinfo.into()).await?;
debug!(
"{}: Completed handshake with {} [{}]",
self.logid, self.ed25519_id, self.rsa_id
self.unique_id, self.ed25519_id, self.rsa_id
);
let (tls_sink, tls_stream) = self.tls.split();
@ -397,7 +397,7 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> VerifiedChannel<T> {
self.link_protocol,
Box::new(tls_sink),
tls_stream,
self.logid,
self.unique_id,
self.ed25519_id,
self.rsa_id,
))
@ -572,7 +572,7 @@ pub(super) mod test {
certs_cell: certs,
netinfo_cell,
target_addr: None,
logid: LogId::new(),
unique_id: UniqId::new(),
}
}
@ -817,7 +817,7 @@ pub(super) mod test {
let ver = VerifiedChannel {
link_protocol: 4,
tls: futures_codec::Framed::new(MsgBuf::new(&b""[..]), ChannelCodec::new(4)),
logid: LogId::new(),
unique_id: UniqId::new(),
target_addr: Some(peer_addr),
ed25519_id,
rsa_id,

View File

@ -7,7 +7,7 @@
//! or in the error handling behavior.
use super::circmap::{CircEnt, CircMap};
use super::LogId;
use super::UniqId;
use crate::circuit::halfcirc::HalfCirc;
use crate::util::err::ReactorError;
use crate::{Error, Result};
@ -80,7 +80,7 @@ where
channel: Weak<super::Channel>,
/// Logging identifier for this channel
logid: LogId,
unique_id: UniqId,
}
impl<T> Reactor<T>
@ -98,7 +98,7 @@ where
control: mpsc::Receiver<CtrlResult>,
closeflag: oneshot::Receiver<CtrlMsg>,
input: T,
logid: LogId,
unique_id: UniqId,
) -> Self {
let mut oneshots = stream::SelectAll::new();
oneshots.push(stream::once(closeflag));
@ -108,7 +108,7 @@ where
input: input.fuse(),
channel: Arc::downgrade(&channel),
circs: circmap,
logid,
unique_id,
}
}
@ -125,7 +125,7 @@ where
} else {
return Err(Error::ChannelClosed);
}
debug!("{}: Running reactor", self.logid);
debug!("{}: Running reactor", self.unique_id);
let result: Result<()> = loop {
match self.run_once().await {
Ok(()) => (),
@ -133,7 +133,7 @@ where
Err(ReactorError::Err(e)) => break Err(e),
}
};
debug!("{}: Reactor stopped: {:?}", self.logid, result);
debug!("{}: Reactor stopped: {:?}", self.unique_id, result);
if let Some(chan) = self.channel.upgrade() {
chan.closed.store(true, Ordering::SeqCst);
}
@ -173,7 +173,7 @@ where
/// Handle a CtrlMsg other than Shutdown.
async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
trace!("{}: reactor received {:?}", self.logid, msg);
trace!("{}: reactor received {:?}", self.unique_id, msg);
match msg {
CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
CtrlMsg::Register(ch) => self.register(ch),
@ -196,7 +196,7 @@ where
match msg {
Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log.
_ => trace!("{}: received {} for {}", self.logid, msg.cmd(), circid),
_ => trace!("{}: received {} for {}", self.unique_id, msg.cmd(), circid),
}
match msg {
@ -286,7 +286,7 @@ where
Some(CircEnt::Opening(oneshot, _)) => {
trace!(
"{}: Passing destroy to pending circuit {}",
self.logid,
self.unique_id,
circid
);
oneshot
@ -301,7 +301,11 @@ where
}
// It's an open circuit: tell it that it got a DESTROY cell.
Some(CircEnt::Open(mut sink)) => {
trace!("{}: Passing destroy to open circuit {}", self.logid, circid);
trace!(
"{}: Passing destroy to open circuit {}",
self.unique_id,
circid
);
sink.send(msg.try_into()?)
.await
// XXXX I think that this one actually means the other side
@ -314,7 +318,11 @@ where
Some(CircEnt::DestroySent(_)) => Ok(()),
// Got a DESTROY cell for a circuit we don't have.
None => {
trace!("{}: Destroy for nonexistent circuit {}", self.logid, circid);
trace!(
"{}: Destroy for nonexistent circuit {}",
self.unique_id,
circid
);
Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
}
}
@ -323,7 +331,11 @@ where
/// Called when a circuit goes away: sends a DESTROY cell and removes
/// the circuit.
async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
trace!("{}: Circuit {} is gone; sending DESTROY", self.logid, id);
trace!(
"{}: Circuit {} is gone; sending DESTROY",
self.unique_id,
id
);
{
let mut map = self.circs.lock().await;
// Remove the circuit's entry from the map: nothing more
@ -362,7 +374,7 @@ pub(crate) mod test {
let link_protocol = 4;
let (send1, recv1) = mpsc::channel(32);
let (send2, recv2) = mpsc::channel(32);
let logid = LogId::new();
let unique_id = UniqId::new();
let ed_id = [0x1; 32].into();
let rsa_id = [0x2; 20].into();
let send1 = send1.sink_map_err(|_| tor_cell::Error::ChanProto("dummy message".into()));
@ -370,7 +382,7 @@ pub(crate) mod test {
link_protocol,
Box::new(send1),
recv2,
logid,
unique_id,
ed_id,
rsa_id,
);

View File

@ -13,20 +13,20 @@ static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
/// for a very long time; if you do, we detect that and exit with an
/// assertion failure.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct LogId(usize);
pub struct UniqId(usize);
impl LogId {
/// Construct a new LogId.
impl UniqId {
/// Construct a new UniqId.
pub fn new() -> Self {
// Relaxed ordering is fine; we don't care about how this
// is instantiated with respoect to other channels.
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
assert!(id != std::usize::MAX, "Exhausted the channel ID namespace");
LogId(id)
UniqId(id)
}
}
impl Display for LogId {
impl Display for UniqId {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "Chan {}", self.0)
}
@ -37,25 +37,25 @@ impl Display for LogId {
/// We don't use circuit IDs here, because they tend are huge and
/// random and can be reused more readily.
#[derive(Debug)]
pub(crate) struct CircLogIdContext {
pub(crate) struct CircUniqIdContext {
/// Next value to be handed out for this channel's circuits.
next_circ_id: usize,
}
impl CircLogIdContext {
/// Create a new CircLogIdContext
impl CircUniqIdContext {
/// Create a new CircUniqIdContext
pub(super) fn new() -> Self {
CircLogIdContext { next_circ_id: 0 }
CircUniqIdContext { next_circ_id: 0 }
}
/// Construct a new, unique-ish circuit LogId
pub(super) fn next(&mut self, logid: LogId) -> crate::circuit::LogId {
let circ_logid = self.next_circ_id;
/// Construct a new, unique-ish circuit UniqId
pub(super) fn next(&mut self, unique_id: UniqId) -> crate::circuit::UniqId {
let circ_unique_id = self.next_circ_id;
self.next_circ_id += 1;
assert!(
self.next_circ_id != 0,
"Exhaused the unique circuit ID namespace on a channel"
);
crate::circuit::LogId::new(logid.0, circ_logid)
crate::circuit::UniqId::new(unique_id.0, circ_unique_id)
}
}
@ -63,8 +63,8 @@ impl CircLogIdContext {
mod test {
use super::*;
#[test]
fn chan_logid() {
let ids: Vec<LogId> = (0..10).map(|_| LogId::new()).collect();
fn chan_unique_id() {
let ids: Vec<UniqId> = (0..10).map(|_| UniqId::new()).collect();
// Make sure we got distinct numbers
let mut all_nums: Vec<_> = ids.iter().map(|x| x.0).collect();
@ -77,8 +77,8 @@ mod test {
#[test]
fn chan_circid() {
let chan_id99 = LogId(99);
let mut ctx = CircLogIdContext::new();
let chan_id99 = UniqId(99);
let mut ctx = CircUniqIdContext::new();
let _id0 = ctx.next(chan_id99);
let _id1 = ctx.next(chan_id99);

View File

@ -41,15 +41,15 @@
pub(crate) mod celltypes;
pub(crate) mod halfcirc;
mod halfstream;
mod logid;
pub(crate) mod reactor;
pub(crate) mod sendme;
mod streammap;
mod unique_id;
use crate::channel::{Channel, CircDestroyHandle};
use crate::circuit::celltypes::*;
pub(crate) use crate::circuit::logid::LogId;
use crate::circuit::reactor::{CtrlMsg, CtrlResult};
pub(crate) use crate::circuit::unique_id::UniqId;
use crate::crypto::cell::{
ClientLayer, CryptInit, HopNum, InboundClientLayer, OutboundClientCrypt, OutboundClientLayer,
RelayCellBody,
@ -129,7 +129,7 @@ struct ClientCircImpl {
/// is a RELAY cell with a stream ID value of 0.
sendmeta: Option<oneshot::Sender<MetaResult>>,
/// An identifier for this circuit, for logging purposes.
logid: LogId,
unique_id: UniqId,
}
/// A handle to a circuit as held by a stream. Used to send cells.
@ -217,7 +217,7 @@ impl ClientCirc {
}
circ.sendmeta = Some(sender);
trace!("{}: Registered a meta-cell handler", circ.logid);
trace!("{}: Registered a meta-cell handler", circ.unique_id);
Ok(receiver)
}
@ -265,13 +265,13 @@ impl ClientCirc {
let receiver = self.register_meta_handler().await?;
// Now send the EXTEND2 cell to the the last hop...
let (logid, hop) = {
let (unique_id, hop) = {
let mut c = self.c.lock().await;
let n_hops = c.crypto_out.n_layers();
let hop = ((n_hops - 1) as u8).into();
debug!(
"{}: Extending circuit to hop {} with {:?}",
c.logid,
c.unique_id,
n_hops + 1,
linkspecs
);
@ -283,12 +283,12 @@ impl ClientCirc {
)
.await?;
(c.logid, hop)
(c.unique_id, hop)
// note that we're dropping the lock here, since we're going
// to wait for a response.
};
trace!("{}: waiting for EXTENDED2 cell", logid);
trace!("{}: waiting for EXTENDED2 cell", unique_id);
// ... and now we wait for a response.
let (from_hop, msg) = receiver.await.map_err(|_| {
Error::CircDestroy("Circuit closed while waiting for extended cell".into())
@ -320,13 +320,16 @@ impl ClientCirc {
};
let server_handshake = msg.into_body();
trace!("{}: Received EXTENDED2 cell; completing handshake.", logid);
trace!(
"{}: Received EXTENDED2 cell; completing handshake.",
unique_id
);
// Now perform the second part of the handshake, and see if it
// succeeded.
let keygen = H::client2(state, server_handshake)?;
let layer = L::construct(keygen)?;
debug!("{}: Handshake complete; circuit extended.", logid);
debug!("{}: Handshake complete; circuit extended.", unique_id);
// If we get here, it succeeded. Add a new hop to the circuit.
let (layer_fwd, layer_back) = layer.split();
@ -570,7 +573,7 @@ impl ClientCircImpl {
return Err(Error::CircuitClosed);
}
trace!("{}: Received meta-cell {:?}", self.logid, msg);
trace!("{}: Received meta-cell {:?}", self.unique_id, msg);
// For all other command types, we'll only get them in response
// to another command, which should have registered a responder.
@ -691,7 +694,7 @@ impl PendingClientCirc {
createdreceiver: oneshot::Receiver<CreateResponse>,
circ_closed: Option<CircDestroyHandle>,
input: mpsc::Receiver<ClientCircChanMsg>,
logid: LogId,
unique_id: UniqId,
) -> (PendingClientCirc, reactor::Reactor) {
let crypto_out = OutboundClientCrypt::new();
let (sendclosed, recvclosed) = oneshot::channel::<CtrlMsg>();
@ -708,7 +711,7 @@ impl PendingClientCirc {
control: sendctrl,
sendshutdown: Some(sendclosed),
sendmeta: None,
logid,
unique_id,
};
let circuit = ClientCirc {
closed: AtomicBool::new(false),
@ -719,7 +722,7 @@ impl PendingClientCirc {
recvcreated: createdreceiver,
circ: Arc::clone(&circuit),
};
let reactor = reactor::Reactor::new(circuit, recvctrl, recvclosed, input, logid);
let reactor = reactor::Reactor::new(circuit, recvctrl, recvclosed, input, unique_id);
(pending, reactor)
}
@ -759,11 +762,15 @@ impl PendingClientCirc {
let PendingClientCirc { circ, recvcreated } = self;
let (state, msg) = H::client1(rng, &key)?;
let create_cell = wrap.to_chanmsg(msg);
let logid = {
let unique_id = {
let mut c = circ.c.lock().await;
debug!("{}: Extending to hop 1 with {}", c.logid, create_cell.cmd());
debug!(
"{}: Extending to hop 1 with {}",
c.unique_id,
create_cell.cmd()
);
c.send_msg(create_cell).await?;
c.logid
c.unique_id
};
let reply = recvcreated
@ -775,7 +782,7 @@ impl PendingClientCirc {
let layer = L::construct(keygen)?;
debug!("{}: Handshake complete; circuit created.", logid);
debug!("{}: Handshake complete; circuit created.", unique_id);
let (layer_fwd, layer_back) = layer.split();
circ.add_hop(
@ -1007,7 +1014,7 @@ mod test {
let circid = 128.into();
let (created_send, created_recv) = oneshot::channel();
let (_circmsg_send, circmsg_recv) = mpsc::channel(64);
let logid = LogId::new(23, 17);
let unique_id = UniqId::new(23, 17);
let (pending, reactor) = PendingClientCirc::new(
circid,
@ -1015,7 +1022,7 @@ mod test {
created_recv,
None, // circ_closed.
circmsg_recv,
logid,
unique_id,
);
// one to reply as a relay, and one to be the reactor.
@ -1117,7 +1124,7 @@ mod test {
let circid = 128.into();
let (_created_send, created_recv) = oneshot::channel();
let (circmsg_send, circmsg_recv) = mpsc::channel(64);
let logid = LogId::new(23, 17);
let unique_id = UniqId::new(23, 17);
let (pending, mut reactor) = PendingClientCirc::new(
circid,
@ -1125,7 +1132,7 @@ mod test {
created_recv,
None, // circ_closed.
circmsg_recv,
logid,
unique_id,
);
let PendingClientCirc {

View File

@ -8,7 +8,7 @@
use super::streammap::{ShouldSendEnd, StreamEnt};
use crate::circuit::celltypes::ClientCircChanMsg;
use crate::circuit::logid::LogId;
use crate::circuit::unique_id::UniqId;
use crate::circuit::{sendme, streammap};
use crate::crypto::cell::{HopNum, InboundClientCrypt, InboundClientLayer};
use crate::util::err::ReactorError;
@ -138,7 +138,7 @@ pub struct Reactor {
/// List of hops state objects used by the reactor
hops: Vec<InboundHop>,
/// An identifier for logging about this reactor's circuit.
logid: LogId,
unique_id: UniqId,
}
impl Reactor {
@ -148,7 +148,7 @@ impl Reactor {
control: mpsc::Receiver<CtrlResult>,
closeflag: oneshot::Receiver<CtrlMsg>,
input: mpsc::Receiver<ClientCircChanMsg>,
logid: LogId,
unique_id: UniqId,
) -> Self {
let mut oneshots = stream::SelectAll::new();
oneshots.push(stream::once(closeflag));
@ -159,7 +159,7 @@ impl Reactor {
circuit: Arc::downgrade(&circuit),
crypto_in: InboundClientCrypt::new(),
hops: Vec::new(),
logid,
unique_id,
}
}
@ -176,7 +176,7 @@ impl Reactor {
} else {
return Err(Error::CircuitClosed);
}
debug!("{}: Running circuit reactor", self.logid);
debug!("{}: Running circuit reactor", self.unique_id);
let result: Result<()> = loop {
match self.run_once().await {
Ok(()) => (),
@ -184,7 +184,7 @@ impl Reactor {
Err(ReactorError::Err(e)) => break Err(e),
}
};
debug!("{}: Circuit reactor stopped: {:?}", self.logid, result);
debug!("{}: Circuit reactor stopped: {:?}", self.unique_id, result);
if let Some(circ) = self.circuit.upgrade() {
// TODO: should this call terminate?
circ.closed.store(true, Ordering::SeqCst);
@ -230,7 +230,7 @@ impl Reactor {
/// Handle a CtrlMsg other than Shutdown.
async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
trace!("{}: reactor received {:?}", self.logid, msg);
trace!("{}: reactor received {:?}", self.unique_id, msg);
match msg {
CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
CtrlMsg::CloseStream(hop, id, recvwindow) => {
@ -275,7 +275,7 @@ impl Reactor {
let should_send_end = hop.map.terminate(id, window)?;
trace!(
"{}: Ending stream {}; should_send_end={:?}",
self.logid,
self.unique_id,
id,
should_send_end
);

View File

@ -8,21 +8,21 @@ use std::fmt::{Display, Formatter};
/// random number, and can be reused over time. This is less likely
/// to repeat.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct LogId {
pub struct UniqId {
/// Channel that this circuit is on.
chan: usize,
/// ID for the circuit on the channel
circ: usize,
}
impl LogId {
/// Construct a new circuit LogId from its parts
impl UniqId {
/// Construct a new circuit UniqId from its parts
pub(crate) fn new(chan: usize, circ: usize) -> Self {
LogId { chan, circ }
UniqId { chan, circ }
}
}
impl Display for LogId {
impl Display for UniqId {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "Circ {}.{}", self.chan, self.circ)
}