kesha1225

Untitled

Jun 30th, 2020
1,001
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import copy
  2. import os
  3. import typing
  4.  
  5. import orjson
  6. from pydantic.fields import ModelField
  7. from pydantic.main import ModelMetaclass
  8. from pydantic.schema import schema
  9.  
  10. from ._types import TeensyData, TeensyTable, TeensyTableData, TeensyObject, TeensySchema
  11. from .exceptions import TeensyError
  12. from .filters import TEENSY_FILTERS
  13. from .models import TeensyModel
  14.  
  15. DOUBLE_UNDERSCORE = "__"
  16. TEENSY_OBJECT_ID_FIELD = "__id"
  17.  
  18.  
  19. class TeensyDB:
  20.     def __init__(self, path: str):
  21.         if not path.endswith(".json"):
  22.             raise TeensyError(f"Path must has .json file extension, got <<{path}>>")
  23.         self.path = path
  24.         self._data: typing.Optional[TeensyData] = None
  25.  
  26.     @property
  27.     def data(self) -> TeensyData:
  28.         if self._data is None:
  29.             self._data = self._loads_data()
  30.         return self._data
  31.  
  32.     def drop(self):
  33.         self.__save(
  34.             [{"__teensydb__": "generated"}, {"__table__": "__other", "__data__": {}}]
  35.         )
  36.         return self.data
  37.  
  38.     def _loads_data(self):
  39.         if not os.path.exists(self.path):
  40.             return self.drop()
  41.  
  42.         with open(self.path) as file:
  43.             data = orjson.loads(file.read())
  44.             if not {"__teensydb__": "generated"} in data:
  45.                 raise TeensyError(
  46.                     "File is not generated by TeensyDB, clear it or delete or use db.drop() and rerun"
  47.                 )
  48.         return data
  49.  
  50.     def __save(self, data: TeensyData):
  51.         self._data = data
  52.         with open(self.path, "wb") as file:
  53.             file.write(orjson.dumps(self._data))
  54.  
  55.     def _get_table(self, table_name: str) -> TeensyTable:
  56.         for _table in self.data[1:]:
  57.             if _table["__table__"] == table_name:
  58.                 return _table
  59.         _table = {"__table__": table_name, "__data__": []}
  60.         self.data.append(_table)
  61.         return _table
  62.  
  63.     def _get_table_data(self, table_name: str) -> TeensyTableData:
  64.         current_table = self._get_table(table_name)
  65.         current_table_data: TeensyTableData = current_table["__data__"]
  66.         return current_table_data
  67.  
  68.     def _get_object_from_str_reference(self, str_reference: str):
  69.         ref_table, ref_id = str_reference.split("_")[3:5]
  70.         obj = self.find_one(ref_table, __id=int(ref_id), return_raw=True)
  71.         return obj
  72.  
  73.     def _get_all_obj_refs(self, obj):
  74.         for obj_key, obj_value in obj.copy().items():
  75.             found_obj_values = obj_value if isinstance(obj_value, list) else [obj_value]
  76.             obj[obj_key] = []
  77.             for found_obj_value in found_obj_values:
  78.                 if isinstance(found_obj_value, str) and found_obj_value.startswith(
  79.                         "__reference"
  80.                 ):
  81.                     obj[obj_key].append(
  82.                         self._get_object_from_str_reference(found_obj_value)
  83.                     )
  84.                 else:
  85.                     obj[obj_key].append(found_obj_value)
  86.             obj[obj_key] = (
  87.                 obj[obj_key] if isinstance(obj_value, list) else obj[obj_key][0]
  88.             )
  89.         return obj
  90.  
  91.     def _get_table_data_from_obj(
  92.             self, find_obj: typing.Union[typing.Type[TeensyModel], str]
  93.     ):
  94.         return self._get_table_data(
  95.             find_obj if isinstance(find_obj, str) else find_obj.__name__
  96.         )
  97.  
  98.     def _check_obj(self, obj: dict, check_kwargs: dict):
  99.         # todo: че делать если там лист с рефами
  100.         # todo: sort skip limit
  101.         found = []
  102.         checked_reference_keys = []
  103.         for obj_key, obj_value in obj.items():
  104.             for find_key, find_value in check_kwargs.items():
  105.                 if (
  106.                         DOUBLE_UNDERSCORE in find_key
  107.                         and find_key != TEENSY_OBJECT_ID_FIELD
  108.                         and find_key not in checked_reference_keys
  109.                 ):
  110.                     obj = self._get_all_obj_refs(obj)
  111.                     # хз может как то по кускам доставать
  112.  
  113.                     query: typing.List[str] = find_key.split(DOUBLE_UNDERSCORE)
  114.                     query_filter = None
  115.                     if query[-1] in TEENSY_FILTERS:
  116.                         query_filter = query.pop()
  117.                     for element_number in range(len(query) - 1):
  118.                         child_field = query[element_number + 1]
  119.                         child_fields = query[:element_number + 2]
  120.                         child_obj = obj
  121.                         for field in child_fields[:-1]:
  122.                             child_obj = child_obj[field]
  123.                         if query_filter is not None:
  124.                             found.append(
  125.                                 TEENSY_FILTERS[query_filter](
  126.                                     child_obj, child_field, find_value
  127.                                 )
  128.                             )
  129.                         elif self._check_obj(child_obj, {child_field: find_value}):
  130.                             found.append(True)
  131.                         checked_reference_keys.append(find_key)
  132.                 if find_key == obj_key and find_value == obj_value:
  133.                     found.append(True)
  134.         return found and all(found) and len(found) == len(check_kwargs)
  135.  
  136.     @staticmethod
  137.     def _check_return_raw_and_get_refs(return_raw: bool, with_refs: bool):
  138.         if not return_raw and not with_refs:
  139.             raise TeensyError(
  140.                 "'return_raw=False' can work only with 'get_all_refs=True'. Can work if:\n"
  141.                 "db.find_one(..., return_raw=True, with_refs=True),\n"
  142.                 "db.find_one(..., return_raw=False, with_refs=True),\n"
  143.                 "db.find_one(..., return_raw=True, with_refs=False),\n"
  144.             )
  145.  
  146.     def find_one(
  147.             self,
  148.             find_obj: typing.Union[typing.Type[TeensyModel], str],
  149.             return_raw: bool = False,
  150.             with_refs: bool = True,
  151.             **kwargs,
  152.     ) -> typing.Optional[typing.Union[TeensyObject, typing.Any]]:
  153.  
  154.         self._check_return_raw_and_get_refs(return_raw, with_refs)
  155.         current_table_data = self._get_table_data_from_obj(find_obj)
  156.         if not kwargs:
  157.             if with_refs:
  158.                 return (
  159.                     self._get_all_obj_refs(current_table_data)[0]
  160.                     if current_table_data
  161.                     else []
  162.                 )
  163.             return current_table_data[0] if current_table_data else []
  164.  
  165.         for obj in current_table_data:
  166.             if self._check_obj(obj, kwargs):
  167.                 break
  168.         else:
  169.             return None
  170.  
  171.         if with_refs:
  172.             obj = self._get_all_obj_refs(obj)
  173.         if return_raw:
  174.             return obj
  175.         return find_obj(**obj) if issubclass(find_obj, TeensyModel) else obj
  176.  
  177.     def find_all(
  178.             self,
  179.             find_obj: typing.Union[typing.Type[TeensyModel], str],
  180.             return_raw: bool = False,
  181.             with_refs: bool = True,
  182.             **kwargs,
  183.     ) -> typing.Optional[
  184.         typing.Union[typing.List[typing.Union[TeensyObject, typing.Any]]]
  185.     ]:
  186.         self._check_return_raw_and_get_refs(return_raw, with_refs)
  187.  
  188.         current_table_data = self._get_table_data_from_obj(find_obj)
  189.  
  190.         if not kwargs:
  191.             if with_refs:
  192.                 return [self._get_all_obj_refs(obj) for obj in current_table_data]
  193.             return current_table_data
  194.  
  195.         found_objs = []
  196.         for obj in current_table_data:
  197.             if self._check_obj(obj, kwargs):
  198.                 if with_refs:
  199.                     obj = self._get_all_obj_refs(obj)
  200.                 found_objs.append(obj)
  201.  
  202.         if return_raw:
  203.             return found_objs
  204.  
  205.         return [
  206.             find_obj(**obj) for obj in found_objs if issubclass(find_obj, TeensyModel)
  207.         ]
  208.  
  209.     def _save_element(self, table_name: str, obj: TeensyObject) -> int:
  210.         current_table_data = self._get_table_data(table_name)
  211.         obj_id = len(current_table_data)
  212.         obj.update({"__id": obj_id})
  213.         current_table_data.append(obj)
  214.         self.__save(self.data)
  215.         return obj_id
  216.  
  217.     @staticmethod
  218.     def _get_reference_name(reference_table_title: str, reference_object_id: int):
  219.         return f"__reference_{reference_table_title}_{reference_object_id}__"
  220.  
  221.     @staticmethod
  222.     def _get_property_reference_name(field_schema: dict) -> typing.Optional[str]:
  223.         if field_schema.get("$ref") is None or (
  224.                 field_schema.get("items") is not None
  225.                 and field_schema["items"].get("$ref") is None
  226.         ):
  227.             return None
  228.  
  229.         reference_name: str = (
  230.             field_schema["items"]["$ref"]
  231.             if field_schema.get("items")
  232.             else field_schema["$ref"]
  233.         ).lstrip("/")
  234.         return reference_name
  235.  
  236.     def _get_object_references_fields(self, obj_schemas: TeensySchema):
  237.         found_references = {}
  238.         if len(obj_schemas["definitions"]) <= 1:
  239.             return found_references
  240.  
  241.         # todo: заменить на model_schema тогда удалять ниче не надо
  242.         copied_schemas = copy.deepcopy(obj_schemas)
  243.         original_object_schema = list(copied_schemas["definitions"])[-1]
  244.  
  245.         del copied_schemas["definitions"][original_object_schema]
  246.         for obj_name, obj_schema in obj_schemas["definitions"].items():
  247.             # хз надо как то вынести
  248.             for prop_name, prop_schema in obj_schema["properties"].items():
  249.                 ref_name = self._get_property_reference_name(prop_schema)
  250.                 if ref_name is not None and ref_name not in found_references:
  251.                     found_references[prop_name] = ref_name
  252.         return found_references
  253.  
  254.     def _add_to_table(
  255.             self, table_name: str, obj: TeensyObject, obj_schemas: TeensySchema
  256.     ):
  257.         found_references = self._get_object_references_fields(obj_schemas)
  258.         for ref_title in list(set(list(found_references.values()))):
  259.             for ref_key, ref_table_title in found_references.items():
  260.                 if ref_table_title != ref_title or ref_key not in obj:
  261.                     continue
  262.                 refs_data_to_find = (
  263.                     obj[ref_key] if isinstance(obj[ref_key], list) else [obj[ref_key]]
  264.                 )
  265.  
  266.                 obj[ref_key] = []
  267.  
  268.                 for ref_data_to_find in refs_data_to_find:
  269.                     reference_in_db: list = self.find_all(
  270.                         ref_title, **ref_data_to_find, return_raw=True
  271.                     )
  272.                     if not reference_in_db:
  273.                         reference_obj_id = self._add_to_table(
  274.                             ref_title, ref_data_to_find, obj_schemas,
  275.                         )
  276.                         obj[ref_key].append(
  277.                             self._get_reference_name(ref_table_title, reference_obj_id)
  278.                         )
  279.                     else:
  280.                         obj[ref_key] = [
  281.                             self._get_reference_name(ref_table_title, found_ref["__id"])
  282.                             for found_ref in reference_in_db
  283.                         ]
  284.                 obj[ref_key] = (
  285.                     obj[ref_key] if len(obj[ref_key]) > 1 else obj[ref_key][0]
  286.                 )
  287.         return self._save_element(table_name, obj)
  288.  
  289.     def write(self, *obj: typing.Union[TeensyModel]):
  290.         # todo: мб все через __fields__
  291.         [
  292.             self._add_to_table(
  293.                 table_name=_obj.__class__.__name__,
  294.                 obj=_obj.dict(),
  295.                 obj_schemas=schema([_obj.__class__], ref_prefix="/"),
  296.             )
  297.             for _obj in obj
  298.         ]
RAW Paste Data