Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # var s='выравнять разделитель экрана по этой стрелке -->';prompt('Ctrl+C:',`#${' '.repeat(79-s.length)+s}\n`)
- # выравнять разделитель экрана по этой стрелке -->
- class ValidationError(Exception):
- pass
- import re
- from abc import ABC, abstractmethod
- def not_none(f):
- def decorator(*args, **kw):
- if args[2] is not None:
- f(*args, **kw)
- return decorator
- class BaseValidator(ABC):
- @abstractmethod
- def validate(self, field, value):
- pass
- class RequiredValidator(BaseValidator):
- def validate(self, field, value):
- if value is None and field.required and not field.nullable:
- raise ValidationError('required')
- class RangeValidator(BaseValidator):
- @not_none
- def validate(self, field, value):
- if field.min is not None and value < field.min:
- raise ValidationError(
- 'must be greater than or equal to {}'.format(field.min)
- )
- if field.max is not None and value > field.max:
- raise ValidationError(
- 'must be less than or equal to {}'.format(field.max)
- )
- class LengthValidator(BaseValidator):
- @not_none
- def validate(self, field, value):
- length = len(value)
- if field.minlength is not None and length < field.minlength:
- raise ValidationError(
- 'must have at least {} character(s)'.format(field.minlength)
- )
- if field.maxlength is not None and length > field.maxlength:
- raise ValidationError(
- 'must have no more than {} character(s)'.format(field.maxlength)
- )
- class RegexValidator(BaseValidator):
- @not_none
- def validate(self, field, value):
- regex = field.regex
- if regex is not None:
- if not isinstance(regex, re._pattern_type):
- regex = re.compile(regex)
- if not regex.match(value):
- raise ValidationError(
- 'does not match pattern {!r}'.format(regex.pattern)
- )
- class EnumValidator(BaseValidator):
- @not_none
- def validate(self, field, value):
- if value not in field.enum:
- raise ValidationError('must be one of {}'.format(field.enum))
- class IterableValidator(BaseValidator):
- @not_none
- def validate(self, field, value):
- for item in value:
- field.base_field.validate(item)
- class MetaField(type):
- # Наследуем валидаторы
- def __new__(mcls, name, bases, attrs):
- cls = super().__new__(mcls, name, bases, attrs)
- parents = [b for b in bases if isinstance(b, mcls)]
- if not parents:
- return cls
- validators = []
- for p in reversed(parents):
- validators += getattr(p, 'validators', [])
- cls.validators = validators + cls.validators
- return cls
- import inspect
- # TODO: добавить отслеживание изменений
- class BaseField(metaclass=MetaField):
- validators = [RequiredValidator]
- def __init__(
- self,
- name=None,
- default=None,
- required=False,
- # https://docs.mongodb.com/manual/indexes/#index-types
- index_type=None,
- sparse=False,
- unique=False,
- # list field names
- unique_with=None,
- nullable=False
- ):
- self.name = name
- self.default = default
- self.required = required
- self.index_type = index_type
- self.sparse = sparse
- self.unique = unique
- self.unique_with = unique_with
- self.nullable = nullable
- @property
- def name(self):
- return self._name
- @name.setter
- def name(self, value):
- if value is not None:
- if not isinstance(value, str):
- raise TypeError('must be a string')
- if re.search(r'[$.\x00]', value):
- raise TypeError('name contains illegal characters')
- self._name = value
- # __set, __get__, __delete__ срабатывают только, если объявлены как свойства
- # класса
- def __set__(self, instance, value):
- # В MySQL если мы попытаемся при update установить значение NULL, то будет
- # использовано дефолтное значение. Тут такая же логика.
- # Дефолтная функция так же может None возвратить.
- value = self.get_default() if value is None else value
- if value is None:
- if not self.nullable:
- # Удаляем поле, если ему присваивается None.
- instance.delete_data(self.name)
- return
- else:
- value = self.prepare_value(value)
- instance.set_data(self.name, value)
- def __get__(self, instance, owner):
- if instance is None:
- return self
- return instance.get_data(self.name)
- def __delete__(self, instance):
- raise TypeError('cannot delete dependency')
- def prepare_value(self, value):
- return value
- def has_default(self):
- return self.default is not None
- def get_default(self):
- return self.default() if callable(self.default) else self.default
- def validate(self, value):
- for validator in self.validators:
- if inspect.isclass(validator):
- validator = validator()
- if not isinstance(validator, BaseValidator):
- raise ValueError()
- validator.validate(self, value)
- def to_mongo(self, value):
- """ Используется перед вставкой/обновлением. """
- return value
- def to_json(self, value):
- """ В нашем API данные выводятся в JSON. """
- return value
- import bson
- import dateutil
- import datetime
- import uuid
- class ObjectIdField(BaseField):
- def prepare_value(self, value):
- return value if isinstance(value, bson.ObjectId) else bson.ObjectId(value)
- def to_json(self, value):
- return str(value)
- class UUIDField(BaseField):
- def prepare_value(self, value):
- return value if isinstance(value, uuid.UUID) else uuid.UUID(value)
- def to_json(self, value):
- return str(value)
- class DateTimeField(BaseField):
- def prepare_value(self, value):
- if isinstance(value, datetime.datetime):
- return value
- if isinstance(value, datetime.date):
- return datetime.datetime(value.year, value.month, value.day)
- if isinstance(value, (float, int)):
- return datetime.datetime.utcfromtimestamp(value)
- if isinstance(value, str):
- # https://stackoverflow.com/questions/127803/how-to-parse-an-iso-8601-formatted-date
- return dateutil.parser.parse(value)
- raise ValueError(value)
- def to_json(self, value):
- return value.isoformat()
- class BooleanField(BaseField):
- def prepare_value(self, value):
- return bool(value)
- class FloatField(BaseField):
- validators = [RangeValidator]
- def __init__(
- self,
- name=None,
- default=None,
- required=False,
- min=None,
- max=None,
- index_type=None,
- sparse=False,
- unique=False,
- unique_with=None,
- nullable=False
- ):
- super().__init__(
- name,
- default,
- required,
- index_type,
- sparse,
- unique,
- unique_with,
- nullable
- )
- self.min = min
- self.max = max
- def prepare_value(self, value):
- return float(value)
- class IntField(FloatField):
- def prepare_value(self, value):
- return int(value)
- class StringField(BaseField):
- validators = [LengthValidator, RegexValidator]
- def __init__(
- self,
- name=None,
- default=None,
- required=False,
- minlength=None,
- maxlength=None,
- regex=None,
- index_type=None,
- sparse=False,
- unique=False,
- unique_with=None,
- nullable=False
- ):
- super().__init__(
- name,
- default,
- required,
- index_type,
- sparse,
- unique,
- unique_with,
- nullable
- )
- self.minlength = minlength
- self.maxlength = maxlength
- self.regex = regex
- def prepare_value(self, value):
- return str(value)
- class BytesField(StringField):
- def prepare_value(self, value):
- return bytes(value)
- class EnumField(BaseField):
- validators = [EnumValidator]
- def __init__(
- self,
- enum,
- name=None,
- default=None,
- required=False,
- index_type=None,
- sparse=False,
- unique=False,
- unique_with=None,
- nullable=False
- ):
- super().__init__(
- name,
- default,
- required,
- index_type,
- sparse,
- unique,
- unique_with,
- nullable
- )
- self.enum = enum
- class ListField(BaseField):
- validators = [LengthValidator, IterableValidator]
- def __init__(
- self,
- base_field,
- name=None,
- default=None,
- required=False,
- minlength=None,
- maxlength=None,
- index_type=None,
- sparse=False,
- unique=False,
- unique_with=None,
- nullable=False
- ):
- super().__init__(
- name,
- default,
- required,
- index_type,
- sparse,
- unique,
- unique_with,
- nullable
- )
- self.base_field = base_field
- self.minlength = minlength
- self.maxlength = maxlength
- def prepare_value(self, value):
- return List(self.base_field, value)
- # https://docs.python.org/3/tutorial/datastructures.html#more-on-lists
- class List(list):
- def __init__(self, base_field, *args, **kw):
- self.base_field = base_field
- L = list(*args, **kw)
- self.extend(L)
- def prepare_value(self, v):
- return self.base_field.prepare_value(v)
- def append(self, v):
- v = self.prepare_value(v)
- super().append(v)
- # mark as changed
- def insert(self, i, v):
- v = self.prepare_value(v)
- super().insert(i, v)
- # mark as changed
- def extend(self, L):
- if L:
- super().extend([self.prepare_value(v) for v in L])
- # mark as changed
- def pop(self, *args, **kw):
- super().pop(*args, **kw)
- # mark as changed
- def sort(self, *args, **kw):
- super().sort(*args, **kw)
- # mark as changed
- def __setitem__(self, i, v):
- v = self.prepare_value(v)
- super().__setitem__(i, v)
- # mark as changed
- def __delitem__(self, i):
- super().__delitem__(i)
- # mark as changed
- import re
- def snake_case(s):
- s = re.sub(r'([A-Z]+)([A-Z])', r'\1_\2', s)
- s = re.sub(r'([a-z])([A-Z])', r'\1_\2', s)
- return s.lower()
- class Meta:
- indexes = []
- collection = None
- # Что еще нужно?
- class MetaDocument(type):
- def __new__(mcls, name, bases, attrs):
- cls = super().__new__(mcls, name, bases, attrs)
- fields = {}
- for k, v in attrs.items():
- if isinstance(v, BaseField):
- if v.name is None:
- v.name = k
- fields[v.name] = v
- cls._fields = fields
- if cls.is_toplevel:
- # Наследуем индексы
- if hasattr(cls, 'Meta'):
- class _Meta(cls.Meta, Meta):
- pass
- meta = _Meta()
- else:
- meta = Meta()
- for base in reversed(bases):
- if isinstance(base, mcls) and base.is_toplevel:
- meta.indexes = base._meta.indexes + meta.indexes
- if meta.collection is None:
- meta.collection = snake_case(cls.__name__)
- cls._meta = meta
- return cls
- # TODO: добавить поддержку вложенных документов
- class EmbeddedDocument(metaclass=MetaDocument):
- _toplevel = False
- def __init__(self, *args, **kw):
- data = dict(*args, **kw)
- self._data = {}
- for key in self._fields.keys():
- setattr(self, key, data.get(key))
- @property
- def is_toplevel(self):
- return self._toplevel
- def get_field(self, name):
- return self._fields.get(name)
- def get_data(self, name):
- return self._data.get(name)
- def set_data(self, name, value):
- self._data[name] = value
- def delete_data(self, name):
- self._data.pop(name, 0)
- def clean(self):
- pass
- def validate(self):
- self.clean()
- errors = []
- for name, field in self._fields.items():
- try:
- field.validate(self.get_data(name))
- except ValidationError as e:
- errors.append('{}: {}'.format(name, str(e)))
- if errors:
- raise ValidationError(
- '{} validation failed: {}'.format(
- self.__class__.__qualname__, '; '.join(errors)
- )
- )
- def to_mongo(self):
- ret = {}
- for name, value in self._data.items():
- field = self.get_field(name)
- ret[name] = field.to_mongo(value)
- return ret
- def to_json(self):
- ret = {}
- for name, value in self._data.items():
- field = self.get_field(name)
- ret[name] = field.to_json(value)
- return rett
- class Document(EmbeddedDocument):
- _toplevel = True
- _id = ObjectIdField()
- # Тут методы для работы с базой
- class Foo(Document):
- class Meta:
- indexes = ['foo']
- class Bar(Foo):
- class Meta:
- indexes = ['bar']
- class Baz(Bar):
- class Meta:
- indexes = ['baz']
- print(Baz._meta.indexes)
- assert(Baz._meta.indexes == ['foo', 'bar', 'baz'])
- class User(Document):
- username = StringField(regex=r'\w{3,10}$', required=True)
- password = StringField(minlength=6, maxlength=255, required=False)
- registration = DateTimeField(default=datetime.datetime.utcnow, required=True)
- skills = ListField(StringField(regex=r'\w{1,31}$'), minlength=3, maxlength=10, required=True)
- user = User(username='tester', skills=['foo', 'bar', 'baz'])
- user.validate()
- print(user.to_json())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement