Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import re
- import os
- import nltk
- import pymorphy2
- import numpy as np
- import tensorflow as tf
- from collections import Counter
- DESCRIPTION = [
- {
- 'input_type': 'bag_of_char_ngrams', 'n_min': 3, 'n_max': 3, 'n_tokens_per_language': 4000,
- 'zero_digit_flag': True, 'normalize_word_flag': False,
- 'authorized_char_re': r'[^а-яёЁa-z0\-$?.]', 'lower_case_flag': True
- }
- ]
- BATCH_SIZE = 1000
- LANGUAGE_PATH = '/tmp/export/language'
- MODEL_FILE_NAME = '/tmp/export/tensorflow/model'
- def list_directory(path):
- return sorted([os.path.join(path, file_name) for file_name in os.listdir(path)])
- class DenseSemanticLayers:
- @staticmethod
- def run(input_tensor):
- dropout00 = tf.layers.dropout(input_tensor[0], 0.2)
- dense0 = tf.layers.dense(dropout00, 100)
- dropout01 = tf.layers.dropout(dense0, 0.2)
- return tf.nn.l2_normalize(dropout01, 1)
- class SentencesToVectors:
- def __init__(
- self, description=DESCRIPTION, batch_size=BATCH_SIZE, language_path=LANGUAGE_PATH,
- model_file_name=MODEL_FILE_NAME
- ):
- self.components = {'semantic_layers': DenseSemanticLayers()}
- self.session = tf.InteractiveSession()
- self.components['string_to_numpy'] = StringToNumpy(
- description=description, language_path=language_path, batch_size=BATCH_SIZE
- )
- self.components['numpy_filer'] = NumpyFilter()
- self.components['numpy_to_placeholder'] = _NumpyToPlaceholder(description=description)
- self.placeholders = []
- self.values = []
- for entry in description:
- self.placeholders.append(tf.placeholder(
- dtype=tf.float32, shape=(batch_size, entry['n_tokens_per_language'])
- ))
- self.values.append(np.empty(
- dtype=np.float32, shape=(batch_size, entry['n_tokens_per_language'])
- ))
- with tf.variable_scope('question_semantics', reuse=tf.AUTO_REUSE):
- self.vectors = self.components['semantic_layers'].run(self.placeholders)
- tf.train.Saver(tf.global_variables()).restore(self.session, model_file_name)
- def run(self, sentences):
- ids = []
- for sentence_index, sentence in enumerate(sentences):
- arrays = self.components['string_to_numpy'].run(sentence)
- if not self.components['numpy_filer'].run(arrays):
- continue
- for i, value in enumerate(self.components['numpy_to_placeholder'].run(arrays)):
- self.values[i][len(ids)] = value
- ids.append(sentence_index)
- feed_dict = {placeholder: value for placeholder, value in zip(
- self.placeholders, self.values[:len(ids)]
- )}
- vectors = self.session.run(self.vectors, feed_dict=feed_dict)
- return ids, vectors
- class StringToNumpy:
- def __init__(self, description, language_path, batch_size):
- """
- :param description: [{'input_type': string, **params}, ...]
- """
- self.components = {}
- self.description = description
- for entry_index, entry in enumerate(self.description):
- self.components[entry_index] = _BagOfCharNgramsToNumpy(
- description=entry, batch_size=batch_size,
- path=os.path.join(language_path, str(entry_index))
- )
- def run(self, string):
- arrays = []
- for i in range(len(self.description)):
- arrays.append(self.components[i].run(string))
- return arrays
- class _BagOfCharNgramsToNumpy:
- def __init__(self, description, path, batch_size):
- """
- :param description: {'n_min': int, 'n_max': int, **string_processor_params}
- """
- self.components = {}
- self.description = description
- self.tokens = TokenRecordsToDict(path=path, batch_size=batch_size).run()
- self.components['string_to_tokens'] = StringToCharNgrams(
- description['n_min'], description['n_max']
- )
- self.components['string_processor'] = StringProcessor(
- zero_digit_flag=description['zero_digit_flag'],
- normalize_word_flag=description['normalize_word_flag'],
- authorized_char_re=description['authorized_char_re'],
- lower_case_flag=description['lower_case_flag']
- )
- def run(self, string):
- tokens = self.components['string_to_tokens'].run(
- self.components['string_processor'].run(string)
- )
- counter = Counter()
- for word_tokens in tokens:
- for token in word_tokens:
- index = self.tokens.get(token, None)
- if index is not None:
- counter[index] += 1
- if len(counter) == 0:
- array = np.zeros(shape=(0, 2), dtype=np.int64)
- else:
- array = np.asarray(list(counter.items()))
- return array[:, 0], array[:, 1]
- class WordToCharNgrams:
- def __init__(self, n_min, n_max):
- self.n_min = n_min
- self.n_max = n_max
- self.pad_symbol = '#'
- def run(self, word):
- return [''.join(ngram) for ngram in nltk.everygrams(
- word, self.n_min, self.n_max, pad_left=True, pad_right=True,
- left_pad_symbol='#', right_pad_symbol='#'
- )]
- class StringToWordNgrams:
- def __init__(self, n_min, n_max):
- self.n_min = n_min
- self.n_max = n_max
- def run(self, string):
- words = nltk.word_tokenize(string, language='russian')
- return [' '.join(ngram) for ngram in nltk.everygrams(words, self.n_min, self.n_max)]
- class StringToCharNgrams:
- def __init__(self, n_min, n_max):
- self.n_min = n_min
- self.n_max = n_max
- self.components = dict()
- self.components['String->WordNgrams'] = StringToWordNgrams(1, 1)
- self.components['Word->CharNgrams'] = WordToCharNgrams(n_min, n_max)
- def run(self, string):
- return [
- self.components['Word->CharNgrams'].run(word)
- for word in self.components['String->WordNgrams'].run(string)
- ]
- class StringProcessor:
- def __init__(self, zero_digit_flag, normalize_word_flag, authorized_char_re, lower_case_flag):
- super().__init__()
- self.zero_digit_flag = zero_digit_flag
- self.normalize_word_flag = normalize_word_flag
- self.lower_case_flag = lower_case_flag
- self.components = dict()
- self.components['string_to_word_ngrams'] = StringToWordNgrams(1, 1)
- self.authorized_char_compiled_re = re.compile(authorized_char_re)
- if self.zero_digit_flag:
- self.zero_digit_compiled_re = re.compile(r'[0-9]')
- if self.normalize_word_flag:
- self.analyzer = pymorphy2.MorphAnalyzer()
- def run(self, string):
- words = []
- for word in self.components['string_to_word_ngrams'].run(string):
- if self.lower_case_flag:
- word = word.lower()
- if self.zero_digit_flag:
- word = self.zero_digit_compiled_re.sub('0', word)
- if self.normalize_word_flag:
- word = self.analyzer.normal_forms(word)[0]
- word = self.authorized_char_compiled_re.sub('', word)
- if len(word) > 0:
- words.append(word)
- return ' '.join(words)
- class NumpyFilter:
- @staticmethod
- def run(arrays):
- value = 0
- for array_list in arrays:
- for array in array_list:
- value += np.sum(array)
- return value != 0
- class _NumpyToPlaceholder:
- def __init__(self, description):
- """
- :param description: [{'input_type': string, **params}, ...]
- """
- self.components = dict()
- self.description = description
- for entry_index, entry in enumerate(self.description):
- component = _BagOfCharNgramsNumpyToPlaceholder
- self.components[entry_index] = component(description=entry)
- def run(self, arrays):
- values = []
- for i in range(len(self.description)):
- values.append(self.components[i].run(arrays[i]))
- return values
- class _BagOfCharNgramsNumpyToPlaceholder:
- def __init__(self, description):
- self.description = description
- def run(self, arrays):
- value = np.zeros((self.description['n_tokens_per_language'],), dtype=np.float32)
- value[arrays[0]] = arrays[1]
- return value
- class TokenRecordsToDict:
- def __init__(self, path, batch_size):
- self.components = dict()
- self.path = path
- self.batch_size = batch_size
- self.components['record_to_values'] = TfRecordToDictValues()
- def run(self):
- tokens = {}
- for records in TfRecordIterator(batch_size=self.batch_size, path=self.path).run():
- for record in records:
- token, index = self.components['record_to_values'].run(
- description=(('token', 'bytes'), ('index', 'int')), record=record
- )
- tokens[token[0].decode()] = index[0]
- return tokens
- class TfRecordIterator:
- def __init__(self, path, batch_size):
- self.path = path
- self.batch_size = batch_size
- def run(self):
- records = []
- for file_name in list_directory(self.path):
- for record in tf.python_io.tf_record_iterator(file_name):
- records.append(record)
- if len(records) == self.batch_size:
- yield records
- records = []
- if len(records) > 0:
- yield records
- class TfRecordToDictValues:
- @staticmethod
- def run(description, record):
- values = []
- example = tf.train.Example()
- example.ParseFromString(record)
- for entry_index, entry in enumerate(description):
- feature = example.features.feature[entry[0]]
- dtype = entry[1]
- if dtype == 'int':
- value_list = feature.int64_list
- elif dtype == 'float':
- value_list = feature.float_list
- else: # dtype == 'bytes'
- value_list = feature.bytes_list
- values.append(value_list.value)
- return values
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement