Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas
- import numpy
- import pickle
- import math
- from sklearn.cluster import MiniBatchKMeans, KMeans
- from matplotlib import pyplot as plt
- from sklearn.metrics import silhouette_score
- from pas.algorithm.som_with_tensorflow import SOM
- class ClusterWithSomNKmeans(object):
- def __init__(self):
- return
- def run_som( self, _n, _m, _dim_features, _nitr, _alpha, _nparray ):
- som_object = SOM(_n,
- _m,
- _dim_features,
- _nitr,
- _alpha)
- som_object.train(_nparray)
- # Get output grid
- image_grid = som_object.get_centroids()
- # value of winner node
- mapped = som_object.map_vects(_nparray)
- #for index in range(0, len(mapped)):
- # print(str(self._feature.get_column_names(index)) + ": " + str(mapped[index]))
- #self._som_result = som_object
- #print(som_object)
- return som_object
- def getBestPointInCurve(self, arr_curve):
- curve_point = numpy.asarray([i for i in range(0, len(arr_curve))])
- nPoints = len(arr_curve)
- allCoord = numpy.vstack((range(nPoints), arr_curve)).T
- numpy.array([range(nPoints), arr_curve])
- firstPoint = allCoord[0]
- lineVec = allCoord[-1] - allCoord[0]
- lineVecNorm = lineVec / numpy.sqrt(numpy.sum(lineVec ** 2))
- vecFromFirst = allCoord - firstPoint
- scalarProduct = numpy.sum(vecFromFirst * numpy.matlib.repmat(lineVecNorm, nPoints, 1), axis=1)
- vecFromFirstParallel = numpy.outer(scalarProduct, lineVecNorm)
- vecToLine = vecFromFirst - vecFromFirstParallel
- distToLine = numpy.sqrt(numpy.sum(vecToLine ** 2, axis=1))
- idxOfBestPoint = numpy.argmax(distToLine)
- return idxOfBestPoint
- def findOptimalNumberOfGroup(self, X, min_number_of_group, max_number_of_group, is_plot=False):
- # Clustering by k-means
- # Get optimal number of group
- arr_sil_coeff = []
- arr_sil_coeff_k = []
- arr_elbow = []
- for n_test_cluster in range(min_number_of_group, max_number_of_group):
- #print("K-Means for find optimal number of groups - " + str(n_test_cluster))
- algorithm = MiniBatchKMeans(init='k-means++', n_clusters=n_test_cluster, batch_size=100)
- test_elbow = algorithm.fit(X).score(X)
- test_labels = algorithm.labels_
- test_sil_coeff = silhouette_score(X, test_labels, metric='euclidean')
- arr_sil_coeff.append(test_sil_coeff)
- arr_elbow.append(test_elbow)
- arr_sil_coeff_k.append(n_test_cluster)
- #print("n_cluster = " + str(n_test_cluster) + " - sil_coeff = " + str(test_sil_coeff) + " - elbow = " + str(
- # test_elbow))
- arr_sil_coeff_k = numpy.asarray(arr_sil_coeff_k)
- idx_best_sil = self.getBestPointInCurve(numpy.asarray(arr_sil_coeff))
- idx_best_elbow = self.getBestPointInCurve(numpy.asarray(arr_elbow))
- print("optimal k: sil_coeff -> " + str(arr_sil_coeff_k[idx_best_sil]) + ", elbow -> " + str(
- arr_sil_coeff_k[idx_best_elbow]))
- if is_plot:
- plt.plot(arr_sil_coeff)
- plt.xticks(arr_sil_coeff_k)
- plt.title("Sil_Coeff")
- plt.show()
- plt.plot(arr_elbow)
- plt.xticks(arr_sil_coeff_k)
- plt.title("Elbow")
- plt.show()
- return arr_sil_coeff_k[idx_best_sil], arr_sil_coeff_k[idx_best_elbow]
- def make_cluster_with_kmeans(self,_som_object,_n_som,_m_som,_dim_som_feature,_nparray):
- assert (len( _nparray ) > 0)
- assert (_dim_som_feature == len( _nparray[0] ))
- # Find Optimal Number of Groups using MiniBatchKMeans
- centroids = _som_object.get_centroids()
- centroids = numpy.array( centroids )
- centroids.shape = ( _n_som * _m_som, _dim_som_feature)
- # linear_clusters = hdbscan.HDBSCAN( min_cluster_size=5 ).fit_predict( centroids )
- #clustering_algorithm
- max_number_of_group = len(_nparray)
- som_sil, som_elbow = self.findOptimalNumberOfGroup( centroids, 2, max_number_of_group, is_plot=False )
- print( "SOM Sil: " + str( som_sil ) + " - SOM Elbow: " + str( som_elbow ) )
- n_cluster = min( som_sil, som_elbow )
- # Input Data for Clustering
- #X = numpy.array( self._feature._listoflist )
- # X.shape = (len(self._feature._listoflist), _dim_of_som_feature)
- X = _nparray
- X.shape = (len(_nparray), _dim_som_feature)
- # Make cluster with MiniBatchKMeans.
- clustering_algorithm = MiniBatchKMeans( init='k-means++', n_clusters=n_cluster, batch_size=100 )
- # clustering_algorithm = KMeans( n_clusters=self._number_of_cluster, init='k-means++', random_state=10 )
- clustering_algorithm.fit( X )
- # clustering_algorithm = hdbscan.HDBSCAN( int(self._number_of_cluster) )
- # linear_result = clustering_algorithm.fit_predict( X )
- # print("The hdbscan result")
- # print(linear_result)
- # print(self._cluster.cluster_centers_)
- return clustering_algorithm
- # input :
- # _x : input vectors
- # _clustering_algorithm : clustring algorith such as KMeans
- # returns : a list of three values, label, distance, and similarity
- """
- def get_relationships_vector_and_clusters(self, _x, _clustering_algorithm):
- labels = _clustering_algorithm.predict( _x )
- label = labels[0]
- result = []
- center = _clustering_algorithm.cluster_centers_[ label ]
- distance = numpy.linalg.norm( _x - center )
- similarity = numpy.inner( _x, center ) \
- / (numpy.linalg.norm( _x ) * numpy.linalg.norm( center ))
- return result
- """
- # _nparray : 2D array
- def get_relationships_2DArray_and_clusters( self, _nparray , _clustering_algorithm):
- labels = _clustering_algorithm.predict(_nparray)
- assert( len(_nparray) == len(labels) )
- result = [] #numpy.asarray( [i for i in range( 0, 3 )] )
- index = 0
- # for vector_x, vector_y in zip( nparray, self._cluster.cluster_centers_ ):
- center_list = [] #numpy.asarray( [i for i in range( 0, len(nparray) )])
- for i in range( 0, len(_nparray) ):
- center_list.append( _clustering_algorithm.cluster_centers_[ labels[i] ] )
- center_list = numpy.asarray(center_list)
- for vector in _nparray:
- distance = numpy.linalg.norm( vector - center_list[index] )
- similarity = numpy.inner( vector, center_list[index] ) \
- / (numpy.linalg.norm( vector ) * numpy.linalg.norm( center_list[index] ))
- label = labels[index]
- # returned feature
- a_list = [label, distance, similarity]
- result.append( a_list )
- index = index + 1
- return result
Add Comment
Please, Sign In to add comment