Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import theano
- import theano.tensor as T
- import numpy as np
- def asfloat(value):
- """ Convert variable to float type configured by theano
- floatX variable.
- Parameters
- ----------
- value : matrix, ndarray or scalar
- Value that could be converted to float type.
- Returns
- -------
- matrix, ndarray or scalar
- Output would be input value converted to float type
- configured by theano floatX variable.
- """
- if isinstance(value, (np.matrix, np.ndarray)):
- return value.astype(theano.config.floatX)
- float_x_type = np.cast[theano.config.floatX]
- return float_x_type(value)
- def clone(instance):
- class_ = instance.__class__
- parameters = instance.get_params()
- return class_(**parameters)
- class FuzzyCMeans(object):
- """
- Fuzzy c-means.
- Paramaters
- ----------
- n_clusters : int
- Number of clusters.
- m : float
- """
- def __init__(self, n_clusters, m=2):
- if n_clusters < 2:
- raise ValueError("Number of clusters should be greater than 2")
- if m < 1:
- raise ValueError("Parameter `m` should be greater than 1")
- self.n_clusters = n_clusters
- self.m = m
- self.centers_ = None
- self.is_initialized = False
- def init_methods(self):
- if self.is_initialized:
- raise AttributeError("Methods have already been initialized.")
- x = T.matrix('x')
- centers = self.centers_
- d = distance_to_centers = (
- x.reshape((x.shape[0], 1, x.shape[1])) -
- centers.reshape((1, centers.shape[0], centers.shape[1]))
- ).norm(L=2, axis=2)
- weights = 1 / (
- (
- distance_to_centers.reshape((d.shape[0], d.shape[1], 1)) /
- distance_to_centers.reshape((d.shape[0], 1, d.shape[1]))
- ) ** asfloat(2. / (self.m - 1))
- ).sum(axis=2)
- proba = weights / T.sum(weights, axis=1).reshape((-1, 1))
- proba_power_m = weights ** self.m
- new_centers = (
- proba_power_m.T.dot(x) /
- T.sum(proba_power_m, axis=0).reshape((-1, 1))
- )
- self.predict_proba = theano.function([x], proba)
- self.train_iteration = theano.function([x], proba, updates=[
- (centers, new_centers),
- ])
- self.is_initialized = True
- def get_params(self, deep=False):
- return dict(n_clusters=self.n_clusters, m=self.m)
- @property
- def centers(self):
- return self.centers_.get_value()
- def fit(self, data, maxiter=100, epsilon=1e-5, verbose=False):
- n_features = data.shape[1]
- if self.centers_ is None:
- data_min = data.min(axis=0)
- data_max = data.max(axis=0)
- random_centers = np.random.random((self.n_clusters, n_features))
- scaled_centers = (data_max - data_min) * random_centers + data_min
- self.centers_ = theano.shared(
- name='centers',
- value=asfloat(scaled_centers)
- )
- n_expected_features = self.centers.shape[1]
- if n_expected_features != n_features:
- raise ValueError("Input data must contain {} features, "
- "found {}".format(n_expected_features,
- n_features))
- if not self.is_initialized:
- self.init_methods()
- i = 1
- proba_update = np.inf
- prev_proba = None
- while (proba_update > epsilon) and (i <= maxiter):
- proba = self.train_iteration(data)
- if prev_proba is not None:
- proba_update = np.linalg.norm(prev_proba - proba)
- prev_proba = proba
- i += 1
- def predict(self, data):
- proba = self.predict_proba(data)
- return proba.argmax(axis=1)
- def select_best_clustering(algorithm, n_trials, data, **fit_kwargs):
- """ Select best clusters using SSE.
- Parameters
- ----------
- algorithm : object
- n_trials : int
- data : matrix
- Raises
- ------
- ValueError
- Exception will raise in case input parameter values
- are invalid.
- Returns
- -------
- object
- Pretrained clustering algorithm that give smallest
- SSE (sum of squared error) score.
- """
- if n_trials < 1:
- raise ValueError("Number of trials should be greater than 1")
- if not isinstance(n_trials, int):
- raise ValueError("Number of tirals should be an integer number")
- algorithms = []
- for trial in range(n_trials):
- algorithm = clone(algorithm)
- algorithm.fit(data, **fit_kwargs)
- clusters = algorithm.predict(data)
- centers = algorithm.centers[clusters, :]
- sse_score = np.sum((data - centers) ** 2)
- # We should use second variable as unique value to prevent
- # object instance comparison when we have exactly the same
- # score values.
- algorithms.append((sse_score, trial, algorithm))
- _, _, best_algorithm = min(algorithms)
- return best_algorithm
- if __name__ == '__main__':
- fcm = FuzzyCMeans(n_clusters=2, m=2)
- fcm.fit(data, maxiter=100)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement