Refactor frontend API

* Refactor how API is instantiated and executed
* Add logging support
* Better sanity env checks
* Documentation
* Fix minor errors and usability concerns
* Implement changes to all frontend engines based on API changes
This commit is contained in:
ex0dus-0x 2019-07-22 18:28:29 -04:00
parent a407b844fc
commit 9b78a5a393
No known key found for this signature in database
GPG Key ID: DABAD5DB9BDD540E
4 changed files with 385 additions and 227 deletions

View File

@ -17,12 +17,15 @@ import os
import sys
import argparse
from .frontend import DeepStateFrontend
from .frontend import DeepStateFrontend, FrontendError
class AFL(DeepStateFrontend):
""" Defines default AFL fuzzer frontend """
FUZZER = "afl-fuzz"
COMPILER = "afl-clang++"
@classmethod
def parse_args(cls):
parser = argparse.ArgumentParser(description="Use AFL as a back-end for DeepState.")
@ -46,7 +49,7 @@ class AFL(DeepStateFrontend):
def compile(self):
args = self._args
args = self._ARGS
lib_path = "/usr/local/lib/"
if not os.path.isfile(lib_path + "libdeepstate_AFL.a"):
@ -57,65 +60,96 @@ class AFL(DeepStateFrontend):
super().compile(compiler_args)
def pre_exec(self):
super().pre_exec()
args = self._ARGS
if args.compile_test:
self.compile()
sys.exit(0)
# require input seeds if we aren't in dumb mode, or we are using crash mode
if not args.dumb_mode or args.crash_mode:
if not args.input_seeds:
raise FrontendError("Must provide -i/--input_seeds option for AFL.")
seeds = args.input_seeds
# check if seeds dir exists
if not os.path.exists(seeds):
os.mkdir(seeds)
raise FrontendError("Seed path doesn't exist. Creating empty seed directory and exiting.")
# check if seeds dir is empty
if len([name for name in os.listdir(seeds)]) == 0:
raise FrontendError(f"No seeds present in directory {seeds}.")
@property
def cmd(self):
args = self._ARGS
cmd_dict = {
"-i": args.input_seeds,
"-o": args.output_test_dir,
"-t": str(args.timeout),
"-m": str(args.mem_limit)
}
# check if we are using one of AFL's many "modes"
if args.dirty_mode:
cmd_dict["-d"] = None
if args.dumb_mode:
cmd_dict["-n"] = None
if args.qemu_mode:
cmd_dict["-Q"] = None
if args.crash_explore:
cmd_dict["-C"] = None
# other misc arguments
if args.dictionary:
cmd_dict["-x"] = args.dictionary
if args.file:
cmd_dict["-f"] = args.file
cmd_dict['--'] = args.binary
# if not specified, set DeepState flags to help AFL coverage
if len(args.args) == 0:
cmd_dict["--input_test_file"] = "@@"
cmd_dict["--abort_on_fail"] = None
cmd_dict["--no_fork"] = None
if args.which_test:
cmd_dict["--input_which_test"] = args.which_test
return cmd_dict
@property
def stats(self):
pass
# TODO
def ensemble(self):
# get original stats
orig_stats = self.stats
# update stored stats at current point of execution
self._update_stats()
if stats["last_update"] != orig_stats["last_update"]:
self.sync_seeds()
else:
self.get_seeds()
def main():
fuzzer = AFL("afl-fuzz", compiler="afl-clang-fast++")
fuzzer = AFL()
args = fuzzer.parse_args()
if args.fuzzer_help:
fuzzer.print_help()
sys.exit(0)
if args.compile_test:
print("COMPILING DEEPSTATE HARNESS FOR FUZZING...")
fuzzer.compile()
sys.exit(0)
if not args.seeds or not args.output_test_dir:
print("Error: --seeds and/or --output_test_dir required for fuzzing.")
sys.exit(1)
if not os.path.exists(args.seeds):
print("CREATING INPUT SEED DIRECTORY...")
os.mkdir(args.seeds)
if len([name for name in os.listdir(args.seeds)]) == 0:
print("Error: no seeds present in directory", args.seeds)
sys.exit(1)
cmd_dict = {
"-i": args.seeds,
"-o": args.output_test_dir,
"-t": str(args.timeout),
"-m": str(args.mem_limit)
}
# check if we are using one of AFL's many "modes"
if args.dirty_mode:
cmd_dict['-d'] = None
if args.dumb_mode:
cmd_dict['-n'] = None
if args.qemu_mode:
cmd_dict['-Q'] = None
if args.crash_explore:
cmd_dict['-C'] = None
# other misc arguments
if args.dictionary:
cmd_dict['-x'] = args.dictionary
if args.file:
cmd_dict['-f'] = args.file
cmd_dict['--'] = args.binary
# if not specified, set DeepState flags to help AFL coverage
if len(args.args) == 0:
args.args = ["--input_test_file", "@@", "--abort_on_fail", "--no_fork"]
fuzzer.cli_command(cmd_dict, cli_other=args.args)
print("EXECUTING FUZZER...")
fuzzer.execute_fuzzer()
fuzzer.run()
return 0

View File

@ -17,10 +17,12 @@ import os
import sys
import argparse
from .frontend import DeepStateFrontend
from .frontend import DeepStateFrontend, FrontendError
class Angora(DeepStateFrontend):
FUZZER = "angora_fuzzer"
COMPILER = "bin/angora-clang++"
@classmethod
def parse_args(cls):
@ -32,7 +34,7 @@ class Angora(DeepStateFrontend):
compile_group.add_argument("--compiler_args", default=[], nargs='+', help="Compiler flags (excluding -o) to pass to compiler.")
compile_group.add_argument("--out_test_name", type=str, default="test", help="Set name for generated *.taint and *.fast binaries.")
parser.add_argument("taint_binary", type=str, help="Path to binary compiled with taint tracking.")
parser.add_argument("taint_binary", nargs="?", type=str, help="Path to binary compiled with taint tracking.")
parser.add_argument("--mode", type=str, default="llvm", help="Specifies binary instrumentation framework used (either llvm or pin).")
parser.add_argument("--no_afl", action='store_true', help="Disables AFL mutation strategies being used.")
parser.add_argument("--no_exploration", action='store_true', help="Disables context-sensitive input bytes mutation.")
@ -42,7 +44,7 @@ class Angora(DeepStateFrontend):
def compile(self):
args = self._args
args = self._ARGS
no_taints = args.ignored_taints
env = os.environ.copy()
@ -76,61 +78,71 @@ class Angora(DeepStateFrontend):
return 0
def pre_exec(self):
super().pre_exec()
args = self._ARGS
if args.compile_test:
print("COMPILING DEEPSTATE HARNESS FOR FUZZING...")
self.compile()
sys.exit(0)
# since base method checks for args.binary by default
if not args.taint_binary:
self.parser.print_help()
raise FrontendError("Must provide taint binary for Angora.")
if not args.input_seeds:
raise FrontendError("Must provide -i/--input_seeds option for Angora.")
seeds = os.path.abspath(args.input_seeds)
if not os.path.exists(seeds):
os.mkdir(seeds)
raise FrontendError("Seed path doesn't exist. Creating empty seed directory and exiting.")
if len([name for name in os.listdir(seeds)]) == 0:
raise FrontendError(f"No seeds present in directory {seeds}")
@property
def cmd(self):
args = self._ARGS
cmd_dict = {
"--time_limit": str(args.timeout),
"--mode": args.mode,
"--input": args.input_seeds,
"--output": args.output_test_dir,
"--jobs": str(args.jobs),
"--track": os.path.abspath(args.taint_binary),
}
if args.no_afl:
cmd_dict["--disable_afl_mutation"] = None
if args.no_exploration:
cmd_dict["--disable_exploitation"] = None
cmd_dict["--"] = os.path.abspath(args.binary)
# if not specified, set DeepState flags to help Angora coverage
if len(args.args) == 0:
cmd_dict["--input_test_file"] = "@@"
cmd_dict["--abort_on_fail"] = None
cmd_dict["--no_fork"] = None
if args.which_test:
cmd_dict["--input_which_test"] = args.which_test
return cmd_dict
def main():
fuzzer = Angora("angora_fuzzer", compiler="bin/angora-clang++", envvar="ANGORA")
fuzzer = Angora(envvar="ANGORA")
args = fuzzer.parse_args()
if args.compile_test:
print("COMPILING DEEPSTATE HARNESS FOR FUZZING...")
fuzzer.compile()
sys.exit(0)
# we do not require for the sake of the compilation arg group
if not args.seeds or not args.output_test_dir:
print("Error: --seeds and/or --output_test_dir required for fuzzing.")
sys.exit(1)
seeds = os.path.abspath(args.seeds)
if args.fuzzer_help:
fuzzer.print_help()
sys.exit(0)
if not os.path.exists(seeds):
print("CREATING INPUT SEED DIRECTORY...")
os.mkdir(seeds)
if len([name for name in os.listdir(seeds)]) == 0:
print("Error: no seeds present in directory", args.seeds)
sys.exit(1)
cmd_dict = {
"--time_limit": str(args.timeout),
"--mode": args.mode,
"--input": seeds,
"--output": args.output_test_dir,
"--jobs": str(args.jobs),
"--track": os.path.abspath(args.taint_binary),
}
if args.no_afl:
cmd_dict['--disable_afl_mutation'] = None
if args.no_exploration:
cmd_dict['--disable_exploitation'] = None
cmd_dict['--'] = os.path.abspath(args.binary)
# default args if none provided
if len(args.args) == 0:
cli_other = ["--input_test_file", "@@"]
else:
cli_other = args.args
fuzzer.cli_command(cmd_dict, cli_other=cli_other)
print("EXECUTING FUZZER...")
fuzzer.execute_fuzzer()
fuzzer.run()
return 0

