Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import tensorflow as tf
- import numpy as np
- from matplotlib.pyplot import cm
- import matplotlib.pyplot as plt
- from sklearn.datasets.samples_generator import make_blobs
- def categorical_scatter_2d(X2D, class_idxs, ms=3, ax=None, alpha=1.0):
- ## Plot a 2D matrix with corresponding class labels: each class diff colour
- if ax is None:
- fig, ax = plt.subplots()
- classes = np.unique(class_idxs)
- colors = cm.rainbow(np.linspace(0,1, len(classes)))
- for i, cls in enumerate(classes):
- ax.scatter(X2D[class_idxs==cls, 0], X2D[class_idxs==cls, 1],
- label=str(cls), alpha=alpha, color=colors[i])
- return ax
- def cos_sim(A):
- similarity = np.dot(A, A.T)
- # squared magnitude of preference vectors (number of occurrences)
- square_mag = np.diag(similarity)
- # inverse squared magnitude
- inv_square_mag = 1 / square_mag
- # if it doesn't occur, set it's inverse magnitude to zero (instead of inf)
- inv_square_mag[np.isinf(inv_square_mag)] = 0
- # inverse of the magnitude
- inv_mag = np.sqrt(inv_square_mag)
- # cosine similarity (elementwise multiply by inverse magnitudes)
- cosine = similarity * inv_mag
- cosine = cosine.T * inv_mag
- return cosine
- # First we make the 'true' latent space for generating our observed pair
- # similarities
- N = 500
- # This all still works if the latent space is higher dimensional
- D = 2
- z_true, labels = make_blobs(n_samples=N, cluster_std=1., centers=10,
- n_features=D, random_state=2)
- categorical_scatter_2d(z_true, labels)
- plt.show()
- # Create synthetic 'pair probabilities' based on cosine similarity or sigmoid
- # of dot product. I found the latter works better. You might need to ensure
- # your input data is spiky, or play with other aspects to get it to work if not
- #sims = np.minimum(np.maximum(cos_sim(z_true) / 2. + 0.5, 0.0), 1.0) ** 10
- sims = 1. / (1. + np.exp(-np.dot(z_true, z_true.T)))
- plt.hist(sims.reshape(-1))
- plt.show()
- # Now we make a model to find latent vectors whose similarity predicts our
- # pair probabilities well
- tf.reset_default_graph()
- # Dimensionality of our estimated latent space
- D_hat = 2
- # Iniitialize a latent vector for each of our examples
- zhat = tf.get_variable('zhat', [N, D_hat],
- initializer=tf.random_normal_initializer(0, 1.0))
- # We have the target probabilities in a matrix
- # Note there's no means to deal with the random censoring in your data
- # You could just eliminate these pairs the loss calculation with a mask
- p_target = tf.constant(sims.astype(np.float32), name='target')
- # Estimated probability of a matching pair
- norm = tf.reduce_sum(tf.square(zhat), 1, keep_dims=True)
- zhat_normd = zhat / norm
- p_hat = tf.nn.sigmoid(tf.matmul(zhat_normd, zhat_normd, transpose_b=True))
- # Cross entropy cost
- xent_mat = p_target * tf.log(p_hat + 1e-8) + \
- (1 - p_target) * tf.log(1 - p_hat + 1e-8)
- # Mean of cross-entropy --> ignore elements on the diagonal which have prob=1.0
- xent = -tf.reduce_mean(xent_mat * -(tf.diag(tf.ones([tf.shape(p_hat)[0]]))-1))
- # Train op + init
- train_op = tf.train.AdamOptimizer(0.01).minimize(xent)
- sess = tf.InteractiveSession()
- sess.run(tf.global_variables_initializer())
- # Train then plot the latent space
- for i in range(1000):
- if i % 100 == 0:
- print sess.run([xent, train_op])[0]
- else:
- sess.run(train_op)
- zhat_ = sess.run(zhat_normd)
- # You obviously won't know the classes for the latent space, but maybe you can
- # run k-means or other clustering on it...
- categorical_scatter_2d(zhat_, labels)
- #==============================================================================
- # # Below here is a minibatch version that doesn't seem to work - not sure why.
- # perm = np.random.permutation(len(sims))
- # sims, labels = sims[perm], labels[perm]
- #
- # batch_size = 32
- # tf.reset_default_graph()
- #
- # D_hat = 2
- # zhat = tf.get_variable('zhat', [N, D_hat],
- # initializer=tf.random_normal_initializer(0, 1.0))
- #
- # target = tf.constant(sims.astype(np.float32), name='target')
- #
- # selection_i = tf.random_uniform([batch_size], 0, tf.shape(target)[0], tf.int32)
- # selection_j = tf.random_uniform([batch_size], 0, tf.shape(target)[0], tf.int32)
- # #selection_i = tf.range(0, batch_size)
- # #selection_j = tf.range(0, batch_size)
- #
- # z_i = tf.gather(zhat, selection_i)
- # z_j = tf.gather(zhat, selection_j)
- #
- # sess = tf.InteractiveSession()
- # sess.run(tf.global_variables_initializer())
- # zhat_ = sess.run(zhat)
- # p_hat_sub = tf.nn.sigmoid(tf.matmul(z_i, z_j, transpose_b=True))
- # t_sub = tf.gather(tf.transpose(tf.gather(target, selection_i)), selection_j)
- #
- # xent_mat = t_sub * tf.log(p_hat_sub) + (1 - t_sub) * tf.log(1 - p_hat_sub)
- # xent_sub = -tf.reduce_mean(xent_mat * -(tf.diag(tf.ones([batch_size]))-1))
- #
- #
- # train_op = tf.train.AdamOptimizer(0.01).minimize(xent_sub)
- #
- # sess = tf.InteractiveSession()
- # sess.run(tf.global_variables_initializer())
- #
- # zhat_ = sess.run(zhat)
- # categorical_scatter_2d(zhat_, labels)
- #
- # #%%
- # j = 0
- # for i in range(int(1000 / (batch_size / 500.))):
- #
- # if i % (int(100 / (batch_size / 500.))) == 0:
- # print sess.run([xent_sub, train_op], {})[0]
- # else:
- # sess.run(train_op, {})
- # j += 1
- # if j >= 500 / batch_size:
- # j = 0
- # zhat_ = sess.run(zhat)
- # categorical_scatter_2d(zhat_, labels)
- #
- #==============================================================================
Advertisement
Add Comment
Please, Sign In to add comment