Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- from collections import deque
- from functools import cached_property, partial, singledispatchmethod, wraps
- from itertools import islice, pairwise, repeat
- from typing import *
- from frozendict import frozendict
- from more_itertools import flatten
- # Python 3.10+ Two Function Complete Monad Library! :: https://pastebin.com/JP0tdf4s #python #python3 #mod #monad
- py_struct = NamedTuple
- class CompareResult(py_struct):
- les: bool
- equ: bool
- gtr: bool
- cmp_str_input_t = NewType('cmp_str_input_t', Iterable[SupportsBytes and SupportsIndex and SupportsInt])
- def cmp_iter_str(si1: cmp_str_input_t, si2: cmp_str_input_t) -> CompareResult:
- it1 = iter(si1)
- it2 = iter(si2)
- while True:
- c1 = next(it1, None)
- c2 = next(it2, None)
- n1 = c1 is None
- n2 = c2 is None
- if n1 and n2:
- return CompareResult(False, True, False)
- elif n1 ^ n2:
- return CompareResult(n1, False, n2)
- elif c1 != c2:
- return CompareResult(c1 < c2, False, c1 > c2)
- monad_fxn_callable_t = TypeVar('monad_fxn_callable_t', bound = callable)
- class monad_attr_err_spec_t(py_struct):
- obj_type: Type[Any]
- attr_name: str
- fxn_triggered: monad_fxn_callable_t
- def monad(call_function: monad_fxn_callable_t, *op_types: monad_attr_err_spec_t, pass_types: bool = False) -> monad_fxn_callable_t:
- def _i(*args, **kwargs):
- try:
- if pass_types:
- return call_function(*args, op_types = op_types, **kwargs)
- return call_function(*args, **kwargs)
- except AttributeError as attr_err: # https://docs.python.org/3.11/library/exceptions.html#AttributeError
- if attr_err.obj is None:
- return
- ae_obj, ae_name = attr_err.obj, attr_err.name # Changed in version 3.10: Added the name and obj attributes.
- x: monad_attr_err_spec_t
- for x in op_types:
- if x.obj_type is type(ae_obj) and ae_name == x.attr_name:
- return x.fxn_triggered(*args, ae_obj = ae_obj, ae_name = ae_name, **kwargs)
- raise # Don't suppress unexpected errors!
- return _i
- def mo_decorate(call_function: monad_fxn_callable_t, pass_types: bool = False) -> monad_fxn_callable_t:
- @wraps(call_function)
- def internal(*op_types: monad_attr_err_spec_t):
- return monad(call_function, *op_types, pass_types = pass_types)
- return internal
- # Python 3.10+ Two Function Complete Monad Library! V2! :: https://pastebin.com/L37fvrQk #python #python3 #mod #monad
- py_curry_fxn_arg_t = TypeVar('py_curry_fxn_arg_t')
- py_curry_fxn_closure_t = NewType('py_curry_fxn_closure_t', Callable[[py_curry_fxn_arg_t, ...], py_curry_fxn_arg_t])
- def py_curry(fxn: py_curry_fxn_closure_t, *args: py_curry_fxn_arg_t, **kwargs: py_curry_fxn_arg_t) -> py_curry_fxn_closure_t: # Keep It Simple, Stupid!
- """ Like `functools.partial()` but deliberately simpler, faster, memory-lighter, clearer typed; and with a known, simple, fixed, dependable definition.
- >>> return lambda *args2, **kwargs2: fxn(*args, *args2, **kwargs, **kwargs2) # pass new args2 after old (earlier curried) args and update (earlier) kwargs with kwargs2.
- """
- return lambda *args2, **kwargs2: fxn(*args, *args2, **kwargs, **kwargs2) # pass new args2 after old (earlier curried) args and update (earlier) kwargs with kwargs2.
- valid_english_character_string: str = ( # the 52 basic Arbegla english letter characters
- "abcdefghijklmnopqrstuvwxyz" "ABCDEFGHIJKLMNOPQRSTUVWXYZ")
- class PyClojure:
- __driver: py_curry_fxn_closure_t
- __positional_args: Tuple[py_curry_fxn_arg_t, ...]
- __variadic_kwargs: Mapping[str, py_curry_fxn_arg_t] | MutableMapping[str, py_curry_fxn_arg_t]
- __c_fxn_name: str
- repo_key_function_t = NewType('repo_key_function_t', str | Iterable[py_curry_fxn_arg_t] | py_curry_fxn_closure_t)
- repo: Dict[repo_key_function_t, Self] = {} # global
- valid_name_closure_str: str = valid_english_character_string + ''
- valid_name_closure_char: FrozenSet[int] = frozenset((map(ord, valid_name_closure_str)))
- def __init__(self, driver: Optional[py_curry_fxn_closure_t], name: str, *args, **kwargs):
- self.__driver = driver
- self.__positional_args = (*args,)
- self.__variadic_kwargs = {**kwargs}
- self.__c_fxn_name = name
- def __new__(cls, *args, **kwargs) -> 'PyClojure':
- driver = args[0] if len(args) > 0 else None
- name = args[1] if len(args) > 1 else ''
- illegal_cs: Set[str] = set()
- if not (len(args) >= 2 and isinstance(name, str) and len(name) >= 5 and 0 == len((illegal_cs := (set(map(ord, name)).symmetric_difference(cls.valid_name_closure_char)))) and (driver is None or callable(driver))): # …. @({()})
- raise TypeError( # @({()})
- f'For all {cls.__name__}(), an optional driver @({str(driver)}) must be given as None @({driver is None}) or a callable @({(callable(driver))}) and '
- f'the closure must next be given a name of ≥5 characters in @({repr(cls.valid_name_closure_str)}) given the name @({repr(name)} with illegal characters @({tuple(illegal_cs)}).')
- del illegal_cs # reduce clutter
- repo = cls.repo
- found_name: str = repo.get(name, '')
- found_driver: str = repo.get(driver, None)
- if found_driver or found_name:
- return found_driver or found_name # already exists
- else:
- new = object.__new__(cls)
- new.__init__(*args, **kwargs)
- repo[name] = new
- repo[driver] = new
- del repo, name, driver # fix weird scope persistence…
- return new
- def __bool__(self) -> bool:
- return True
- def refine_kwargs(self, *tables: Optional[Tuple[str, py_curry_fxn_arg_t]], freeze: bool = False, thaw: bool = False) -> None:
- self.__variadic_kwargs = default_kwargs(self.__variadic_kwargs.items(), *tables, freeze = freeze, thaw = thaw)
- def refine_p_args(self, *amendments: Optional[Tuple[int, py_curry_fxn_arg_t]], start_length: Optional[int] = None, max_length: Optional[int] = None, fill_value: py_curry_fxn_arg_t = None, more_copies_than_insertions: bool = True) -> None:
- self.__positional_args = gen_tuple_diff(self.__positional_args, *amendments, start_length = start_length, max_length = max_length, fill_value = fill_value, more_copies_than_insertions = more_copies_than_insertions)
- @singledispatchmethod
- def __contains__(self, item: Any) -> bool:
- raise NotImplementedError(f'The type {repr(type(item))} is unsupported on {repr(self)}.')
- @__contains__.register(str)
- def __contains__(self, item: str) -> bool:
- return item in self.__variadic_kwargs
- @__contains__.register(int)
- def __contains__(self, item: int) -> bool:
- return item in self.__positional_args
- def __repr__(self) -> str:
- pos_args = (', '.join(map(repr, self.__positional_args)))
- pos_args = (', ' + pos_args if pos_args else '')
- v_kwargs = tuple(((str(k), repr(v)) for (k, v) in self.__variadic_kwargs.items()))
- v_kwargs = (', ' + ', '.join(f'{k} = {v}' for (k, v) in v_kwargs) if v_kwargs else '')
- return f'({self.__class__.__name__}({repr(self.__driver), repr(self.__c_fxn_name)}{pos_args}{v_kwargs}))'
- def __str__(self) -> str:
- pos_args = (', '.join(map(str, self.__positional_args)))
- pos_args = (', ' + pos_args if pos_args else '')
- v_kwargs = tuple(((str(k), str(v)) for (k, v) in self.__variadic_kwargs.items()))
- v_kwargs = (', ' + ', '.join(f'{k} = {v}' for (k, v) in v_kwargs) if v_kwargs else '')
- return f'({self.__class__.__name__}({str(self.__driver), str(self.__c_fxn_name)}{pos_args}{v_kwargs}))'
- def __iter__(self) -> Iterable[py_curry_fxn_arg_t | Tuple[str, py_curry_fxn_arg_t]]:
- yield from self.__positional_args
- yield from self.__variadic_kwargs.items()
- @property
- def closure_name(self) -> str:
- return self.__c_fxn_name
- @property
- def closure_driver_fxn(self) -> py_curry_fxn_closure_t:
- return self.__driver
- @property
- def closure_driver_str(self) -> str:
- return str(self.__driver)
- @property
- def closure_driver_repr(self) -> str:
- return repr(self.__driver)
- def __call__(self, *args, **kwargs):
- return self.__driver(*(self.__positional_args + args), **default_kwargs(*(tuple(self.__variadic_kwargs.items()) + tuple(kwargs.items()))))
- def call_bare(self):
- return self()
- @cached_property
- def call_save(self):
- return self()
- KVT = TypeVar('KVT')
- def default_kwargs(*tables: Optional[Tuple[str, KVT]], freeze: bool = False, thaw: bool = False) -> Mapping[str, KVT] | Dict[str, KVT]:
- tmp: Dict[str, KVT] = {}
- for tbl in tables:
- if tbl:
- tmp.update(tbl)
- if freeze:
- tmp = frozendict(tmp)
- if thaw:
- tmp = dict(tmp)
- return tmp
- G_KEY_T = TypeVar('G_KEY_T')
- def gen_diff_table(*tables: Optional[Tuple[G_KEY_T, KVT]], freeze: bool = False, thaw: bool = False) -> Mapping[G_KEY_T, KVT] | Dict[G_KEY_T, KVT]:
- tmp: Dict[G_KEY_T, KVT] = {}
- for tbl in tables:
- if tbl:
- tmp.update(tbl)
- if freeze:
- tmp = frozendict(tmp)
- if thaw:
- tmp = dict(tmp)
- return tmp
- # do_this assure gen_tuple_diff is fast.
- def gen_tuple_diff(old_array: Tuple[KVT, ...], *amendments: Optional[Tuple[int, KVT]],
- start_length: Optional[int] = None, max_length: Optional[int] = None, fill_value: KVT = None,
- more_copies_than_insertions: bool = True) -> Tuple[KVT, ...]:
- final_length = len(old_array)
- if start_length:
- final_length = max(final_length, start_length)
- if max_length:
- final_length = min(final_length, max_length)
- fill_scalar: int = max((start_length or max_length or len(old_array)) - len(old_array), 0)
- merge_diff: Mapping[int, KVT] = gen_diff_table(*tuple(
- ((k, v) if 0 <= k < final_length else
- ((final_length + k, v) if 0 < -k <= final_length else None)
- ) for (k, v) in amendments), freeze = True)
- pair_gen = pairwise(sorted(merge_diff.keys()))
- if more_copies_than_insertions:
- merged = deque() # fast .append()
- last_index = 0
- for k1, k2 in pair_gen:
- if k1 > last_index: # Add the segment from old_array if there's a gap
- merged.append(islice(old_array, last_index, k1))
- merged.append(merge_diff[k1])
- last_index = k2
- if last_index < final_length:
- merged.append(islice(old_array, last_index, final_length))
- merged.append(repeat(fill_value, fill_scalar))
- return tuple(flatten(merged))
- else:
- return tuple(flatten(
- (flatten(portion for k1, k2 in pair_gen for portion in
- ((merge_diff[k1],), (islice(old_array, k1, k2)))),
- repeat(fill_value, fill_scalar))))
- generic_py_closure_typ = NewType('generic_py_closure_typ', py_curry_fxn_closure_t | PyClojure)
- generic_py_closure_tva = TypeVar('generic_py_closure_tva', generic_py_closure_typ, py_curry_fxn_closure_t, PyClojure)
- def py_closure(fxn: py_curry_fxn_closure_t | PyClojure, *args: py_curry_fxn_arg_t, **kwargs: py_curry_fxn_arg_t) -> py_curry_fxn_closure_t: # Keep It Simple, Stupid!
- """ Like `functools.partial()` but deliberately simpler, faster, memory-lighter, clearer typed; and with a known, simple, fixed, dependable definition.
- >>> return lambda *args2, **kwargs2: fxn(*args, *args2, **kwargs, **kwargs2) # pass new args2 after old (earlier curried) args and update (earlier) kwargs with kwargs2.
- """
- return lambda *args2, **kwargs2: fxn(*args, *args2, **kwargs, **kwargs2) # pass new args2 after old (earlier curried) args and update (earlier) kwargs with kwargs2.
- f_part = partial
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement