Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Backprop on the Seeds self.dataset
- from random import seed
- from random import randrange
- from random import random
- from csv import reader
- from math import exp
- import csv
- from pandas import DataFrame
- import pandas as pd
- import pickle
- import collections
- import numpy as np
- import math
- class BP:
- def __init__(self,n_inputs, n_outputs, n_neurons, n_folds, l_rate, n_epoch, n_of_network,activation_function="sigmoid", optimizer='none', filename="dataset_", derivated=0, parents=[], bias=0):
- self.n_inputs = n_inputs
- self.n_outputs = n_outputs
- self.n_neurons = n_neurons #is a list where each element rappresent the number of neurons in each layer of the nn
- self.n_folds = n_folds
- self.l_rate = l_rate
- self.n_epoch = n_epoch
- self.n_hidden_layers = len(self.n_neurons)
- #for other networks select the dataset file correct
- if n_of_network != 0:
- self.filename = filename
- else :
- self.filename = filename
- self.n_gains = list()
- self.n_gains.append(n_inputs)
- self.network_output = list()
- self.n_of_network=n_of_network
- self.accurancy = 0
- self.derivated = derivated #indicates if a nn has others nn as input
- self.parents = parents #list of nns which output is part of the input of the new nn
- self.bias = bias #add a bias
- self.a = 0
- #this for is used for create a nn with a desired number of neurons in each layer
- for i in range(len(self.n_neurons)):
- self.n_gains.append(self.n_neurons[i])
- self.activation_function = activation_function #activation functions "relu" or "sigmoid"
- #used in adam optimization algorithm
- self.optimizer = optimizer
- self.alpha = l_rate
- self.beta_1 = 0.9
- self.beta_2 = 0.999 # initialize the values of the parameters
- self.epsilon = 1e-8
- self.m_t = 0
- self.v_t = 0
- self.t = 0
- # Load a CSV file
- def load_csv(self, filename):
- self.dataset = list()
- with open(filename, 'r') as file:
- csv_reader = reader(file)
- for row in csv_reader:
- if not row:
- continue
- self.dataset.append(row)
- #add bias
- if self.bias !=0:
- temp= self.dataset[-1][-1]
- self.dataset[-1][-1] = '1'
- self.dataset[-1].extend('0')
- self.dataset[-1][-1]= temp
- return self.dataset
- def load_derivated_csv(self, filename):
- self.dataset = list()
- j = 0
- print (filename)
- with open(filename, 'r') as file:
- csv_reader = reader(file)
- for row in csv_reader:
- if not row:
- continue
- self.dataset.append(row[:-1])
- for i in range(len(self.parents)):
- self.dataset[-1].extend([str(h) for h in self.parents[i].network_output[j]])
- j = j+1
- #add bias
- if self.bias !=0:
- self.dataset[-1].extend(['1'])
- self.dataset[-1].extend([row[-1]])
- #self.dataset[-1] = [str(i) for i in self.dataset]
- return self.dataset
- # Convert string column to float
- def str_column_to_float(self, dataset, column):
- for row in dataset:
- row[column] = float(row[column].strip())
- # Convert string column to integer
- def str_column_to_int(self, dataset, column):
- class_values = [row[column] for row in dataset]
- unique = set(class_values)
- lookup = dict()
- for i, value in enumerate(unique):
- lookup[value] = i
- for row in dataset:
- row[column] = lookup[row[column]]
- return lookup
- # Find the min and max values for each column
- def dataset_minmax(self, dataset):
- minmax = list()
- stats = [[min(column), max(column)] for column in zip(*dataset)]
- return stats
- # Rescale self.dataset columns to the range 0-1
- def normalize_dataset(self, dataset, minmax):
- for row in dataset:
- for i in range(len(row) - 1):
- if (minmax[i][1] - minmax[i][0]) != 0:
- row[i] = (row[i] - minmax[i][0]) / (minmax[i][1] - minmax[i][0])
- # Split a self.dataset into k folds
- def cross_validation_split(self, dataset, n_folds):
- dataset_split = list()
- dataset_copy = list(dataset)
- fold_size = int(len(dataset) / n_folds)
- for i in range(n_folds):
- fold = list()
- while len(fold) < fold_size:
- index = randrange(len(dataset_copy))
- fold.append(dataset_copy.pop(index))
- dataset_split.append(fold)
- return dataset_split
- # Calculate accuracy percentage
- def accuracy_metric(self,actual, predicted):
- correct = 0
- for i in range(len(actual)):
- if actual[i] == predicted[i]:
- correct += 1
- return correct / float(len(actual)) * 100.0
- # Evaluate an algorithm using a cross validation split
- def evaluate_algorithm(self, dataset, algorithm, n_folds, *args):
- folds = self.cross_validation_split(dataset, n_folds)
- scores = list()
- for fold in folds:
- train_set = list(folds)
- train_set.remove(fold)
- train_set = sum(train_set, [])
- test_set = list()
- for row in fold:
- row_copy = list(row)
- test_set.append(row_copy)
- row_copy[-1] = None
- predicted = algorithm(train_set, test_set, *args)
- actual = [row[-1] for row in fold]
- accuracy = self.accuracy_metric(actual, predicted)
- scores.append(accuracy)
- return scores
- # Calculate neuron activation for an input
- def activate(self,weights, inputs):
- activation = weights[-1]
- for i in range(len(weights) - 1):
- activation += weights[i] * inputs[i]
- return activation
- # Transfer neuron activation
- def transfer(self,activation):
- if self.activation_function == 'sigmoid':
- return 1.0 / (1.0 + exp(-activation))
- elif self.activation_function == 'relu':
- if activation>=0:
- return activation
- else:
- return 0.1*activation #leaky relu
- # Forward propagate input to a network output
- def forward_propagate(self,network, row):
- inputs = row
- for layer in network:
- new_inputs = []
- for neuron in layer:
- activation = self.activate(neuron['weights'], inputs)
- neuron['output'] = self.transfer(activation)
- new_inputs.append(neuron['output'])
- neuron['activation']= activation #added so that can be use in relu activation function
- inputs = new_inputs
- return inputs
- # Calculate the derivative of an neuron output
- def transfer_derivative(self,output):
- if self.activation_function == 'sigmoid':
- return output * (1.0 - output)
- elif self.activation_function == 'relu':
- if output >=0:
- return 1
- else:
- return 0.1 #leaky relu
- # Backpropagate error and store in neurons
- def backward_propagate_error(self,network, expected):
- for i in reversed(range(len(network))):
- layer = network[i]
- errors = list()
- #for all layers apart the last one
- if i != len(network) - 1:
- for j in range(len(layer)):
- error = 0.0
- for neuron in network[i + 1]:
- error += (neuron['weights'][j] * neuron['delta'])
- errors.append(error)
- #for the last layer
- else:
- for j in range(len(layer)):
- neuron = layer[j]
- errors.append(expected[j] - neuron['output'])
- #in the previous it's just calculated the delta*weights and now is processed the same operation for all layers that is multiply what we got for the derivative
- for j in range(len(layer)):
- neuron = layer[j]
- #
- if self.optimizer == 'none':
- if self.activation_function == 'sigmoid':
- neuron['delta'] = errors[j] * self.transfer_derivative(neuron['output'])
- elif self.activation_function == 'relu':
- neuron['delta'] = errors[j] * self.transfer_derivative(neuron['activation'])
- elif self.optimizer == 'adam':
- if self.activation_function == 'sigmoid':
- neuron['delta'] = self.transfer_derivative(neuron['output'])
- elif self.activation_function == 'relu':
- neuron['delta'] = self.transfer_derivative(neuron['activation'])
- # Update network weights with error
- def update_weights(self,network, row, l_rate):
- for i in range(len(network)):
- inputs = row[:-1]
- if i != 0:
- inputs = [neuron['output'] for neuron in network[i - 1]]
- for neuron in network[i]:
- for j in range(len(inputs)):
- if self.optimizer == 'none':
- neuron['weights'][j] += l_rate * neuron['delta'] * inputs[j]
- neuron['weights'][-1] += l_rate * neuron['delta']
- elif self.optimizer == 'adam':
- neuron['weights'][j] += neuron['delta'] * inputs[j]
- g_t = neuron['delta'] # computes the gradient of the stochastic function
- self.m_t = self.beta_1 * self.m_t + (1 - self.beta_1) * g_t # updates the moving averages of the gradient
- self.v_t = self.beta_2 * self.v_t + (1 - self.beta_2) * (g_t * g_t) # updates the moving averages of the squared gradient
- m_cap = self.m_t / (1 - (self.beta_1 ** self.t)) # calculates the bias-corrected estimates
- v_cap = self.v_t / (1 - (self.beta_2 ** self.t)) # calculates the bias-corrected estimates
- previous_weight = neuron['weights'][j]
- neuron['weights'][-1] += (self.alpha * m_cap) / (math.sqrt(v_cap) + self.epsilon) # updates the parameters
- # Train a network for a fixed number of epochs
- def train_network(self,network, train, l_rate, n_epoch, n_outputs):
- #outputs_dataframe=pd.DataFrame([],columns=[1,2,3])
- for epoch in range(n_epoch):
- self.t += 1 # used in adam algorithm
- for row in train:
- outputs = self.forward_propagate(network, row)
- expected = [0 for i in range(n_outputs)]
- expected[row[-1]] = 1
- self.backward_propagate_error(network, expected)
- self.update_weights(network, row, l_rate)
- # Initialize a network
- def initialize_network(self,n_inputs, n_hidden, n_outputs):
- network = list()
- #this part allow to create a desired number of neurons for each layer
- hidden_layer = [[{'weights': [random() for l in range(self.n_gains[j])]} for i in range(self.n_neurons[j])] for j in
- range(self.n_hidden_layers)]
- for i in range(len(hidden_layer)):
- network.append(hidden_layer[i])
- output_layer = [{'weights': [random() for i in range(self.n_gains[-1])]} for i in range(n_outputs)]
- network.append(output_layer)
- return network
- # Make a prediction with a network
- def predict(self,network, row):
- outputs = self.forward_propagate(network, row)
- self.network_output.append(outputs)
- return outputs.index(max(outputs))
- # Backpropagation Algorithm With Stochastic Gradient Descent
- def back_propagation(self,train, test, l_rate, n_epoch, n_hidden):
- self.network = self.initialize_network(self.n_inputs, self.n_hidden_layers, self.n_outputs)
- self.train_network(self.network, train, l_rate, n_epoch, self.n_outputs)
- self.save_nnt()
- predictions = list()
- for row in test:
- prediction = self.predict(self.network, row)
- predictions.append(prediction)
- return (predictions)
- #function used for save nn trained
- def save_nnt(self):
- save_nnt_name = "trained_network_"+str(self.n_of_network)
- with open(save_nnt_name, 'wb') as fp:
- pickle.dump(self.network, fp)
- # function used for load nn trained
- def load_nnt(self):
- load_nnt_name = "trained_network_"+str(self.n_of_network)
- with open(load_nnt_name, 'rb') as fp:
- self.network = pickle.load(fp)
- #used for debugging purpose
- def print_weights(self):
- for i in range(len(self.network)):
- for neuron in self.network[i]:
- print ("Layer: {}\tWeight: {}".format(i, neuron['weights']))
- def main_program(self):
- self.dataset = self.load_csv(self.filename)
- if self.derivated == 0:
- self.dataset = self.load_csv(self.filename)
- else:
- self.dataset = self.load_derivated_csv(self.filename)
- for i in range(len(self.dataset[0]) - 1):
- self.str_column_to_float(self.dataset, i)
- # convert class column to integers
- self.str_column_to_int(self.dataset, len(self.dataset[0]) - 1)
- # normalize input variables
- minmax = self.dataset_minmax(self.dataset)
- self.normalize_dataset(self.dataset, minmax)
- scores = self.evaluate_algorithm(self.dataset, self.back_propagation, self.n_folds, self.l_rate, self.n_epoch, self.n_hidden_layers)
- #print('Scores: %s' % scores)
- self.accurancy = (sum(scores) / float(len(scores)))
- print('Mean Accuracy: %.3f%%' % self.accurancy)
- return self.accurancy
- acc=list()
- bp = list()
- bp.append(BP(4, 3, [4], 5, 0.3, 500, 0,"sigmoid", "none","IrisDataTrain.csv", 0, [], 0))
- bp[0].main_program()
- bp[0].print_weights()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement