Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import re
- __all__ = ["Header", "IPParseError", "IPRange", "IPRecord", "Report", "ServerBlacklist"]
- class Header(object):
- def __init__(self, text, indent=" "):
- self.text = text
- self.indent = indent
- def _dump_to_lines(self):
- return ["//{}{}".format(self.indent, line) for line in self.text.splitlines()]
- class IPParseError(Exception):
- pass
- class IPRange(object):
- def __init__(self, text):
- self.text = text
- self._low_address, self._high_address = IPRange._parse_ip_range(text)
- @staticmethod
- def _parse_ip(ip):
- match = re.match(r"^(\d+)\.(\d+)\.(\d+)\.(\d+)$", ip)
- b1, b2, b3, b4 = map(int, match.groups())
- return (b1 << 24) + (b2 << 16) + (b3 << 8) + b4
- @staticmethod
- def _parse_ip_range(text):
- if re.match(r"^\d+\.\d+\.\d+\.\d+$", text) is not None:
- address = IPRange._parse_ip(text)
- return address, address
- subnet_match = re.match(r"^(\d+\.\d+\.\d+\.\d+)\s*/\s*(\d+)$", text)
- if subnet_match is not None:
- ip_string, subnet_string = subnet_match.groups()
- ip_address = IPRange._parse_ip(ip_string)
- subnet = int(subnet_string)
- erase_bits = 32 - subnet
- low_address = (ip_address >> erase_bits) << erase_bits
- high_address = low_address + (1 << erase_bits) - 1
- return low_address, high_address
- range_match = re.match(r"^(\d+\.\d+\.\d+\.\d+)\s*\-\s*(\d+\.\d+\.\d+\.\d+)$", text)
- if range_match is not None:
- low_string, high_string = range_match.groups()
- return IPRange._parse_ip(low_string), IPRange._parse_ip(high_string)
- raise IPParseError("{!r} is not a valid IP range".format(text))
- def matches(self, ip):
- normalized_ip = re.sub(r"\.x$", r".0", ip)
- address = self._parse_ip(normalized_ip)
- if ip == normalized_ip:
- return self._low_address <= address <= self._high_address
- else:
- return self._low_address <= (address + 255) and address <= self._high_address
- class IPRecord(object):
- def __init__(self, ip_range, comment=None, enabled=True):
- self.ip_range = ip_range
- self.comment = comment
- self.enabled = enabled
- def _dump_to_lines(self):
- line = self.ip_range.text
- if self.comment is not None:
- line = "{} // {}".format(line, self.comment)
- if not self.enabled:
- line = "// {}".format(line)
- return [line]
- class Report(object):
- def __init__(self, fields, ip_records):
- self.fields = fields
- self.ip_records = ip_records
- def _dump_field_to_lines(self, field):
- value = self.fields[field]
- return ["// {}: {}".format(field, line) for line in value.splitlines()]
- def _dump_to_lines(self):
- lines = []
- main_fields = ["Name", "Ban Date", "Ban Reason", "Proof", "Server", "Added by", "Extra"]
- for main_field in main_fields:
- if main_field in self.fields:
- lines.extend(self._dump_field_to_lines(main_field))
- for field in sorted(self.fields.keys()):
- if field not in main_fields:
- lines.extend(self._dump_field_to_lines(field))
- for ip_record in sorted(self.ip_records, key=lambda ip_record: ip_record.ip_range._low_address):
- lines.extend(ip_record._dump_to_lines())
- return lines
- class ServerBlacklistParser(object):
- def __init__(self, text):
- self.lines = text.splitlines()
- self.line_index = -1
- self.line_content = None
- self.line_comment = None
- self.line_enabled = None
- self.skip_line()
- @staticmethod
- def split_line(line):
- if "//" not in line:
- return line.strip(), None, True
- content, comment = line.split("//", 1)
- content = content.strip()
- if content != "":
- return content, comment, True
- if "//" in comment:
- content, nested_comment = comment.split("//", 1)
- else:
- content = comment
- nested_comment = None
- content = content.strip()
- try:
- IPRange(content)
- except IPParseError:
- return "", comment, True
- else:
- return content, nested_comment, False
- def skip_line(self):
- self.line_index += 1
- if self.at_eof():
- self.line_content = None
- self.line_comment = None
- self.line_enabled = None
- return
- line = self.lines[self.line_index]
- self.line_content, self.line_comment, self.line_enabled = ServerBlacklistParser.split_line(line)
- def at_eof(self):
- return self.line_index >= len(self.lines)
- def line_is_empty(self):
- return self.line_content == "" and self.line_comment is None
- def line_is_pure_comment(self):
- return self.line_content == "" and self.line_comment is not None
- def line_has_content(self):
- return self.line_content != "" and self.line_content is not None
- @staticmethod
- def comments_to_fields(comments):
- fields = {}
- for comment in comments:
- if ":" in comment:
- field, value = comment.split(":", 1)
- field = field.strip().title()
- value = value.strip()
- if field == "Date":
- field = "Ban Date"
- elif field == "Reason":
- field = "Ban Reason"
- elif field == "Evidence":
- field = "Proof"
- else:
- field = "Extra"
- value = comment.strip()
- if value == "":
- value = "(empty)"
- if field in fields:
- fields[field] = "{}\n{}".format(fields[field], value)
- else:
- fields[field] = value
- return fields
- def parse_item(self):
- while self.line_is_empty():
- self.skip_line()
- if self.at_eof():
- return
- pure_comments = []
- while self.line_is_pure_comment():
- pure_comments.append(self.line_comment)
- self.skip_line()
- if self.at_eof() or self.line_is_empty():
- if len(pure_comments) > 0:
- return Header("\n".join(pure_comments), indent="")
- else:
- return
- ip_records = []
- while self.line_has_content():
- try:
- ip_range = IPRange(self.line_content)
- except IPParseError as e:
- pass
- else:
- record_comment = None if self.line_comment is None else self.line_comment.strip()
- ip_records.append(IPRecord(ip_range, record_comment, self.line_enabled))
- self.skip_line()
- if len(ip_records) > 0:
- return Report(ServerBlacklistParser.comments_to_fields(pure_comments), ip_records)
- else:
- return
- def parse_items(self):
- items = []
- while not self.at_eof():
- item = self.parse_item()
- if item is not None:
- items.append(item)
- return items
- class ServerBlacklist(object):
- def __init__(self, items):
- self.items = items
- def dump_to_string(self):
- return "\n".join("\n".join(item._dump_to_lines()) + "\n" for item in self.items)
- @classmethod
- def load_from_string(cls, text):
- return cls(ServerBlacklistParser(text).parse_items())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement