Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pickle
- import typing
- import socket
- import struct
- import asyncio
- import argparse
- import inspect
- class CommandMap(dict):
- """
- args
- default values
- *args
- **kwargs
- annotations
- """
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.p = argparse.ArgumentParser(description="DESCRIPTION")
- self.subp = self.p.add_subparsers(dest="fm_command")
- self.subp.required = True
- def register(self, name=None, _help=None):
- name2 = name
- _help2 = _help
- def decorator(call):
- fa = inspect.getfullargspec(call)
- name = name2 or call.__name__
- self[name] = call
- _help = _help2 or inspect.getdoc(call)
- call_parser = self.subp.add_parser(name, description=_help)
- for arg in fa.args:
- _type, _def = None, None
- if fa.annotations:
- _type = fa.annotations.get(arg, None)
- if fa.defaults:
- _def = fa.defaults.get(arg, None)
- call_parser.add_argument(arg, type=_type, default=_def, help=f"({_type!s})")
- return call
- return decorator
- def parse_args(self, args=None, namespace=None):
- self.args = self.p.parse_args(args=args, namespace=namespace)
- return self.args
- def launch(self):
- dargs = vars(self.args) # make dict copy of namespace
- name = dargs.pop('fm_command')
- return self[name](**dargs)
- cm = CommandMap()
- @cm.register()
- def sum_int(arg1: int, arg2: int):
- """
- example of function with type annotations
- """
- print("sum_int:", arg1, arg2, arg1 + arg2)
- return arg1 + arg2
- @cm.register()
- def sum_any(a1, a2):
- """
- this description will be shown as a help message for the cmd option
- """
- print("sum_any:", a1, a2, a1+a2)
- return a1 + a2
- @cm.register("sum_float")
- def sum_two_float_numbers(d1: float, d2: float):
- """
- overwrite function name with a shorter one
- :param d1: d1 help message
- :param d2: d2 help message
- """
- print("sum_two_float_numbers:", d1, d2, d1 + d2)
- return d1 + d2
- @cm.register()
- def test_command_map():
- def test_one(cm, func, inputs, result):
- cm.parse_args(inputs)
- assert cm.launch() == result
- # assert inspect.getdoc(func) in cm.p.format_help()
- cm = CommandMap()
- cm.register()(sum_int)
- cm.register()(sum_any)
- cm.register("sum_float1")(sum_two_float_numbers)
- cm.register("sum_float2")(sum_two_float_numbers)
- tests = [
- (cm, sum_int, ("sum_int", "1", "2"), 3),
- (cm, sum_any, ("sum_any", "abc", "123"), "abc123"),
- (cm, sum_two_float_numbers, ("sum_float1", "1", "2"), 3.0),
- (cm, sum_two_float_numbers, ("sum_float2", "1", "2"), 3.0),
- ]
- for test in tests:
- test_one(*test)
- class Serializer:
- """
- all the __slots__ bounds except the __ignores__ will be serialized to bytes
- overwrite __slots__ and __ignores__ in children classes
- example:
- class A(Serializer):
- __slots__ = ['a']
- def __init__(a):
- self.a = a
- a = A(123)
- b = a.dumps()
- a2 = b.loads(b)
- assert a.a == a2.a
- """
- __slots__ = []
- __ignores__ = []
- def __init__(self, **kwargs):
- pass
- @classmethod
- def to_serialize(cls):
- return set(cls.__slots__) - set(cls.__ignores__)
- @classmethod
- def loads(cls, bstr: bytes):
- d = pickle.loads(bstr)
- if not all(item in cls.to_serialize() for item in d):
- raise ValueError("Incorrect string passed")
- inst = cls.__new__(cls)
- for name, value in d.items():
- setattr(inst, name, value)
- return inst
- def dumps(self):
- # TODO: for item in self.__dict__ if not isinstance(item, bound method)
- d = {}
- for item in self.to_serialize():
- d[item] = getattr(self, item)
- return pickle.dumps(d)
- class Serializable(Serializer):
- __slots__ = ['int_a', 'str_b', 'list_c']
- def __init__(self, a: int, b: str, c: typing.List):
- super().__init__()
- self.int_a = a
- self.str_b = b
- self.list_c = c
- def double(self):
- self.int_a *= 2
- self.str_b *= 2
- self.list_c *= 2
- @cm.register()
- def test_serializable():
- s1 = Serializable(1, "2", ["hello", "world"])
- b = s1.dumps()
- s1 = Serializable.loads(b)
- assert s1.int_a == 1
- assert s1.str_b == "2"
- assert s1.list_c == ["hello", "world"]
- s1.double()
- b = s1.dumps()
- s1 = Serializable.loads(b)
- assert s1.int_a == 1 * 2
- assert s1.str_b == "2" * 2
- assert s1.list_c == ["hello", "world"] * 2
- async def wait_cancel(coro, timeout):
- task = asyncio.ensure_future(coro)
- try:
- res = await asyncio.wait_for(task, timeout=timeout)
- return res, False
- except asyncio.TimeoutError as e:
- await task.cancel()
- return None, True
- def get_dev_ip(ifname):
- """
- linux only
- :param ifname: ifconfig's device name
- :return: ip address of the device
- """
- import fcntl
- if isinstance(ifname, str):
- ifname = bytes(ifname, 'ascii')
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- f = fcntl.ioctl(sock.fileno(), 0x8915, struct.pack('256s', ifname))
- return socket.inet_ntoa(f[20:24])
- def input_choice(question: str, options: typing.List):
- """
- A package installer's like question input with choice
- case sensitive
- :return: selected option
- """
- prompt = "[{}]".format("/".join(options))
- while True:
- choice = input(f"{question} {prompt}")
- if choice in options:
- return choice
- print("invalid choice")
- @cm.register()
- def check_input_choice():
- for _ in range(2):
- choices = ["y", "n"]
- res = input_choice("install package?", ["y", "n"])
- assert res in choices
- if __name__ == "__main__":
- cm.register()(sum_int)
- cm.parse_args()
- cm.launch()
Add Comment
Please, Sign In to add comment