# Copyright (c) 2013, Felipe Andres Manzano # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice,this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the copyright holder nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. import unittest import sys import shutil import tempfile import os import hashlib import subprocess import time #logging.basicConfig(filename = "test.log", # format = "%(asctime)s: %(name)s:%(levelname)s: %(message)s", # level = logging.DEBUG) class IntegrationTest(unittest.TestCase): def setUp(self): # Create a temporary directory self.test_dir = tempfile.mkdtemp() def tearDown(self): # Remove the directory after the test shutil.rmtree(self.test_dir) def _runWithTimeout(self, procargs, timeout=1200): po = subprocess.Popen(procargs) secs_used = 0 while po.poll() is None and secs_used < timeout: time.sleep(1) sys.stderr.write("~") secs_used += 1 self.assertTrue(secs_used < timeout) sys.stderr.write("\n") @unittest.skip('TODO(mark); skipping so we can move on with our lives and merge x86_new. ask felipe to fix later.') def testArguments(self): dirname = os.path.dirname(__file__) filename = os.path.abspath(os.path.join(dirname, 'binaries/arguments_linux_amd64')) self.assertTrue(filename.startswith(os.getcwd())) filename = filename[len(os.getcwd())+1:] SE = os.path.join(dirname, '../main.py') data = file(filename,'rb').read() self.assertEqual(len(data), 767152) self.assertEqual(hashlib.md5(data).hexdigest() , '00fb23e47831a1054ca4a74656035472') workspace = '%s/workspace'%self.test_dir self._runWithTimeout(['python', SE, '--log', '%s/output.log'%self.test_dir, '--workspace', workspace, '--proc', '4', filename, '+++++++++']) data = file('%s/visited.txt'%workspace,'r').read() data = '\n'.join(sorted(set(data.split('\n')))) self.assertEqual(hashlib.md5(data).hexdigest() , '757e3cb387a163987d9265f15970f595') @unittest.skip('TODO(mark); skipping so we can move on with our lives and merge x86_new. ask felipe to fix later.') def testArgumentsAssertions(self): dirname = os.path.dirname(__file__) filename = os.path.abspath(os.path.join(dirname, 'binaries/arguments_linux_amd64')) self.assertTrue(filename.startswith(os.getcwd())) filename = filename[len(os.getcwd())+1:] SE = os.path.join(dirname, '../main.py') data = file(filename,'rb').read() self.assertEqual(len(data), 767152) self.assertEqual(hashlib.md5(data).hexdigest() , '00fb23e47831a1054ca4a74656035472') workspace = '%s/workspace'%self.test_dir assertions = '%s/assertions.txt'%self.test_dir file(assertions,'w').write('0x0000000000401003 ZF == 1') self._runWithTimeout(['python', SE, '--log', '%s/output.log'%self.test_dir, '--workspace', workspace, '--proc', '4', '--assertions', assertions, filename, '+++++++++']) data = file('%s/visited.txt'%workspace,'r').read() data = '\n'.join(sorted(set(data.split('\n')))) self.assertEqual(hashlib.md5(data).hexdigest() , 'c52d7d471ba5c94fcf59936086821a6b') @unittest.skip('TODO(mark); skipping so we can move on with our lives and merge x86_new. ask felipe to fix later.') def testDecree(self): dirname = os.path.dirname(__file__) filename = os.path.abspath(os.path.join(dirname, 'binaries/cadet_decree_x86')) self.assertTrue(filename.startswith(os.getcwd())) filename = filename[len(os.getcwd())+1:] SE = os.path.join(dirname, '../main.py') data = file(filename,'rb').read() self.assertEqual(len(data), 1828) self.assertEqual(hashlib.md5(data).hexdigest() , '8955a29d51c1edd39b0e53794ebcf464') workspace = '%s/workspace'%self.test_dir self._runWithTimeout(['python', SE, '--log', '%s/output.log'%self.test_dir, '--workspace', workspace, '--timeout', '20', '--proc', '4', '--policy', 'uncovered', filename]) data = file('%s/visited.txt'%workspace,'r').read() visited = len(set(data.split('\n'))) self.assertTrue(visited > 100 ) if __name__ == '__main__': unittest.main()