Advertisement
HasteBin0

Python 3.11+ Simple and Efficient Module for Monads and Closures V0.3

Oct 8th, 2023
1,329
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 12.08 KB | Software | 0 0
  1. #!/usr/bin/python3
  2. from collections import deque
  3. from functools import cached_property, partial, singledispatchmethod, wraps
  4. from itertools import islice, pairwise, repeat
  5. from typing import *
  6.  
  7. from frozendict import frozendict
  8. from more_itertools import flatten
  9.  
  10. # Python 3.10+ Two Function Complete Monad Library! :: https://pastebin.com/JP0tdf4s  #python #python3 #mod #monad
  11.  
  12.  
  13. py_struct = NamedTuple
  14.  
  15.  
  16. class CompareResult(py_struct):
  17.     les: bool
  18.     equ: bool
  19.     gtr: bool
  20.  
  21.  
  22. cmp_str_input_t = NewType('cmp_str_input_t', Iterable[SupportsBytes and SupportsIndex and SupportsInt])
  23.  
  24.  
  25. def cmp_iter_str(si1: cmp_str_input_t, si2: cmp_str_input_t) -> CompareResult:
  26.     it1 = iter(si1)
  27.     it2 = iter(si2)
  28.     while True:
  29.         c1 = next(it1, None)
  30.         c2 = next(it2, None)
  31.         n1 = c1 is None
  32.         n2 = c2 is None
  33.         if n1 and n2:
  34.             return CompareResult(False, True, False)
  35.         elif n1 ^ n2:
  36.             return CompareResult(n1, False, n2)
  37.         elif c1 != c2:
  38.             return CompareResult(c1 < c2, False, c1 > c2)
  39.  
  40.  
  41. monad_fxn_callable_t = TypeVar('monad_fxn_callable_t', bound = callable)
  42.  
  43.  
  44. class monad_attr_err_spec_t(py_struct):
  45.     obj_type: Type[Any]
  46.     attr_name: str
  47.     fxn_triggered: monad_fxn_callable_t
  48.  
  49.  
  50. def monad(call_function: monad_fxn_callable_t, *op_types: monad_attr_err_spec_t, pass_types: bool = False) -> monad_fxn_callable_t:
  51.     def _i(*args, **kwargs):
  52.         try:
  53.             if pass_types:
  54.                 return call_function(*args, op_types = op_types, **kwargs)
  55.             return call_function(*args, **kwargs)
  56.         except AttributeError as attr_err:  # https://docs.python.org/3.11/library/exceptions.html#AttributeError
  57.             if attr_err.obj is None:
  58.                 return
  59.             ae_obj, ae_name = attr_err.obj, attr_err.name  # Changed in version 3.10: Added the name and obj attributes.
  60.             x: monad_attr_err_spec_t
  61.             for x in op_types:
  62.                 if x.obj_type is type(ae_obj) and ae_name == x.attr_name:
  63.                     return x.fxn_triggered(*args, ae_obj = ae_obj, ae_name = ae_name, **kwargs)
  64.             raise  # Don't suppress unexpected errors!
  65.    
  66.     return _i
  67.  
  68.  
  69. def mo_decorate(call_function: monad_fxn_callable_t, pass_types: bool = False) -> monad_fxn_callable_t:
  70.     @wraps(call_function)
  71.     def internal(*op_types: monad_attr_err_spec_t):
  72.         return monad(call_function, *op_types, pass_types = pass_types)
  73.    
  74.     return internal
  75.  
  76.  
  77. # Python 3.10+ Two Function Complete Monad Library! V2! :: https://pastebin.com/L37fvrQk #python #python3 #mod #monad
  78.  
  79.  
  80. py_curry_fxn_arg_t = TypeVar('py_curry_fxn_arg_t')
  81. py_curry_fxn_closure_t = NewType('py_curry_fxn_closure_t', Callable[[py_curry_fxn_arg_t, ...], py_curry_fxn_arg_t])
  82.  
  83.  
  84. def py_curry(fxn: py_curry_fxn_closure_t, *args: py_curry_fxn_arg_t, **kwargs: py_curry_fxn_arg_t) -> py_curry_fxn_closure_t:  # Keep It Simple, Stupid!
  85.     """ Like `functools.partial()` but deliberately simpler, faster, memory-lighter, clearer typed; and with a known, simple, fixed, dependable definition.
  86.    >>> return lambda *args2, **kwargs2: fxn(*args, *args2, **kwargs, **kwargs2)  # pass new args2 after old (earlier curried) args and update (earlier) kwargs with kwargs2.
  87.    """
  88.     return lambda *args2, **kwargs2: fxn(*args, *args2, **kwargs, **kwargs2)  # pass new args2 after old (earlier curried) args and update (earlier) kwargs with kwargs2.
  89.  
  90.  
  91. valid_english_character_string: str = (  # the 52 basic Arbegla english letter characters
  92.     "abcdefghijklmnopqrstuvwxyz"    "ABCDEFGHIJKLMNOPQRSTUVWXYZ")
  93.  
  94.  
  95. class PyClojure:
  96.     __driver: py_curry_fxn_closure_t
  97.     __positional_args: Tuple[py_curry_fxn_arg_t, ...]
  98.     __variadic_kwargs: Mapping[str, py_curry_fxn_arg_t] | MutableMapping[str, py_curry_fxn_arg_t]
  99.     __c_fxn_name: str
  100.    
  101.     repo_key_function_t = NewType('repo_key_function_t', str | Iterable[py_curry_fxn_arg_t] | py_curry_fxn_closure_t)
  102.     repo: Dict[repo_key_function_t, Self] = {}  # global
  103.     valid_name_closure_str: str = valid_english_character_string + ''
  104.     valid_name_closure_char: FrozenSet[int] = frozenset((map(ord, valid_name_closure_str)))
  105.    
  106.     def __init__(self, driver: Optional[py_curry_fxn_closure_t], name: str, *args, **kwargs):
  107.         self.__driver = driver
  108.         self.__positional_args = (*args,)
  109.         self.__variadic_kwargs = {**kwargs}
  110.         self.__c_fxn_name = name
  111.    
  112.     def __new__(cls, *args, **kwargs) -> 'PyClojure':
  113.         driver = args[0] if len(args) > 0 else None
  114.         name = args[1] if len(args) > 1 else ''
  115.         illegal_cs: Set[str] = set()
  116.         if not (len(args) >= 2 and isinstance(name, str) and len(name) >= 5 and 0 == len((illegal_cs := (set(map(ord, name)).symmetric_difference(cls.valid_name_closure_char)))) and (driver is None or callable(driver))):  # …. @({()})
  117.             raise TypeError(  # @({()})
  118.                 f'For all {cls.__name__}(), an optional driver @({str(driver)}) must be given as None @({driver is None}) or a callable @({(callable(driver))}) and '
  119.                 f'the closure must next be given a name of ≥5 characters in @({repr(cls.valid_name_closure_str)}) given the name @({repr(name)} with illegal characters @({tuple(illegal_cs)}).')
  120.         del illegal_cs  # reduce clutter
  121.         repo = cls.repo
  122.         found_name: str = repo.get(name, '')
  123.         found_driver: str = repo.get(driver, None)
  124.         if found_driver or found_name:
  125.             return found_driver or found_name  # already exists
  126.         else:
  127.             new = object.__new__(cls)
  128.             new.__init__(*args, **kwargs)
  129.             repo[name] = new
  130.             repo[driver] = new
  131.             del repo, name, driver  # fix weird scope persistence…
  132.             return new
  133.    
  134.     def __bool__(self) -> bool:
  135.         return True
  136.    
  137.     def refine_kwargs(self, *tables: Optional[Tuple[str, py_curry_fxn_arg_t]], freeze: bool = False, thaw: bool = False) -> None:
  138.         self.__variadic_kwargs = default_kwargs(self.__variadic_kwargs.items(), *tables, freeze = freeze, thaw = thaw)
  139.    
  140.     def refine_p_args(self, *amendments: Optional[Tuple[int, py_curry_fxn_arg_t]], start_length: Optional[int] = None, max_length: Optional[int] = None, fill_value: py_curry_fxn_arg_t = None, more_copies_than_insertions: bool = True) -> None:
  141.         self.__positional_args = gen_tuple_diff(self.__positional_args, *amendments, start_length = start_length, max_length = max_length, fill_value = fill_value, more_copies_than_insertions = more_copies_than_insertions)
  142.    
  143.     @singledispatchmethod
  144.     def __contains__(self, item: Any) -> bool:
  145.         raise NotImplementedError(f'The type {repr(type(item))} is unsupported on {repr(self)}.')
  146.    
  147.     @__contains__.register(str)
  148.     def __contains__(self, item: str) -> bool:
  149.         return item in self.__variadic_kwargs
  150.    
  151.     @__contains__.register(int)
  152.     def __contains__(self, item: int) -> bool:
  153.         return item in self.__positional_args
  154.    
  155.     def __repr__(self) -> str:
  156.         pos_args = (', '.join(map(repr, self.__positional_args)))
  157.         pos_args = (', ' + pos_args if pos_args else '')
  158.         v_kwargs = tuple(((str(k), repr(v)) for (k, v) in self.__variadic_kwargs.items()))
  159.         v_kwargs = (', ' + ', '.join(f'{k} = {v}' for (k, v) in v_kwargs) if v_kwargs else '')
  160.         return f'({self.__class__.__name__}({repr(self.__driver), repr(self.__c_fxn_name)}{pos_args}{v_kwargs}))'
  161.    
  162.     def __str__(self) -> str:
  163.         pos_args = (', '.join(map(str, self.__positional_args)))
  164.         pos_args = (', ' + pos_args if pos_args else '')
  165.         v_kwargs = tuple(((str(k), str(v)) for (k, v) in self.__variadic_kwargs.items()))
  166.         v_kwargs = (', ' + ', '.join(f'{k} = {v}' for (k, v) in v_kwargs) if v_kwargs else '')
  167.         return f'({self.__class__.__name__}({str(self.__driver), str(self.__c_fxn_name)}{pos_args}{v_kwargs}))'
  168.    
  169.     def __iter__(self) -> Iterable[py_curry_fxn_arg_t | Tuple[str, py_curry_fxn_arg_t]]:
  170.         yield from self.__positional_args
  171.         yield from self.__variadic_kwargs.items()
  172.    
  173.     @property
  174.     def closure_name(self) -> str:
  175.         return self.__c_fxn_name
  176.    
  177.     @property
  178.     def closure_driver_fxn(self) -> py_curry_fxn_closure_t:
  179.         return self.__driver
  180.    
  181.     @property
  182.     def closure_driver_str(self) -> str:
  183.         return str(self.__driver)
  184.    
  185.     @property
  186.     def closure_driver_repr(self) -> str:
  187.         return repr(self.__driver)
  188.    
  189.     def __call__(self, *args, **kwargs):
  190.         return self.__driver(*(self.__positional_args + args), **default_kwargs(*(tuple(self.__variadic_kwargs.items()) + tuple(kwargs.items()))))
  191.    
  192.     def call_bare(self):
  193.         return self()
  194.    
  195.     @cached_property
  196.     def call_save(self):
  197.         return self()
  198.  
  199.  
  200. KVT = TypeVar('KVT')
  201.  
  202.  
  203. def default_kwargs(*tables: Optional[Tuple[str, KVT]], freeze: bool = False, thaw: bool = False) -> Mapping[str, KVT] | Dict[str, KVT]:
  204.     tmp: Dict[str, KVT] = {}
  205.     for tbl in tables:
  206.         if tbl:
  207.             tmp.update(tbl)
  208.     if freeze:
  209.         tmp = frozendict(tmp)
  210.     if thaw:
  211.         tmp = dict(tmp)
  212.     return tmp
  213.  
  214.  
  215. G_KEY_T = TypeVar('G_KEY_T')
  216.  
  217.  
  218. def gen_diff_table(*tables: Optional[Tuple[G_KEY_T, KVT]], freeze: bool = False, thaw: bool = False) -> Mapping[G_KEY_T, KVT] | Dict[G_KEY_T, KVT]:
  219.     tmp: Dict[G_KEY_T, KVT] = {}
  220.     for tbl in tables:
  221.         if tbl:
  222.             tmp.update(tbl)
  223.     if freeze:
  224.         tmp = frozendict(tmp)
  225.     if thaw:
  226.         tmp = dict(tmp)
  227.     return tmp
  228.  
  229.  
  230. # do_this assure gen_tuple_diff is fast.
  231. def gen_tuple_diff(old_array: Tuple[KVT, ...], *amendments: Optional[Tuple[int, KVT]],
  232.                    start_length: Optional[int] = None, max_length: Optional[int] = None, fill_value: KVT = None,
  233.                    more_copies_than_insertions: bool = True) -> Tuple[KVT, ...]:
  234.     final_length = len(old_array)
  235.     if start_length:
  236.         final_length = max(final_length, start_length)
  237.     if max_length:
  238.         final_length = min(final_length, max_length)
  239.    
  240.     fill_scalar: int = max((start_length or max_length or len(old_array)) - len(old_array), 0)
  241.     merge_diff: Mapping[int, KVT] = gen_diff_table(*tuple(
  242.         ((k, v) if 0 <= k < final_length else
  243.          ((final_length + k, v) if 0 < -k <= final_length else None)
  244.          ) for (k, v) in amendments), freeze = True)
  245.    
  246.     pair_gen = pairwise(sorted(merge_diff.keys()))
  247.    
  248.     if more_copies_than_insertions:
  249.         merged = deque()  # fast .append()
  250.         last_index = 0
  251.         for k1, k2 in pair_gen:
  252.             if k1 > last_index:  # Add the segment from old_array if there's a gap
  253.                 merged.append(islice(old_array, last_index, k1))
  254.             merged.append(merge_diff[k1])
  255.             last_index = k2
  256.         if last_index < final_length:
  257.             merged.append(islice(old_array, last_index, final_length))
  258.         merged.append(repeat(fill_value, fill_scalar))
  259.         return tuple(flatten(merged))
  260.    
  261.     else:
  262.         return tuple(flatten(
  263.             (flatten(portion for k1, k2 in pair_gen for portion in
  264.                      ((merge_diff[k1],), (islice(old_array, k1, k2)))),
  265.              repeat(fill_value, fill_scalar))))
  266.  
  267.  
  268. generic_py_closure_typ = NewType('generic_py_closure_typ', py_curry_fxn_closure_t | PyClojure)
  269. generic_py_closure_tva = TypeVar('generic_py_closure_tva', generic_py_closure_typ, py_curry_fxn_closure_t, PyClojure)
  270.  
  271.  
  272. def py_closure(fxn: py_curry_fxn_closure_t | PyClojure, *args: py_curry_fxn_arg_t, **kwargs: py_curry_fxn_arg_t) -> py_curry_fxn_closure_t:  # Keep It Simple, Stupid!
  273.     """ Like `functools.partial()` but deliberately simpler, faster, memory-lighter, clearer typed; and with a known, simple, fixed, dependable definition.
  274.    >>> return lambda *args2, **kwargs2: fxn(*args, *args2, **kwargs, **kwargs2)  # pass new args2 after old (earlier curried) args and update (earlier) kwargs with kwargs2.
  275.    """
  276.     return lambda *args2, **kwargs2: fxn(*args, *args2, **kwargs, **kwargs2)  # pass new args2 after old (earlier curried) args and update (earlier) kwargs with kwargs2.
  277.  
  278.  
  279. f_part = partial
  280.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement