diff --git a/bin/mctest/__main__.py b/bin/mctest/__main__.py index 82b6073..d0e9600 100644 --- a/bin/mctest/__main__.py +++ b/bin/mctest/__main__.py @@ -14,37 +14,80 @@ # limitations under the License. +import argparse +import collections import logging import manticore +import multiprocessing import sys +from manticore.core.state import TerminateState from manticore.utils.helpers import issymbolic + L = logging.getLogger("mctest") L.setLevel(logging.INFO) -class EntryPointPlugin(manticore.core.plugin.Plugin): - """Interpose on system calls. When we come across McTest's special system - call that is invoked at the beginnning of McTest_Run, then we stop execution - there and take over.""" - def on_syscall_callback(self, state, index): - if 0x41414141 == index: - print 'here!!!!!' - state_id = self.manticore._executor._workspace.save_state(state) - self.manticore._executor.put(state_id) - raise manticore.TerminateState("Canceled", testcase=False) +def read_c_string(state, ea): + """Read a concrete NUL-terminated string from `ea`.""" + return state.cpu.read_string(ea) + def read_uintptr_t(state, ea): - """Read a uint64_t value from memory.""" - next_ea = ea + (state.cpu.address_bit_size // 8) - val = state.cpu.read_int(ea) + """Read a uintptr_t value from memory.""" + addr_size_bits = state.cpu.address_bit_size + next_ea = ea + (addr_size_bits // 8) + val = state.cpu.read_int(ea, size=addr_size_bits) if issymbolic(val): val = state.solve_one(val) return val, next_ea -def read_c_string(state, ea): - return state.cpu.read_string(ea) + +def read_uint32_t(state, ea): + """Read a uint32_t value from memory.""" + next_ea = ea + 4 + val = state.cpu.read_int(ea, size=32) + if issymbolic(val): + val = state.solve_one(val) + return val, next_ea + + +TestInfo = collections.namedtuple( + 'TestInfo', 'ea name file_name line_number') + + +def read_test_info(state, ea): + """Read in a `McTest_TestInfo` info structure from memory.""" + prev_test_ea, ea = read_uintptr_t(state, ea) + test_func_ea, ea = read_uintptr_t(state, ea) + test_name_ea, ea = read_uintptr_t(state, ea) + file_name_ea, ea = read_uintptr_t(state, ea) + file_line_num, _ = read_uint32_t(state, ea) + + if not test_func_ea or \ + not test_name_ea or \ + not file_name_ea or \ + not file_line_num: # `__LINE__` in C always starts at `1` ;-) + return None, prev_test_ea + + test_name = read_c_string(state, test_name_ea) + file_name = read_c_string(state, file_name_ea) + info = TestInfo(test_func_ea, test_name, file_name, file_line_num) + return info, prev_test_ea + + +def find_test_cases(state, info_ea): + """Find the test case descriptors.""" + tests = [] + info_ea, _ = read_uintptr_t(state, info_ea) + while info_ea: + test, info_ea = read_test_info(state, info_ea) + if test: + tests.append(test) + tests.sort(key=lambda t: (t.file_name, t.line_number)) + return tests + def read_api_table(state, ea): """Reads in the API table.""" @@ -58,16 +101,16 @@ def read_api_table(state, ea): apis[api_name] = api_ea return apis + def make_symbolic_input(state, input_begin_ea, input_end_ea): """Fill in the input data array with symbolic data.""" input_size = input_end_ea - input_begin_ea - data = state.new_symbolic_buffer(nbytes=input_size, name='MCTEST_INPUT') - state.cpu.write_bytes(input_begin_ea, data) - + state.cpu.write_bytes(input_begin_ea, data) return data -def IsSymbolicUInt(state, arg): + +def hook_IsSymbolicUInt(state, arg): """Implements McTest_IsSymblicUInt, which returns 1 if its input argument has more then one solutions, and zero otherwise.""" solutions = state.solve_n(arg, 2) @@ -81,28 +124,101 @@ def IsSymbolicUInt(state, arg): return 1 -def Assume(state, arg): - """Implements _McTest_CanAssume, which tries to inject a constraint.""" +def hook_Assume(state, arg): + """Implements _McTest_Assume, which tries to inject a constraint.""" constraint = arg != 0 - state.constrain(constraint) - if len(state.concretize(constraint, 'ONE')) == 0: - L.error("Failed to assert assumption {}".format(constraint)) - self.exit(2) + if issymbolic(constraint): + state.constrain(constraint) -def Pass(self): +def hook_Pass(state): """Implements McTest_Pass, which notifies us of a passing test.""" L.info("Passed test case") - self.exit(0) + if state.context['failed']: + raise TerminateState("Got to end of failing test case.") + else: + raise TerminateState("Passed test case") -def Fail(self): + +def hook_Fail(state): """Implements McTest_Fail, which notifies us of a passing test.""" L.error("Failed test case") - self.exit(1) + state.context['failed'] = 1 + raise TerminateState("Failed test case") +def hook_SoftFail(state): + """Implements McTest_Fail, which notifies us of a passing test.""" + L.error("Soft failure in test case, continuing") + state.context['failed'] = 1 + + +LEVEL_TO_LOGGER = { + 0: L.debug, + 1: L.info, + 2: L.warning, + 3: L.error, + 4: L.critical +} + + +def hook_Log(state): + """Implements McTest_Log, which lets Manticore intercept and handle the + printing of log messages from the simulated tests.""" + pass + +def hook(func): + return lambda state: state.invoke_model(func) + + +def run_test(state, apis, test): + """Run an individual test case.""" + state.cpu.PC = test.ea + m = manticore.Manticore(state, sys.argv[1:]) + + state = m.initial_state + state.context['failed'] = 0 + state.context['log_messages'] = [] + state.context['input'] = make_symbolic_input( + state, apis['InputBegin'], apis['InputEnd']) + + m.add_hook(apis['IsSymbolicUInt'], hook(hook_IsSymbolicUInt)) + m.add_hook(apis['Assume'], hook(hook_Assume)) + m.add_hook(apis['Pass'], hook(hook_Pass)) + m.add_hook(apis['Fail'], hook(hook_Fail)) + m.add_hook(apis['SoftFail'], hook(hook_SoftFail)) + m.run() + + +def run_tests(args, state, apis): + """Run all of the test cases.""" + pool = multiprocessing.Pool(processes=max(1, args.num_workers)) + results = [] + tests = find_test_cases(state, apis['LastTestInfo']) + for test in tests: + print "Found test", test.name + res = pool.apply_async(run_test, (state, apis, test)) + results.append(res) + + pool.close() + pool.join() + + exit(0) + def main(): + parser = argparse.ArgumentParser( + description="Symbolically execute unit tests with Manticore") + + parser.add_argument( + "--num_workers", default=1, type=int, + help="Number of workers to spawn for testing and test generation.") + + parser.add_argument( + "binary", type=str, help="Path to the test binary to run.") + + args = parser.parse_args() + m = manticore.Manticore(sys.argv[1], sys.argv[1:]) m.verbosity(1) @@ -110,24 +226,15 @@ def main(): m._binary_type = 'not elf' m._binary_obj = m._initial_state.platform.elf - mctest = m._get_symbol_address('McTest_Run') - run_state = m._initial_state + setup_ea = m._get_symbol_address('McTest_Setup') + setup_state = m._initial_state ea_of_api_table = m._get_symbol_address('McTest_API') - apis = read_api_table(run_state, ea_of_api_table) + apis = read_api_table(setup_state, ea_of_api_table) - # Introduce symbolic input that the tested code will use. - symbolic_input = make_symbolic_input(run_state, - apis['InputBegin'], apis['InputEnd']) + m.add_hook(setup_ea, lambda state: run_tests(args, state, apis)) + m.run() - m.add_hook(apis['IsSymbolicUInt'], lambda state: state.invoke_model(IsSymbolicUInt)) - m.add_hook(apis['Assume'], lambda state: state.invoke_model(Assume)) - m.add_hook(apis['Pass'], lambda state: state.invoke_model(Pass)) - m.add_hook(apis['Fail'], lambda state: state.invoke_model(Fail)) - - print "Finished establishing hooks" - - # To Do: create test states if "__main__" == __name__: exit(main()) diff --git a/bin/mctest/main_angr.py b/bin/mctest/main_angr.py index ec73fd4..e17879e 100644 --- a/bin/mctest/main_angr.py +++ b/bin/mctest/main_angr.py @@ -48,15 +48,15 @@ def read_c_string(state, ea): def read_uintptr_t(state, ea): - """Read a uint64_t value from memory.""" + """Read a uintptr_t value from memory.""" next_ea = ea + (state.arch.bits // 8) val = state.solver.eval(state.mem[ea].uintptr_t.resolved, cast_to=int) return val, next_ea def read_uint32_t(state, ea): - """Read a uint64_t value from memory.""" - next_ea = ea + (state.arch.bits // 8) + """Read a uint32_t value from memory.""" + next_ea = ea + 4 val = state.solver.eval(state.mem[ea].uint32_t.resolved, cast_to=int) return val, next_ea @@ -85,6 +85,18 @@ def read_test_info(state, ea): return info, prev_test_ea +def find_test_cases(state, info_ea): + """Find the test case descriptors.""" + tests = [] + info_ea, _ = read_uintptr_t(state, info_ea) + while info_ea: + test, info_ea = read_test_info(state, info_ea) + if test: + tests.append(test) + tests.sort(key=lambda t: (t.file_name, t.line_number)) + return tests + + def read_api_table(state, ea): """Reads in the API table.""" apis = {} @@ -98,17 +110,6 @@ def read_api_table(state, ea): return apis -def find_test_cases(state, info_ea): - """Find the test case descriptors.""" - tests = [] - while info_ea: - test, info_ea = read_test_info(state, info_ea) - if test: - tests.append(test) - tests.sort(key=lambda t: (t.file_name, t.line_number)) - return tests - - def make_symbolic_input(state, input_begin_ea, input_end_ea): """Fill in the input data array with symbolic data.""" input_size = input_end_ea - input_begin_ea @@ -133,7 +134,7 @@ class IsSymbolicUInt(angr.SimProcedure): class Assume(angr.SimProcedure): - """Implements _McTest_CanAssume, which tries to inject a constraint.""" + """Implements _McTest_Assume, which tries to inject a constraint.""" def run(self, arg): constraint = arg != 0 self.state.solver.add(constraint) @@ -165,8 +166,8 @@ class SoftFail(angr.SimProcedure): class Log(angr.SimProcedure): - """Implements McTest_Log, which lets Angr intercept and handle the printing - of log messages from the simulated tests.""" + """Implements McTest_Log, which lets Angr intercept and handle the + printing of log messages from the simulated tests.""" LEVEL_TO_LOGGER = { 0: L.debug, @@ -297,10 +298,6 @@ def main(): ea_of_api_table = project.kb.labels.lookup('McTest_API') apis = read_api_table(run_state, ea_of_api_table) - # Introduce symbolic input that the tested code will use. - symbolic_input = make_symbolic_input( - run_state, apis['InputBegin'], apis['InputEnd']) - # Hook various functions. hook_function(project, apis['IsSymbolicUInt'], IsSymbolicUInt) hook_function(project, apis['Assume'], Assume) @@ -318,6 +315,8 @@ def main(): # as failing. run_state.globals['failed'] = 0 run_state.globals['log_messages'] = [] + run_state.globals['input'] = make_symbolic_input( + run_state, apis['InputBegin'], apis['InputEnd']) pool = multiprocessing.Pool(processes=max(1, args.num_workers)) results = [] diff --git a/src/lib/McTest.c b/src/lib/McTest.c index b362731..ecca27d 100644 --- a/src/lib/McTest.c +++ b/src/lib/McTest.c @@ -205,15 +205,6 @@ const struct McTest_IndexEntry McTest_API[] = { /* Set up McTest. */ void McTest_Setup(void) { - /* Manticore entrypoint. Manticore doesn't (yet?) support symbol lookups, so - * we instead interpose on this fake system call, and discover the API table - * via the first argument to the system call. */ -#if defined(_MSC_VER) -# warning "TODO: Implement Windows interception support for Manticore." -#else - syscall(0x41414141, &McTest_API); -#endif - /* TODO(pag): Sort the test cases by file name and line number. */ }