View File

@ -19,21 +19,72 @@ import shutil
import subprocess
import sys
from .frontend import DeepStateFrontend
from .frontend import DeepStateFrontend, FrontendError
class Eclipser(DeepStateFrontend):
"""
Eclipser front-end implemented with a base DeepStateFrontend object
in order to interface the executable DLL for greybox concolic testing
in order to interface the executable DLL for greybox concolic testing.
"""
FUZZER = "Eclipser.dll"
def print_help(self):
"""
Overrides default interface for calling for help.
"""
subprocess.call(["dotnet", self.fuzzer, "fuzz", "--help"])
def cli_command(self, cmd_dict, compiler="dotnet", cli_other=None):
super().cli_command(cmd_dict, compiler=compiler, cli_other=cli_other)
def post_processing(self, out):
def pre_exec(self):
super().pre_exec()
args = self._ARGS
out = args.output_test_dir
if not os.path.exists(out):
print("Creating output directory.")
os.mkdir(out)
@property
def cmd(self):
args = self._ARGS
# initialize DeepState flags if none
if len(args.args) == 0:
deepargs = ["--input_test_file", "eclipser.input",
"--no_fork", "--abort_on_fail"]
else:
deepargs = args.args
if args.which_test is not None:
deepargs += ["--input_which_test", args.which_test]
cmd_dict = {
"fuzz": None,
"-p": args.binary,
"-t": str(args.timeout),
"-o": args.output_test_dir + "/run",
"--src": "file",
"--fixfilepath": "eclipser.input",
"--initarg": " ".join(deepargs),
"--maxfilelen": str(args.max_input_size),
}
if args.input_seeds is not None:
cmd_dict["-i"] = args.input_seeds
return cmd_dict
def post_exec(self):
"""
Decode and minimize testcases after fuzzing.
"""
out = self._ARGS.output_test_dir
subprocess.call(["dotnet", self.fuzzer, "decode", "-i", out + "/run/testcase", "-o", out + "/decoded"])
subprocess.call(["dotnet", self.fuzzer, "decode", "-i", out + "/run/crash", "-o", out + "/decoded"])
for f in glob.glob(out + "/decoded/decoded_files/*"):
@ -43,47 +94,9 @@ class Eclipser(DeepStateFrontend):
def main():
fuzzer = Eclipser("build/Eclipser.dll", envvar="ECLIPSER_HOME")
args = fuzzer.parse_args()
out = args.output_test_dir
if args.fuzzer_help:
fuzzer.print_help()
sys.exit(0)
if not os.path.exists(out):
print("CREATING OUTPUT DIRECTORY...")
os.mkdir(out)
if not os.path.isdir(out):
print("Error:", out, "is not a directory!")
sys.exit(1)
deepargs = "--input_test_file eclipser.input --abort_on_fail --no_fork"
if args.which_test is not None:
deepargs += " --input_which_test " + args.which_test
cmd_dict = {
"fuzz": None,
"-p": args.binary,
"-t": str(args.timeout),
"-o": out + "/run",
"--src": "file",
"--fixfilepath": "eclipser.input",
"--initarg": deepargs,
"--maxfilelen": str(args.max_input_size),
}
if args.seeds is not None:
cmd_dict["-i"] = args.seeds
fuzzer.cli_command(cmd_dict, cli_other=args.args)
print("EXECUTING FUZZER...")
fuzzer.execute_fuzzer()
print("DECODING THE TESTS...")
fuzzer.post_processing(out)
fuzzer = Eclipser(envvar="ECLIPSER_HOME")
fuzzer.parse_args()
fuzzer.run(compiler="dotnet")
return 0

