Merge pull request #222 from trailofbits/frontend-refactor

Refactor frontend API with support for seed synchronization
This commit is contained in:
Alan
2019-07-30 19:59:19 -04:00
committed by GitHub
4 changed files with 714 additions and 255 deletions

View File

@@ -15,107 +15,214 @@
import os
import sys
import logging
import argparse
from .frontend import DeepStateFrontend
from .frontend import DeepStateFrontend, FrontendError
L = logging.getLogger("deepstate.frontend.afl")
L.setLevel(os.environ.get("DEEPSTATE_LOG", "INFO").upper())
class AFL(DeepStateFrontend):
""" Defines default AFL fuzzer frontend """
FUZZER = "afl-fuzz"
COMPILER = "afl-clang++"
@classmethod
def parse_args(cls):
parser = argparse.ArgumentParser(description="Use AFL as a back-end for DeepState.")
# Compilation/instrumentation support
compile_group = parser.add_argument_group("compilation and instrumentation arguments")
compile_group.add_argument("--compile_test", type=str, help="Path to DeepState test harness for compilation.")
compile_group.add_argument("--compiler_args", default=[], nargs='+', help="Compiler flags (excluding -o) to pass to compiler.")
compile_group.add_argument("--compiler_args", type=str, help="Linker flags (space seperated) to include for external libraries.")
compile_group.add_argument("--out_test_name", type=str, default="out", help="Set name of generated instrumented binary.")
# Execution options
parser.add_argument("--dictionary", type=str, help="Optional fuzzer dictionary for AFL.")
parser.add_argument("--mem_limit", type=int, default=50, help="Child process memory limit in MB (default is 50).")
parser.add_argument("--file", type=str, help="Input file read by fuzzed program, if any.")
parser.add_argument("--dirty_mode", action='store_true', help="Fuzz without deterministic steps.")
parser.add_argument("--dumb_mode", action='store_true', help="Fuzz without instrumentation.")
parser.add_argument("--qemu_mode", action='store_true', help="Fuzz with QEMU mode.")
parser.add_argument("--crash_explore", action='store_true', help="Fuzz with crash exploration.")
# AFL execution modes
parser.add_argument("--dirty_mode", action="store_true", help="Fuzz without deterministic steps.")
parser.add_argument("--dumb_mode", action="store_true", help="Fuzz without instrumentation.")
parser.add_argument("--qemu_mode", action="store_true", help="Fuzz with QEMU mode.")
parser.add_argument("--crash_explore", action="store_true", help="Fuzz with crash exploration.")
# Misc. post-processing
parser.add_argument("--post_stats", action="store_true", help="Output post-fuzzing stats.")
cls.parser = parser
return super(AFL, cls).parse_args()
def compile(self):
args = self._args
args = self._ARGS
lib_path = "/usr/local/lib/"
if not os.path.isfile(lib_path + "libdeepstate_AFL.a"):
lib_path = "/usr/local/lib/libdeepstate_AFL.a"
L.debug(f"Static library path: {lib_path}")
if not os.path.isfile(lib_path):
raise RuntimeError("no AFL-instrumented DeepState static library found in {}".format(lib_path))
compiler_args = [args.compile_test, "-std=c++11"] + args.compiler_args + \
["-ldeepstate_AFL", "-o", args.out_test_name + ".afl"]
flags = ["-ldeepstate_AFL"]
if args.compiler_args:
flags += [arg for arg in args.compiler_args.split(" ")]
compiler_args = ["-std=c++11", args.compile_test] + flags + \
["-o", args.out_test_name + ".afl"]
super().compile(compiler_args)
def pre_exec(self):
"""
Perform argparse and environment-related sanity checks.
"""
# check if core dump pattern is set as `core`
with open("/proc/sys/kernel/core_pattern") as f:
if not "core" in f.read():
raise FrontendError("No core dump pattern set. Execute 'echo core | sudo tee /proc/sys/kernel/core_pattern'")
super().pre_exec()
args = self._ARGS
# require input seeds if we aren't in dumb mode, or we are using crash mode
if not args.dumb_mode or args.crash_mode:
if not args.input_seeds:
raise FrontendError("Must provide -i/--input_seeds option for AFL.")
seeds = args.input_seeds
# check if seeds dir exists
if not os.path.exists(seeds):
os.mkdir(seeds)
raise FrontendError("Seed path doesn't exist. Creating empty seed directory and exiting.")
# check if seeds dir is empty
if len([name for name in os.listdir(seeds)]) == 0:
raise FrontendError(f"No seeds present in directory {seeds}.")
@property
def cmd(self):
args = self._ARGS
cmd_dict = {
"-o": args.output_test_dir,
"-t": str(args.timeout),
"-m": str(args.mem_limit)
}
# since this is optional for AFL's dumb fuzzing
if args.input_seeds:
cmd_dict["-i"] = args.input_seeds
# check if we are using one of AFL's many "modes"
if args.dirty_mode:
cmd_dict["-d"] = None
if args.dumb_mode:
cmd_dict["-n"] = None
if args.qemu_mode:
cmd_dict["-Q"] = None
if args.crash_explore:
cmd_dict["-C"] = None
# other misc arguments
if args.dictionary:
cmd_dict["-x"] = args.dictionary
if args.file:
cmd_dict["-f"] = args.file
cmd_dict['--'] = args.binary
# if not specified, set DeepState flags to help AFL coverage
if len(args.args) == 0:
cmd_dict["--input_test_file"] = "@@"
cmd_dict["--abort_on_fail"] = None
cmd_dict["--no_fork"] = None
if args.which_test:
cmd_dict["--input_which_test"] = args.which_test
return cmd_dict
@property
def stats(self):
"""
Retrieves and parses the stats file produced by AFL
"""
args = self._ARGS
stat_file = args.output_test_dir + "/fuzzer_stats"
with open(stat_file, "r") as sf:
lines = sf.readlines()
stats = {
"last_update": None,
"start_time": None,
"fuzzer_pid": None,
"cycles_done": None,
"execs_done": None,
"execs_per_sec": None,
"paths_total": None,
"paths_favored": None,
"paths_found": None,
"paths_imported": None,
"max_depth": None,
"cur_path": None,
"pending_favs": None,
"pending_total": None,
"variable_paths": None,
"stability": None,
"bitmap_cvg": None,
"unique_crashes": None,
"unique_hangs": None,
"last_path": None,
"last_crash": None,
"last_hang": None,
"execs_since_crash": None,
"exec_timeout": None,
"afl_banner": None,
"afl_version": None,
"command_line": None
}
for l in lines:
for k in stats.keys():
if k in l:
stats[k] = l[19:].strip(": %\r\n")
return stats
def _sync_seeds(self, mode, src, dest, excludes=["orig", ".state"]):
super()._sync_seeds(mode, src, dest, excludes=excludes)
def post_exec(self):
"""
AFL post_exec outputs last updated fuzzer stats,
and (TODO) performs crash triaging with seeds from
both sync_dir and local queue.
"""
args = self._ARGS
if args.post_stats:
print("\nAFL RUN STATS:\n")
for stat, val in self.stats.items():
fstat = stat.replace("_", " ").upper()
print(f"{fstat}:\t\t\t{val}")
def main():
fuzzer = AFL("afl-fuzz", compiler="afl-clang-fast++")
args = fuzzer.parse_args()
if args.fuzzer_help:
fuzzer.print_help()
sys.exit(0)
if args.compile_test:
print("COMPILING DEEPSTATE HARNESS FOR FUZZING...")
fuzzer.compile()
sys.exit(0)
if not args.seeds or not args.output_test_dir:
print("Error: --seeds and/or --output_test_dir required for fuzzing.")
sys.exit(1)
if not os.path.exists(args.seeds):
print("CREATING INPUT SEED DIRECTORY...")
os.mkdir(args.seeds)
if len([name for name in os.listdir(args.seeds)]) == 0:
print("Error: no seeds present in directory", args.seeds)
sys.exit(1)
cmd_dict = {
"-i": args.seeds,
"-o": args.output_test_dir,
"-t": str(args.timeout),
"-m": str(args.mem_limit)
}
# check if we are using one of AFL's many "modes"
if args.dirty_mode:
cmd_dict['-d'] = None
if args.dumb_mode:
cmd_dict['-n'] = None
if args.qemu_mode:
cmd_dict['-Q'] = None
if args.crash_explore:
cmd_dict['-C'] = None
# other misc arguments
if args.dictionary:
cmd_dict['-x'] = args.dictionary
if args.file:
cmd_dict['-f'] = args.file
cmd_dict['--'] = args.binary
# if not specified, set DeepState flags to help AFL coverage
if len(args.args) == 0:
args.args = ["--input_test_file", "@@", "--abort_on_fail", "--no_fork"]
fuzzer.cli_command(cmd_dict, cli_other=args.args)
print("EXECUTING FUZZER...")
fuzzer.execute_fuzzer()
fuzzer = AFL()
fuzzer.parse_args()
fuzzer.run()
return 0

