Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- from __future__ import unicode_literals
- import re
- class Lexeme(object):
- def __init__(self,
- lexeme_type,
- start=-1,
- end=-1):
- self.lexeme_type = lexeme_type
- self.start = start
- self.end = end
- def debug(self):
- return ('lexeme_type: {}'.format(self.lexeme_type)).encode('utf-8')
- class LexemeOperator(Lexeme):
- pass
- class LexemeError(Lexeme):
- pass
- class LexemeValue(Lexeme):
- def __init__(self,
- lexeme_type,
- text_before_asterisk='',
- start=-1,
- end=-1,
- has_asterisk=False,
- text_after_asterisk='',
- has_error=False):
- super(LexemeValue, self).__init__(lexeme_type, start, end)
- self.has_asterisk = has_asterisk
- self.text_before_asterisk = text_before_asterisk
- self.text_after_asterisk = text_after_asterisk
- self.has_error = has_error
- def debug(self):
- return ('lexeme_type: {}, '
- 'has_asterisk: {}, '
- 'text_before_asterisk: {}, '
- 'text_after_asterisk: {}, '
- 'has_error: {}'.format(self.lexeme_type,
- self.has_asterisk,
- self.text_before_asterisk,
- self.text_after_asterisk,
- self.has_error)).encode('utf-8')
- # noinspection RegExpRedundantEscape
- value_pattern = re.compile(r"[\w'\-:\*]+", re.UNICODE)
- def match_value(text, start):
- if start >= len(text):
- return False
- if text[start] == '"':
- end = text.find('"', start + 1)
- if end == -1:
- return LexemeError(lexeme_type='QUOTES_PARITY_ERROR',
- start=start,
- end=len(text))
- else:
- return LexemeValue(lexeme_type='QVALUE',
- start=start,
- end=end + 1, # value is without quotes, but lexeme start and end are with quotes
- text_before_asterisk=text[start + 1: end])
- value = value_pattern.match(text[start:])
- if value:
- return LexemeValue(lexeme_type='VALUE',
- start=start,
- end=start + len(value.group()),
- text_before_asterisk=value.group())
- return False
- def core_lexer(text):
- token_types = {'(': 'LPAREN',
- ')': 'RPAREN',
- '|': 'OR'}
- pos = 0
- while pos < len(text):
- whitespaces_beginning = re.match(r'[\s\&\,]+', text[pos:])
- if whitespaces_beginning:
- pos += len(whitespaces_beginning.group())
- first_symbol = text[pos]
- if first_symbol in '|()':
- yield LexemeOperator(lexeme_type=token_types[first_symbol],
- start=pos,
- end=pos + 1)
- pos += 1
- elif first_symbol == '-':
- if text[pos:].startswith('-('):
- yield LexemeOperator(lexeme_type='NOT',
- start=pos,
- end=pos + 1)
- yield LexemeOperator(lexeme_type='LPAREN',
- start=pos + 1,
- end=pos + 2)
- pos += 2
- else:
- value = match_value(text, pos + 1)
- if value:
- yield LexemeOperator(lexeme_type='NOT',
- start=pos,
- end=pos + 1)
- yield value
- pos = value.end
- else:
- yield LexemeError(lexeme_type='MINUS_ERROR',
- start=pos,
- end=pos + 1)
- pos += 1
- else:
- value = match_value(text, pos)
- if value:
- yield value
- pos = value.end
- else:
- yield LexemeError(lexeme_type='UNEXPECTED_SYMBOL',
- start=pos,
- end=pos + 1)
- pos += 1
- def find_asterisk(token):
- asterisks = re.findall(r'\*', token.text_before_asterisk)
- if not asterisks:
- return token
- elif len(asterisks) > 1 or len(token.text_before_asterisk) == 1:
- token.has_asterisk = True
- token.has_error = True
- return token
- else:
- value = token.text_before_asterisk
- asterisk_position = re.search(r'\*', value)
- token.has_asterisk = True
- token.text_before_asterisk = value[:asterisk_position.start()]
- token.text_after_asterisk = value[asterisk_position.start() + 1:]
- return token
- def parse_value(value_lexeme):
- return find_asterisk(value_lexeme)
- class Lexer(object):
- def __init__(self, text):
- self.core_lexer_generator = core_lexer(text)
- self.dup_count = 0
- self.buff = None
- self.lexer_errors = list()
- def __iter__(self):
- return self.core_lexer_generator
- def __next__(self):
- return self.next()
- def next(self):
- if self.dup_count > 0:
- self.dup_count -= 1
- return self.buff
- else:
- self.buff = next(self.core_lexer_generator)
- if isinstance(self.buff, LexemeValue):
- self.buff = parse_value(self.buff)
- if self.buff.has_error:
- self.lexer_errors.append(LexemeError(lexeme_type='MULTIASTERISKS', start=-1, end=-1))
- return self.next()
- elif isinstance(self.buff, LexemeError):
- self.lexer_errors.append(self.buff)
- return self.next()
- return self.buff
- def dup_lexeme(self):
- self.dup_count += 1
- class Parser(object):
- def parse(self, lexeme_generator):
- return parse_expression(lexeme_generator)
- class ParserError(object):
- def __init__(self, error_type):
- self.error_type = error_type
- def debug(self):
- return 'lexeme_type: {}'.format(self.error_type).encode('utf-8')
- class TreeNode(object):
- def __init__(self, value=None, children=None):
- if children:
- self.children = children
- else:
- self.children = list()
- self.value = value
- def debug(self):
- if self.children:
- return '{} -> {}'.format(self.value.lexeme_type, ','.join([x.value.lexeme_type for x in self.children]))
- else:
- return '{}'.format(self.value.text_before_asterisk + self.value.text_after_asterisk)
- def refine_tree_node(tree_node):
- if not tree_node.children:
- return None
- elif len(tree_node.children) == 1 and tree_node.value != 'NOT':
- return tree_node.children[0]
- else:
- return tree_node
- def parse_expression(lexeme_generator):
- current_node = TreeNode(value=LexemeOperator('OR', -1, -1))
- expression_errors = list()
- while True:
- subtree, alternative_errors = parse_alternative(lexeme_generator)
- if subtree:
- current_node.children.append(subtree)
- expression_errors += alternative_errors
- elif alternative_errors:
- expression_errors += alternative_errors
- else:
- try:
- lexeme = next(lexeme_generator)
- except StopIteration:
- return refine_tree_node(current_node), expression_errors
- if lexeme.lexeme_type != 'OR':
- lexeme_generator.dup_lexeme()
- return refine_tree_node(current_node), expression_errors
- def parse_alternative(lexeme_generator):
- current_node = TreeNode(value=LexemeOperator('AND', -1, -1))
- alternative_errors = list()
- while True:
- subtree, condition_errors = parse_condition(lexeme_generator)
- if subtree:
- current_node.children.append(subtree)
- alternative_errors += condition_errors
- elif condition_errors:
- alternative_errors += condition_errors
- else:
- return refine_tree_node(current_node), alternative_errors
- def parse_condition(lexeme_generator):
- try:
- lexeme = next(lexeme_generator)
- except StopIteration:
- return None, list()
- if lexeme.lexeme_type == 'NOT':
- subtree, condition_errors = parse_term(lexeme_generator)
- if subtree:
- if subtree.value.lexeme_type == 'NOT':
- current_node = subtree.children[0]
- else:
- current_node = TreeNode(value=lexeme, children=[subtree])
- else:
- current_node = None
- condition_errors.append(ParserError(error_type='EMPTY_NEGATIVE'))
- return current_node, condition_errors
- else:
- lexeme_generator.dup_lexeme()
- subtree, condition_errors = parse_term(lexeme_generator)
- if subtree:
- return subtree, condition_errors
- else:
- return None, condition_errors
- def parse_term(lexeme_generator):
- try:
- lexeme = next(lexeme_generator)
- except StopIteration:
- return None, list()
- if isinstance(lexeme, LexemeValue):
- current_node = TreeNode(value=lexeme)
- term_errors = []
- return current_node, term_errors
- elif lexeme.lexeme_type == 'LPAREN':
- subtree, term_errors = parse_expression(lexeme_generator)
- try:
- lexeme = next(lexeme_generator)
- except StopIteration:
- term_errors.append(ParserError(error_type='NO_RPAREN_AFTER_EXPRESSION'))
- return subtree, term_errors
- if lexeme.lexeme_type != 'RPAREN':
- lexeme_generator.dup_lexeme()
- elif not subtree:
- term_errors.append(ParserError(error_type='EMPTY_SUBTREE'))
- return subtree, term_errors
- else:
- lexeme_generator.dup_lexeme()
- return None, list()
- class QueryNode(object):
- pass
- class QueryMatch(QueryNode):
- def __init__(self, operation, field, value, quoted=False):
- self.operation = operation
- self.field = field
- self.value = value
- self.quoted = quoted
- def debug(self):
- return '[{} {} {} {}]'.format(self.operation, self.field, self.value, self.quoted)
- class QueryLogic(QueryNode):
- def __init__(self, logic_operation, children):
- self.logic_operation = logic_operation
- self.children = children
- def debug(self):
- return '[{} -> {}]'.format(self.logic_operation, [x.value if isinstance(x, QueryMatch) else x.logic_operation for x in self.children])
- def convert_node(tree_node, field_name='', tree_type=''):
- if tree_node.value.lexeme_type in ['AND', 'OR', 'NOT']:
- converted_children = list()
- for child in tree_node.children:
- converted_children.append(convert_node(child, field_name, tree_type))
- return QueryLogic(tree_node.value.lexeme_type, converted_children)
- elif tree_type == 'STRING':
- if tree_node.value.lexeme_type == 'QVALUE':
- quoted = True
- else:
- quoted = False
- if tree_node.value.has_asterisk:
- operation = 'LIKE'
- value = [tree_node.value.text_before_asterisk, tree_node.value.text_after_asterisk]
- else:
- operation = 'EQUAL'
- value = tree_node.value.text_before_asterisk
- return QueryMatch(operation, field_name, value, quoted)
- else:
- return QueryMatch('INCLUDE', field_name, tree_node.value.text_before_asterisk)
- def build_query_tree(tree_root, field_name, tree_type):
- return convert_node(tree_root, field_name, tree_type)
- QueryFieldsLG = {'lex': 'STRING', 'gramm': 'SET', 'sem': 'SET', 'flags': 'SET'}
- class UserQuery(object):
- pass
- class UserQueryLG(UserQuery):
- def __init__(self, word_queries_list):
- self.word_queries_list = word_queries_list
- self.query_inner_forest = list()
- self.repo_forest = list()
- def build_inner_forest(self):
- for word_query in self.word_queries_list:
- word_query_forest = dict()
- for field_name, field_query in word_query.items():
- if field_name not in QueryFieldsLG:
- word_query_forest[field_name] = field_query
- else:
- lexeme_generator = Lexer(field_query)
- root, errors = parse_expression(lexeme_generator)
- word_query_forest[field_name] = root
- self.query_inner_forest.append(word_query_forest)
- def debug_inner(self):
- inner_forest_representation = list()
- for word_query_forest in self.query_inner_forest:
- word_forest_representation = dict()
- for field_name, field_tree in word_query_forest.items():
- if field_name not in QueryFieldsLG:
- word_forest_representation[field_name] = field_tree
- else:
- nodes = list()
- level = [field_tree]
- while level:
- next_level = list()
- for node in level:
- nodes.append(node.debug())
- if node:
- next_level += node.children
- level = next_level
- word_forest_representation[field_name] = nodes
- inner_forest_representation.append(word_forest_representation)
- return inner_forest_representation
- def build_repo_forest(self):
- repo_forest = list()
- for word_query_forest in self.query_inner_forest:
- word_repo_forest = list()
- word_additional = dict()
- for field_name, field_tree in word_query_forest.items():
- if field_name not in QueryFieldsLG:
- word_additional[field_name] = field_tree
- else:
- query_tree = build_query_tree(field_tree, field_name, tree_type=QueryFieldsLG[field_name])
- word_repo_forest.append(query_tree)
- repo_forest.append({'query_tree': QueryLogic('AND', word_repo_forest), 'additional_params': word_additional})
- self.repo_forest = repo_forest
- return repo_forest
- def debug_repo(self):
- repo_forest_representation = list()
- for word_query_forest in self.repo_forest:
- nodes = list()
- level = [word_query_forest['query_tree']]
- while level:
- next_level = list()
- for node in level:
- nodes.append(node.debug())
- if node and isinstance(node, QueryLogic):
- next_level += node.children
- level = next_level
- repo_forest_representation.append(nodes)
- return repo_forest_representation
- if __name__ == "__main__":
- query = [{'lex': 'word word | word & word', 'gramm': 'NUM,gen,imper2'}, {'lex': 'word word | *word & word', 'sem': 'r:concr & t:space & top:contain'}]
- # query = [{'lex': 'NUM,gen,imper2'}]
- user_query_processor = UserQueryLG(query)
- user_query_processor.build_inner_forest()
- print(user_query_processor.debug_inner())
- user_query_processor.build_repo_forest()
- for i, forest in enumerate(user_query_processor.debug_repo()):
- print('word', i)
- print('\n'.join(forest))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement