Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # encoding: utf-8
- """
- Module for testing code on outlier detection.
- """
- from collections import defaultdict
- import math
- import sys
- import csv
- # set the start-of-sequence and end-of-sequence boundary
- # char for internal computation; this should ideally be
- # a character not used in the sequences themselves
- _BOUNDARY = '#'
- def build_model(seqs, context_len=3):
- """
- Build a sequence (word) model and return it in a data structure.
- Args:
- seqs (list): The list of sequences upon which the modelling
- is performed. Sequences can be either strings
- or lists/tuples (types can be mixed).
- context_len (int): The length of the largest context for
- which probabilities are collected
- (whenever possible). Defaults to 3.
- Returns:
- dict: a dictionary with the sequence model.
- """
- # initialize variables used for probability collection, as
- # follows:
- # - `counts_char` is a dictionary of dictionaries of integers
- # with all the observed context -> transition counts
- # - `counts_seqlen` is a dictionary of integers with all
- # the observed sequence lengths
- # - `logprobs_char` is a dictionary of dictionaries of floats
- # with the log-probabilities for all the observed
- # context -> transitions
- # - `logprobs_seqlen` is a dictionary of floats with
- # the log-probabilities for all the observed
- # context -> transitions
- counts_char = defaultdict(lambda: defaultdict(int))
- counts_seqlen = defaultdict(int)
- logprobs_char = {}
- logprobs_seqlen = {}
- # join lists/tuples into strings, if needed, and then
- # add start-of-sequence and end-of-sequence boundaries
- seqs = [''.join(seq) if isinstance(seq, (list, tuple)) else seq for seq in seqs]
- seqs = ['%s%s%s' % (_BOUNDARY, seq, _BOUNDARY) for seq in seqs]
- # iterate over all sequences
- for seq in seqs:
- # collect sequence length information (please note that
- # this intentionally includes sequences boundaries, so
- # that the minimum length should be 3); we also cache the
- # length of the current sequence, which will be used many times
- # in the loop
- len_seq = len(seq)
- counts_seqlen[len_seq] += 1
- # iterate over all context lengths up to the maximum one
- for clength in range(1, context_len+1):
- # collect all subsequences of the current sequence of
- # length `clength`, excluding the first one (it starts at
- # index one) as we are not interested in the
- # NULL -> START_BOUNDARY transition common to all sequences;
- # plese note that `context` will be an empty string for
- # context of length 1 (which is the intended behaviour)
- for idx in range(len_seq-clength+1):
- context = seq[idx:idx+clength-1]
- transition = seq[idx+clength-1]
- counts_char[context][transition] += 1
- # calculate the log-probability for all transitions in terms of their
- # context; we cache the denominator as a float to speed the computation
- for context in counts_char:
- total = float(sum(counts_char[context].values()))
- logprobs_char[context] = {context:math.log(count/total)
- for context, count
- in counts_char[context].items()}
- # calculate the log-probability for all sequences lengths; we cache the
- # denominator as a float to speed the computation
- total = float(sum(counts_seqlen.values()))
- logprobs_seqlen = {seqlen:math.log(count/total) for seqlen, count in counts_seqlen.items()}
- return {'char': logprobs_char, 'seqlen':logprobs_seqlen}
- def seq_logprob(seq, model, use_length_prob=True, length_normalize=True):
- """
- Calculate the log-probability of a sequence according to a model.
- Args:
- seq (str or list): The sequence whose probability will be calculated.
- model (dict): A sequence probability model, as returned by
- build_model().
- use_length_prob (bool): Whether to add a probability based on
- sequence length. Defaults to True.
- length_normalize (bool): Whether to normalize end probabilities
- dividing by sequence length. Defaults to True.
- Return:
- float : The log-probability of the sequence (or None, if the
- sequence is shorter than the sequence length context
- the model was built upon).
- """
- # join list/tuple into a string, if needed, and then
- # add start-of-sequence and end-of-sequence boundaries
- if isinstance(seq, (list, tuple)):
- seq = ''.join(seq)
- seq = '%s%s%s' % (_BOUNDARY, seq, _BOUNDARY)
- # extract the length of the largest context in the model, i.e.,
- # the parameter that was used with `build_model()`; the +1
- # is due to the need of including the transition
- context_len = max(map(len, model['char'])) + 1
- # collect transition probabilities, iterating over all context
- # length up to the maximum one; we also cache the current
- # sequence length to speed computation
- seq_prob = 0.0
- len_seq = len(seq)
- for clength in range(1, context_len+1):
- for idx in range(len_seq-clength+1):
- context = seq[idx:idx+clength-1]
- transition = seq[idx+clength-1]
- seq_prob += model['char'][context][transition]
- # add the probability for sequence length (including boundaries), if requested
- if use_length_prob:
- seq_prob += model['seqlen'][len_seq]
- # normalize probabilities based on sequence length (including boundaries), if requested
- if length_normalize:
- seq_prob /= len_seq
- return seq_prob
- def test():
- """
- Main test function.
- """
- # read data
- with open('/usr/share/dict/words') as handler:
- lines = handler.readlines()
- words = [line[:-1] for line in lines]
- # add obviously irregular entry
- words.append('+[@$@]xç{jk=~=qwertyuiop')
- words.append('mytest')
- words.append(['m', 'y', 't', 'e', 's', 't'])
- model = build_model(words)
- # collect probabilities and print them ordered
- probs = {}
- for word in words:
- logp = seq_logprob(word, model)
- if logp:
- if isinstance(word, list):
- probs[tuple(word)] = logp
- else:
- probs[word] = logp
- sorted_probs = sorted(probs.items(), key=lambda x: x[1])
- for entry in sorted_probs:
- print(entry)
- def run_check(filename, column, tsv):
- # set delimiter
- if tsv:
- delimiter = '\t'
- else:
- delimiter = ','
- # collect all entries
- seqs = []
- with open(filename) as csvfile:
- reader = csv.DictReader(csvfile, delimiter=delimiter)
- for row in reader:
- seqs.append(row[column])
- # build model
- model = build_model(seqs)
- # collect probabilities and print them ordered
- probs = {}
- for seq in seqs:
- logp = seq_logprob(seq, model)
- if logp:
- probs[seq] = logp
- else:
- probs[seq] = -99.999
- sorted_probs = sorted(probs.items(), key=lambda x: x[1])
- for entry in sorted_probs:
- print(entry)
- if __name__ == '__main__':
- # extremely simple argument handling
- import argparse
- parser = argparse.ArgumentParser('check data')
- parser.add_argument('-t', '--tsv',
- help='use tabs as delimiters',
- action='store_true')
- parser.add_argument('filename', metavar='filename', help='file to check')
- parser.add_argument('column', metavar='column', help='name of the column to check')
- arguments = parser.parse_args()
- run_check(arguments.filename, arguments.column, arguments.tsv)
- #test()
Add Comment
Please, Sign In to add comment