Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import math
- from .lib import Graph, operations
- from .lib.utils_functions import get_day_of_week, get_hour
- def word_count_graph(input_stream_name: str, text_column: str = 'text', count_column: str = 'count') -> Graph:
- """Constructs graph which counts words in text_column of all rows passed"""
- words_count_graph = \
- Graph.graph_from_iter(input_stream_name) \
- .map(operations.FilterPunctuation(text_column)) \
- .map(operations.UnaryOperation(text_column, text_column, lambda x: x.lower())) \
- .map(operations.Split(text_column)) \
- .sort([text_column]) \
- .reduce(operations.Count(count_column), [text_column]) \
- .sort([count_column, text_column])
- return words_count_graph
- def inverted_index_graph(input_stream_name: str, doc_column: str = 'doc_id', text_column: str = 'text',
- result_column: str = 'tf_idf') -> Graph:
- """Constructs graph which calculates td-idf for every word/document pair"""
- count_column = 'docs_count'
- idf_column = 'idf'
- tf_column = 'tf'
- word_counter_graph = \
- Graph.graph_from_iter(input_stream_name) \
- .map(operations.FilterPunctuation(text_column)) \
- .map(operations.UnaryOperation(text_column, text_column, lambda x: x.lower())) \
- .map(operations.Split(text_column)) \
- .sort([doc_column, text_column]) \
- .reduce(operations.FirstReducer(), [doc_column, text_column])
- count_columns_graph = Graph.graph_from_iter(input_stream_name).reduce(operations.Count(count_column), [])
- word_counter_graph = \
- word_counter_graph.join(operations.OuterJoiner(), count_columns_graph, []) \
- .sort([text_column]) \
- .reduce(operations.IdfCalculator(count_column), [text_column])
- idf_counter_graph = \
- Graph.graph_from_iter(input_stream_name) \
- .map(operations.FilterPunctuation(text_column)) \
- .map(operations.UnaryOperation(text_column, text_column, lambda x: x.lower())) \
- .map(operations.Split(text_column)) \
- .sort([doc_column]) \
- .reduce(operations.TermFrequency(text_column, tf_column), [doc_column]) \
- .join(operations.LeftJoiner(), word_counter_graph, [text_column]) \
- .sort([text_column]) \
- .map(operations.BinaryOperation([tf_column, idf_column], result_column, lambda x, y: x * y)) \
- .sort([text_column]) \
- .reduce(operations.TopN(result_column, 3), [text_column])
- return idf_counter_graph
- def pmi_graph(input_stream_name: str, doc_column: str = 'doc_id', text_column: str = 'text',
- result_column: str = 'pmi') -> Graph:
- """Constructs graph which gives for every document the top 10 words ranked by pointwise mutual information"""
- count_column = 'words_count'
- tf_row_column = 'tf_row'
- tf_document_column = 'tf_document'
- words_count_graph = \
- Graph.graph_from_iter(input_stream_name) \
- .map(operations.FilterPunctuation(text_column)) \
- .map(operations.UnaryOperation(text_column, text_column, lambda x: x.lower())) \
- .map(operations.Split(text_column)) \
- row_count_graph = \
- words_count_graph.sort([text_column, doc_column]) \
- .reduce(operations.Count(count_column), [text_column, doc_column])
- words_count_graph = \
- words_count_graph.join(operations.LeftJoiner(), row_count_graph, [doc_column, text_column]) \
- .map(operations.Filter(lambda row: row[count_column] >= 2 and len(row[text_column]) > 4)) \
- .map(operations.Project([doc_column, text_column]))
- text_frequency_graph = \
- words_count_graph.reduce(operations.TermFrequency(text_column), [])
- row_frequency_graph = \
- words_count_graph.sort([doc_column]) \
- .reduce(operations.TermFrequency(text_column), [doc_column]) \
- .join(operations.InnerJoiner(
- suffix_a=tf_row_column[2:],
- suffix_b=tf_document_column[2:]),
- text_frequency_graph,
- [text_column]) \
- .map(operations.BinaryOperation([tf_row_column, tf_document_column], result_column, lambda x, y: x / y)) \
- .map(operations.UnaryOperation(result_column, result_column, math.log)) \
- .reduce(operations.TopN(result_column, 10), []).sort([doc_column])
- return row_frequency_graph
- def yandex_maps_graph(input_stream_name_time: str, input_stream_name_length: str,
- enter_time_column: str = 'enter_time', leave_time_column: str = 'leave_time',
- edge_id_column: str = 'edge_id',
- start_coord_column: str = 'start', end_coord_column: str = 'end',
- weekday_result_column: str = 'weekday', hour_result_column: str = 'hour',
- speed_result_column: str = 'speed') -> Graph:
- """Constructs graph which measures average speed in km/h depending on the weekday and hour"""
- length_column = 'length'
- length_graph = \
- Graph.graph_from_iter(input_stream_name_length) \
- .map(operations.HaversineDistance(length_column, start_coord_column, end_coord_column))
- time_graph = \
- Graph.graph_from_iter(input_stream_name_time) \
- .map(operations.UnaryOperation(enter_time_column, weekday_result_column, get_day_of_week)) \
- .map(operations.UnaryOperation(enter_time_column, hour_result_column, get_hour)) \
- .sort([edge_id_column]) \
- .join(operations.LeftJoiner(), length_graph, [edge_id_column]) \
- .sort([weekday_result_column, hour_result_column]) \
- .reduce(
- operations.SpeedReducer(enter_time_column, leave_time_column, length_column, speed_result_column),
- [weekday_result_column, hour_result_column]
- ) \
- .map(operations.Project([hour_result_column, speed_result_column, weekday_result_column]))
- return time_graph
Advertisement
Add Comment
Please, Sign In to add comment