View File

@@ -15,12 +15,21 @@
import os
import sys
import pipes
import logging
import argparse
import subprocess
from .frontend import DeepStateFrontend, FrontendError
L = logging.getLogger("deepstate.frontend.angora")
L.setLevel(os.environ.get("DEEPSTATE_LOG", "INFO").upper())
from .frontend import DeepStateFrontend
class Angora(DeepStateFrontend):
FUZZER = "angora_fuzzer"
COMPILER = "bin/angora-clang++"
@classmethod
def parse_args(cls):
@@ -28,12 +37,12 @@ class Angora(DeepStateFrontend):
compile_group = parser.add_argument_group("compilation and instrumentation arguments")
compile_group.add_argument("--compile_test", type=str, help="Path to DeepState test harness for compilation.")
compile_group.add_argument("--ignored_taints", type=str, help="Path to ignored function calls for taint analysis.")
compile_group.add_argument("--compiler_args", default=[], nargs='+', help="Compiler flags (excluding -o) to pass to compiler.")
compile_group.add_argument("--ignore_calls", type=str, help="Path to static/shared libraries (colon seperated) for functions to blackbox for taint analysis.")
compile_group.add_argument("--compiler_args", type=str, help="Linker flags (space seperated) to include for external libraries.")
compile_group.add_argument("--out_test_name", type=str, default="test", help="Set name for generated *.taint and *.fast binaries.")
parser.add_argument("taint_binary", type=str, help="Path to binary compiled with taint tracking.")
parser.add_argument("--mode", type=str, default="llvm", help="Specifies binary instrumentation framework used (either llvm or pin).")
parser.add_argument("taint_binary", nargs="?", type=str, help="Path to binary compiled with taint tracking.")
parser.add_argument("--mode", type=str, default="llvm", choices=["llvm", "pin"], help="Specifies binary instrumentation framework used (either llvm or pin).")
parser.add_argument("--no_afl", action='store_true', help="Disables AFL mutation strategies being used.")
parser.add_argument("--no_exploration", action='store_true', help="Disables context-sensitive input bytes mutation.")
@@ -42,95 +51,143 @@ class Angora(DeepStateFrontend):
def compile(self):
args = self._args
no_taints = args.ignored_taints
args = self._ARGS
env = os.environ.copy()
# check if static libraries exist
lib_path = "/usr/local/lib/"
L.debug(f"Static library path: {lib_path}")
if not os.path.isfile(lib_path + "libdeepstate_fast.a"):
raise RuntimeError("no Angora branch-instrumented DeepState static library found in {}".format(lib_path))
if not os.path.isfile(lib_path + "libdeepstate_taint.a"):
raise RuntimeError("no Angora taint-tracked DeepState static library found in {}".format(lib_path))
# generate ignored functions output for taint tracking
# set envvar to file with ignored lib functions for taint tracking
if no_taints:
if os.path.isfile(no_taints):
env["ANGORA_TAINT_RULE_LIST"] = os.path.abspath(no_taints)
if args.ignore_calls:
# generate instrumented binary
fast_args = [args.compile_test] + args.compiler_args + \
["-ldeepstate_fast", "-o", args.out_test_name + ".fast"]
libpath = [path for path in args.ignore_calls.split(":")]
L.debug(f"Ignoring library objects: {libpath}")
out_file = "abilist.txt"
# TODO(alan): more robust library check
ignore_bufs = []
for path in libpath:
if not os.path.isfile(path):
raise FrontendError(f"Library `{path}` to blackbox was not a valid library path.")
# instantiate command to call, but store output to buffer
cmd = [os.getenv("ANGORA") + "/tools/gen_library_abilist.sh", path, "discard"]
L.debug(f"Compilation command: {cmd}")
out = subprocess.check_output(cmd)
ignore_bufs += [out]
# write all to final out_file
with open(out_file, "wb") as f:
for buf in ignore_bufs:
f.write(buf)
# set envvar for fuzzer compilers
env["ANGORA_TAINT_RULE_LIST"] = os.path.abspath(out_file)
# make a binary with light instrumentation
fast_flags = ["-ldeepstate_fast"]
if args.compiler_args:
fast_flags += [arg for arg in args.compiler_args.split(" ")]
fast_args = ["-std=c++11", args.compile_test] + fast_flags + \
["-o", args.out_test_name + ".fast"]
L.info("Compiling {args.binary} for Angora with light instrumentation")
super().compile(compiler_args=fast_args, env=env)
# make a binary with taint tracking information
taint_flags = ["-ldeepstate_taint"]
if args.compiler_args:
taint_flags += [arg for arg in args.compiler_args.split(' ')]
if args.mode == "pin":
env["USE_PIN"] = "1"
else:
env["USE_TRACK"] = "1"
taint_args = [args.compile_test] + args.compiler_args + \
["-ldeepstate_taint", "-o", args.out_test_name + ".taint"]
taint_args = ["-std=c++11", args.compile_test] + taint_flags + \
["-o", args.out_test_name + ".taint"]
L.info("Compiling {args.binary} for Angora with taint tracking")
super().compile(compiler_args=taint_args, env=env)
return 0
def pre_exec(self):
super().pre_exec()
args = self._ARGS
# since base method checks for args.binary by default
if not args.taint_binary:
self.parser.print_help()
raise FrontendError("Must provide taint binary for Angora.")
if not args.input_seeds:
raise FrontendError("Must provide -i/--input_seeds option for Angora.")
seeds = os.path.abspath(args.input_seeds)
L.debug(f"Seed path: {seeds}")
if not os.path.exists(seeds):
os.mkdir(seeds)
raise FrontendError("Seed path doesn't exist. Creating empty seed directory and exiting.")
if len([name for name in os.listdir(seeds)]) == 0:
raise FrontendError(f"No seeds present in directory {seeds}")
if os.path.exists(args.output_test_dir):
raise FrontendError(f"Remove previous `{args.output_test_dir}` output directory before running Angora.")
@property
def cmd(self):
args = self._ARGS
cmd_dict = {
"--time_limit": str(args.timeout),
"--mode": args.mode,
"--input": args.input_seeds,
"--output": args.output_test_dir,
"--jobs": str(args.jobs),
"--track": os.path.abspath(args.taint_binary),
}
if args.no_afl:
cmd_dict["--disable_afl_mutation"] = None
if args.no_exploration:
cmd_dict["--disable_exploitation"] = None
cmd_dict["--"] = os.path.abspath(args.binary)
# if not specified, set DeepState flags to help Angora coverage
if len(args.args) == 0:
cmd_dict["--input_test_file"] = "@@"
cmd_dict["--abort_on_fail"] = None
cmd_dict["--no_fork"] = None
if args.which_test:
cmd_dict["--input_which_test"] = args.which_test
return cmd_dict
def main():
fuzzer = Angora("angora_fuzzer", compiler="bin/angora-clang++", envvar="ANGORA")
fuzzer = Angora(envvar="ANGORA")
args = fuzzer.parse_args()
if args.compile_test:
print("COMPILING DEEPSTATE HARNESS FOR FUZZING...")
fuzzer.compile()
sys.exit(0)
# we do not require for the sake of the compilation arg group
if not args.seeds or not args.output_test_dir:
print("Error: --seeds and/or --output_test_dir required for fuzzing.")
sys.exit(1)
seeds = os.path.abspath(args.seeds)
if args.fuzzer_help:
fuzzer.print_help()
sys.exit(0)
if not os.path.exists(seeds):
print("CREATING INPUT SEED DIRECTORY...")
os.mkdir(seeds)
if len([name for name in os.listdir(seeds)]) == 0:
print("Error: no seeds present in directory", args.seeds)
sys.exit(1)
cmd_dict = {
"--time_limit": str(args.timeout),
"--mode": args.mode,
"--input": seeds,
"--output": args.output_test_dir,
"--jobs": str(args.jobs),
"--track": os.path.abspath(args.taint_binary),
}
if args.no_afl:
cmd_dict['--disable_afl_mutation'] = None
if args.no_exploration:
cmd_dict['--disable_exploitation'] = None
cmd_dict['--'] = os.path.abspath(args.binary)
# default args if none provided
if len(args.args) == 0:
cli_other = ["--input_test_file", "@@"]
else:
cli_other = args.args
fuzzer.cli_command(cmd_dict, cli_other=cli_other)
print("EXECUTING FUZZER...")
fuzzer.execute_fuzzer()
fuzzer.run()
return 0

View File

@@ -13,29 +13,89 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import glob
import os
import shutil
import subprocess
import sys
import glob
import shutil
import logging
import subprocess
from .frontend import DeepStateFrontend, FrontendError
L = logging.getLogger("deepstate.frontend.eclipser")
L.setLevel(os.environ.get("DEEPSTATE_LOG", "INFO").upper())
from .frontend import DeepStateFrontend
class Eclipser(DeepStateFrontend):
"""
Eclipser front-end implemented with a base DeepStateFrontend object
in order to interface the executable DLL for greybox concolic testing
in order to interface the executable DLL for greybox concolic testing.
"""
FUZZER = "Eclipser.dll"
def print_help(self):
subprocess.call(["dotnet", self.fuzzer, "fuzz", "--help"])
def cli_command(self, cmd_dict, compiler="dotnet", cli_other=None):
super().cli_command(cmd_dict, compiler=compiler, cli_other=cli_other)
def post_processing(self, out):
subprocess.call(["dotnet", self.fuzzer, "decode", "-i", out + "/run/testcase", "-o", out + "/decoded"])
subprocess.call(["dotnet", self.fuzzer, "decode", "-i", out + "/run/crash", "-o", out + "/decoded"])
def pre_exec(self):
super().pre_exec()
out = self._ARGS.output_test_dir
L.debug(f"Output test directory: {out}")
if not os.path.exists(out):
print("Creating output directory.")
os.mkdir(out)
@property
def cmd(self):
args = self._ARGS
# initialize DeepState flags if none
if len(args.args) == 0:
deepargs = ["--input_test_file", "eclipser.input",
"--no_fork", "--abort_on_fail"]
else:
deepargs = args.args
if args.which_test is not None:
deepargs += ["--input_which_test", args.which_test]
cmd_dict = {
"fuzz": None,
"-p": args.binary,
"-t": str(args.timeout),
"-o": args.output_test_dir,
"--src": "file",
"--fixfilepath": "eclipser.input",
"--initarg": " ".join(deepargs),
"--maxfilelen": str(args.max_input_size),
}
if args.input_seeds is not None:
cmd_dict["--initseedsdir"] = args.input_seeds
return cmd_dict
def ensemble(self):
local_queue = self._ARGS.output_test_dir + "/testcase/"
super().ensemble(local_queue)
def post_exec(self):
"""
Decode and minimize testcases after fuzzing.
"""
out = self._ARGS.output_test_dir
L.info("Performing post-processing decoding on testcases and crashes")
subprocess.call(["dotnet", self.fuzzer, "decode", "-i", out + "/testcase", "-o", out + "/decoded"])
subprocess.call(["dotnet", self.fuzzer, "decode", "-i", out + "/crash", "-o", out + "/decoded"])
for f in glob.glob(out + "/decoded/decoded_files/*"):
shutil.copy(f, out)
shutil.rmtree(out + "/decoded")
@@ -43,47 +103,9 @@ class Eclipser(DeepStateFrontend):
def main():
fuzzer = Eclipser("build/Eclipser.dll", envvar="ECLIPSER_HOME")
args = fuzzer.parse_args()
out = args.output_test_dir
if args.fuzzer_help:
fuzzer.print_help()
sys.exit(0)
if not os.path.exists(out):
print("CREATING OUTPUT DIRECTORY...")
os.mkdir(out)
if not os.path.isdir(out):
print("Error:", out, "is not a directory!")
sys.exit(1)
deepargs = "--input_test_file eclipser.input --abort_on_fail --no_fork"
if args.which_test is not None:
deepargs += " --input_which_test " + args.which_test
cmd_dict = {
"fuzz": None,
"-p": args.binary,
"-t": str(args.timeout),
"-o": out + "/run",
"--src": "file",
"--fixfilepath": "eclipser.input",
"--initarg": deepargs,
"--maxfilelen": str(args.max_input_size),
}
if args.seeds is not None:
cmd_dict["-i"] = args.seeds
fuzzer.cli_command(cmd_dict, cli_other=args.args)
print("EXECUTING FUZZER...")
fuzzer.execute_fuzzer()
print("DECODING THE TESTS...")
fuzzer.post_processing(out)
fuzzer = Eclipser(envvar="ECLIPSER_HOME")
fuzzer.parse_args()
fuzzer.run(compiler="dotnet")
return 0

View File

@@ -13,33 +13,62 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
logging.basicConfig()
import os
import time
import sys
import subprocess
import threading
import argparse
import functools
L = logging.getLogger("deepstate.frontend")
L.setLevel(os.environ.get("DEEPSTATE_LOG", "INFO").upper())
class FrontendError(Exception):
pass
class DeepStateFrontend(object):
"""
Defines a base front-end object for using DeepState to interact with fuzzers. Base object designed
around `afl-fuzz` front-end as default.
Defines a base front-end object for using DeepState to interact with fuzzers.
"""
def __init__(self, name, compiler=None, envvar="PATH"):
def __init__(self, envvar="PATH"):
"""
initializes base object with fuzzer executable and path, and checks to see if fuzzer
executable exists in supplied environment variable (default is $PATH).
Initializes base object with fuzzer executable and path, and checks to see if fuzzer
executable exists in supplied environment variable (default is $PATH). Optionally also
sets path to compiler executable for compile-time instrumentation, for those fuzzers that support it.
optionally also sets path to compiler executable for compile-time instrumentation,
for those fuzzers that support it.
User must define FUZZER and COMPILER members in inherited fuzzer class.
:param envvar: name of envvar to discover executables. Default is $PATH.
"""
if not hasattr(self, "FUZZER"):
raise FrontendError("DeepStateFrontend.FUZZER not set")
fuzzer_name = self.FUZZER
if hasattr(self, "COMPILER"):
compiler = self.COMPILER
else:
compiler = None
if os.environ.get(envvar) is None:
raise RuntimeError(f"${envvar} does not contain any known paths.")
raise FrontendError(f"${envvar} does not contain any known paths.")
# collect paths from envvar, and check to see if fuzzer executable is present in paths
potential_paths = [var for var in os.environ.get(envvar).split(":")]
fuzzer_paths = [f"{path}/{name}" for path in potential_paths if os.path.isfile(path + '/' + name)]
fuzzer_paths = [f"{path}/{fuzzer_name}" for path in potential_paths if os.path.isfile(path + '/' + fuzzer_name)]
if len(fuzzer_paths) == 0:
raise RuntimeError(f"${envvar} does not contain supplied fuzzer executable.")
raise FrontendError(f"${envvar} does not contain supplied fuzzer executable.")
L.debug(fuzzer_paths)
# if supplied, check if compiler exists in potential_paths
if compiler is not None:
@@ -50,124 +79,368 @@ class DeepStateFrontend(object):
if os.path.isfile(compiler):
self.compiler = compiler
else:
raise RuntimeError(f"{compiler} does not exist as absolute path or in ${envvar}")
raise FrontendError(f"{compiler} does not exist as absolute path or in ${envvar}")
# use first compiler executable if multiple exists
self.compiler = compiler_paths[0]
L.debug(f"Initialized compiler: {self.compiler}")
# in case name supplied as `bin/fuzzer`, strip executable name
if '/' in name:
self.name = name.split('/')[-1]
if '/' in fuzzer_name:
self.name = fuzzer_name.split('/')[-1]
else:
self.name = name
self.name = fuzzer_name
# use first fuzzer executable path if multiple exists
self.fuzzer = fuzzer_paths[0]
L.debug(f"Initialized fuzzer path: {self.fuzzer}")
self._start_time = int(time.time())
self._on = False
def print_help(self):
"""
calls fuzzer to print executable help menu
Calls fuzzer to print executable help menu.
"""
subprocess.call([self.fuzzer, "--help"])
def compile(self, compiler_args=None, custom_cmd=None, env=os.environ.copy()):
def compile(self, compiler_args, env=os.environ.copy()):
"""
provides a simple interface for calling a compiler to instrument a test harness for
mutation-based fuzzers
Provides a simple interface that allows the user to compile a test harness
with instrumentation using the specified compiler. Users should implement an
inherited method that constructs the arguments necessary, and then pass it to the
base object.
:param compiler_args: list of arguments for compiler (excluding compiler executable)
:param env: optional envvars to set during compilation
"""
if self.compiler is None:
raise RuntimeError(f"No compiler specified for compile-time instrumentation.")
raise FrontendError(f"No compiler specified for compile-time instrumentation.")
os.environ["CC"] = self.compiler
os.environ["CCX"] = self.compiler
# initialize compiler envvars
env["CC"] = self.compiler
env["CXX"] = self.compiler
L.debug(f"CC={env['CC']} and CXX={env['CXX']}")
# initialize command with prepended compiler
compile_cmd = [self.compiler] + compiler_args
L.debug(f"Compilation command: {str(compile_cmd)}")
L.info(f"Compiling test harness `{self._ARGS.compile_test}` with {self.compiler}")
try:
if custom_cmd is not None:
compile_cmd = custom_cmd
else:
compile_cmd = [self.compiler] + compiler_args
ps = subprocess.Popen(compile_cmd, env=env)
ps.communicate()
except BaseException as e:
raise RuntimeError(f"{self.compiler} interrupted due to exception:", e)
raise FrontendError(f"{self.compiler} interrupted due to exception:", e)
def cli_command(self, cmd_dict, compiler=None, cli_other=None):
def pre_exec(self):
"""
provides an interface for constructing proper command to be passed
to fuzzer cli executable.
Called before fuzzer execution in order to perform sanity checks. Base method contains
default argument checks. Users should implement inherited method for any other environment
checks or initializations before execution.
"""
args = self._ARGS
if args is None:
raise FrontendError("No arguments parsed yet. Call parse_args before pre_exec.")
if args.fuzzer_help:
self.print_help()
sys.exit(0)
# if compile_test is an existing argument, call compile for user
if hasattr(args, "compile_test"):
if args.compile_test:
self.compile()
sys.exit(0)
# manually check if binary positional argument was passed
if args.binary is None:
self.parser.print_help()
print("\nError: Target binary not specified.")
sys.exit(1)
L.debug(f"Target binary: {args.binary}")
# no sanity check, since some fuzzers require optional input seeds
if args.input_seeds:
L.debug(f"Input seeds directory: {args.input_seeds}")
L.debug(f"Output directory: {args.output_test_dir}")
# check if we in ensemble mode, and initialize directory
if args.enable_sync:
if not os.path.isdir(args.sync_dir):
L.info("Initializing sync directory for ensembling")
os.mkdir(args.sync_dir)
L.debug(f"Sync directory: {args.sync_dir}")
@staticmethod
def _dict_to_cmd(cmd_dict):
"""
Helper that provides an interface for constructing proper command to be passed
to fuzzer executable. This takes a dict that maps a str argument flag to a value,
and transforms it into list.
:param cmd_dict: dict with keys as cli flags and values as arguments
"""
# turn arg mapping into viable cli args
cmd_args = list(functools.reduce(lambda key, val: key + val, cmd_dict.items()))
cmd_args = [arg for arg in cmd_args if arg is not None]
# prepends compiler executable if specified
if compiler is not None:
self.cmd = [compiler, self.fuzzer]
L.debug(f"Fuzzer arguments: `{str(cmd_args)}`")
return cmd_args
def run(self, compiler=None):
"""
Spawns the fuzzer by taking the self.cmd property and initializing a command in a list
format for subprocess.
:param compiler: if necessary, a compiler that is invoked before fuzzer executable (ie `dotnet`)
"""
args = self._ARGS
# call pre_exec for any checks/inits before execution
L.info("Calling pre_exec before fuzzing")
self.pre_exec()
# initialize cmd from property or throw exception
if hasattr(self, "cmd") or isinstance(getattr(type(self), "cmd", None), property):
command = [self.fuzzer] + DeepStateFrontend._dict_to_cmd(self.cmd)
else:
self.cmd = [self.fuzzer]
raise FrontendError("No DeepStateFrontend.cmd attribute defined.")
# create command to execute by fuzzer, append any other optional arguments
self.cmd += cmd_args
if cli_other is not None:
self.cmd += cli_other
# prepend compiler that invokes fuzzer
if compiler:
command.insert(0, compiler)
L.info(f"Executing command `{str(command)}` in {args.jobs} fuzzer(s)")
# exec fuzzer
L.info(f"Fuzzer start time: {self._start_time}")
self._on = True
# TODO(alan): output to standardized logger with uniform pretty-printing
def output_reader(proc):
for line in iter(proc.stdout.readline, b''):
print("{}".format(line.decode("utf-8")), end='')
def execute_fuzzer(self):
"""
takes constructed cli command and executes fuzzer with subprocess.call
"""
try:
r = subprocess.call(self.cmd)
print(f"{self.name} finished with exit code", r)
except BaseException as e:
raise RuntimeError(f"{self.fuzzer} run interrupted due to exception:", e)
# if we are syncing seeds, we background the AFL process but still process output
# to the foreground, while handling seed synchronization in a loop
if args.enable_sync:
self.proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
t = threading.Thread(target=output_reader, args=(self.proc,))
t.start()
# do not ensemble as fuzzer initializes
time.sleep(5)
self.sync_count = 0
L.info(f"Starting fuzzer with seed synchronization with PID `{self.proc.pid}`")
while self._is_alive():
L.info(f"Performing sync cycle {self.sync_count}")
time.sleep(args.sync_cycle)
self.ensemble()
self.sync_count += 1
def post_processing(self):
# if not syncing, start regular foreground child process with regular thread for consistency
else:
self.proc = subprocess.Popen(command)
t = threading.Thread()
t.start()
L.info(f"Starting fuzzer normally with PID `{self.proc.pid}`")
self.proc.communicate()
except OSError as e:
raise FrontendError(f"{self.fuzzer} run interrupted due to exception {e}.")
except KeyboardInterrupt:
self._kill()
t.join()
self.exec_time = round(time.time() - self._start_time, 2)
L.info(f"Fuzzer exec time: {self.exec_time}s")
# do post-fuzz operations
if hasattr(self, "post_exec") and callable(getattr(self, "post_exec")):
L.info("Calling post-exec for fuzzer post-processing")
self.post_exec()
def _is_alive(self):
"""
performs any post-fuzzing operations, like test extraction / parsing
Checks to see if fuzzer PID is running, but tossing SIGT (0) to see if we can
interact. Ideally used in an event loop during a running process.
"""
raise NotImplementedError("Must be implemented by front-end executor.")
if self._on:
return True
try:
os.kill(self.proc.pid, 0)
except (OSError, ProcessLookupError):
return False
return True
def _kill(self):
"""
Kills running fuzzer process. Can be used forcefully if
KeyboardInterrupt signal falls through and process continues execution.
"""
if not hasattr(self, "proc"):
raise FrontendError("Attempted to kill non-running PID.")
self.proc.terminate()
self.proc.wait()
self._on = False
@property
def stats(self):
"""
Parses out stats generated by fuzzer output. Should be implemented by user, and can return custom
feedback.
"""
raise NotImplementedError("Must implement in frontend subclass.")
def _sync_seeds(self, mode, src, dest, excludes=[]):
"""
Helper that invokes rsync for convenient file syncing between two files.
TODO(alan): implement functionality for syncing across servers.
TODO(alan): consider implementing "native" syncing alongside current "rsync mode".
:param mode: str representing mode (either 'GET' or 'PUSH')
:param src: path to source queue
:param dest: path to destination queue
:param excludes: list of string patterns for paths to ignore when rsync-ing
"""
if not mode in ["GET", "PUSH"]:
raise FrontendError(f"Unknown mode for seed syncing: `{mode}`")
rsync_cmd = ["rsync", "-racz", "--ignore-existing"]
# subclass should invoke with list of pattern ignores
if len(excludes) > 0:
rsync_cmd += [f"--exclude={e}" for e in excludes]
# TODO: determine other necessary arguments
if mode == "GET":
rsync_cmd += [dest, src]
elif mode == "PUSH":
rsync_cmd += [src, dest]
L.debug(f"rsync command: {rsync_cmd}")
try:
subprocess.Popen(rsync_cmd)
except subprocess.CalledProcessError as e:
raise FrontendError(f"{self.fuzzer} run interrupted due to exception {e}.")
@staticmethod
def _queue_len(queue_path):
return len([path for path in os.listdir(queue_path)])
def ensemble(self, local_queue=None, global_queue=None):
"""
Base method for implementing ensemble fuzzing with seed synchronization. User should
implement any additional logic for determining whether to sync/get seeds as if in event loop.
"""
args = self._ARGS
if global_queue is None:
global_queue = args.sync_dir + "/"
global_len = DeepStateFrontend._queue_len(global_queue)
L.debug(f"Global seed queue: {global_queue} with {global_len} files")
if local_queue is None:
local_queue = args.output_test_dir + "/queue/"
local_len = DeepStateFrontend._queue_len(local_queue)
L.debug(f"Fuzzer local seed queue: {local_queue} with {local_len} files")
# sanity check: if global queue is empty, populate from local queue
if (global_len == 0) and (local_len > 0):
L.info("Nothing in global queue, pushing seeds from local queue")
self._sync_seeds("PUSH", local_queue, global_queue)
return
# get seeds from AFL to global queue, rsync will deal with duplicates
# TODO: rename sync seeds to arbitrary filenames in queue
self._sync_seeds("GET", global_queue, local_queue)
# push seeds from global queue to local, rsync will deal with duplicates
self._sync_seeds("PUSH", global_queue, local_queue)
_ARGS = None
@classmethod
def parse_args(cls):
"""
Default base argument parser for DeepState frontends. Comprises of default arguments all
frontends must implement to maintain consistency in executables. Users can inherit this
method to extend and add own arguments or override for outstanding deviations in fuzzer CLIs.
"""
if cls._ARGS:
return cls._ARGS
# use existing argparser if defined in fuzzer object,
# or initialize new one, both with default arguments
if hasattr(cls, "parser"):
L.debug("Using previously initialized parser")
parser = cls.parser
else:
parser = argparse.ArgumentParser(
description="Use fuzzer as back-end for DeepState.")
parser.add_argument("binary", type=str, help="Path to the test binary to run.")
# Target binary (not required, as we enforce manual checks in pre_exec)
parser.add_argument("binary", nargs="?", type=str, help="Path to the test binary to run.")
parser.add_argument("--output_test_dir", type=str, default="out", help="Directory where tests will be saved.")
# Input/output workdirs
parser.add_argument("-i", "--input_seeds", type=str, help="Directory with seed inputs.")
parser.add_argument("-o", "--output_test_dir", type=str, default=f"out", help="Directory where tests will be saved.")
parser.add_argument("--timeout", type=int, default=3600, help="How long to fuzz.")
# Fuzzer execution options
parser.add_argument("-t", "--timeout", type=int, default=3600, help="How long to fuzz.")
parser.add_argument("-s", "--max_input_size", type=int, default=8192, help="Maximum input size.")
parser.add_argument("-j", "--jobs", type=int, default=1, help="How many worker processes to spawn.")
parser.add_argument("--jobs", type=int, default=1, help="How many worker processes to spawn.")
parser.add_argument("--seeds", type=str, help="Directory with seed inputs.")
# Parallel / Ensemble Fuzzing
parser.add_argument("--enable_sync", action="store_true", help="Enable seed synchronization.")
parser.add_argument("--sync_dir", type=str, default="out_sync", help="Directory for seed synchronization.")
parser.add_argument("--sync_cycle", type=int, default=5, help="Time between sync cycle.")
parser.add_argument("--sync_crashes", action="store_true", help="Sync crashes between local and global queue.")
parser.add_argument("--sync_hangs", action="store_true", help="Sync hanging input between local and global queue.")
# Miscellaneous options
parser.add_argument("--fuzzer_help", action="store_true", help="Show fuzzer command line options.")
parser.add_argument("--which_test", type=str, help="Which test to run (equivalent to --input_which_test).")
parser.add_argument("--args", default=[], nargs=argparse.REMAINDER, help="Overrides DeepState arguments to pass to test(s).")
parser.add_argument("--max_input_size", type=int, default=8192, help="Maximum input size.")
parser.add_argument("--fuzzer_help", action='store_true', help="Show fuzzer command line options.")
parser.add_argument("--args", default=[], nargs=argparse.REMAINDER, help="Other arguments to pass to fuzzer cli.")
cls._args = parser.parse_args()
cls._ARGS = parser.parse_args()
cls.parser = parser
return cls._args