Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Simplified VM code which works for some cases.
- You need extend/rewrite code to pass all cases.
- """
- import builtins
- import dis
- import types
- import typing as tp
- class Block(object):
- def __init__(self, block_type: str, depth: int, handler: tp.Optional[str]):
- self.type = block_type
- self.handler = handler
- self.depth = depth
- def bind_args(code: types.CodeType, defaults: tp.Tuple[tp.Any], kwdefaults: tp.Dict[str, tp.Any],
- args: tp.Any, kwargs: tp.Any) -> tp.Dict[str, tp.Any]:
- parsed_args: tp.Dict[str, tp.Any] = dict()
- argcount: int = code.co_argcount
- kw_argcount: int = code.co_kwonlyargcount
- names: tp.Tuple[str, ...] = code.co_varnames
- flags = set()
- for flag_key in dis.COMPILER_FLAG_NAMES.keys():
- if flag_key & code.co_flags:
- flags.add(flag_key)
- args_in = 4 in flags
- kwargs_in = 8 in flags
- # default kwargs and args
- args_name_index = argcount + kw_argcount
- if args_in:
- parsed_args[names[args_name_index]] = ()
- args_name = names[args_name_index]
- if kwargs_in:
- parsed_args[names[args_name_index + 1]] = {}
- kwargs_name = names[args_name_index + 1]
- else:
- if kwargs_in:
- parsed_args[names[args_name_index]] = {}
- kwargs_name = names[args_name_index]
- # defaults
- if len(defaults) > 0:
- default_in: tp.Tuple[str, ...] = names[argcount - len(defaults):argcount]
- no_default = names[:argcount - len(defaults)]
- for key, value in zip(default_in, defaults):
- parsed_args[key] = value
- else:
- default_in = ()
- no_default = names[:argcount]
- # kwdefaults
- if len(kwdefaults) > 0:
- kwdefault_in = set(kwdefaults)
- for key, value in kwdefaults.items():
- parsed_args[key] = value
- else:
- kwdefault_in = set()
- # args
- if len(args) > argcount:
- if args_in:
- for index, arg in enumerate(args[:argcount]):
- parsed_args[names[index]] = arg
- parsed_args[args_name] = tuple(args[argcount:])
- else:
- raise TypeError("args")
- else:
- for index, arg in enumerate(args):
- parsed_args[names[index]] = arg
- # kwargs
- new_kwargs = dict()
- for name, value in kwargs.items():
- if name in names:
- if name in parsed_args and name not in default_in and name not in kwdefault_in:
- raise TypeError("kwargs")
- else:
- parsed_args[name] = value
- else:
- if kwargs_in:
- new_kwargs[name] = value
- else:
- raise TypeError("kwargs")
- if kwargs_in:
- parsed_args[kwargs_name] = new_kwargs
- # other
- for name in no_default:
- if name not in parsed_args:
- raise TypeError("other")
- # kwonly
- if kw_argcount > 0:
- kw_only = names[argcount:argcount + kw_argcount]
- for name in kw_only:
- if name not in parsed_args:
- raise TypeError("kwonly")
- return parsed_args
- class Frame:
- def __init__(self,
- frame_code: types.CodeType,
- frame_builtins: tp.Dict[str, tp.Any],
- frame_globals: tp.Dict[str, tp.Any],
- frame_locals: tp.Dict[str, tp.Any]) -> None:
- self.code = frame_code
- self.builtins = frame_builtins
- self.globals = frame_globals
- self.locals = frame_locals
- self.data_stack: tp.Any = []
- self.return_value = None
- self.block_stack: tp.List[Block] = []
- self.last_exception = None
- self.next_instr = None
- self.jump_pos = 0
- def top(self) -> tp.Any:
- return self.data_stack[-1]
- def pop(self) -> tp.Any:
- return self.data_stack.pop()
- def pop_block(self) -> tp.Any:
- return self.block_stack.pop()
- def unwind_block(self, block: tp.Any) -> None:
- if block.type == 'except-handler':
- offset = 3
- else:
- offset = 0
- while len(self.data_stack) > block.level + offset:
- self.pop()
- if block.type == 'except-handler':
- tb, value, exctype = self.popn(3)
- self.last_exception = exctype, value, tb
- def push(self, *values: tp.Any) -> None:
- self.data_stack.extend(values)
- def push_block(self, block_type: str, handler: tp.Optional[str] = None) -> None:
- depth = len(self.data_stack)
- self.block_stack.append(Block(block_type, depth, handler))
- def popn(self, n: int) -> tp.Any:
- if n > 0:
- returned = self.data_stack[-n:]
- self.data_stack[-n:] = []
- return returned
- else:
- return []
- def run(self) -> tp.Any:
- current_pos = 0
- instr_list = list(dis.get_instructions(self.code))
- while current_pos < len(instr_list):
- instruction = instr_list[current_pos]
- if current_pos < len(instr_list) - 1:
- self.next_instr = instr_list[current_pos + 1].argval
- getattr(self, instruction.opname.lower() + "_op")(instruction.argval)
- if self.jump_pos:
- current_pos = 0
- for inst in instr_list:
- if inst.offset >= self.jump_pos:
- break
- current_pos += 1
- self.jump_pos = 0
- else:
- current_pos += 1
- return self.return_value
- def call_function_op(self, arg: int) -> None:
- arguments = self.popn(arg)
- f = self.pop()
- self.push(f(*arguments))
- def call_function_kw_op(self, args: int) -> None:
- keys = self.pop()
- values = self.popn(len(keys))
- arguments = self.popn(args - len(keys))
- kwargs = {key: value for key, value in zip(keys, values)}
- func = self.pop()
- self.push(func(*arguments, **kwargs))
- def load_name_op(self, arg: str) -> None:
- if arg in self.locals:
- self.push(self.locals[arg])
- elif arg in self.globals:
- self.push(self.globals[arg])
- elif arg in self.builtins:
- self.push(self.builtins[arg])
- else:
- raise NameError
- def load_global_op(self, arg: str) -> None:
- if arg in self.globals:
- self.push(self.globals[arg])
- elif arg in self.builtins:
- self.push(self.builtins[arg])
- else:
- raise NameError
- def load_const_op(self, arg: tp.Any) -> None:
- self.push(arg)
- def load_fast_op(self, name: str) -> None:
- self.push(self.locals[name])
- def store_fast_op(self, name: str) -> None:
- if name in self.locals:
- self.locals[name] = self.pop()
- else:
- raise UnboundLocalError
- def delete_fast_op(self, name: str) -> None:
- if name in self.locals:
- del self.locals[name]
- else:
- raise UnboundLocalError
- def load_attr_op(self, attr: str) -> None:
- obj = self.pop()
- val = getattr(obj, attr)
- self.push(val)
- def store_attr_op(self, name: str) -> None:
- val, obj = self.popn(2)
- setattr(obj, name, val)
- def delete_attr_op(self, name: str) -> None:
- obj = self.pop()
- delattr(obj, name)
- def delete_global_op(self, name: str) -> None:
- del self.globals[name]
- def delete_name_op(self, name: str) -> None:
- del self.locals[name]
- def return_value_op(self, *args: tp.Any) -> None:
- self.return_value = self.pop()
- def pop_top_op(self, *args: tp.Any) -> None:
- self.pop()
- def pop_block_op(self, *args: tp.Any) -> None:
- self.pop_block()
- def import_name_op(self, name: str) -> None:
- lvl, froml = self.popn(2)
- self.push(__import__(name, self.globals, self.locals, froml, lvl))
- def import_from_op(self, name: str) -> None:
- self.push(getattr(self.top(), name))
- def import_star_op(self, arg: tp.Any = None) -> None:
- mod = self.pop()
- for attr in dir(mod):
- if attr[0] != '_':
- self.locals[attr] = getattr(mod, attr)
- def make_function_op(self, arg: int) -> None:
- name = self.pop()
- code = self.pop()
- if arg & 0x08:
- pass
- if arg & 0x04:
- pass
- if arg & 0x02:
- kwonly_defaults = self.pop()
- else:
- kwonly_defaults = {}
- if arg & 0x01:
- defaults = self.pop()
- else:
- defaults = ()
- local = self.locals
- def f(*args: tp.Any, **kwargs: tp.Any) -> tp.Any:
- parsed_args: tp.Dict[str, tp.Any] = bind_args(code, defaults, kwonly_defaults, args, kwargs)
- f_locals = dict(local)
- f_locals.update(parsed_args)
- frame = Frame(code, self.builtins, self.globals, f_locals) # Run code in prepared environment
- return frame.run()
- f.__name__ = name
- self.push(f)
- def store_name_op(self, arg: str) -> None:
- const = self.pop()
- self.locals[arg] = const
- def load_method_op(self, method_name: str) -> None:
- obj = self.pop()
- method = getattr(obj, method_name)
- if hasattr(method, '__self__'):
- self.push(None)
- self.push(method)
- else:
- self.push(method)
- self.push(obj)
- def call_method_op(self, argc: int) -> None:
- arguments = self.popn(argc)
- x, y = self.popn(2)
- if y is None:
- self.push(x(*arguments))
- else:
- self.push(y(*arguments))
- # Logic
- comp_ops = {
- '<': lambda x, y: x < y,
- '<=': lambda x, y: x <= y,
- '==': lambda x, y: x == y,
- '!=': lambda x, y: x != y,
- '>': lambda x, y: x > y,
- '>=': lambda x, y: x >= y,
- 'is': lambda x, y: x is y,
- 'is not': lambda x, y: x is not y,
- 'in': lambda x, y: x in y,
- 'not in': lambda x, y: x not in y,
- }
- def compare_op_op(self, operation: str) -> None:
- x, y = self.popn(2)
- if operation in self.comp_ops:
- self.push(self.comp_ops[operation](x, y))
- # EXCEPTION!!!
- # Unary Operations
- def unary_negative_op(self, *args: tp.Any) -> None:
- self.push(-self.pop())
- def unary_positive_op(self, *args: tp.Any) -> None:
- self.push(abs(self.pop()))
- def unary_not_op(self, *args: tp.Any) -> None:
- self.push(not self.pop())
- def unary_invert_op(self, *args: tp.Any) -> None:
- self.push(~self.pop())
- # Inplace Operations
- def inplace_power_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x ** y)
- def inplace_multiply_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x * y)
- def inplace_matrix_multiply_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x @ y)
- def inplace_floor_divide_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x // y)
- def inplace_true_divide_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x / y)
- def inplace_modulo_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x % y)
- def inplace_add_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x + y)
- def inplace_subtract_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x - y)
- def inplace_lshift_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x << y)
- def inplace_rshift_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x >> y)
- def inplace_and_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x & y)
- def inplace_xor_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x ^ y)
- def inplace_or_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x | y)
- # Binary Operations
- def binary_power_op(self, *args: tp.Any) -> None:
- self.inplace_power_op(args)
- def binary_multiply_op(self, *args: tp.Any) -> None:
- self.inplace_multiply_op(args)
- def binary_matrix_multiply_op(self, *args: tp.Any) -> None:
- self.inplace_matrix_multiply_op(args)
- def binary_floor_divide_op(self, *args: tp.Any) -> None:
- self.inplace_floor_divide_op(args)
- def binary_true_divide_op(self, *args: tp.Any) -> None:
- self.inplace_true_divide_op(args)
- def binary_modulo_op(self, *args: tp.Any) -> None:
- self.inplace_modulo_op(args)
- def binary_add_op(self, *args: tp.Any) -> None:
- self.inplace_add_op(args)
- def binary_subtract_op(self, *args: tp.Any) -> None:
- self.inplace_subtract_op(args)
- def binary_lshift_op(self, *args: tp.Any) -> None:
- self.inplace_lshift_op(args)
- def binary_rshift_op(self, *args: tp.Any) -> None:
- self.inplace_rshift_op(args)
- def binary_and_op(self, *args: tp.Any) -> None:
- self.inplace_and_op(args)
- def binary_xor_op(self, *args: tp.Any) -> None:
- self.inplace_xor_op(args)
- def binary_or_op(self, *args: tp.Any) -> None:
- self.inplace_or_op(args)
- def binary_subscr_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(x[y])
- def store_subscr_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- x[y] = self.pop()
- self.push(x)
- self.push(y)
- def delete_subscr_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- del x[y]
- self.push(x)
- self.push(y)
- def store_global_op(self, name: str) -> None:
- self.globals[name] = self.pop()
- # Stack
- def dup_top_op(self, *args: tp.Any) -> None:
- self.push(self.top())
- def dup_top_two_op(self, *args: tp.Any) -> None:
- x = self.pop()
- y = self.top()
- self.push(x)
- self.push(y)
- self.push(x)
- def rot_two_op(self, *args: tp.Any) -> None:
- x, y = self.popn(2)
- self.push(y)
- self.push(x)
- def rot_three_op(self, *args: tp.Any) -> None:
- x, y, z = self.popn(3)
- self.push(z)
- self.push(y)
- self.push(x)
- """"
- def pop_except_op(self) -> None:
- block = self.pop_block()
- if block.type != 'except-handler':
- raise Exception("popped block is not an except handler")
- self.unwind_block(block)
- """
- # Jumps
- def jump_forward_op(self, ind: int) -> None:
- self.jump_pos = ind
- def jump_absolute_op(self, ind: int) -> None:
- self.jump_pos = ind
- def pop_jump_if_true_op(self, ind: int) -> None:
- if self.pop():
- self.jump_pos = ind
- def pop_jump_if_false_op(self, ind: int) -> None:
- if not self.pop():
- self.jump_pos = ind
- def jump_if_true_or_pop_op(self, ind: int) -> None:
- if self.top():
- self.jump_pos = ind
- else:
- self.pop()
- def jump_if_false_or_pop(self, ind: int) -> None:
- if not self.top():
- self.jump_pos = ind
- else:
- self.pop()
- # Loop
- def setup_loop_op(self, args: tp.Any) -> None:
- self.push_block('loop', args)
- def get_iter_op(self, args: tp.Any) -> None:
- self.push(iter(self.pop()))
- def for_iter_op(self, step: int, *args: tp.Any) -> None:
- iterat = self.pop()
- try:
- next_it = iterat.__next__()
- self.push(iterat)
- self.push(next_it)
- except StopIteration:
- self.jump_pos = step
- # Building
- def build_list_op(self, size: int) -> None:
- self.push(self.popn(size))
- def build_list_unpack_op(self, size: int) -> None:
- elems = self.popn(size)
- unp_list: tp.List[tp.Any] = []
- for elem in elems:
- unp_list += elem
- self.push(unp_list)
- def list_append_op(self, i: int) -> None:
- new_list = self.pop()
- list.append(new_list[-i], new_list)
- self.push(new_list)
- def build_tuple_op(self, size: int) -> None:
- self.push(tuple(self.popn(size)))
- def build_tuple_unpack_op(self, size: int) -> None:
- elems = self.popn(size)
- unp_tuple: tp.Tuple[tp.Any, ...] = tuple()
- for elem in elems:
- unp_tuple += elem
- self.push(unp_tuple)
- def build_map_op(self, size: int) -> None:
- elems = self.popn(2 * size)
- new_map = {}
- for i in range(0, 2 * size, 2):
- new_map[elems[i]] = elems[i + 1]
- self.push(new_map)
- def build_map_unpack_op(self, size: int) -> None:
- elems = self.popn(size)
- unp_map: tp.Dict[tp.Any, tp.Any] = {}
- for elem in elems:
- unp_map.update(elem)
- self.push(unp_map)
- def build_const_key_map_op(self, size: int) -> None:
- keys = self.pop()
- values = self.popn(size)
- new_map = {key: value for key, value in zip(keys, values)}
- self.push(new_map)
- def build_set_op(self, size: int) -> None:
- self.push(set(self.popn(size)))
- def build_set_unpack_op(self, size: int) -> None:
- elems = self.popn(size)
- self.push(set().union(*elems))
- def build_slice_op(self, args: int) -> None:
- if args == 2:
- x, y = self.popn(2)
- self.push(slice(x, y))
- elif args == 3:
- x, y, z = self.popn(3)
- self.push(slice(x, y, z))
- else:
- raise Exception('incorrect set of arguments for slice')
- def build_string_op(self, size: int) -> None:
- elems = self.popn(size)
- string = ''
- for elem in elems:
- string += elem
- self.push(string)
- def format_value_op(self, flags: tp.Any) -> None:
- pass
- def unpack_sequence_op(self, count: int) -> None:
- seq = self.pop()
- for x in reversed(seq):
- self.push(x)
- def extended_arg_op(self, ext: tp.Any) -> None:
- instr_arg = self.next_instr
- instr_arg |= ext << 8
- self.next_instr = instr_arg
- """
- def do_raise(self, exc: tp.Any, cause: tp.Any) -> tp.Any:
- if exc is None:
- exc_type, val, tb = self.last_exception
- if exc_type is None:
- return 'exception'
- else:
- return 'reraise'
- elif type(exc) == type:
- exc_type = exc
- val = exc()
- elif isinstance(exc, BaseException):
- exc_type = type(exc)
- val = exc
- else:
- return 'exception'
- if cause:
- if type(cause) == type:
- cause = cause()
- elif not isinstance(cause, BaseException):
- return 'exception'
- val.__cause__ = cause
- self.last_exception = exc_type, val, val.__traceback__
- return 'exception'
- def raise_varargs_op(self, argc: int) -> None:
- cause = exc = None
- if argc == 2:
- cause = self.pop()
- exc = self.pop()
- elif argc == 1:
- exc = self.pop()
- return self.do_raise(exc, cause)
- """
- class VirtualMachine:
- def run(self, code_obj: types.CodeType) -> None:
- """
- :param code_text_or_obj: code for interpreting
- """
- globals_context: tp.Dict[str, tp.Any] = {}
- frame = Frame(code_obj, builtins.globals()['__builtins__'], globals_context, globals_context)
- return frame.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement