Adds some error checking around finding important symbols and initializing Manticore/Angr. Adds the ability to save concretized bytes to an output directory. For the time being, I'm going with the approach of the user specifying a output dir, then within that I create directories for each file basename in the tests, and subdirectories for each test name, and in there I put binary test files.

This commit is contained in:
Peter Goodman
2017-11-02 00:54:18 -04:00
parent bc208dbd4d
commit c4f74e2389
4 changed files with 149 additions and 48 deletions

View File

@@ -12,13 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import collections
import logging
import md5
import os
import struct
# Represents a TestInfo data structure (the information we know about a test.)
TestInfo = collections.namedtuple(
'TestInfo', 'ea name file_name line_number')
class TestInfo(object):
"""Represents a `DeepState_TestInfo` data structure from the program, as
well as associated meta-data about the test."""
def __init__(self, ea, name, file_name, line_number):
self.ea = ea
self.name = name
self.file_name = file_name
self.line_number = line_number
LOG_LEVEL_DEBUG = 0
@@ -88,6 +97,30 @@ class DeepState(object):
def add_constraint(self, expr):
raise NotImplementedError("Must be implemented by engine.")
_ARGS = None
@classmethod
def parse_args(cls):
"""Parses command-line arguments needed by DeepState."""
if cls._ARGS:
return cls._ARGS
parser = argparse.ArgumentParser(
description="Symbolically execute unit tests with Angr")
parser.add_argument(
"--num_workers", default=1, type=int,
help="Number of workers to spawn for testing and test generation.")
parser.add_argument(
"--output_test_dir", type=str, required=False)
parser.add_argument(
"binary", type=str, help="Path to the test binary to run.")
cls._ARGS = parser.parse_args()
return cls._ARGS
@property
def context(self):
"""Gives convenient property-based access to a dictionary holding state-
@@ -118,7 +151,7 @@ class DeepState(object):
else:
return chars, next_ea
def read_test_info(self, ea):
def _read_test_info(self, ea):
"""Read in a `DeepState_TestInfo` info structure from memory."""
prev_test_ea, ea = self.read_uintptr_t(ea)
test_func_ea, ea = self.read_uintptr_t(ea)
@@ -137,12 +170,24 @@ class DeepState(object):
info = TestInfo(test_func_ea, test_name, file_name, file_line_num)
return info, prev_test_ea
def _split_path(self, path):
"""Split a path into all of its components."""
parts = []
while path:
root, ext = os.path.split(path)
if not ext:
break
path = root
parts.insert(0, ext)
return parts
def find_test_cases(self):
"""Find the test case descriptors."""
tests = []
info_ea, _ = self.read_uintptr_t(self.context['apis']['LastTestInfo'])
while info_ea:
test, info_ea = self.read_test_info(info_ea)
test, info_ea = self._read_test_info(info_ea)
if test:
tests.append(test)
tests.sort(key=lambda t: (t.file_name, t.line_number))
@@ -174,6 +219,8 @@ class DeepState(object):
info.name, info.file_name, info.line_number))
apis = self.context['apis']
# Create the symbols that feed API functions like `DeepState_Int`.
symbols = []
for i, ea in enumerate(xrange(apis['InputBegin'], apis['InputEnd'])):
symbol = self.create_symbol('DEEP_INPUT_{}'.format(i), 8)
@@ -182,6 +229,23 @@ class DeepState(object):
self.context['symbols'] = symbols
# Create the output directory for this test case.
args = self.parse_args()
if args.output_test_dir is not None:
test_dir = os.path.join(args.output_test_dir,
os.path.basename(info.file_name),
info.name)
try:
os.makedirs(test_dir)
except:
pass
if not os.path.isdir(test_dir):
LOGGER.critical("Cannot create test output directory: {}".format(
test_dir))
self.context['test_dir'] = test_dir
def log_message(self, level, message):
"""Add `message` to the `level`-specific log as a `Stream` object for
deferred logging (at the end of the state)."""
@@ -238,22 +302,56 @@ class DeepState(object):
return "".join(message)
def _save_test(self, info, input_bytes):
"""Save the concretized bytes to a file."""
if not len(input_bytes) or 'test_dir' not in self.context:
return
if self.context['abandoned']:
return
test_dir = self.context['test_dir']
test_name = md5.new(input_bytes).hexdigest()
if self.context['failed']:
test_name += ".fail"
else:
test_name += ".pass"
test_file = os.path.join(test_dir, test_name)
LOGGER.info("Saving input to {}".format(test_file))
try:
with open(test_file, "wb") as f:
f.write(input_bytes)
except:
LOGGER.critical("Error saving input to {}".format(test_file))
def report(self):
"""Report on the pass/fail status of a test case, and dump its log."""
info = self.context['info']
apis = self.context['apis']
input_length, _ = self.read_uint32_t(apis['InputIndex'])
# Concretize the used symbols.
symbols = self.context['symbols']
input_bytes = []
input_bytes = bytearray()
for i in xrange(input_length):
b = self.concretize(symbols[i], constrain=True)
input_bytes.append("{:02x}".format(b))
input_bytes.append(b)
# Print out each log entry.
for level, stream in self.context['log']:
logger = LOG_LEVEL_TO_LOGGER[level]
logger(self._stream_to_message(stream))
LOGGER.info("Input: {}".format(" ".join(input_bytes)))
# Print out the first few input bytes to be helpful.
lots_of_bytes = len(input_bytes) > 20 and " ..." or ""
bytes_to_show = min(20, len(input_bytes))
LOGGER.info("Input: {}{}".format(
" ".join("{:02x}".format(b) for b in input_bytes[:bytes_to_show]),
lots_of_bytes))
self._save_test(info, input_bytes)
def pass_test(self):
"""Notify the symbolic executor that this test has passed and stop

