rgb-cln/tests/fuzz/run.py

87 lines
2.7 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import logging
import os
import re
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
def job(command):
"""Takes a list of str, and runs it as a subprocess."""
command_line = " ".join(command)
logging.debug(f"Running '{command_line}'\n")
res = subprocess.run(command, check=True, stderr=subprocess.PIPE,
universal_newlines=True).stderr
logging.debug(f"Command '{command_line} output:\n'{res}'")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Run the fuzz targets a given amount of times, or generate"
" new seeds.",
)
parser.add_argument(
"seed_dir",
help="The parent directory for the seed corpora of each target.",
)
parser.add_argument("-g", "--generate", action="store_true")
parser.add_argument(
"-j",
"--par",
type=int,
default=1,
help="How many targets to run in parallel.",
)
parser.add_argument(
"-n",
"--runs",
default=100000,
help="How many times to run each target (if generating).",
)
parser.add_argument(
"-m",
"--merge_dir",
default=None,
help="The parent directory to merge each target corpus from.",
)
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG)
target_dir = os.path.abspath(os.path.dirname(__file__))
targets = [os.path.join(target_dir, f) for f in os.listdir(target_dir)
if re.compile(r"^fuzz-[\w-]*$").findall(f)]
with ThreadPoolExecutor(max_workers=args.par) as pool:
jobs = []
runs = args.runs if args.generate else 1
for target in targets:
seed_dir = os.path.join(args.seed_dir, os.path.basename(target))
os.makedirs(seed_dir, exist_ok=True)
command = [
target,
f"-runs={runs}",
seed_dir,
]
if args.merge_dir is not None:
input_target = os.path.join(args.merge_dir,
os.path.basename(target))
if not os.path.exists(input_target):
continue
command = [
target,
"-merge=1",
"-shuffle=0",
"-prefer_small=1",
"-use_value_profile=1", # Also used by OSS-Fuzz: https://github.com/google/oss-fuzz/issues/1406#issuecomment-387790487
seed_dir,
input_target,
]
jobs.append(pool.submit(job, command))
for completed in as_completed(jobs):
completed.result()