Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- template_test_harness.py
- Template which loads the context of a process into a Unicorn Engine,
- instance, loads a custom (mutated) inputs, and executes the
- desired code. Designed to be used in conjunction with one of the
- Unicorn Context Dumper scripts.
- Author:
- Nathan Voss <njvoss299@gmail.com>
- """
- import argparse
- import struct
- import binascii
- from unicorn import *
- from unicorn.x86_const import * # TODO: Set correct architecture here as necessary
- import unicorn_loader
- # Simple stand-in heap to prevent OS/kernel issues
- unicorn_heap = None
- # Start and end address of emulation
- START_ADDRESS = 0x00000000004004DA # TODO: Set start address here
- END_ADDRESS = 0x0000000000400511 # TODO: Set end address here
- def hook_mem_read(uc, type, addr,*args):
- print(hex(addr))
- return False
- def u32(data):
- return struct.unpack(">I", data)[0]
- def p32(num):
- return struct.pack(">I", num)
- """
- Implement target-specific hooks in here.
- Stub out, skip past, and re-implement necessary functionality as appropriate
- """
- def unicorn_hook_instruction(uc, address, size, user_data):
- # TODO: Setup hooks and handle anything you need to here
- # - For example, hook malloc/free/etc. and handle it internally
- pass
- #------------------------
- #---- Main test function
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument('context_dir', type=str, help="Directory containing process context")
- parser.add_argument('input_file', type=str, help="Path to the file containing the mutated input content")
- parser.add_argument('-d', '--debug', default=False, action="store_true", help="Dump trace info")
- args = parser.parse_args()
- print("Loading context from {}".format(args.context_dir))
- uc = unicorn_loader.AflUnicornEngine(args.context_dir, enable_trace=args.debug, debug_print=False)
- # Instantiate the hook function to avoid emulation errors
- global unicorn_heap
- unicorn_heap = unicorn_loader.UnicornSimpleHeap(uc, debug_print=False)
- uc.hook_add(UC_HOOK_CODE, unicorn_hook_instruction)
- uc.hook_add(UC_HOOK_MEM_READ_UNMAPPED, hook_mem_read)
- # Execute 1 instruction just to startup the forkserver
- # NOTE: This instruction will be executed again later, so be sure that
- # there are no negative consequences to the overall execution state.
- # If there are, change the later call to emu_start to no re-execute
- # the first instruction.
- print("Starting the forkserver by executing 1 instruction")
- try:
- uc.emu_start(START_ADDRESS, 0, 0, count=1)
- except UcError as e:
- print("ERROR: Failed to execute a single instruction (error: {})!".format(e))
- return
- def intr_hook(uc, intno, data):
- print 'interrupt=%d, v0=%d, pc=0x%08x' % (intno, uc.reg_read(UC_X86_REG_V0), uc.reg_read(UC_X86_REG_PC))
- # hook interrupts for syscall
- uc.hook_add(UC_HOOK_INTR, intr_hook)
- # Allocate a buffer and load a mutated input and put it into the right spot
- if args.input_file:
- print("Loading input content from {}".format(args.input_file))
- input_file = open(args.input_file, 'rb')
- input_content = input_file.read()
- input_file.close()
- # TODO: Apply constraints to mutated input here
- # raise exceptions.NotImplementedError('No constraints on the mutated inputs have been set!')
- if len(input_content) > 0xFF:
- print("Test packet is too long (> {} bytes)".format(0xFF))
- return
- # Allocate a new buffer and put the input into it.
- buf_addr = unicorn_heap.malloc(len(input_content))
- stack_addr = unicorn_heap.malloc(1024)
- uc.mem_write(buf_addr, input_content)
- print("INPUT CONTENT: " + input_content)
- print("INPUT CONTENT LENGTH: " + str(len(input_content)))
- print("Allocated mutated input buffer @ 0x{0:016x}".format(buf_addr))
- # TODO: Set the input into the state so it will be handled
- uc.reg_write(UC_X86_REG_RBP, stack_addr+512)
- uc.reg_write(UC_X86_REG_RSP, stack_addr+512)
- # raise exceptions.NotImplementedError('The mutated input was not loaded into the Unicorn state!')
- # Run the test
- print("Executing from 0x{0:016x} to 0x{1:016x}".format(START_ADDRESS, END_ADDRESS))
- try:
- result = uc.emu_start(START_ADDRESS, END_ADDRESS, timeout=0, count=0)
- #result = uc.emu_start(START_ADDRESS, 0, timeout=0, count=1)
- except UcError as e:
- # If something went wrong during emulation a signal is raised to force this
- # script to crash in a way that AFL can detect ('uc.force_crash()' should be
- # called for any condition that you want AFL to treat as a crash).
- print("Execution failed with error: {}".format(e))
- uc.dump_regs()
- uc.force_crash(e)
- print("Final register state:")
- uc.dump_regs()
- print("Done.")
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement