From 1deccfd0053f5e4649dce697de7cd662a4cb47ec Mon Sep 17 00:00:00 2001 From: Michael Rash Date: Thu, 24 Apr 2014 22:11:04 -0400 Subject: [PATCH] [test suite] python fuzzer - break out fuzzing sections into dedicated functions --- test/spa_fuzzing.py | 198 ++++++++++++++++++++++++++++++-------------- 1 file changed, 134 insertions(+), 64 deletions(-) diff --git a/test/spa_fuzzing.py b/test/spa_fuzzing.py index 3ac56f78..ba395fca 100755 --- a/test/spa_fuzzing.py +++ b/test/spa_fuzzing.py @@ -19,14 +19,14 @@ import base64 import argparse -def main(): +### a few constants +spa_success = 1 +spa_failure = 0 +spa_sha256 = 3 +do_digest = 1 +no_digest = 0 - ### a few constants - spa_success = 1 - spa_failure = 0 - spa_sha256 = 3 - do_digest = 1 - no_digest = 0 +def main(): args = parse_cmdline() @@ -50,78 +50,148 @@ def main(): payload_num += 1 - print "# start tests with payload: " + spa_payload + print "# start tests with payload: ", spa_payload + "\n" \ + "# base64 encoded original payload:", base64.b64encode(spa_payload) + + ### fuzz individual payload fields + pkt_id = field_fuzzing(args, spa_payload, payload_num, pkt_id) ### valid payload tests - all digest types - print "# payload " + str(payload_num) + " valid payload + valid digest types..." - for digest_type in range(0, 6): - print str(pkt_id), str(spa_success), str(do_digest), \ - str(digest_type), base64.b64encode(spa_payload) - pkt_id += 1 + pkt_id = valid_payloads(args, spa_payload, payload_num, pkt_id) ### invalid digest types - print "# payload " + str(payload_num) + " invalid digest types..." - for digest_type in [-1, 6, 7]: - print str(pkt_id), str(spa_success), str(do_digest), \ - str(digest_type), base64.b64encode(spa_payload) - pkt_id += 1 + pkt_id = invalid_digest_types(args, spa_payload, payload_num, pkt_id) ### truncated lengths - print "# payload " + str(payload_num) + " truncated lengths..." - for l in range(1, len(spa_payload)): - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(spa_payload[:l]) - pkt_id += 1 - for l in range(1, len(spa_payload)): - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(spa_payload[l:]) - pkt_id += 1 + pkt_id = truncated_lengths(args, spa_payload, payload_num, pkt_id) - ### blocks of spliced chars out of the original SPA payload - print "# payload " + str(payload_num) + " splice blocks of chars..." - for bl in range(1, 20): - for l in range(0, len(spa_payload)): - new_payload = spa_payload[:l] + spa_payload[l+bl:] - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(new_payload) - pkt_id += 1 + ### remove chunks of chars out of the original SPA payload + pkt_id = rm_chunks(args, spa_payload, payload_num, pkt_id) ### SPA payloads that are too long - print "# payload " + str(payload_num) + " payloads too long..." - for l in [1, 10, 50, 127, 128, 129, 200, 399, \ - 400, 401, 500, 800, 1000, 1023, 1024, 1025, \ - 1200, 1499, 1500, 1501, 2000]: - for non_ascii in range(0, 5) + range(127, 130) + range(252, 255): - new_payload = spa_payload - for p in range(0, l): - new_payload += chr(non_ascii) - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(new_payload) - pkt_id += 1 + pkt_id = append_data_to_end(args, spa_payload, payload_num, pkt_id) ### additional embedded ':' chars - print "# payload " + str(payload_num) + " additional embedded : chars..." - for pos in range(0, len(spa_payload)): - if spa_payload[pos] == ':': - continue + pkt_id = embedded_separators(args, spa_payload, payload_num, pkt_id) + + ### non-ascii char tests + pkt_id = embedded_non_ascii(args, spa_payload, payload_num, pkt_id) + + return + +def field_fuzzing(args, spa_payload, payload_num, pkt_id): + repl_start = 0 + repl_end = 0 + field_num = 0 + for idx in range(0, len(spa_payload)): + if spa_payload[idx] == ':' or idx == len(spa_payload)-1: + field_num += 1 + repl_start = repl_end + repl_end = idx + + ### now generate fuzzing data for this field + for c in range(1, 255): + for l in [1, 2, 3, 4, 5, 6, 10, 14, 15, 16, 17, 24, 31, 32, 33, \ + 63, 64, 127, 128, 129, 250]: + + fuzzing_field = '' + + for n in range(0, l): + fuzzing_field += chr(c) + + if idx == len(spa_payload)-1: + new_payload = spa_payload[:repl_start] + ":" + base64.b64encode(fuzzing_field) + else: + if field_num == 1: + new_payload = spa_payload[:repl_start] + fuzzing_field + spa_payload[repl_end:] + elif field_num == 2 or field_num >= 6: ### user or access request + new_payload = spa_payload[:repl_start] + ":" + base64.b64encode(fuzzing_field) + spa_payload[repl_end:] + else: + ### time stamp, version, and SPA type fields aren't base64 encoded + new_payload = spa_payload[:repl_start] + ":" + fuzzing_field + spa_payload[repl_end:] + + print str(pkt_id), str(spa_failure), str(do_digest), \ + str(spa_sha256), base64.b64encode(new_payload) + pkt_id += 1 + + return pkt_id + +def valid_payloads(args, spa_payload, payload_num, pkt_id): + print "# payload " + str(payload_num) + " valid payload + valid digest types..." + for digest_type in range(0, 6): + print str(pkt_id), str(spa_success), str(do_digest), \ + str(digest_type), base64.b64encode(spa_payload) + pkt_id += 1 + return pkt_id + +def invalid_digest_types(args, spa_payload, payload_num, pkt_id): + print "# payload " + str(payload_num) + " invalid digest types..." + for digest_type in [-1, 6, 7]: + print str(pkt_id), str(spa_success), str(do_digest), \ + str(digest_type), base64.b64encode(spa_payload) + pkt_id += 1 + return pkt_id + +def truncated_lengths(args, spa_payload, payload_num, pkt_id): + print "# payload " + str(payload_num) + " truncated lengths..." + for l in range(1, len(spa_payload)): + print str(pkt_id), str(spa_failure), str(do_digest), \ + str(spa_sha256), base64.b64encode(spa_payload[:l]) + pkt_id += 1 + for l in range(1, len(spa_payload)): + print str(pkt_id), str(spa_failure), str(do_digest), \ + str(spa_sha256), base64.b64encode(spa_payload[l:]) + pkt_id += 1 + + return pkt_id + +def rm_chunks(args, spa_payload, payload_num, pkt_id): + print "# payload " + str(payload_num) + " splice blocks of chars..." + for bl in range(1, 20): + for l in range(0, len(spa_payload)): + new_payload = spa_payload[:l] + spa_payload[l+bl:] + print str(pkt_id), str(spa_failure), str(do_digest), \ + str(spa_sha256), base64.b64encode(new_payload) + pkt_id += 1 + return pkt_id + +def append_data_to_end(args, spa_payload, payload_num, pkt_id): + print "# payload " + str(payload_num) + " payloads too long..." + for l in [1, 10, 50, 127, 128, 129, 200, 399, \ + 400, 401, 500, 800, 1000, 1023, 1024, 1025, \ + 1200, 1499, 1500, 1501, 2000]: + for non_ascii in range(0, 5) + range(127, 130) + range(252, 256): + new_payload = spa_payload + for p in range(0, l): + new_payload += chr(non_ascii) + print str(pkt_id), str(spa_failure), str(do_digest), \ + str(spa_sha256), base64.b64encode(new_payload) + pkt_id += 1 + return pkt_id + +def embedded_separators(args, spa_payload, payload_num, pkt_id): + print "# payload " + str(payload_num) + " additional embedded : chars..." + for pos in range(0, len(spa_payload)): + if spa_payload[pos] == ':': + continue + new_payload = list(spa_payload) + new_payload[pos] = ':' + print str(pkt_id), str(spa_failure), str(do_digest), \ + str(spa_sha256), base64.b64encode(''.join(new_payload)) + pkt_id += 1 + return pkt_id + +def embedded_non_ascii(args, spa_payload, payload_num, pkt_id): + print "# payload " + str(payload_num) + " non-ascii char tests..." + for pos in range(0, len(spa_payload)): + for non_ascii in range(0, 31) + range(127, 256): new_payload = list(spa_payload) - new_payload[pos] = ':' + new_payload[pos] = chr(non_ascii) + ### write out the fuzzing line print str(pkt_id), str(spa_failure), str(do_digest), \ str(spa_sha256), base64.b64encode(''.join(new_payload)) pkt_id += 1 - - ### non-ascii char tests - print "# payload " + str(payload_num) + " non-ascii char tests..." - for pos in range(0, len(spa_payload)): - for non_ascii in range(0, 31) + range(127, 255): - new_payload = list(spa_payload) - new_payload[pos] = chr(non_ascii) - ### write out the fuzzing line - print str(pkt_id), str(spa_failure), str(do_digest), \ - str(spa_sha256), base64.b64encode(''.join(new_payload)) - pkt_id += 1 - - return + return pkt_id def print_hdr(): print "# "