Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Example usage:
- In [1]: !cat ayy.s
- .text
- .global _start
- .type _start, @function
- _start:
- movq $10, %rdx
- pushw $10
- pushw $28513
- pushw $28012
- pushw $8313
- pushw $31073
- movq %rsp, %rsi
- movq $1, %rdi
- movq $1, %rbx
- movq $1, %rax
- syscall
- addq $10, %rsp
- movq $0, %rax
- retq
- In [2]: !as -o ayy.o ayy.s
- In [3]: with open('ayy.o', 'rb') as f:
- ...: f.seek(64)
- ...: payload = f.read()
- ...:
- In [4]: from they_should_have_written_cpython_in_rust_lol import execute_binary_payload
- In [5]: execute_binary_payload(payload)
- ayy lmao
- """
- import dis
- import mmap
- import sys
- import types
- # byte offsets for structures
- if hasattr(sys, 'gettotalrefcount'):
- # under PyDEBUG builds
- _f_localsplus_offset = 392
- _ob_item_offset = 40
- _view_offset = 72
- _sizeof_pyobject = 32
- _mbuf_offset = 40
- _master_offset = 48
- _view_offset = 72
- _tp_repr_offset = 104
- else:
- # under normal builds
- _f_localsplus_offset = 376
- _ob_item_offset = 24
- _view_offset = 56
- _sizeof_pyobject = 16
- _mbuf_offset = 24
- _master_offset = 32
- _view_offset = 56
- _tp_repr_offset = 88
- # the maximum allowed offset from the address of the frame's locals
- # to the target address measured in pointer counts.
- _write_range = 2 ** 22
- def _make_write_bytecode(range_):
- """Create bytecode that emits one branch for all values in
- [-range_, range_] like:
- if this_branch():
- _n = value
- where ``this_branch`` is a callable that checks if we should assign to
- the given index. ``value`` is constant 0. ``this_branch`` is not stored
- in the locals, instead we just store it on the stack.
- To do negative values, we overflow oparg with man EXTENDED_ARG calls.
- """
- try:
- return _make_write_bytecode._cache[range_]
- except KeyError:
- pass
- cached_file_name = f'_make_write_bytecode.{range_}'
- try:
- with open(cached_file_name, 'rb') as f:
- _make_write_bytecode._cache[range_] = ret = f.read()
- return ret
- except FileNotFoundError:
- pass
- code = bytearray((
- dis.opmap['LOAD_CONST'], 0,
- dis.opmap['YIELD_VALUE'], 0,
- ))
- def check(n):
- *extension, index = (n % 2 ** 32).to_bytes(4, 'big')
- extension = [*filter(bool, extension)]
- code.extend((
- dis.opmap['DUP_TOP'], 0,
- dis.opmap['CALL_FUNCTION'], 0,
- dis.opmap['EXTENDED_ARG'], 0,
- dis.opmap['EXTENDED_ARG'], 0,
- dis.opmap['EXTENDED_ARG'], 0,
- dis.opmap['POP_JUMP_IF_FALSE'], 0,
- dis.opmap['LOAD_CONST'], 0,
- ))
- ix = len(code) - 3
- for ext in extension:
- code.extend((dis.opmap['EXTENDED_ARG'], ext))
- code.extend((
- dis.opmap['STORE_FAST'], index,
- ))
- jump_index = len(code)
- *jump_extension, jump_index = jump_index.to_bytes(4, 'big')
- code[ix - 6] = jump_extension[0]
- code[ix - 4] = jump_extension[1]
- code[ix - 2] = jump_extension[2]
- code[ix] = jump_index
- for n in range(range_):
- check(n)
- check(-n)
- code.extend((
- dis.opmap['LOAD_CONST'], 0,
- dis.opmap['RETURN_VALUE'], 0
- ))
- ret = _make_write_bytecode._cache[range_] = bytes(code)
- with open(cached_file_name, 'wb') as f:
- f.write(ret)
- return ret
- _make_write_bytecode._cache = {}
- def _make_write_codeobject(range_, value):
- return types.CodeType(
- 0,
- 0,
- 0,
- 2,
- 99,
- _make_write_bytecode(range_),
- (value,), # make co_consts[0] ``value`` to replace None
- (),
- (),
- '<string>',
- '<write-target-helper>',
- 1,
- b'',
- )
- def _write_target_address(target_address, value, *, verbose=False):
- # collection of frames to prevent deallocation
- frames = []
- while True:
- # spray the heap with generators looking for a location where the
- # frame's local variables are within ``range_ * 8`` bytes of the
- # target address
- code = _make_write_codeobject(_write_range, value)
- generator = types.FunctionType(code, {})()
- locals_address = id(generator.gi_frame) + _f_localsplus_offset
- offset_bytes = target_address - locals_address
- offset_index = offset_bytes // 8
- if -_write_range <= offset_index <= _write_range:
- # we found a frame that is close enough to the target address;
- # clear the saved frames and continue
- frames.clear()
- if verbose:
- print(
- f'found write frame within range:'
- f' offset={offset_index};'
- f' locals=0x{locals_address:x}'
- f' target=0x{target_address:x}',
- )
- break
- else:
- if verbose:
- print(
- f'write frame too far away:'
- f' offset={offset_index};'
- f' locals=0x{locals_address:x}'
- f' target=0x{target_address:x}',
- )
- # this frame isn't close enough; save it in memory so we don't try
- # here again
- frames.append(generator.gi_frame)
- # make sure we set None to ``value`` and prime the generator
- assert next(generator) is value, 'yielded the wrong value'
- def branch_selector():
- for n in range(_write_range):
- if n == offset_index:
- yield True
- break
- yield False
- if -n == offset_index:
- yield True
- break
- yield False
- try:
- # send the selector into the generator; this gets bound as
- # ``this_branch``
- generator.send(branch_selector().__next__)
- except StopIteration:
- # the branch after ``branch_selector`` returns True will raise a
- # ``StopIteration``
- pass
- # the maximum allowed offset from the address of the frame's locals
- # to the target address measured in pointer counts.
- _load_range = 2 ** 22
- def _make_load_bytecode(range_):
- """Create bytecode that emits one branch for all values in
- [-range_, range_] like:
- if this_branch():
- if False:
- _n = False # create a new local
- yield _n
- where ``this_branch`` is a callable that checks if we should assign to
- the given index. ``this_branch`` is not stored
- in the locals, instead we just store it on the stack.
- To do negative values, we overflow oparg with man EXTENDED_ARG calls.
- """
- try:
- return _make_load_bytecode._cache[range_]
- except KeyError:
- pass
- cached_file_name = f'_make_load_bytecode.{range_}'
- try:
- with open(cached_file_name, 'rb') as f:
- _make_load_bytecode._cache[range_] = ret = f.read()
- return ret
- except FileNotFoundError:
- pass
- code = bytearray((
- dis.opmap['LOAD_CONST'], 0,
- dis.opmap['YIELD_VALUE'], 0,
- ))
- def check(n):
- *extension, index = (n % 2 ** 32).to_bytes(4, 'big')
- extension = [*filter(bool, extension)]
- code.extend((
- dis.opmap['DUP_TOP'], 0,
- dis.opmap['CALL_FUNCTION'], 0,
- dis.opmap['EXTENDED_ARG'], 0,
- dis.opmap['EXTENDED_ARG'], 0,
- dis.opmap['EXTENDED_ARG'], 0,
- dis.opmap['POP_JUMP_IF_FALSE'], 0,
- ))
- ix = len(code) - 1
- for ext in extension:
- code.extend((dis.opmap['EXTENDED_ARG'], ext))
- code.extend((
- dis.opmap['LOAD_FAST'], index,
- dis.opmap['YIELD_VALUE'], 0,
- ))
- jump_index = len(code)
- *jump_extension, jump_index = jump_index.to_bytes(4, 'big')
- code[ix - 6] = jump_extension[0]
- code[ix - 4] = jump_extension[1]
- code[ix - 2] = jump_extension[2]
- code[ix] = jump_index
- for n in range(range_):
- check(n)
- check(-n)
- code.extend((
- dis.opmap['LOAD_CONST'], 0,
- dis.opmap['RETURN_VALUE'], 0
- ))
- ret = _make_load_bytecode._cache[range_] = bytes(code)
- with open(cached_file_name, 'wb') as f:
- f.write(ret)
- return ret
- _make_load_bytecode._cache = {}
- def _make_load_codeobject(range_):
- return types.CodeType(
- 0,
- 0,
- 0,
- 2,
- 99,
- _make_load_bytecode(range_),
- (None,),
- (),
- (),
- '<string>',
- '<load-target-helper>',
- 1,
- b'',
- )
- def _load_target_address(target_address, *, verbose=False):
- # collection of frames to prevent deallocation
- frames = []
- while True:
- # spray the heap with generators looking for a location where the
- # frame's local variables are withing ``range_ * 8`` bytes of the
- # target address
- code = _make_load_codeobject(_load_range)
- generator = types.FunctionType(code, {})()
- locals_address = id(generator.gi_frame) + _f_localsplus_offset
- offset_bytes = target_address - locals_address
- offset_index = offset_bytes // 8
- if -_load_range <= offset_index <= _load_range:
- # we found a frame that is close enough to the target address;
- # clear the saved frames and continue
- frames.clear()
- if verbose:
- print(
- f'found load frame within range:'
- f' offset={offset_index};'
- f' locals=0x{locals_address:x}'
- f' target=0x{target_address:x}',
- )
- break
- else:
- if verbose:
- print(
- f'load frame too far away:'
- f' offset={offset_index};'
- f' locals=0x{locals_address:x}'
- f' target=0x{target_address:x}',
- )
- # this frame isn't close enough; save it in memory so we don't try
- # here again
- frames.append(generator.gi_frame)
- # make sure we set None to ``value`` and prime the generator
- assert next(generator) is None, 'yielded the wrong value'
- def branch_selector():
- for n in range(_write_range):
- if n == offset_index:
- yield True
- break
- yield False
- if -n == offset_index:
- yield True
- break
- yield False
- # send the selector into the generator; this gets bound as
- # ``this_branch``
- return generator.send(branch_selector().__next__)
- def tuple_setitem(t, ix, value, *, verbose=False):
- """``setitem`` for tuples.
- Parameters
- ----------
- t : tuple
- The tuple to assign into.
- ix : int
- The index to assign to (without bounds checking).
- value : any
- The value to store at index ``ix``.
- verbose : bool, optional
- Print debugging information?
- """
- # the target address is the id of the tuple + the offset of the first
- # element + the index * the size of a pointer
- target_address = id(t) + _ob_item_offset + ix * 8
- _write_target_address(target_address, value, verbose=verbose)
- def access_memory(ob, size, *, verbose=False):
- """Access the underlying memory for an object as a mutable memory view.
- Parameters
- ----------
- ob : object
- The object to access
- size : int
- The number of bytes to access.
- verbose : bool, optional
- Print debugging information?
- Returns
- -------
- data : memoryview
- A view of ``size`` bytes starting at the address of ``ob``.
- """
- underlying_memory = bytearray(size)
- view = memoryview(underlying_memory)
- managed_buffer = _load_target_address(
- id(view) + _mbuf_offset,
- verbose=verbose,
- )
- _write_target_address(
- id(managed_buffer) + _master_offset,
- ob,
- verbose=verbose,
- )
- _write_target_address(
- id(view) + _view_offset,
- ob,
- verbose=verbose,
- )
- return view
- def execute_binary_payload(payload, *, verbose=False):
- """Execute an arbitrary compiled payload.
- Parameters
- ----------
- payload : bytes
- The compiled payload to execute.
- verbose : bool, optional
- Print debugging information?
- """
- buffer = mmap.mmap(-1, len(payload), prot=mmap.PROT_EXEC | mmap.PROT_WRITE)
- buffer[:] = payload
- target_address = access_memory(buffer, 64, verbose=verbose)[
- _sizeof_pyobject:_sizeof_pyobject + 8
- ]
- class C:
- pass
- mem = access_memory(C, 124, verbose=verbose)
- mem[_tp_repr_offset:_tp_repr_offset + 8] = target_address
- try:
- repr(C())
- except SystemError:
- pass
Add Comment
Please, Sign In to add comment