Remove ManticoreControl object [#180] (#4)

* Remove ManticoreControl object
* Some changes were brought in from dev-symbolicate-api
* Add Manticore.terminate()
* Add State.abandon()
* Update sample scripts
* Remove ctl from README
* Fix tests
* Bring in changes from dev-symbolicate-api
* Lower-case wildcard
* string -> cstring
* abandon() docstring
* Rename "name" to "label"
* Remove obsolete comment
* Make NUL a possible value for the last byte of a cstring
* Fix AbandonState and add example binary&script
* name -> label in tests, manticore.py
* Ignore .DS_Store
* Update symbolicate_buffer docstring
This commit is contained in:
Yan 2017-02-14 14:54:52 -05:00 committed by GitHub
parent 406918a9fc
commit dde79a0bab
11 changed files with 180 additions and 60 deletions

3
.gitignore vendored
View File

@ -89,3 +89,6 @@ ENV/
# Rope project settings
.ropeproject
# macOS Finder files
.DS_Store

View File

@ -66,12 +66,12 @@ hook_pc = 0x400ca0
m = Manticore('./path/to/binary')
def hook(context, state, control):
def hook(context, state):
cpu = state.cpu
print 'eax', cpu.EAX
print cpu.read_int(cpu.SP)
control.exit() # tell Manticore to stop
m.terminate() # tell Manticore to stop
m.add_hook(hook_pc, hook)
m.start()

View File

@ -167,15 +167,81 @@ class State(object):
def add(self, constraint, check=False):
self.constraints.add(constraint)
def make_symbolic(self, data, name = 'INPUT', WILDCARD='+', string=False):
if WILDCARD in data:
def abandon(self):
'''Abandon the currently-active state
Note: This must be called from the Executor loop, or a user-provided
callback.'''
raise AbandonState
def new_symbolic_buffer(self, nbytes, **options):
'''Create and return a symbolic buffer of length |nbytes|. The buffer is
not written into State's memory; write it to the state's memory to
introduce it into the program state.
Args:
nbytes - Length of the new buffer
options - Options to set on the returned expression. Valid options:
name -- The name to assign to the buffer (str)
cstring -- Whether or not to enforce that the buffer is a cstring
(i.e. no \0 bytes, except for the last byte). (bool)
Returns:
Expression representing the buffer.
'''
name = options.get('name', 'buffer')
expr = self.constraints.new_array(name=name, index_max=nbytes)
self.input_symbols.append(expr)
if options.get('cstring', False):
for i in range(nbytes - 1):
self.constraints.add(expr[i] != 0)
return expr
def new_symbolic_value(self, nbits, **options):
'''Create and return a symbolic value that is |nbits| bits wide. Assign
the value to a register or write it into the address space to introduce
it into the program state.
Args:
nbits - The bitwidth of the value returned.
options - Options to set on the returned expression. Valid options:
label -- The label to assign to the value.
Returns:
Expression representing the value.
'''
assert nbits in (1, 4, 8, 16, 32, 64, 128, 256)
name = options.get('label', 'val')
expr = self.constraints.new_bitvec(nbits, name=name)
self.input_symbols.append(expr)
return expr
def symbolicate_buffer(self, data, label = 'INPUT', wildcard='+', string=False):
'''Mark parts of a buffer as symbolic (demarked by the wildcard byte)
Args:
data -- The string to symbolicate. If no wildcard bytes are provided,
this is the identity function on the first argument.
label -- The label to assign to the value
wildcard -- The byte that is considered a wildcard
string -- Ensure bytes returned can not be \0
Returns:
If data does not contain any wildcard bytes, data itself. Otherwise,
a list of values derived from data. Non-wildcard bytes are kept as
is, wildcard bytes are replaced by Expression objects.
'''
if wildcard in data:
size = len(data)
symb = self.constraints.new_array(name=name, index_max=size)
symb = self.constraints.new_array(name=label, index_max=size)
self.input_symbols.append(symb)
for j in xrange(size):
if data[j] != WILDCARD:
if data[j] != wildcard:
symb[j] = data[j]
data = [symb[i] for i in range(size)]
data = [symb[i] for i in range(size)]
if string:
for b in data:
@ -710,7 +776,7 @@ class Executor(object):
vals = random.sample(vals, 1)
logger.info("Too much storage used(%d). Inhibiting fork", total_used_storage)
childs = []
children = []
if len(vals) == 1:
constraint = symbolic == vals[0]
current_state.add(constraint, check=True) #We already know it's sat
@ -739,10 +805,10 @@ class Executor(object):
setstate(new_state, new_value)
#add the state to the list of pending states
self.putState(new_state)
childs.append(new_state.co)
children.append(new_state.co)
logger.debug("Forking state %d into states %r",parent, childs)
logger.debug("Forking state %d into states %r",parent, children)
current_state = None
return current_state
@ -868,7 +934,6 @@ class Executor(object):
except AbandonState, e:
current_state = None
break
except ForkState as e:
logger.debug("Forking state")
@ -926,20 +991,20 @@ class Executor(object):
logger.error('SymbolicMemoryException at PC: 0x%16x. Cause: %s', current_state.current.PC, e.cause)
logger.info('Constraint for crashing! %s', e.constraint)
childs = []
children = []
with current_state as new_state:
new_state.add(e.constraint==False)
if solver.check(new_state.constraints):
self.putState(new_state)
childs.append(new_state.co)
children.append(new_state.co)
with current_state as new_state:
childs.append(new_state.co)
children.append(new_state.co)
new_state.add(e.constraint)
self.newerror(new_state.current.PC)
self.generate_testcase(new_state, "Symbolic Memory Exception: " + str(e))
logger.info("Forking state %d into states %r",current_state.co, childs)
logger.info("Forking state %d into states %r",current_state.co, children)
new_state = current_state = None
except MemoryException as e:

View File

@ -22,9 +22,9 @@ if __name__ == '__main__':
# Ensure that we ignore all possible branches to libc
# This hook returns False if we should abandon exploration
# or True to continue
def fork_hook(new_pc):
def fork_hook(ctx, state):
_from, _to = lib.start, lib.start + lib.size
return not (_from <= new_pc < _to)
return not (_from <= state.cpu.PC < _to)
m.add_fork_hook(fork_hook)
# Start path exploration. start() returns when Manticore

View File

@ -11,7 +11,7 @@ if __name__ == '__main__':
path = sys.argv[1]
m = Manticore(path)
def myhook(ctx, state, ctl):
def myhook(ctx, state):
flag = ''
cpu = state.cpu
arraytop = cpu.R11
@ -22,7 +22,7 @@ if __name__ == '__main__':
concrete_input = state.solve_one(symbolic_input)
flag += chr(concrete_input & 0xff)
print 'flag is:', flag
ctl.exit()
m.terminate()
m.add_hook(0x109f0, myhook)

View File

@ -21,12 +21,12 @@ if __name__ == '__main__':
m = Manticore(None, path, args)
# Trigger an event when PC reaches a certain value
def reached_goal(state):
def reached_goal(ctx, state):
cpu = state.cpu
assert cpu.pc == 0x10858
assert cpu.PC == 0x10858
instruction = cpu.read(cpu.pc, 4)
instruction = cpu.read(cpu.PC, 4)
print "Execution goal reached."
print "Instruction bytes: {:08x}".format(cpu.pc)

View File

@ -0,0 +1,56 @@
#include <stdio.h>
/**
* Example code for the state abandon example in the Manticore script corpus.
*
* # Compile binary
* $ gcc -m32 -static -g state_explore.c -o state_explore
*
* # Pull out the address of the branch we want to ignore
* $ ADDRESS=0x$(objdump -S state_explore | grep -A 1 'value == 0x41' |
* tail -n 1 | sed 's|^\s*||g' | cut -f1 -d:)
*
* # Run the analysis
* $ python state_explore.py state_explore $ADDRESS
*/
int
main(int argc, char *argv[])
{
int value;
read(0, &value, sizeof value);
if ((value & 0xff) != 0) {
if (value >= 0x40) {
write(1, "1", 1);
if (value == 0x41) {
write(1, "a", 1);
} else if (value == 0x42) {
write(1, "b", 1);
} else if (value == 0x43) {
write(1, "c", 1);
} else if (value == 0x44) {
write(1, "d", 1);
} else {
write(1, "e", 1);
}
} else {
write(1, "2", 1);
}
} else if ((value & 0xff00) != 0) {
if (value > 0x1000) {
write(1, "3", 1);
} else {
write(1, "4", 1);
}
} else if ((value & 0xff0000) != 0) {
if (value > 0xf0000) {
write(1, "5", 1);
} else {
write(1, "6", 1);
}
}
write(1, "\n", 2);
return 0;
}

View File

@ -0,0 +1,22 @@
import sys
from manticore import Manticore
if __name__ == '__main__':
if len(sys.argv) < 3:
sys.stderr.write("Usage: %s [binary] [address]\n"%(sys.argv[0],))
sys.exit(2)
m = Manticore(sys.argv[1])
# Set to the address of the conditonal checking for the first complex branch
to_abandon = int(sys.argv[2], 0)
def explore(ctx, state):
print "Abandoning state at PC: ", hex(state.cpu.PC)
state.abandon()
print "Adding hook to: ", hex(to_abandon)
m.add_hook(to_abandon, explore)
m.start()

View File

@ -29,7 +29,7 @@ if __name__ == '__main__':
else:
target = (0x400a83, 'EBX')
def entered_func(state):
def entered_func(ctx, state):
'''For ARM, Make R4 symbolic at 0x1082c, as r4 is used in a branch right
after.
'''

View File

@ -36,7 +36,7 @@ def makeDecree(args):
#if args.data != '':
# logger.info('Starting with concrete input: {}'.format(args.data))
model.input.transmit(args.data)
model.input.transmit(initial_state.make_symbolic('+'*14, name='RECEIVE'))
model.input.transmit(initial_state.symbolicate_buffer('+'*14, label='RECEIVE'))
return initial_state
def makeLinux(program, arguments, environment, concrete_start = ''):
@ -50,14 +50,14 @@ def makeLinux(program, arguments, environment, concrete_start = ''):
logger.info('Starting with concrete input: {}'.format(concrete_start))
for i in xrange(len(arguments)):
arguments[i] = initial_state.make_symbolic(arguments[i], name='ARGV%d' % (i+1), string=True)
arguments[i] = initial_state.symbolicate_buffer(arguments[i], label='ARGV%d' % (i+1), string=True)
for i in xrange(len(environment)):
environment[i] = initial_state.make_symbolic(environment[i], name='ENV%d' % (i+1), string=True)
environment[i] = initial_state.symbolicate_buffer(environment[i], label='ENV%d' % (i+1), string=True)
model.input.transmit(concrete_start)
#set stdin input...
model.input.transmit(initial_state.make_symbolic('+'*256, name='STDIN'))
model.input.transmit(initial_state.symbolicate_buffer('+'*256, label='STDIN'))
return initial_state
@ -128,32 +128,6 @@ def issymbolic(value):
'''
return isinstance(value, Expression)
class ManticoreControl(object):
'''
Controls which state is chosen for next execution, and stops all
subprocess execution.
'''
def __init__(self, manticore_obj):
self._parent = manticore_obj
def exit(self):
''' Terminate all outstanding manticore processes '''
m = self._parent
m._executor.shutdown()
@property
def states(self):
''' Return a list of possible next-state choices. Each entry in the list
is a file name of the next-state file '''
raise NotImplementedError()
def abandon_state(self):
''' Mark the current state as irrelevant, so Manticore will not continue
exploring this state tree
'''
pass
class Manticore(object):
def __init__(self, binary_path, args = [], verbose = False):
@ -336,11 +310,7 @@ class Manticore(object):
for pc to invoke callback on every instruction.
'''
def _inner(state):
# Callbacks are defined to consume three arguments:
# manticore context
# program state
# manticore control obj
callback(self._context, state, ManticoreControl(self))
callback(self._context, state)
self._hooks.setdefault(pc, set()).add(_inner)
@ -596,6 +566,10 @@ class Manticore(object):
finally:
self._running = False
def terminate(self):
'Gracefully terminate the currently-executing Manticore run.'
self._executor.shutdown()
def _assertions_callback(self, state, pc):
if pc not in self._assertions:
return

View File

@ -65,14 +65,14 @@ class StateTest(unittest.TestCase):
constraints = ConstraintSet()
initial_state = State(constraints, FakeModel())
arr = initial_state.make_symbolic('+'*100, name='SYMBA')
arr = initial_state.symbolicate_buffer('+'*100, label='SYMBA')
initial_state.add(arr[0] > 0x41)
self.assertTrue(len(initial_state.constraints.declarations) == 1 )
with initial_state as new_state:
self.assertTrue(len(initial_state.constraints.declarations) == 1 )
self.assertTrue(len(new_state.constraints.declarations) == 1 )
arrb = new_state.make_symbolic('+'*100, name='SYMBB')
arrb = new_state.symbolicate_buffer('+'*100, label='SYMBB')
self.assertTrue(len(initial_state.constraints.declarations) == 1 )
self.assertTrue(len(new_state.constraints.declarations) == 1 )