View File

@ -13,33 +13,59 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
logging.basicConfig()
import os
import sys
import time
import subprocess
import argparse
import functools
L = logging.getLogger("deepstate.frontend")
L.setLevel(logging.INFO)
class FrontendError(Exception):
pass
class DeepStateFrontend(object):
"""
Defines a base front-end object for using DeepState to interact with fuzzers. Base object designed
around `afl-fuzz` front-end as default.
Defines a base front-end object for using DeepState to interact with fuzzers.
"""
def __init__(self, name, compiler=None, envvar="PATH"):
def __init__(self, envvar="PATH"):
"""
initializes base object with fuzzer executable and path, and checks to see if fuzzer
executable exists in supplied environment variable (default is $PATH).
Initializes base object with fuzzer executable and path, and checks to see if fuzzer
executable exists in supplied environment variable (default is $PATH). Optionally also
sets path to compiler executable for compile-time instrumentation, for those fuzzers that support it.
optionally also sets path to compiler executable for compile-time instrumentation,
for those fuzzers that support it.
User must define FUZZER and COMPILER members in inherited fuzzer class.
:param envvar: name of envvar to discover executables. Default is $PATH.
"""
if not hasattr(self, "FUZZER"):
raise FrontendError("DeepStateFrontend.FUZZER not set")
fuzzer_name = self.FUZZER
if hasattr(self, "COMPILER"):
compiler = self.COMPILER
else:
compiler = None
if os.environ.get(envvar) is None:
raise RuntimeError(f"${envvar} does not contain any known paths.")
raise FrontendError(f"${envvar} does not contain any known paths.")
# collect paths from envvar, and check to see if fuzzer executable is present in paths
potential_paths = [var for var in os.environ.get(envvar).split(":")]
fuzzer_paths = [f"{path}/{name}" for path in potential_paths if os.path.isfile(path + '/' + name)]
fuzzer_paths = [f"{path}/{fuzzer_name}" for path in potential_paths if os.path.isfile(path + '/' + fuzzer_name)]
if len(fuzzer_paths) == 0:
raise RuntimeError(f"${envvar} does not contain supplied fuzzer executable.")
raise FrontendError(f"${envvar} does not contain supplied fuzzer executable.")
L.debug(fuzzer_paths)
# if supplied, check if compiler exists in potential_paths
if compiler is not None:
@ -50,91 +76,161 @@ class DeepStateFrontend(object):
if os.path.isfile(compiler):
self.compiler = compiler
else:
raise RuntimeError(f"{compiler} does not exist as absolute path or in ${envvar}")
raise FrontendError(f"{compiler} does not exist as absolute path or in ${envvar}")
# use first compiler executable if multiple exists
self.compiler = compiler_paths[0]
L.info(f"Initialized compiler: {self.compiler}")
# in case name supplied as `bin/fuzzer`, strip executable name
if '/' in name:
self.name = name.split('/')[-1]
if '/' in fuzzer_name:
self.name = fuzzer_name.split('/')[-1]
else:
self.name = name
self.name = fuzzer_name
# use first fuzzer executable path if multiple exists
self.fuzzer = fuzzer_paths[0]
L.info(f"Initialized fuzzer path: {self.fuzzer}")
self.start_time = int(time.time())
self._on = False
def print_help(self):
"""
calls fuzzer to print executable help menu
Calls fuzzer to print executable help menu.
"""
subprocess.call([self.fuzzer, "--help"])
def compile(self, compiler_args=None, custom_cmd=None, env=os.environ.copy()):
def compile(self, compiler_args, env=os.environ.copy()):
"""
provides a simple interface for calling a compiler to instrument a test harness for
mutation-based fuzzers
Provides a simple interface that allows the user to compile a test harness
with instrumentation using the specified compiler. Users should implement an
inherited method that constructs the arguments necessary, and then pass it to the
base object.
:param compiler_args: list of arguments for compiler (excluding compiler executable)
:param env: optional envvars to set during compilation
"""
if self.compiler is None:
raise RuntimeError(f"No compiler specified for compile-time instrumentation.")
raise FrontendError(f"No compiler specified for compile-time instrumentation.")
os.environ["CC"] = self.compiler
os.environ["CCX"] = self.compiler
L.info(f"Compiling test harness `{self._ARGS.compile_test}` with {self.compiler}")
env["CC"] = self.compiler
env["CXX"] = self.compiler
L.debug(f"CC={env['CC']} and CXX={env['CXX']}")
if custom_cmd is not None:
compile_cmd = custom_cmd
else:
compile_cmd = [self.compiler] + compiler_args
L.debug(f"Compilation command: {str(compile_cmd)}")
try:
if custom_cmd is not None:
compile_cmd = custom_cmd
else:
compile_cmd = [self.compiler] + compiler_args
ps = subprocess.Popen(compile_cmd, env=env)
ps.communicate()
except BaseException as e:
raise RuntimeError(f"{self.compiler} interrupted due to exception:", e)
raise FrontendError(f"{self.compiler} interrupted due to exception:", e)
def pre_exec(self):
"""
Called before fuzzer execution in order to perform sanity checks. Base method contains
default argument checks. Users should implement inherited method for any other environment
checks or initializations before execution.
"""
args = self._ARGS
if args is None:
raise FrontendError("No arguments parsed yet. Call parse_args before pre_exec.")
if args.fuzzer_help:
self.print_help()
sys.exit(0)
if args.binary is None:
self.print_help()
sys.exit(1)
L.debug(f"Target binary: {args.binary}")
if not args.output_test_dir:
raise FrontendError("No output test directory path specified.")
L.debug(f"Output directory: {args.output_test_dir}")
def cli_command(self, cmd_dict, compiler=None, cli_other=None):
@staticmethod
def _dict_to_cmd(cmd_dict):
"""
provides an interface for constructing proper command to be passed
to fuzzer cli executable.
to cli executable.
:param cmd_dict: dict with keys as cli flags and values as arguments
"""
# turn arg mapping into viable cli args
cmd_args = list(functools.reduce(lambda key, val: key + val, cmd_dict.items()))
cmd_args = [arg for arg in cmd_args if arg is not None]
# prepends compiler executable if specified
if compiler is not None:
self.cmd = [compiler, self.fuzzer]
L.debug(f"Fuzzer arguments: `{str(cmd_args)}`")
return cmd_args
def run(self, compiler=None):
"""
Spawns the fuzzer by taking the self.cmd property and initializing a command in a list
format for subprocess.
:param compiler: if necessary, a compiler that is invoked before fuzzer executable (ie `dotnet`)
"""
# call pre_exec for any checks/inits before execution
L.info("Calling pre_exec before fuzzing")
self.pre_exec()
# initialize cmd from property or throw exception
if hasattr(self, "cmd") or isinstance(getattr(type(self), "cmd", None), property):
command = [self.fuzzer] + DeepStateFrontend._dict_to_cmd(self.cmd)
else:
self.cmd = [self.fuzzer]
raise FrontendError("No DeepStateFrontend.cmd attribute defined.")
# create command to execute by fuzzer, append any other optional arguments
self.cmd += cmd_args
if cli_other is not None:
self.cmd += cli_other
# prepend compiler that invokes fuzzer
if compiler:
command.insert(0, compiler)
def execute_fuzzer(self):
"""
takes constructed cli command and executes fuzzer with subprocess.call
"""
L.info(f"Executing command `{str(command)}`")
# TODO(alan): other stuff before calling cmd
L.info(f"Fuzzer start time: {self.start_time}")
self._on = True
try:
r = subprocess.call(self.cmd)
print(f"{self.name} finished with exit code", r)
ps = subprocess.Popen(command)
ps.communicate()
except BaseException as e:
raise RuntimeError(f"{self.fuzzer} run interrupted due to exception:", e)
raise FrontendError(f"{self.fuzzer} run interrupted due to exception {e}.")
self._off = True
L.info(f"Fuzzer end time: {self.start_time}")
# do post-fuzz operations
if hasattr(self, 'post_exec') and callable(getattr(self, 'post_exec')):
L.info("Calling post-exec for fuzzer post-processing")
self.post_exec()
def post_processing(self):
"""
performs any post-fuzzing operations, like test extraction / parsing
"""
raise NotImplementedError("Must be implemented by front-end executor.")
# TODO
def sync_seeds(self, path):
pass
_ARGS = None
@ -144,30 +240,33 @@ class DeepStateFrontend(object):
if cls._ARGS:
return cls._ARGS
# use existing argparser if defined in fuzzer object,
# or initialize new one, both with default arguments
if hasattr(cls, "parser"):
L.debug("Using previously initialized parser")
parser = cls.parser
else:
parser = argparse.ArgumentParser(
description="Use fuzzer as back-end for DeepState.")
parser.add_argument("binary", type=str, help="Path to the test binary to run.")
# Target binary (not required, as we enforce manual checks in pre_exec)
parser.add_argument("binary", nargs='?', type=str, help="Path to the test binary to run.")
parser.add_argument("--output_test_dir", type=str, default="out", help="Directory where tests will be saved.")
# Input/output workdirs
parser.add_argument("-i", "--input_seeds", type=str, help="Directory with seed inputs.")
parser.add_argument("-o", "--output_test_dir", type=str, default="out", help="Directory where tests will be saved.")
parser.add_argument("--timeout", type=int, default=3600, help="How long to fuzz.")
parser.add_argument("--jobs", type=int, default=1, help="How many worker processes to spawn.")
parser.add_argument("--seeds", type=str, help="Directory with seed inputs.")
parser.add_argument("--which_test", type=str, help="Which test to run (equivalent to --input_which_test).")
parser.add_argument("--max_input_size", type=int, default=8192, help="Maximum input size.")
# Fuzzer execution options
parser.add_argument("-t", "--timeout", type=int, default=3600, help="How long to fuzz.")
parser.add_argument("-j", "--jobs", type=int, default=1, help="How many worker processes to spawn.")
parser.add_argument("-s", "--max_input_size", type=int, default=8192, help="Maximum input size.")
# Miscellaneous options
parser.add_argument("--fuzzer_help", action='store_true', help="Show fuzzer command line options.")
parser.add_argument("--which_test", type=str, help="Which test to run (equivalent to --input_which_test).")
parser.add_argument("--args", default=[], nargs=argparse.REMAINDER, help="Overrides DeepState arguments to pass to test(s).")
parser.add_argument("--args", default=[], nargs=argparse.REMAINDER, help="Other arguments to pass to fuzzer cli.")
cls._args = parser.parse_args()
cls._ARGS = parser.parse_args()
cls.parser = parser
return cls._args
return cls._ARGS