Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import cupy as cp
- import numpy as np
- import copy
- import math
- # Code adapted from
- # https://github.com/numenta/htmresearch/blob/master/htmresearch/algorithms/union_temporal_pooler.py
- class TemporalPooler:
- def __init__(self, input_size, cells,
- sparsity=0.05,
- activeOverlapWeight=1.0,
- predictedActiveOverlapWeight=10.0,
- maxUnionActivity=0.20,
- decayTimeConst=20.0,
- synPermPredActiveInc=0.3,
- synPermPreviousPredActiveInc=0.3,
- historyLength=3,
- minHistory=0):
- self.input_size = input_size
- self.cells = cells
- self.sparsity = sparsity
- self.duty_cycle = cp.zeros(self.cells, dtype=np.float32)
- self.boosting_intensity = 0.3
- self.duty_cycle_inertia = 0.99
- self.permanence = cp.random.randn(self.cells, self.input_size)
- self.active_overlap_weight = activeOverlapWeight
- self.predicted_active_overlap_weight = predictedActiveOverlapWeight
- self.max_union_activity = maxUnionActivity
- self.permanence_threshold = 0.0
- self.syn_perm_active_inc = 0.2
- self.syn_perm_inactive_dec = 0.3
- self.syn_perm_pred_active_inc = synPermPredActiveInc
- self.syn_perm_previous_pred_active_inc = synPermPreviousPredActiveInc
- self.history_length = historyLength
- self.min_history = minHistory
- self.max_union_cells = int(cells * self.max_union_activity)
- self.pooling_activation = cp.zeros(cells, dtype=np.float32)
- self.pooling_timer = cp.ones(cells, dtype=np.float32) * 1000
- self.pooling_activation_init_level = cp.zeros(cells, dtype=np.float32)
- self.pooling_activation_tie_breaker = cp.random.randn(cells) * 0.000001
- self.union_SDR = cp.array([], dtype=np.int32)
- self.active_cells = cp.array([], dtype=np.int32)
- self.pred_active_input = cp.zeros(input_size, dtype=np.float32)
- self.pre_predicted_active_input = cp.zeros((input_size, self.history_length), dtype=np.int32)
- self.prev_active_cells = cp.zeros(cells, dtype=np.int32)
- def run(self, active_input, predicted_active_input, train=True):
- weight = self.permanence > self.permanence_threshold
- # Compute proximal dendrite overlaps with active and active-predicted inputs
- overlaps_active = cp.sum(active_input & weight, axis=1)
- overlaps_predicted_active = cp.sum(predicted_active_input & weight, axis=1)
- total_overlap = (overlaps_active * self.active_overlap_weight +
- overlaps_predicted_active *
- self.predicted_active_overlap_weight).astype(np.float32)
- #perform global inhibition
- boosting = cp.exp(self.boosting_intensity * -self.duty_cycle / self.sparsity)
- total_overlap *= boosting
- k_winners = int(self.sparsity * self.cells)
- self.active_cells = total_overlap.argsort()[-k_winners:]
- # Decrement pooling activation of all cells
- self.decay_pooling_activation()
- # Update the poolingActivation of current active Union Temporal Pooler cells
- self.add_to_pooling_activation(self.active_cells, overlaps_predicted_active)
- # update union SDR
- self.get_most_active_cells()
- if train:
- # Adjust boosting factor
- self.duty_cycle *= self.duty_cycle_inertia
- self.duty_cycle[self.active_cells] += 1.0 - self.duty_cycle_inertia
- # adapt permanence of connections from predicted active inputs to newly active cell
- # This step is the spatial pooler learning rule, applied only to the predictedActiveInput
- # Todo: should we also include unpredicted active input in this step?
- #self.adapt_synapses(predicted_active_input, active_cells, self.syn_perm_active_inc, self.syn_perm_inactive_dec)
- self.permanence[self.active_cells] += predicted_active_input * (self.syn_perm_active_inc + self.syn_perm_inactive_dec) - self.syn_perm_inactive_dec
- # Increase permanence of connections from predicted active inputs to cells in the union SDR
- # This is Hebbian learning applied to the current time step
- #self.adapt_synapses(predicted_active_input, self.union_sdr, self.syn_perm_pred_active_inc, 0.0)
- self.permanence[self.union_SDR] += predicted_active_input * self.syn_perm_pred_active_inc
- # adapt permenence of connections from previously predicted inputs to newly active cells
- # This is a reinforcement learning rule that considers previous input to the current cell
- for i in range(self.history_length):
- self.permanence[self.active_cells] += self.pre_predicted_active_input[:,i] * self.syn_perm_previous_pred_active_inc
- #Save previous inputs
- self.pre_active_input = copy.copy(active_input)
- self.pre_predicted_active_input = cp.roll(self.pre_predicted_active_input, 1, 1)
- if self.history_length > 0:
- self.pre_predicted_active_input[:,0] = predicted_active_input
- cp.cuda.Stream.null.synchronize()
- return self.union_SDR
- def decay_pooling_activation(self):
- """
- Decrements pooling activation of all cells
- """
- #exponential decay
- self.pooling_activation = cp.exp(-0.1 * self.pooling_timer) * self.pooling_activation_init_level
- #no decay
- #self.pooling_activation = self.pooling_activation_init_level
- return self.pooling_activation
- def add_to_pooling_activation(self, active_cells, overlaps):
- """
- Adds overlaps from specified active cells to cells' pooling
- activation.
- @param activeCells: Indices of those cells winning the inhibition step
- @param overlaps: A current set of overlap values for each cell
- @return current pooling activation
- """
- # Sigmoid activation
- """baseLinePersistence = 10
- extraPersistence = 10
- thresh = 5
- self.pooling_activation[active_cells] = baseLinePersistence + extraPersistence/(1 + cp.exp(-(overlaps[active_cells] - thresh))) """
- self.pooling_activation[active_cells] += overlaps[active_cells]
- self.pooling_timer[self.pooling_timer >= 0] += 1
- self.pooling_timer[active_cells] = 0
- self.pooling_activation_init_level[active_cells] = self.pooling_activation[active_cells]
- return self.pooling_activation
- def get_most_active_cells(self):
- """
- Gets the most active cells in the Union SDR having at least non-zero
- activation in sorted order.
- @return: a list of cell indices
- """
- pooling_activation = self.pooling_activation
- non_zero_cells = cp.nonzero(pooling_activation)[0]
- # include a tie-breaker before sorting
- pooling_activation_subset = pooling_activation[non_zero_cells] + \
- self.pooling_activation_tie_breaker[non_zero_cells]
- sorted_cells = non_zero_cells[pooling_activation_subset[non_zero_cells].argsort()[::-1]]
- top_cells = sorted_cells[:self.max_union_cells]
- if max(self.pooling_timer) > self.min_history:
- self.union_SDR = cp.sort(top_cells).astype(np.int32)
- else:
- self.union_SDR = []
- return self.union_SDR
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement