Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # file name: model.py
- import pgConnector
- import math
- import dataParser
- import attribute
- import sys
- import prim
- from typing import TypeVar, Generic, List
- T = TypeVar('T') # Generics
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # NB
- class NaiveBayes(Generic[T]):
- # constructor
- def __init__(self, classes: List[T], db_connector: pgConnector.PgConnector, table_name: str, class_field:str = "class", excluded_fields:List[str] = []):
- self.classes = classes # List of all classes
- self.class_probs = {} # Dictionary whose elements are as such: [class_name: 0]
- self.attr_probs = {} # Dictionary containing each p(x_j | C_i), one of the main outputs of the model
- self.db_connector = db_connector # Object that creates the interface between the model and the database
- self.table_name = table_name # Name of the table where data is located
- self.excluded_fields = [class_field] + excluded_fields # Those fields will not be considered by the model in the training process
- self.attr_amt = {} # Dict containing amount of each attribute
- self.c_i_attr_amt = {} # Dict containing amount of each attribute classified as c_i on the training data
- self.table_len = self.db_connector.do_query("select count(*) from " + self.table_name)[0][0]
- for c_i in self.classes:
- self.class_probs[c_i] = 0
- def _calculate_class_probs(self):
- for c_i in self.classes:
- c_i_amt = self.db_connector.do_query("select count(*) from " + self.table_name + " where " + self.excluded_fields[0] + " = " + str(c_i))[0][0]
- self.class_probs[c_i] = c_i_amt/self.table_len
- if (self.class_probs[c_i] == 0):
- raise Exception("Bad training sample, no instances of class \"" + str(c_i) + "\" found!")
- # Using the Laplace Smoothing to estimate both probabilities
- def _smoothed_p_c_i_attr(self, c_i_attr_amt: int, attr_amt: int):
- pseudocount = 1
- amt_classes = len(self.attr_probs.keys())
- return (c_i_attr_amt + pseudocount)/(attr_amt + amt_classes * pseudocount)
- def _smoothed_p_attr(self, attr_amt: int, amt_rows: int):
- pseudocount = 1
- amt_classes = len(self.attr_probs.keys())
- return (attr_amt + pseudocount * amt_classes)/(amt_rows + pseudocount * amt_classes)
- # calculates the probability p(x_j | C_i) for a single attribute x_j and a single class C_i
- def _attr_given_c(self, attr: attribute.Attribute, c_i:T, is_training:bool):
- if (attr in self.attr_probs and c_i in self.attr_probs[attr]):
- return self.attr_probs[attr][c_i]
- if is_training and attr not in self.attr_probs:
- self.attr_probs[attr] = {}
- c_i_attr_amt = self.c_i_attr_amt[attr, c_i] if (attr, c_i) in self.c_i_attr_amt else 0
- attr_amt = self.attr_amt[attr] if attr in self.attr_amt else 0
- # p(c_i | x_j)
- p_c_i_attr = self._smoothed_p_c_i_attr(c_i_attr_amt, attr_amt)
- # p(x_j)
- p_attr = self._smoothed_p_attr(attr_amt, self.table_len)
- # calculating p(x_j | C_i) itself
- prob = (p_c_i_attr * p_attr)/self.class_probs[c_i]
- if is_training:
- self.attr_probs[attr][c_i] = prob
- return prob
- # Here, @self.table_name must be a table containing labeled data.
- # The labels must coincide in name with the ones in @self.class_probs
- def train(self):
- print("Naive Bayes Classifier training started...")
- self._calculate_class_probs()
- dp = dataParser.DataParser(self.db_connector)
- _, parsed_data = dp.parse_count(self.table_name, self.excluded_fields[0], self.excluded_fields)
- amt_attrs = 1
- for (attr, c_i, c_i_attr_amt, attr_amt) in parsed_data:
- self.attr_amt[attr] = attr_amt
- self.c_i_attr_amt[attr, c_i] = c_i_attr_amt
- self._attr_given_c(attr, c_i, True)
- print_str = "\r Currently working on attribute number " + str(amt_attrs)
- print_str += " of " + str(len(parsed_data))
- print(print_str, end='')
- sys.stdout.flush()
- amt_attrs+=1
- print("")
- print("Naive Bayes Classifier training finished!")
- def classify(self, obj:List[attribute.Attribute]):
- probs = {}
- for c_i in self.classes:
- probs[c_i] = self.class_probs[c_i]
- for attr in obj:
- probs[c_i] *= self._attr_given_c(attr, c_i, False)
- return probs
- # ---------------------------------------------------------------------------------------------------------
- # ---------------------------------------------------------------------------------------------------------
- # TAN
- class TreeAugmentedNB(NaiveBayes, Generic[T]):
- def __init__(self, classes: List[T], db_connector: pgConnector.PgConnector, table_name: str, class_field:str = "class", excluded_fields:List[str] = []):
- super().__init__(classes, db_connector, table_name, class_field, excluded_fields)
- self.temp_pairs = {} # Dict containing how pairs appear how many times, e.g. self.temp_pairs[n] = x means that x pairs appear n times
- self.pair_amt = {} # Dict containing the amount of each pair
- self.pair_ck_amt = {} # Dict containing the amount of each pair on each class
- self.mutInfo = {} # Dict containing each I(X; Y | Z)
- self.trees = {} # Dict containing the TAN for each class
- def __update_multual_info(self, pair_data: tuple[attribute.Attribute, attribute.Attribute, T, int, int], dict_attr_ck: dict):
- xi = pair_data[0]
- yj = pair_data[1]
- ck = pair_data[2]
- pair_ck_amt = pair_data[3]
- pair_amt = pair_data[4]
- xi_field = xi.field_name
- yj_field = yj.field_name
- xi_amt = dict_attr_ck[(xi, ck)][1] if (xi, ck) in dict_attr_ck.keys() else 0
- yj_amt = dict_attr_ck[(yj, ck)][1] if (yj, ck) in dict_attr_ck.keys() else 0
- if (xi_field, yj_field) not in self.mutInfo.keys():
- self.mutInfo[(yj_field, xi_field)] = {}
- self.mutInfo[(xi_field, yj_field)] = {}
- if ck not in self.mutInfo[(xi_field, yj_field)].keys():
- self.mutInfo[(xi_field, yj_field)][ck] = 0
- self.mutInfo[(yj_field, xi_field)][ck] = 0
- # P(x_i, y_j | c_k)
- prob_xi_yj_given_ck = (self._smoothed_p_c_i_attr(pair_ck_amt, pair_amt) * self._smoothed_p_attr(xi_amt, self.table_len) * self._smoothed_p_attr(yj_amt, self.table_len))/self.class_probs[ck]
- # P(x_i, y_j, c_k)
- p_x_y_z = prob_xi_yj_given_ck * self.class_probs[ck]
- # P(x_i | c_k)
- prob_xi_given_ck = self._attr_given_c(xi, ck, False)
- # P(y_j | c_k)
- prob_yj_given_ck = self._attr_given_c(yj, ck, False)
- self.mutInfo[(xi_field, yj_field)][ck] += p_x_y_z * math.log(prob_xi_yj_given_ck/(prob_xi_given_ck*prob_yj_given_ck))
- self.mutInfo[(yj_field, xi_field)][ck] += p_x_y_z * math.log(prob_xi_yj_given_ck/(prob_xi_given_ck*prob_yj_given_ck))
- def train(self):
- print("Tree Augmented Naive Bayes Classifier training started...")
- super()._calculate_class_probs()
- super().train()
- dp = dataParser.DataParser(self.db_connector)
- # (xi, yj, ck, #(xi, yj) that are ck, #(xi, yj))
- pair_parsed_data = dp.parse_count_tan(self.table_name, self.excluded_fields[0], self.excluded_fields)
- dict_attr_ck = dp.parse_count(self.table_name, self.excluded_fields[0], self.excluded_fields, True)[1]
- # Calculating Mutual infor between each pair of attributes
- for (i, pair) in enumerate(pair_parsed_data):
- xi = pair[0]
- yj = pair[1]
- ck = pair[2]
- ck_pair_amt = pair[3]
- pair_amt = pair[4]
- self.pair_amt[(xi, yj)] = pair_amt
- self.pair_amt[(yj, xi)] = pair_amt
- if (xi, yj) not in self.pair_ck_amt:
- self.pair_ck_amt[(xi, yj)] = {}
- self.pair_ck_amt[(yj, xi)] = {}
- self.pair_ck_amt[(xi, yj)][ck] = ck_pair_amt
- self.pair_ck_amt[(yj, xi)][ck] = ck_pair_amt
- self.__update_multual_info(pair, dict_attr_ck)
- print_str = "\r Currently working on pair #{}".format(i+1)
- print_str += " of " + str(len(pair_parsed_data))
- print(print_str, end='')
- sys.stdout.flush()
- print("")
- # Building a maximum weight tree for each class
- nodes = list(set([key for sublist in self.mutInfo.keys() for key in sublist]))
- weights_by_class = {}
- for c in self.classes:
- weights_by_class[c] = {}
- for key in self.mutInfo.keys():
- weights_by_class[c][key] = self.mutInfo[key][c]
- for c in self.classes:
- self.trees[c] = prim.MaxTree(nodes, weights_by_class[c])
- print("Tree Augmented Naive Bayes Classifier training finished!\n")
- def classify(self, obj: List[attribute.Attribute]):
- probs = {}
- for c_k in self.classes:
- probs[c_k] = self.class_probs[c_k]
- for attr in obj:
- if attr.field_name == self.trees[c_k].tree_root:
- probs[c_k] *= self._attr_given_c(attr, c_k, False)
- else:
- parent = self.trees[c_k].attr_parent_of(attr, obj)
- attr_amt = self.attr_amt[attr] if attr in self.attr_amt else 0
- parent_amt = self.attr_amt[parent] if parent in self.attr_amt else 0
- pair_amt = self.pair_amt[(attr, parent)] if (attr, parent) in self.pair_amt else 0
- pair_ck_amt = self.pair_ck_amt[(attr, parent)][c_k] if (attr, parent) in self.pair_ck_amt and c_k in self.pair_ck_amt[(attr, parent)] else 0
- if pair_amt not in self.temp_pairs:
- self.temp_pairs[pair_amt] = 0
- self.temp_pairs[pair_amt] += 1
- prob_c_given_pair = self._smoothed_p_c_i_attr(pair_ck_amt, pair_amt)
- prob_pair = self._smoothed_p_attr(attr_amt, self.table_len) * self._smoothed_p_attr(parent_amt, self.table_len)
- numerator = prob_c_given_pair * prob_pair
- denominator = self.class_probs[c_k] * self._attr_given_c(parent, c_k, False)
- probs[c_k] *= numerator/denominator
- return probs
- # returns the NB output for the given obj
- def classify_super(self, obj: List[attribute.Attribute]):
- return super().classify(obj)
- def print_temp_pairs(self):
- print(self.temp_pairs)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement