Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Simplified VM code which works for some cases.
- You need extend/rewrite code to pass all cases.
- """
- import builtins
- import dis
- import types
- import typing as tp
- import operator
- import sys
- class Frame:
- def __init__(self, f_code: types.CodeType,
- f_locals: tp.Dict[str, tp.Any],
- f_globals: tp.Dict[str, tp.Any],
- f_prev: tp.Any):
- self.f_code = f_code
- self.f_globals = f_globals
- self.f_locals = f_locals
- self.f_prev = f_prev
- self.stack: tp.Any = []
- self.block_stack: tp.Any = []
- if f_prev:
- self.f_builtins = f_prev.f_builtins
- else:
- self.f_builtins = builtins.__dict__
- self.cur_idx = 0
- self.co_names = f_code.co_names
- self.co_consts = f_code.co_consts
- self.co_varnames = f_code.co_varnames
- self.instructions = list(dis.get_instructions(f_code))
- def equal_type(l: tp.Any, r: tp.Any) -> bool:
- return isinstance(l, type(r))
- class Block(object):
- def __init__(self, type: tp.Any, handler: tp.Any, frame: tp.Any) -> None:
- self.type = type
- self.handler = handler
- self.frame = frame
- def build_code_object() -> types.CodeType:
- code_str = ''
- return compile(code_str, '', 'exec')
- frame_object = Frame(build_code_object(), {}, {}, {})
- class VirtualMachine:
- def __init__(self) -> None:
- self.frames: tp.Any = []
- self.frame = frame_object # Remove it later (change for None)
- self.last_exception: tp.Any = ()
- self.value = None
- self.operations = {
- 'POWER': operator.pow,
- 'MULTIPLY': operator.mul,
- 'MATRIX_MULTIPLY': operator.matmul,
- 'FLOOR_DIVIDE': operator.floordiv,
- 'TRUE_DIVIDE': operator.truediv,
- 'MODULO': operator.mod,
- 'ADD': operator.add,
- 'SUBTRACT': operator.sub,
- 'SUBSCR': operator.getitem,
- 'LSHIFT': operator.lshift,
- 'RSHIFT': operator.rshift,
- 'AND': operator.and_,
- 'XOR': operator.xor,
- 'OR': operator.or_,
- }
- self.compare_funcs = {
- '<': operator.lt,
- '<=': operator.le,
- '==': operator.eq,
- '!=': operator.ne,
- '>=': operator.ge,
- '>': operator.gt,
- 'in': lambda x, y: x in y,
- 'not in': lambda x, y: x not in y,
- 'is': lambda x, y: x is y,
- 'is not': lambda x, y: x is not y,
- 'exception match': equal_type,
- }
- def run(self, code: types.CodeType) -> None:
- """
- :param code: code for interpreting
- """
- frame = Frame(code, {}, {}, None)
- self.run_frame(frame)
- def pop_frame(self) -> tp.Any:
- self.frames.pop()
- self.frame = self.frames[-1] if self.frames else None
- def push_frame(self, frame: tp.Any) -> None:
- self.frames.append(frame)
- self.frame = frame
- def push(self, *args: tp.Any) -> tp.Any:
- for arg in args:
- self.frame.stack.append(arg)
- def pop(self) -> tp.Any:
- return self.frame.stack.pop()
- def popn(self, n: int) -> tp.Any:
- return list(reversed([self.pop() for _ in range(n)])) if n else []
- def top(self) -> tp.Any:
- return self.frame.stack[-1]
- def pop_block(self) -> tp.Any:
- return self.frame.block_stack.pop()
- def push_block(self, type: tp.Any, delta: tp.Any = None) -> tp.Any:
- self.frame.block_stack.append(Block(type, delta, self.frame))
- def parse_byte_and_args(self) -> tp.Tuple[str, tp.Optional[int]]:
- instruction = self.frame.instructions[self.frame.cur_idx]
- byte_name = instruction.opname
- arg = instruction.arg
- self.frame.cur_idx += 1
- return byte_name, arg
- def run_func(self, byte_name: str, argument: tp.Optional[int]) -> tp.Any:
- try:
- result = None
- if byte_name.split("_")[0] == "BINARY":
- self.run_binary_operation(byte_name)
- elif byte_name.split("_")[0] == "INPLACE":
- self.run_inplace_operation(byte_name)
- else:
- bytecode_fn = getattr(self, byte_name.lower() + '_op')
- if argument is None:
- result = bytecode_fn()
- else:
- result = bytecode_fn(argument)
- except Exception:
- result = 'exception'
- self.last_exception = sys.exc_info()[:2] + (None,)
- return result
- def handle_exception(self) -> bool:
- if self.frame.block_stack:
- cur_block = self.pop_block()
- if cur_block.type == "except":
- self.push_block('except-handler')
- exc_type, value, tb = self.last_exception
- self.push(tb, value, exc_type)
- self.push(tb, value, exc_type)
- self.frame.cur_idx = cur_block.delta
- return True
- return False
- def run_frame(self, frame: tp.Any) -> tp.Any:
- self.push_frame(frame)
- func_result: tp.Any = None
- while len(self.frame.instructions) > frame.cur_idx:
- byte_name, arg = self.parse_byte_and_args()
- func_result = self.run_func(byte_name, arg)
- if func_result == "exception":
- handled = self.handle_exception()
- if not handled:
- exception, value, traceback = self.last_exception
- exc = exception(value)
- exc.__traceback__ = traceback
- raise exc
- if func_result == "yield":
- func_result = False
- break
- self.pop_frame()
- return func_result
- def run_binary_operation(self, operation: str) -> None:
- arg1, arg2 = self.popn(2)
- operation_name = operation.replace("BINARY_", "")
- self.push(self.operations[operation_name](arg1, arg2))
- def run_inplace_operation(self, operation: str) -> None:
- arg1, arg2 = self.popn(2)
- operation_name = operation.replace("INPLACE_", "")
- self.push(self.operations[operation_name](arg1, arg2))
- def call_function_op(self, arg: int) -> None:
- args = self.popn(arg)
- f = self.pop()
- self.push(f(*args))
- def load_name_op(self, arg: tp.Any) -> None:
- name = self.frame.co_names[arg]
- if name in self.frame.f_locals:
- val = self.frame.f_locals[name]
- elif name in self.frame.f_globals:
- val = self.frame.f_globals[name]
- elif name in self.frame.f_builtins:
- val = self.frame.f_builtins[name]
- else:
- raise NameError("name '%s' is not defined" % name)
- self.push(val)
- def load_global_op(self, arg: tp.Any) -> None:
- name = self.frame.co_names[arg]
- if name in self.frame.f_globals:
- val = self.frame.f_globals[name]
- elif name in self.frame.f_builtins:
- val = self.frame.f_builtins[name]
- else:
- raise NameError("global name '%s' is not defined" % name)
- self.push(val)
- def load_const_op(self, arg: tp.Any) -> None:
- self.push(self.frame.co_consts[arg])
- def return_value_op(self) -> tp.Any:
- return self.pop()
- def pop_top_op(self) -> None:
- self.pop()
- def make_function_op(self, arg: int) -> None:
- def fill_args() -> None:
- if arg & 0x08:
- parsed_args['closure'] = self.pop()
- if arg & 0x04:
- parsed_args['annotations'] = self.pop()
- if arg & 0x02:
- parsed_args['kwdefaults'] = self.pop()
- if arg & 0x01:
- parsed_args['defaults'] = self.pop()
- func_code, func_name = self.popn(2)
- parsed_args: tp.Dict[str, tp.Any] = {}
- fill_args()
- func = Function(func_name, func_code, self.frame.f_globals, self, arg, parsed_args)
- self.push(func)
- def store_name_op(self, arg: tp.Any) -> None:
- self.frame.f_locals[self.frame.co_names[arg]] = self.pop()
- def store_global_op(self, arg: tp.Any) -> None:
- self.frame.f_globals[self.frame.co_names[arg]] = self.pop()
- def compare_op_op(self, name: tp.Any) -> None:
- arg1, arg2 = self.popn(2)
- self.push(self.compare_funcs[dis.cmp_op[name]](arg1, arg2))
- class Function(object):
- def __init__(self, name: tp.Any, code: tp.Any, globals_nm: tp.Any, vm: tp.Any,
- argc: tp.Any, kwargs_defaults: tp.Any) -> None:
- self.__name__ = name
- self.__doc__ = code.co_consts[0] if code.co_consts else None
- self.code = code
- self.f_globals = dict(globals_nm)
- self.f_globals.update(vm.frame.f_locals)
- self.f_globals.update({name: self})
- self.var_names = self.code.co_varnames
- self.f_locals = {"__name__": self.code.co_name}
- self.f_locals.update(kwargs_defaults.get("closure", {}))
- self.f_locals.update(kwargs_defaults.get("kwdefaults", {}))
- self.defaults = kwargs_defaults.get("defaults", ())
- self.vm = vm
- self.args_used = False
- if self.code.co_flags & 0x04:
- # function uses the *arguments
- self.args_used = True
- self.kwargs_used = False
- if self.code.co_flags & 0x08:
- # function uses the **keywords
- self.kwargs_used = True
- self.is_generator = False
- if self.code.co_flags & 0x20:
- # the function is a generator
- self.is_generator = True
- def __call__(self, *args: tp.Any, **kwargs: tp.Any) -> tp.Any:
- self.handle_args_kwargs(*args, **kwargs)
- self.frame = Frame(self.code, self.f_locals, self.f_globals, self.vm.frame)
- return self.vm.run_frame(self.frame)
- @staticmethod
- def check_func(args: tp.Any, code: tp.Any, defaults: tp.Any) -> tp.Any:
- if len(args) > code.co_argcount:
- raise TypeError()
- if len(args) + len(defaults) < code.co_argcount:
- raise TypeError()
- def handle_args_kwargs(self, *args: tp.Any, **kwargs: tp.Any) -> tp.Any:
- if not self.args_used:
- Function.check_func(args, self.code, self.defaults)
- self.f_locals.update({self.var_names[i]: args[i] for i in range(len(args))})
- defaults_took = len(args) + len(self.defaults) - self.code.co_argcount
- if defaults_took < 0:
- raise NotImplementedError
- for i in range(len(args), self.code.co_argcount):
- self.f_locals[self.var_names[i]] = self.defaults[defaults_took + i - len(args)]
- for key in self.var_names[:self.code.co_argcount]:
- if key in kwargs:
- self.f_locals[key] = kwargs[key]
- del kwargs[key]
- if self.kwargs_used:
- self.f_locals[self.var_names[self.code.co_argcount]] = kwargs
- else:
- self.f_locals.update({self.var_names[i]: args[i] for i in range(self.code.co_argcount)})
- args_left = args[self.code.co_argcount:]
- if self.kwargs_used:
- for key in self.var_names[self.code.co_argcount:self.code.co_argcount + self.code.co_kwonlyargcount]:
- if key in kwargs:
- self.f_locals[key] = kwargs[key]
- del kwargs[key]
- self.f_locals[self.var_names[self.code.co_argcount + self.code.co_kwonlyargcount + 1]] = kwargs
- self.f_locals[self.var_names[self.code.co_argcount + self.code.co_kwonlyargcount]] = args_left
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement