Guest User

Untitled

a guest
Jan 15th, 2019
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.98 KB | None | 0 0
  1. import pickle
  2. import typing
  3. import socket
  4. import struct
  5. import asyncio
  6. import argparse
  7. import inspect
  8.  
  9.  
  10. class CommandMap(dict):
  11. """
  12. args
  13. default values
  14. *args
  15. **kwargs
  16. annotations
  17. """
  18. def __init__(self, *args, **kwargs):
  19. super().__init__(*args, **kwargs)
  20. self.p = argparse.ArgumentParser(description="DESCRIPTION")
  21. self.subp = self.p.add_subparsers(dest="fm_command")
  22. self.subp.required = True
  23.  
  24. def register(self, name=None, _help=None):
  25. name2 = name
  26. _help2 = _help
  27.  
  28. def decorator(call):
  29. fa = inspect.getfullargspec(call)
  30. name = name2 or call.__name__
  31. self[name] = call
  32.  
  33. _help = _help2 or inspect.getdoc(call)
  34. call_parser = self.subp.add_parser(name, description=_help)
  35. for arg in fa.args:
  36. _type, _def = None, None
  37. if fa.annotations:
  38. _type = fa.annotations.get(arg, None)
  39. if fa.defaults:
  40. _def = fa.defaults.get(arg, None)
  41. call_parser.add_argument(arg, type=_type, default=_def, help=f"({_type!s})")
  42. return call
  43.  
  44. return decorator
  45.  
  46. def parse_args(self, args=None, namespace=None):
  47. self.args = self.p.parse_args(args=args, namespace=namespace)
  48. return self.args
  49.  
  50. def launch(self):
  51. dargs = vars(self.args) # make dict copy of namespace
  52. name = dargs.pop('fm_command')
  53. return self[name](**dargs)
  54.  
  55.  
  56. cm = CommandMap()
  57.  
  58.  
  59. @cm.register()
  60. def sum_int(arg1: int, arg2: int):
  61. """
  62. example of function with type annotations
  63. """
  64. print("sum_int:", arg1, arg2, arg1 + arg2)
  65. return arg1 + arg2
  66.  
  67.  
  68. @cm.register()
  69. def sum_any(a1, a2):
  70. """
  71. this description will be shown as a help message for the cmd option
  72. """
  73. print("sum_any:", a1, a2, a1+a2)
  74. return a1 + a2
  75.  
  76.  
  77. @cm.register("sum_float")
  78. def sum_two_float_numbers(d1: float, d2: float):
  79. """
  80. overwrite function name with a shorter one
  81. :param d1: d1 help message
  82. :param d2: d2 help message
  83. """
  84. print("sum_two_float_numbers:", d1, d2, d1 + d2)
  85. return d1 + d2
  86.  
  87.  
  88. @cm.register()
  89. def test_command_map():
  90. def test_one(cm, func, inputs, result):
  91. cm.parse_args(inputs)
  92. assert cm.launch() == result
  93. # assert inspect.getdoc(func) in cm.p.format_help()
  94.  
  95. cm = CommandMap()
  96. cm.register()(sum_int)
  97. cm.register()(sum_any)
  98. cm.register("sum_float1")(sum_two_float_numbers)
  99. cm.register("sum_float2")(sum_two_float_numbers)
  100. tests = [
  101. (cm, sum_int, ("sum_int", "1", "2"), 3),
  102. (cm, sum_any, ("sum_any", "abc", "123"), "abc123"),
  103. (cm, sum_two_float_numbers, ("sum_float1", "1", "2"), 3.0),
  104. (cm, sum_two_float_numbers, ("sum_float2", "1", "2"), 3.0),
  105. ]
  106. for test in tests:
  107. test_one(*test)
  108.  
  109.  
  110. class Serializer:
  111. """
  112. all the __slots__ bounds except the __ignores__ will be serialized to bytes
  113. overwrite __slots__ and __ignores__ in children classes
  114.  
  115. example:
  116. class A(Serializer):
  117. __slots__ = ['a']
  118.  
  119. def __init__(a):
  120. self.a = a
  121.  
  122. a = A(123)
  123. b = a.dumps()
  124. a2 = b.loads(b)
  125. assert a.a == a2.a
  126. """
  127. __slots__ = []
  128. __ignores__ = []
  129.  
  130. def __init__(self, **kwargs):
  131. pass
  132.  
  133. @classmethod
  134. def to_serialize(cls):
  135. return set(cls.__slots__) - set(cls.__ignores__)
  136.  
  137. @classmethod
  138. def loads(cls, bstr: bytes):
  139. d = pickle.loads(bstr)
  140. if not all(item in cls.to_serialize() for item in d):
  141. raise ValueError("Incorrect string passed")
  142. inst = cls.__new__(cls)
  143. for name, value in d.items():
  144. setattr(inst, name, value)
  145. return inst
  146.  
  147. def dumps(self):
  148. # TODO: for item in self.__dict__ if not isinstance(item, bound method)
  149. d = {}
  150. for item in self.to_serialize():
  151. d[item] = getattr(self, item)
  152. return pickle.dumps(d)
  153.  
  154.  
  155. class Serializable(Serializer):
  156. __slots__ = ['int_a', 'str_b', 'list_c']
  157.  
  158. def __init__(self, a: int, b: str, c: typing.List):
  159. super().__init__()
  160. self.int_a = a
  161. self.str_b = b
  162. self.list_c = c
  163.  
  164. def double(self):
  165. self.int_a *= 2
  166. self.str_b *= 2
  167. self.list_c *= 2
  168.  
  169.  
  170. @cm.register()
  171. def test_serializable():
  172. s1 = Serializable(1, "2", ["hello", "world"])
  173. b = s1.dumps()
  174. s1 = Serializable.loads(b)
  175. assert s1.int_a == 1
  176. assert s1.str_b == "2"
  177. assert s1.list_c == ["hello", "world"]
  178. s1.double()
  179. b = s1.dumps()
  180. s1 = Serializable.loads(b)
  181. assert s1.int_a == 1 * 2
  182. assert s1.str_b == "2" * 2
  183. assert s1.list_c == ["hello", "world"] * 2
  184.  
  185.  
  186. async def wait_cancel(coro, timeout):
  187. task = asyncio.ensure_future(coro)
  188. try:
  189. res = await asyncio.wait_for(task, timeout=timeout)
  190. return res, False
  191. except asyncio.TimeoutError as e:
  192. await task.cancel()
  193. return None, True
  194.  
  195.  
  196. def get_dev_ip(ifname):
  197. """
  198. linux only
  199. :param ifname: ifconfig's device name
  200. :return: ip address of the device
  201. """
  202. import fcntl
  203. if isinstance(ifname, str):
  204. ifname = bytes(ifname, 'ascii')
  205. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  206. f = fcntl.ioctl(sock.fileno(), 0x8915, struct.pack('256s', ifname))
  207. return socket.inet_ntoa(f[20:24])
  208.  
  209.  
  210. def input_choice(question: str, options: typing.List):
  211. """
  212. A package installer's like question input with choice
  213. case sensitive
  214. :return: selected option
  215. """
  216. prompt = "[{}]".format("/".join(options))
  217. while True:
  218. choice = input(f"{question} {prompt}")
  219. if choice in options:
  220. return choice
  221. print("invalid choice")
  222.  
  223.  
  224. @cm.register()
  225. def check_input_choice():
  226. for _ in range(2):
  227. choices = ["y", "n"]
  228. res = input_choice("install package?", ["y", "n"])
  229. assert res in choices
  230.  
  231.  
  232. if __name__ == "__main__":
  233. cm.register()(sum_int)
  234. cm.parse_args()
  235. cm.launch()
Add Comment
Please, Sign In to add comment