diff --git a/Cargo.lock b/Cargo.lock index 88e03ae96..500a0afb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -92,6 +92,8 @@ dependencies = [ "arti-client", "arti-config", "clap", + "float-ord", + "futures", "rand 0.8.4", "serde", "serde_json", @@ -955,6 +957,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "float-ord" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce81f49ae8a0482e4c55ea62ebbd7e5a686af544c00b9d090bba3ff9be97b3d" + [[package]] name = "fnv" version = "1.0.7" diff --git a/crates/arti-bench/Cargo.toml b/crates/arti-bench/Cargo.toml index a09a5ed4c..cfb3241e0 100644 --- a/crates/arti-bench/Cargo.toml +++ b/crates/arti-bench/Cargo.toml @@ -12,6 +12,8 @@ repository = "https://gitlab.torproject.org/tpo/core/arti.git/" [dependencies] clap = "2.33.0" +futures = "0.3" +float-ord = "0.3" rand = "0.8" anyhow = "1.0.5" serde = { version = "1.0.103", features = ["derive"] } diff --git a/crates/arti-bench/src/main.rs b/crates/arti-bench/src/main.rs index 3dab036a0..83d421684 100644 --- a/crates/arti-bench/src/main.rs +++ b/crates/arti-bench/src/main.rs @@ -40,9 +40,14 @@ use anyhow::{anyhow, Result}; use arti_client::{TorAddr, TorClient, TorClientConfig}; use arti_config::ArtiConfig; use clap::{App, Arg}; +use futures::stream::Stream; +use futures::StreamExt; use rand::distributions::Standard; use rand::Rng; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fmt; +use std::fmt::Formatter; use std::future::Future; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream}; @@ -98,7 +103,7 @@ pub struct ClientTiming { } /// A summary of benchmarking results, generated from `ClientTiming`. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, Serialize)] pub struct TimingSummary { /// The time to first byte (TTFB) for the download benchmark. download_ttfb_sec: f64, @@ -110,6 +115,19 @@ pub struct TimingSummary { upload_rate_megabit: f64, } +impl fmt::Display for TimingSummary { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "{:.2} Mbit/s up (ttfb {:.2}ms), {:.2} Mbit/s down (ttfb {:.2}ms)", + self.upload_rate_megabit, + self.upload_ttfb_sec * 1000.0, + self.download_rate_megabit, + self.download_ttfb_sec * 1000.0 + ) + } +} + impl TimingSummary { /// Generate a `TimingSummary` from the `ClientTiming` returned by a benchmark run. pub fn generate(ct: &ClientTiming) -> Result { @@ -215,7 +233,7 @@ async fn client( tokio::io::copy(&mut send_data, &mut socket).await?; socket.flush().await?; - info!("Sent {} bytes payload.", send_data.len()); + info!("Sent {} bytes payload.", send.len()); let copied_ts = SystemTime::now(); // Check we actually got what we thought we would get. @@ -254,6 +272,16 @@ fn main() -> Result<()> { "Path to the Arti configuration to use (usually, a Chutney-generated config).", ), ) + .arg( + Arg::with_name("num-samples") + .short("s") + .long("num-samples") + .takes_value(true) + .required(true) + .value_name("COUNT") + .default_value("3") + .help("How many samples to take per benchmark run.") + ) .arg( Arg::with_name("download-bytes") .short("d") @@ -302,6 +330,7 @@ fn main() -> Result<()> { .value_of("download-bytes") .unwrap() .parse::()?; + let samples = matches.value_of("num-samples").unwrap().parse::()?; info!("Generating test payloads, please wait..."); let upload_payload = random_payload(upload_bytes).into(); let download_payload = random_payload(download_bytes).into(); @@ -317,11 +346,13 @@ fn main() -> Result<()> { .try_for_each(|handle| handle.join().expect("failed to join thread")) }); - let benchmark = Benchmark { + let mut benchmark = Benchmark { connect_addr, + samples, upload_payload, download_payload, runtime: tor_rtcompat::tokio::create_runtime()?, + results: Default::default(), }; benchmark.without_arti()?; @@ -330,6 +361,17 @@ fn main() -> Result<()> { } benchmark.with_arti(tcc)?; + info!("Benchmarking complete."); + + for (ty, results) in benchmark.results.iter() { + info!( + "Information for benchmark type {:?} ({} samples taken):", + ty, benchmark.samples + ); + info!("median: {}", results.results_median); + info!(" mean: {}", results.results_mean); + } + Ok(()) } @@ -341,66 +383,164 @@ where { runtime: R, connect_addr: SocketAddr, + samples: usize, upload_payload: Arc<[u8]>, download_payload: Arc<[u8]>, + /// All benchmark results conducted, indexed by benchmark type. + results: HashMap, +} + +/// The type of benchmark conducted. +#[derive(Clone, Copy, Serialize, Deserialize, Hash, Debug, PartialEq, Eq)] +enum BenchmarkType { + /// Use the benchmark server on its own, without using any proxy. + /// + /// This is useful to get an idea of how well the benchmarking utility performs on its own. + RawLoopback, + /// Benchmark via a SOCKS5 proxy (usually that of a chutney node). + Socks, + /// Benchmark via Arti. + Arti, +} + +/// A set of benchmark results for a given `BenchmarkType`, including information about averages. +#[derive(Clone, Serialize, Debug)] +struct BenchmarkResults { + /// The type of benchmark conducted. + ty: BenchmarkType, + /// The number of times the benchmark was run. + samples: usize, + /// The number of concurrent connections used during the run. + connections: usize, + /// The mean average of all metrics throughout all benchmark runs. + results_mean: TimingSummary, + /// The "low-median" average of all metrics throughout all benchmark runs. + /// + /// # Important note + /// + /// This is only the median if `samples` is an odd number, else it is the + /// `samples / 2`th sample of each set of metrics in sorted order. + results_median: TimingSummary, + /// The raw benchmark results. + results_raw: Vec, +} + +impl BenchmarkResults { + /// Generate summarized benchmark results from raw run data. + fn generate(ty: BenchmarkType, connections: usize, raw: Vec) -> Self { + let mut download_ttfb_secs = raw.iter().map(|s| s.download_ttfb_sec).collect::>(); + float_ord::sort(&mut download_ttfb_secs); + let mut download_rate_megabits = raw + .iter() + .map(|s| s.download_rate_megabit) + .collect::>(); + float_ord::sort(&mut download_rate_megabits); + let mut upload_ttfb_secs = raw.iter().map(|s| s.upload_ttfb_sec).collect::>(); + float_ord::sort(&mut upload_ttfb_secs); + let mut upload_rate_megabits = raw + .iter() + .map(|s| s.upload_rate_megabit) + .collect::>(); + float_ord::sort(&mut upload_rate_megabits); + let samples = raw.len(); + BenchmarkResults { + ty, + samples, + connections, + results_mean: TimingSummary { + download_ttfb_sec: download_ttfb_secs.iter().sum::() / samples as f64, + download_rate_megabit: download_rate_megabits.iter().sum::() / samples as f64, + upload_ttfb_sec: upload_ttfb_secs.iter().sum::() / samples as f64, + upload_rate_megabit: upload_rate_megabits.iter().sum::() / samples as f64, + }, + results_median: TimingSummary { + download_ttfb_sec: download_ttfb_secs[samples / 2], + download_rate_megabit: download_rate_megabits[samples / 2], + upload_ttfb_sec: upload_ttfb_secs[samples / 2], + upload_rate_megabit: upload_rate_megabits[samples / 2], + }, + results_raw: raw, + } + } +} + +/// A summary of all benchmarks conducted throughout the invocation of `arti-bench`. +/// +/// Designed to be stored as an artifact and compared against other later runs. +#[derive(Clone, Serialize, Debug)] +struct BenchmarkSummary { + /// The version of `arti-bench` used to generate the benchmark results. + crate_version: String, + /// All benchmark results conducted, indexed by benchmark type. + results: HashMap, } impl Benchmark { /// Run the benchmark /// /// `stream` should be a try-future which returns `S: AsyncRead + AsyncWrite + Unpin`. - fn run(&self, name: &str, stream: F) -> Result<()> + fn run(&mut self, ty: BenchmarkType, stream_generator: F) -> Result<()> where - F: Future>, + F: Stream + Unpin, + G: Future>, S: AsyncRead + AsyncWrite + Unpin, E: std::error::Error + Send + Sync + 'static, { - let stream = self.runtime.block_on(stream)?; + let mut results = vec![]; + // NOTE(eta): This could make more streams than we need. We assume this is okay. + let mut stream_generator = stream_generator.buffered(1).take(self.samples); + for n in 0..self.samples { + let stream = self + .runtime + .block_on(stream_generator.next()) + .ok_or_else(|| { + anyhow!("internal error: stream generator couldn't supply enough streams") + })??; // one ? for the error above, next ? for G's output - let up = Arc::clone(&self.upload_payload); - let dp = Arc::clone(&self.download_payload); - info!("Benchmarking performance {}...", name); - let stats = self - .runtime - .block_on(async move { client(stream, up, dp).await })?; - let timing = TimingSummary::generate(&stats)?; - info!( - "{}: {:.2} Mbit/s up (ttfb {:.2}ms), {:.2} Mbit/s down (ttfb {:.2}ms)", - name, - timing.upload_rate_megabit, - timing.upload_ttfb_sec * 1000.0, - timing.download_rate_megabit, - timing.download_ttfb_sec * 1000.0 - ); + let up = Arc::clone(&self.upload_payload); + let dp = Arc::clone(&self.download_payload); + info!("Benchmarking {:?}, run {}/{}...", ty, n + 1, self.samples); + let stats = self + .runtime + .block_on(async move { client(stream, up, dp).await })?; + let timing = TimingSummary::generate(&stats)?; + results.push(timing); + } + let results = BenchmarkResults::generate(ty, 1, results); + self.results.insert(ty, results); Ok(()) } /// Benchmark without Arti - fn without_arti(&self) -> Result<()> { + fn without_arti(&mut self) -> Result<()> { + let ca = self.connect_addr; self.run( - "without Arti", - tokio::net::TcpStream::connect(self.connect_addr), + BenchmarkType::RawLoopback, + futures::stream::repeat_with(|| tokio::net::TcpStream::connect(ca)), ) } /// Benchmark with sock5 proxy - fn with_proxy(&self, addr: &str) -> Result<()> { + fn with_proxy(&mut self, addr: &str) -> Result<()> { + let ca = self.connect_addr; self.run( - "with SOCKS proxy", - Socks5Stream::connect(addr, self.connect_addr), + BenchmarkType::Socks, + futures::stream::repeat_with(|| Socks5Stream::connect(addr, ca)), ) } /// Benchmark with Arti - fn with_arti(&self, tcc: TorClientConfig) -> Result<()> { + fn with_arti(&mut self, tcc: TorClientConfig) -> Result<()> { info!("Starting Arti..."); let tor_client = self .runtime .block_on(TorClient::bootstrap(self.runtime.clone(), tcc))?; + let addr = TorAddr::dangerously_from(self.connect_addr)?; + self.run( - "with Arti", - tor_client.connect(TorAddr::dangerously_from(self.connect_addr)?), + BenchmarkType::Arti, + futures::stream::repeat_with(|| tor_client.connect(addr.clone())), ) } }