Merge branch 'refactor/arti-bench' into 'main'

refactor `arti-bench`

See merge request tpo/core/arti!208
This commit is contained in:
eta 2022-01-13 14:16:01 +00:00
commit 0138b8477d
1 changed files with 160 additions and 118 deletions

View File

@ -37,37 +37,32 @@
#![allow(clippy::unwrap_used)]
use anyhow::{anyhow, Result};
use arti_client::{TorAddr, TorClient};
use arti_client::{TorAddr, TorClient, TorClientConfig};
use arti_config::ArtiConfig;
use clap::{App, Arg};
use rand::distributions::Standard;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr, TcpListener};
use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream};
use std::ops::Deref;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::SystemTime;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_socks::tcp::Socks5Stream;
use tor_rtcompat::SpawnBlocking;
use tor_rtcompat::Runtime;
use tracing::info;
/// A vector of random data, used as a test payload for benchmarking.
struct RandomPayload {
/// The actual random data.
data: Vec<u8>,
}
impl RandomPayload {
/// Generates a payload with `size` bytes.
fn generate(size: usize) -> Self {
let mut vector = vec![0_u8; size];
let mut rng = rand::thread_rng();
rng.fill(&mut vector as &mut [u8]);
Self { data: vector }
}
/// Generate a random payload of bytes of the given size
fn random_payload(size: usize) -> Vec<u8> {
rand::thread_rng()
.sample_iter(Standard)
.take(size)
.collect()
}
/// Timing information from the benchmarking server.
@ -138,62 +133,74 @@ impl TimingSummary {
}
}
/// Runs the benchmarking TCP server, using the provided TCP listener and set of payloads.
fn serve_payload(listener: &TcpListener, send: &Arc<RandomPayload>, receive: &Arc<RandomPayload>) {
info!("Listening for clients...");
for stream in listener.incoming() {
let send = Arc::clone(send);
let receive = Arc::clone(receive);
std::thread::spawn(move || {
let mut stream = stream.unwrap();
let peer_addr = stream.peer_addr().unwrap();
// Do this potentially costly allocation before we do all the timing stuff.
let mut received = vec![0_u8; receive.data.len()];
/// Run the timing routine
fn run_timing(mut stream: TcpStream, send: &Arc<[u8]>, receive: &Arc<[u8]>) -> Result<()> {
let peer_addr = stream.peer_addr()?;
// Do this potentially costly allocation before we do all the timing stuff.
let mut received = vec![0_u8; receive.len()];
info!("Accepted connection from {}", peer_addr);
let accepted_ts = SystemTime::now();
let mut data = &send.data as &[u8];
let copied = std::io::copy(&mut data, &mut stream).unwrap();
stream.flush().unwrap();
let copied_ts = SystemTime::now();
assert_eq!(copied, send.data.len() as u64);
info!("Copied {} bytes payload to {}.", copied, peer_addr);
let read = stream.read(&mut received).unwrap();
if read == 0 {
panic!("unexpected EOF");
}
let first_byte_ts = SystemTime::now();
stream.read_exact(&mut received[read..]).unwrap();
let read_done_ts = SystemTime::now();
info!(
"Received {} bytes payload from {}.",
received.len(),
peer_addr
);
// Check we actually got what we thought we would get.
if received != receive.data {
panic!("Received data doesn't match expected; potential corruption?");
}
let st = ServerTiming {
accepted_ts,
copied_ts,
first_byte_ts,
read_done_ts,
};
serde_json::to_writer(&mut stream, &st).unwrap();
info!("Wrote timing payload to {}.", peer_addr);
});
info!("Accepted connection from {}", peer_addr);
let accepted_ts = SystemTime::now();
let mut data: &[u8] = send.deref();
let copied = std::io::copy(&mut data, &mut stream)?;
stream.flush()?;
let copied_ts = SystemTime::now();
assert_eq!(copied, send.len() as u64);
info!("Copied {} bytes payload to {}.", copied, peer_addr);
let read = stream.read(&mut received)?;
if read == 0 {
panic!("unexpected EOF");
}
let first_byte_ts = SystemTime::now();
stream.read_exact(&mut received[read..])?;
let read_done_ts = SystemTime::now();
info!(
"Received {} bytes payload from {}.",
received.len(),
peer_addr
);
// Check we actually got what we thought we would get.
if received != receive.deref() {
panic!("Received data doesn't match expected; potential corruption?");
}
let st = ServerTiming {
accepted_ts,
copied_ts,
first_byte_ts,
read_done_ts,
};
serde_json::to_writer(&mut stream, &st)?;
info!("Wrote timing payload to {}.", peer_addr);
Ok(())
}
/// Runs the benchmarking TCP server, using the provided TCP listener and set of payloads.
fn serve_payload(
listener: &TcpListener,
send: &Arc<[u8]>,
receive: &Arc<[u8]>,
) -> Vec<JoinHandle<Result<()>>> {
info!("Listening for clients...");
listener
.incoming()
.into_iter()
.map(|stream| {
let send = Arc::clone(send);
let receive = Arc::clone(receive);
std::thread::spawn(move || run_timing(stream?, &send, &receive))
})
.collect()
}
/// Runs the benchmarking client on the provided socket.
async fn client<S: AsyncRead + AsyncWrite + Unpin>(
mut socket: S,
send: Arc<RandomPayload>,
receive: Arc<RandomPayload>,
send: Arc<[u8]>,
receive: Arc<[u8]>,
) -> Result<ClientTiming> {
// Do this potentially costly allocation before we do all the timing stuff.
let mut received = vec![0_u8; receive.data.len()];
let mut received = vec![0_u8; receive.len()];
let started_ts = SystemTime::now();
let read = socket.read(&mut received).await?;
@ -204,7 +211,7 @@ async fn client<S: AsyncRead + AsyncWrite + Unpin>(
socket.read_exact(&mut received[read..]).await?;
let read_done_ts = SystemTime::now();
info!("Received {} bytes payload.", received.len());
let mut send_data = &send.data as &[u8];
let mut send_data = &send as &[u8];
tokio::io::copy(&mut send_data, &mut socket).await?;
socket.flush().await?;
@ -212,7 +219,7 @@ async fn client<S: AsyncRead + AsyncWrite + Unpin>(
let copied_ts = SystemTime::now();
// Check we actually got what we thought we would get.
if received != receive.data {
if received != receive.deref() {
panic!("Received data doesn't match expected; potential corruption?");
}
let mut json_buf = Vec::new();
@ -224,8 +231,8 @@ async fn client<S: AsyncRead + AsyncWrite + Unpin>(
read_done_ts,
copied_ts,
server,
download_size: receive.data.len(),
upload_size: send.data.len(),
download_size: receive.len(),
upload_size: send.len(),
})
}
@ -296,69 +303,104 @@ fn main() -> Result<()> {
.unwrap()
.parse::<usize>()?;
info!("Generating test payloads, please wait...");
let upload_payload = Arc::new(RandomPayload::generate(upload_bytes));
let download_payload = Arc::new(RandomPayload::generate(download_bytes));
let upload_payload = random_payload(upload_bytes).into();
let download_payload = random_payload(download_bytes).into();
info!(
"Generated payloads ({} upload, {} download)",
upload_bytes, download_bytes
);
let up = Arc::clone(&upload_payload);
let dp = Arc::clone(&download_payload);
std::thread::spawn(move || {
serve_payload(&listener, &dp, &up);
let _handle = std::thread::spawn(move || -> Result<()> {
serve_payload(&listener, &dp, &up)
.into_iter()
.try_for_each(|handle| handle.join().expect("failed to join thread"))
});
info!("Benchmarking performance without Arti...");
let runtime = tor_rtcompat::tokio::create_runtime()?;
let up = Arc::clone(&upload_payload);
let dp = Arc::clone(&download_payload);
let stats = runtime.block_on(async move {
let socket = tokio::net::TcpStream::connect(connect_addr).await.unwrap();
client(socket, up, dp).await
})?;
let timing = TimingSummary::generate(&stats)?;
info!(
"without Arti: {:.2} Mbit/s up (ttfb {:.2}ms), {:.2} Mbit/s down (ttfb {:.2}ms)",
timing.upload_rate_megabit,
timing.upload_ttfb_sec * 1000.0,
timing.download_rate_megabit,
timing.download_ttfb_sec * 1000.0
);
let benchmark = Benchmark {
connect_addr,
upload_payload,
download_payload,
runtime: tor_rtcompat::tokio::create_runtime()?,
};
benchmark.without_arti()?;
if let Some(addr) = matches.value_of("socks-proxy") {
let up = Arc::clone(&upload_payload);
let dp = Arc::clone(&download_payload);
let stats = runtime.block_on(async move {
let stream = Socks5Stream::connect(addr, connect_addr).await.unwrap();
client(stream, up, dp).await
})?;
benchmark.with_proxy(addr)?;
}
benchmark.with_arti(tcc)?;
Ok(())
}
/// A helper struct for running benchmarks
#[allow(clippy::missing_docs_in_private_items)]
struct Benchmark<R>
where
R: Runtime,
{
runtime: R,
connect_addr: SocketAddr,
upload_payload: Arc<[u8]>,
download_payload: Arc<[u8]>,
}
impl<R: Runtime> Benchmark<R> {
/// Run the benchmark
///
/// `stream` should be a try-future which returns `S: AsyncRead + AsyncWrite + Unpin`.
fn run<F, S, E>(&self, name: &str, stream: F) -> Result<()>
where
F: Future<Output = Result<S, E>>,
S: AsyncRead + AsyncWrite + Unpin,
E: std::error::Error + Send + Sync + 'static,
{
let stream = self.runtime.block_on(stream)?;
let up = Arc::clone(&self.upload_payload);
let dp = Arc::clone(&self.download_payload);
info!("Benchmarking performance {}...", name);
let stats = self
.runtime
.block_on(async move { client(stream, up, dp).await })?;
let timing = TimingSummary::generate(&stats)?;
info!(
"with SOCKS proxy: {:.2} Mbit/s up (ttfb {:.2}ms), {:.2} Mbit/s down (ttfb {:.2}ms)",
"{}: {:.2} Mbit/s up (ttfb {:.2}ms), {:.2} Mbit/s down (ttfb {:.2}ms)",
name,
timing.upload_rate_megabit,
timing.upload_ttfb_sec * 1000.0,
timing.download_rate_megabit,
timing.download_ttfb_sec * 1000.0
);
Ok(())
}
/// Benchmark without Arti
fn without_arti(&self) -> Result<()> {
self.run(
"without Arti",
tokio::net::TcpStream::connect(self.connect_addr),
)
}
/// Benchmark with sock5 proxy
fn with_proxy(&self, addr: &str) -> Result<()> {
self.run(
"with SOCKS proxy",
Socks5Stream::connect(addr, self.connect_addr),
)
}
/// Benchmark with Arti
fn with_arti(&self, tcc: TorClientConfig) -> Result<()> {
info!("Starting Arti...");
let tor_client = self
.runtime
.block_on(TorClient::bootstrap(self.runtime.clone(), tcc))?;
self.run(
"with Arti",
tor_client.connect(TorAddr::dangerously_from(self.connect_addr)?),
)
}
info!("Starting Arti...");
let rt = runtime.clone();
let tor_client = runtime.block_on(TorClient::bootstrap(rt, tcc))?;
info!("Benchmarking performance with Arti...");
let up = Arc::clone(&upload_payload);
let dp = Arc::clone(&download_payload);
let stats = runtime.block_on(async move {
let stream = tor_client
.connect(TorAddr::dangerously_from(connect_addr).unwrap())
.await
.unwrap();
client(stream, up, dp).await
})?;
let timing = TimingSummary::generate(&stats)?;
info!(
"with Arti: {:.2} Mbit/s up (ttfb {:.2}ms), {:.2} Mbit/s down (ttfb {:.2}ms)",
timing.upload_rate_megabit,
timing.upload_ttfb_sec * 1000.0,
timing.download_rate_megabit,
timing.download_ttfb_sec * 1000.0
);
Ok(())
}