mfgnik

Untitled

Apr 23rd, 2020
826
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.85 KB | None | 0 0
  1. import math
  2.  
  3. from .lib import Graph, operations
  4. from .lib.utils_functions import get_day_of_week, get_hour
  5.  
  6.  
  7. def word_count_graph(input_stream_name: str, text_column: str = 'text', count_column: str = 'count') -> Graph:
  8.     """Constructs graph which counts words in text_column of all rows passed"""
  9.     words_count_graph = \
  10.         Graph.graph_from_iter(input_stream_name) \
  11.         .map(operations.FilterPunctuation(text_column)) \
  12.         .map(operations.UnaryOperation(text_column, text_column, lambda x: x.lower())) \
  13.         .map(operations.Split(text_column)) \
  14.         .sort([text_column]) \
  15.         .reduce(operations.Count(count_column), [text_column]) \
  16.         .sort([count_column, text_column])
  17.  
  18.     return words_count_graph
  19.  
  20.  
  21. def inverted_index_graph(input_stream_name: str, doc_column: str = 'doc_id', text_column: str = 'text',
  22.                          result_column: str = 'tf_idf') -> Graph:
  23.     """Constructs graph which calculates td-idf for every word/document pair"""
  24.     count_column = 'docs_count'
  25.     idf_column = 'idf'
  26.     tf_column = 'tf'
  27.  
  28.     word_counter_graph = \
  29.         Graph.graph_from_iter(input_stream_name) \
  30.         .map(operations.FilterPunctuation(text_column)) \
  31.         .map(operations.UnaryOperation(text_column, text_column, lambda x: x.lower())) \
  32.         .map(operations.Split(text_column)) \
  33.         .sort([doc_column, text_column]) \
  34.         .reduce(operations.FirstReducer(), [doc_column, text_column])
  35.  
  36.     count_columns_graph = Graph.graph_from_iter(input_stream_name).reduce(operations.Count(count_column), [])
  37.  
  38.     word_counter_graph = \
  39.         word_counter_graph.join(operations.OuterJoiner(), count_columns_graph, []) \
  40.         .sort([text_column]) \
  41.         .reduce(operations.IdfCalculator(count_column), [text_column])
  42.  
  43.     idf_counter_graph = \
  44.         Graph.graph_from_iter(input_stream_name) \
  45.         .map(operations.FilterPunctuation(text_column)) \
  46.         .map(operations.UnaryOperation(text_column, text_column, lambda x: x.lower())) \
  47.         .map(operations.Split(text_column)) \
  48.         .sort([doc_column]) \
  49.         .reduce(operations.TermFrequency(text_column, tf_column), [doc_column]) \
  50.         .join(operations.LeftJoiner(), word_counter_graph, [text_column]) \
  51.         .sort([text_column]) \
  52.         .map(operations.BinaryOperation([tf_column, idf_column], result_column, lambda x, y: x * y)) \
  53.         .sort([text_column]) \
  54.         .reduce(operations.TopN(result_column, 3), [text_column])
  55.  
  56.     return idf_counter_graph
  57.  
  58.  
  59. def pmi_graph(input_stream_name: str, doc_column: str = 'doc_id', text_column: str = 'text',
  60.               result_column: str = 'pmi') -> Graph:
  61.     """Constructs graph which gives for every document the top 10 words ranked by pointwise mutual information"""
  62.     count_column = 'words_count'
  63.     tf_row_column = 'tf_row'
  64.     tf_document_column = 'tf_document'
  65.  
  66.     words_count_graph = \
  67.         Graph.graph_from_iter(input_stream_name) \
  68.         .map(operations.FilterPunctuation(text_column)) \
  69.         .map(operations.UnaryOperation(text_column, text_column, lambda x: x.lower())) \
  70.         .map(operations.Split(text_column)) \
  71.  
  72.     row_count_graph = \
  73.         words_count_graph.sort([text_column, doc_column]) \
  74.         .reduce(operations.Count(count_column), [text_column, doc_column])
  75.  
  76.     words_count_graph = \
  77.         words_count_graph.join(operations.LeftJoiner(), row_count_graph, [doc_column, text_column]) \
  78.         .map(operations.Filter(lambda row: row[count_column] >= 2 and len(row[text_column]) > 4)) \
  79.         .map(operations.Project([doc_column, text_column]))
  80.  
  81.     text_frequency_graph = \
  82.         words_count_graph.reduce(operations.TermFrequency(text_column), [])
  83.  
  84.     row_frequency_graph = \
  85.         words_count_graph.sort([doc_column]) \
  86.         .reduce(operations.TermFrequency(text_column), [doc_column]) \
  87.         .join(operations.InnerJoiner(
  88.             suffix_a=tf_row_column[2:],
  89.             suffix_b=tf_document_column[2:]),
  90.             text_frequency_graph,
  91.             [text_column]) \
  92.         .map(operations.BinaryOperation([tf_row_column, tf_document_column], result_column, lambda x, y: x / y)) \
  93.         .map(operations.UnaryOperation(result_column, result_column, math.log)) \
  94.         .reduce(operations.TopN(result_column, 10), []).sort([doc_column])
  95.  
  96.     return row_frequency_graph
  97.  
  98.  
  99. def yandex_maps_graph(input_stream_name_time: str, input_stream_name_length: str,
  100.                       enter_time_column: str = 'enter_time', leave_time_column: str = 'leave_time',
  101.                       edge_id_column: str = 'edge_id',
  102.                       start_coord_column: str = 'start', end_coord_column: str = 'end',
  103.                       weekday_result_column: str = 'weekday', hour_result_column: str = 'hour',
  104.                       speed_result_column: str = 'speed') -> Graph:
  105.     """Constructs graph which measures average speed in km/h depending on the weekday and hour"""
  106.     length_column = 'length'
  107.  
  108.     length_graph = \
  109.         Graph.graph_from_iter(input_stream_name_length) \
  110.         .map(operations.HaversineDistance(length_column, start_coord_column, end_coord_column))
  111.  
  112.     time_graph = \
  113.         Graph.graph_from_iter(input_stream_name_time) \
  114.         .map(operations.UnaryOperation(enter_time_column, weekday_result_column, get_day_of_week)) \
  115.         .map(operations.UnaryOperation(enter_time_column, hour_result_column, get_hour)) \
  116.         .sort([edge_id_column]) \
  117.         .join(operations.LeftJoiner(), length_graph, [edge_id_column]) \
  118.         .sort([weekday_result_column, hour_result_column]) \
  119.         .reduce(
  120.             operations.SpeedReducer(enter_time_column, leave_time_column, length_column, speed_result_column),
  121.             [weekday_result_column, hour_result_column]
  122.         ) \
  123.         .map(operations.Project([hour_result_column, speed_result_column, weekday_result_column]))
  124.  
  125.     return time_graph
Advertisement
Add Comment
Please, Sign In to add comment