arti-bench: add support for multiple samples & averaging

We now do multiple samples (configurable; default 3) per type of
`arti-bench` benchmark run, and take a mean and median average of all
data collected, in order to hopefully be a bit more resilient to random
outliers / variation.

This uses some `futures::stream::Stream` hacks, which might result in
more connections being made than required (and might impact the TTFB
metrics somewhat, at least for downloading).

Results now get collected into a `BenchmarkResults` struct per type of
benchmark, which will be in turn placed into a `BenchmarkSummary` in a
later commit; this will also add the ability to serialize the latter
struct out to disk, for future reference.

part of arti#292
This commit is contained in:
eta 2022-01-13 16:23:11 +00:00
parent 0138b8477d
commit 82beb52fca
3 changed files with 180 additions and 30 deletions

8
Cargo.lock generated
View File

@ -92,6 +92,8 @@ dependencies = [
"arti-client",
"arti-config",
"clap",
"float-ord",
"futures",
"rand 0.8.4",
"serde",
"serde_json",
@ -955,6 +957,12 @@ dependencies = [
"num-traits",
]
[[package]]
name = "float-ord"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce81f49ae8a0482e4c55ea62ebbd7e5a686af544c00b9d090bba3ff9be97b3d"
[[package]]
name = "fnv"
version = "1.0.7"

View File

@ -12,6 +12,8 @@ repository = "https://gitlab.torproject.org/tpo/core/arti.git/"
[dependencies]
clap = "2.33.0"
futures = "0.3"
float-ord = "0.3"
rand = "0.8"
anyhow = "1.0.5"
serde = { version = "1.0.103", features = ["derive"] }

View File

@ -40,9 +40,14 @@ use anyhow::{anyhow, Result};
use arti_client::{TorAddr, TorClient, TorClientConfig};
use arti_config::ArtiConfig;
use clap::{App, Arg};
use futures::stream::Stream;
use futures::StreamExt;
use rand::distributions::Standard;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::fmt::Formatter;
use std::future::Future;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream};
@ -98,7 +103,7 @@ pub struct ClientTiming {
}
/// A summary of benchmarking results, generated from `ClientTiming`.
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, Serialize)]
pub struct TimingSummary {
/// The time to first byte (TTFB) for the download benchmark.
download_ttfb_sec: f64,
@ -110,6 +115,19 @@ pub struct TimingSummary {
upload_rate_megabit: f64,
}
impl fmt::Display for TimingSummary {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"{:.2} Mbit/s up (ttfb {:.2}ms), {:.2} Mbit/s down (ttfb {:.2}ms)",
self.upload_rate_megabit,
self.upload_ttfb_sec * 1000.0,
self.download_rate_megabit,
self.download_ttfb_sec * 1000.0
)
}
}
impl TimingSummary {
/// Generate a `TimingSummary` from the `ClientTiming` returned by a benchmark run.
pub fn generate(ct: &ClientTiming) -> Result<Self> {
@ -215,7 +233,7 @@ async fn client<S: AsyncRead + AsyncWrite + Unpin>(
tokio::io::copy(&mut send_data, &mut socket).await?;
socket.flush().await?;
info!("Sent {} bytes payload.", send_data.len());
info!("Sent {} bytes payload.", send.len());
let copied_ts = SystemTime::now();
// Check we actually got what we thought we would get.
@ -254,6 +272,16 @@ fn main() -> Result<()> {
"Path to the Arti configuration to use (usually, a Chutney-generated config).",
),
)
.arg(
Arg::with_name("num-samples")
.short("s")
.long("num-samples")
.takes_value(true)
.required(true)
.value_name("COUNT")
.default_value("3")
.help("How many samples to take per benchmark run.")
)
.arg(
Arg::with_name("download-bytes")
.short("d")
@ -302,6 +330,7 @@ fn main() -> Result<()> {
.value_of("download-bytes")
.unwrap()
.parse::<usize>()?;
let samples = matches.value_of("num-samples").unwrap().parse::<usize>()?;
info!("Generating test payloads, please wait...");
let upload_payload = random_payload(upload_bytes).into();
let download_payload = random_payload(download_bytes).into();
@ -317,11 +346,13 @@ fn main() -> Result<()> {
.try_for_each(|handle| handle.join().expect("failed to join thread"))
});
let benchmark = Benchmark {
let mut benchmark = Benchmark {
connect_addr,
samples,
upload_payload,
download_payload,
runtime: tor_rtcompat::tokio::create_runtime()?,
results: Default::default(),
};
benchmark.without_arti()?;
@ -330,6 +361,17 @@ fn main() -> Result<()> {
}
benchmark.with_arti(tcc)?;
info!("Benchmarking complete.");
for (ty, results) in benchmark.results.iter() {
info!(
"Information for benchmark type {:?} ({} samples taken):",
ty, benchmark.samples
);
info!("median: {}", results.results_median);
info!(" mean: {}", results.results_mean);
}
Ok(())
}
@ -341,66 +383,164 @@ where
{
runtime: R,
connect_addr: SocketAddr,
samples: usize,
upload_payload: Arc<[u8]>,
download_payload: Arc<[u8]>,
/// All benchmark results conducted, indexed by benchmark type.
results: HashMap<BenchmarkType, BenchmarkResults>,
}
/// The type of benchmark conducted.
#[derive(Clone, Copy, Serialize, Deserialize, Hash, Debug, PartialEq, Eq)]
enum BenchmarkType {
/// Use the benchmark server on its own, without using any proxy.
///
/// This is useful to get an idea of how well the benchmarking utility performs on its own.
RawLoopback,
/// Benchmark via a SOCKS5 proxy (usually that of a chutney node).
Socks,
/// Benchmark via Arti.
Arti,
}
/// A set of benchmark results for a given `BenchmarkType`, including information about averages.
#[derive(Clone, Serialize, Debug)]
struct BenchmarkResults {
/// The type of benchmark conducted.
ty: BenchmarkType,
/// The number of times the benchmark was run.
samples: usize,
/// The number of concurrent connections used during the run.
connections: usize,
/// The mean average of all metrics throughout all benchmark runs.
results_mean: TimingSummary,
/// The "low-median" average of all metrics throughout all benchmark runs.
///
/// # Important note
///
/// This is only the median if `samples` is an odd number, else it is the
/// `samples / 2`th sample of each set of metrics in sorted order.
results_median: TimingSummary,
/// The raw benchmark results.
results_raw: Vec<TimingSummary>,
}
impl BenchmarkResults {
/// Generate summarized benchmark results from raw run data.
fn generate(ty: BenchmarkType, connections: usize, raw: Vec<TimingSummary>) -> Self {
let mut download_ttfb_secs = raw.iter().map(|s| s.download_ttfb_sec).collect::<Vec<_>>();
float_ord::sort(&mut download_ttfb_secs);
let mut download_rate_megabits = raw
.iter()
.map(|s| s.download_rate_megabit)
.collect::<Vec<_>>();
float_ord::sort(&mut download_rate_megabits);
let mut upload_ttfb_secs = raw.iter().map(|s| s.upload_ttfb_sec).collect::<Vec<_>>();
float_ord::sort(&mut upload_ttfb_secs);
let mut upload_rate_megabits = raw
.iter()
.map(|s| s.upload_rate_megabit)
.collect::<Vec<_>>();
float_ord::sort(&mut upload_rate_megabits);
let samples = raw.len();
BenchmarkResults {
ty,
samples,
connections,
results_mean: TimingSummary {
download_ttfb_sec: download_ttfb_secs.iter().sum::<f64>() / samples as f64,
download_rate_megabit: download_rate_megabits.iter().sum::<f64>() / samples as f64,
upload_ttfb_sec: upload_ttfb_secs.iter().sum::<f64>() / samples as f64,
upload_rate_megabit: upload_rate_megabits.iter().sum::<f64>() / samples as f64,
},
results_median: TimingSummary {
download_ttfb_sec: download_ttfb_secs[samples / 2],
download_rate_megabit: download_rate_megabits[samples / 2],
upload_ttfb_sec: upload_ttfb_secs[samples / 2],
upload_rate_megabit: upload_rate_megabits[samples / 2],
},
results_raw: raw,
}
}
}
/// A summary of all benchmarks conducted throughout the invocation of `arti-bench`.
///
/// Designed to be stored as an artifact and compared against other later runs.
#[derive(Clone, Serialize, Debug)]
struct BenchmarkSummary {
/// The version of `arti-bench` used to generate the benchmark results.
crate_version: String,
/// All benchmark results conducted, indexed by benchmark type.
results: HashMap<BenchmarkType, BenchmarkResults>,
}
impl<R: Runtime> Benchmark<R> {
/// Run the benchmark
///
/// `stream` should be a try-future which returns `S: AsyncRead + AsyncWrite + Unpin`.
fn run<F, S, E>(&self, name: &str, stream: F) -> Result<()>
fn run<F, G, S, E>(&mut self, ty: BenchmarkType, stream_generator: F) -> Result<()>
where
F: Future<Output = Result<S, E>>,
F: Stream<Item = G> + Unpin,
G: Future<Output = Result<S, E>>,
S: AsyncRead + AsyncWrite + Unpin,
E: std::error::Error + Send + Sync + 'static,
{
let stream = self.runtime.block_on(stream)?;
let mut results = vec![];
// NOTE(eta): This could make more streams than we need. We assume this is okay.
let mut stream_generator = stream_generator.buffered(1).take(self.samples);
for n in 0..self.samples {
let stream = self
.runtime
.block_on(stream_generator.next())
.ok_or_else(|| {
anyhow!("internal error: stream generator couldn't supply enough streams")
})??; // one ? for the error above, next ? for G's output
let up = Arc::clone(&self.upload_payload);
let dp = Arc::clone(&self.download_payload);
info!("Benchmarking performance {}...", name);
let stats = self
.runtime
.block_on(async move { client(stream, up, dp).await })?;
let timing = TimingSummary::generate(&stats)?;
info!(
"{}: {:.2} Mbit/s up (ttfb {:.2}ms), {:.2} Mbit/s down (ttfb {:.2}ms)",
name,
timing.upload_rate_megabit,
timing.upload_ttfb_sec * 1000.0,
timing.download_rate_megabit,
timing.download_ttfb_sec * 1000.0
);
let up = Arc::clone(&self.upload_payload);
let dp = Arc::clone(&self.download_payload);
info!("Benchmarking {:?}, run {}/{}...", ty, n + 1, self.samples);
let stats = self
.runtime
.block_on(async move { client(stream, up, dp).await })?;
let timing = TimingSummary::generate(&stats)?;
results.push(timing);
}
let results = BenchmarkResults::generate(ty, 1, results);
self.results.insert(ty, results);
Ok(())
}
/// Benchmark without Arti
fn without_arti(&self) -> Result<()> {
fn without_arti(&mut self) -> Result<()> {
let ca = self.connect_addr;
self.run(
"without Arti",
tokio::net::TcpStream::connect(self.connect_addr),
BenchmarkType::RawLoopback,
futures::stream::repeat_with(|| tokio::net::TcpStream::connect(ca)),
)
}
/// Benchmark with sock5 proxy
fn with_proxy(&self, addr: &str) -> Result<()> {
fn with_proxy(&mut self, addr: &str) -> Result<()> {
let ca = self.connect_addr;
self.run(
"with SOCKS proxy",
Socks5Stream::connect(addr, self.connect_addr),
BenchmarkType::Socks,
futures::stream::repeat_with(|| Socks5Stream::connect(addr, ca)),
)
}
/// Benchmark with Arti
fn with_arti(&self, tcc: TorClientConfig) -> Result<()> {
fn with_arti(&mut self, tcc: TorClientConfig) -> Result<()> {
info!("Starting Arti...");
let tor_client = self
.runtime
.block_on(TorClient::bootstrap(self.runtime.clone(), tcc))?;
let addr = TorAddr::dangerously_from(self.connect_addr)?;
self.run(
"with Arti",
tor_client.connect(TorAddr::dangerously_from(self.connect_addr)?),
BenchmarkType::Arti,
futures::stream::repeat_with(|| tor_client.connect(addr.clone())),
)
}
}