View File

@@ -14,7 +14,6 @@
# limitations under the License.
import angr
import argparse
import collections
import logging
import multiprocessing
@@ -247,8 +246,7 @@ def do_run_test(project, test, apis, run_state):
try:
test_manager.run()
except Exception as e:
L.error("Uncaught exception: {}\n{}".format(
sys.exc_info()[0], traceback.format_exc()))
L.error("Uncaught exception: {}\n{}".format(e, traceback.format_exc()))
for state in test_manager.deadended:
DeepAngr(state=state).report()
@@ -262,34 +260,37 @@ def run_test(project, test, apis, run_state):
try:
do_run_test(project, test, apis, run_state)
except Exception as e:
L.error("Uncaught exception: {}\n{}".format(
sys.exc_info()[0], traceback.format_exc()))
L.error("Uncaught exception: {}\n{}".format(e, traceback.format_exc()))
def main():
"""Run DeepState."""
parser = argparse.ArgumentParser(
description="Symbolically execute unit tests with Angr")
args = DeepAngr.parse_args()
parser.add_argument(
"--num_workers", default=1, type=int,
help="Number of workers to spawn for testing and test generation.")
try:
project = angr.Project(
args.binary,
use_sim_procedures=True,
translation_cache=True,
support_selfmodifying_code=False,
auto_load_libs=True,
exclude_sim_procedures_list=['printf', '__printf_chk',
'vprintf', '__vprintf_chk',
'fprintf', '__fprintf_chk',
'vfprintf', '__vfprintf_chk'])
except Exception as e:
L.critical("Cannot create Angr instance on binary {}: {}".format(
args.binary, e))
return 1
parser.add_argument(
"binary", type=str, help="Path to the test binary to run.")
args = parser.parse_args()
project = angr.Project(
args.binary,
use_sim_procedures=True,
translation_cache=True,
support_selfmodifying_code=False,
auto_load_libs=True,
exclude_sim_procedures_list=['printf', '__printf_chk',
'vprintf', '__vprintf_chk',
'fprintf', '__fprintf_chk',
'vfprintf', '__vfprintf_chk'])
try:
setup_ea = project.kb.labels.lookup('DeepState_Setup')
if not setup_ea:
raise Exception()
except:
L.critical("Cannot find symbol `DeepState_Setup` in binary `{}`".format(
args.binary))
return 1
entry_state = project.factory.entry_state(
add_options={angr.options.ZERO_FILL_UNCONSTRAINED_MEMORY,
@@ -301,7 +302,6 @@ def main():
concrete_manager = angr.SimulationManager(
project=project,
active_states=[entry_state])
setup_ea = project.kb.labels.lookup('DeepState_Setup')
concrete_manager.explore(find=setup_ea)
run_state = concrete_manager.found[0]

View File

@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import collections
import logging
import manticore
@@ -277,26 +276,30 @@ def run_tests(args, state, apis):
def main():
parser = argparse.ArgumentParser(
description="Symbolically execute unit tests with Manticore")
args = DeepManticore.parse_args()
parser.add_argument(
"--num_workers", default=1, type=int,
help="Number of workers to spawn for testing and test generation.")
try:
m = manticore.Manticore(args.binary)
except Exception as e:
L.critical("Cannot create Manticore instance on binary {}: {}".format(
args.binary, e))
return 1
parser.add_argument(
"binary", type=str, help="Path to the test binary to run.")
args = parser.parse_args()
m = manticore.Manticore(args.binary)
m.verbosity(1)
# Hack to get around current broken _get_symbol_address
m._binary_type = 'not elf'
m._binary_obj = m._initial_state.platform.elf
setup_ea = m._get_symbol_address('DeepState_Setup')
try:
setup_ea = m._get_symbol_address('DeepState_Setup')
if not setup_ea:
raise Exception()
except:
L.critical("Cannot find symbol `DeepState_Setup` in binary `{}`".format(
args.binary))
return 1
setup_state = m._initial_state
mc = DeepManticore(setup_state)

View File

@@ -75,7 +75,7 @@ void DeepState_SymbolizeData(void *begin, void *end) {
uintptr_t end_addr = (uintptr_t) end;
if (begin_addr > end_addr) {
abort();
DeepState_Abandon("Invalid data bounds for DeepState_SymbolizeData");
} else if (begin_addr == end_addr) {
return;
} else {