Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import gc
- import os
- import shutil
- import tracemalloc
- from pathlib import Path
- import numpy as np
- from keras import backend as K
- from keras import callbacks
- from keras.layers import BatchNormalization
- from keras.layers import Input, Dense, Dropout, Layer
- from keras.models import Model
- from keras.utils import np_utils
- from keras.utils.generic_utils import to_list
- from keras.utils.generic_utils import unpack_singleton
- from keras.utils.test_utils import get_test_data
- input_dim = 2
- num_hidden = 4
- num_classes = 2
- batch_size = 5
- train_samples = 20
- test_samples = 20
- def data_generator(x, y, batch_size):
- x = to_list(x)
- y = to_list(y)
- max_batch_index = len(x[0]) // batch_size
- i = 0
- while 1:
- x_batch = [array[i * batch_size: (i + 1) * batch_size] for array in x]
- x_batch = unpack_singleton(x_batch)
- y_batch = [array[i * batch_size: (i + 1) * batch_size] for array in y]
- y_batch = unpack_singleton(y_batch)
- yield x_batch, y_batch
- i += 1
- i = i % max_batch_index
- # Changing the default arguments of get_test_data.
- def get_data_callbacks(num_train=train_samples,
- num_test=test_samples,
- input_shape=(input_dim,),
- classification=True,
- num_classes=num_classes):
- return get_test_data(num_train=num_train,
- num_test=num_test,
- input_shape=input_shape,
- classification=classification,
- num_classes=num_classes)
- def investigate_TensorBoard(tmpdir, update_freq):
- np.random.seed(np.random.randint(1, 1e7))
- filepath = str(tmpdir / 'logs')
- (X_train, y_train), (X_test, y_test) = get_data_callbacks()
- y_test = np_utils.to_categorical(y_test)
- y_train = np_utils.to_categorical(y_train)
- class DummyStatefulMetric(Layer):
- def __init__(self, name='dummy_stateful_metric', **kwargs):
- super(DummyStatefulMetric, self).__init__(name=name, **kwargs)
- self.stateful = True
- self.state = K.variable(value=0, dtype='int32')
- def reset_states(self):
- pass
- def __call__(self, y_true, y_pred):
- return self.state
- inp = Input((input_dim,))
- hidden = Dense(num_hidden, activation='relu')(inp)
- hidden = Dropout(0.1)(hidden)
- hidden = BatchNormalization()(hidden)
- output = Dense(num_classes, activation='softmax')(hidden)
- model = Model(inputs=inp, outputs=output)
- model.compile(loss='categorical_crossentropy',
- optimizer='sgd',
- metrics=['accuracy', DummyStatefulMetric()])
- # we must generate new callbacks for each test, as they aren't stateless
- def callbacks_factory(histogram_freq, embeddings_freq=1):
- return [callbacks.TensorBoard(log_dir=filepath,
- histogram_freq=histogram_freq,
- write_images=True, write_grads=True,
- embeddings_freq=embeddings_freq,
- embeddings_layer_names=['dense_1'],
- embeddings_data=X_test,
- batch_size=5,
- update_freq=update_freq)]
- # fit without validation data
- model.fit(X_train, y_train, batch_size=batch_size,
- callbacks=callbacks_factory(histogram_freq=0, embeddings_freq=0),
- epochs=3)
- # fit with validation data and accuracy
- model.fit(X_train, y_train, batch_size=batch_size,
- validation_data=(X_test, y_test),
- callbacks=callbacks_factory(histogram_freq=0), epochs=2)
- # fit generator without validation data
- train_generator = data_generator(X_train, y_train, batch_size)
- model.fit_generator(train_generator, len(X_train), epochs=2,
- callbacks=callbacks_factory(histogram_freq=0,
- embeddings_freq=0))
- # fit generator with validation data and accuracy
- train_generator = data_generator(X_train, y_train, batch_size)
- model.fit_generator(train_generator, len(X_train), epochs=2,
- validation_data=(X_test, y_test),
- callbacks=callbacks_factory(histogram_freq=1))
- train_generator.close()
- assert os.path.isdir(filepath)
- shutil.rmtree(filepath)
- class Testing:
- def __init__(self):
- self.snapshots = []
- def collect_stats(self):
- self.snapshots.append(tracemalloc.take_snapshot())
- if len(self.snapshots) > 1:
- stats = self.snapshots[-1].filter_traces(filters).compare_to(self.snapshots[-2], 'filename')
- for stat in stats[:10]:
- print("{} new KiB {} total KiB {} new {} total memory blocks: ".format(stat.size_diff / 1024,
- stat.size / 1024,
- stat.count_diff, stat.count))
- for line in stat.traceback.format():
- print(line)
- tmpdir = Path('temp')
- if not os.path.exists('temp'):
- os.mkdir('temp')
- # Keep 10 frames
- tracemalloc.start(10)
- # We are looking for everything at first
- filters = []
- t = Testing()
- for _ in range(10):
- K.clear_session()
- investigate_TensorBoard(tmpdir, 'batch')
- gc.collect()
- K.clear_session()
- t.collect_stats()
- # Filter for tensorflow
- filters = [tracemalloc.Filter(inclusive=True, filename_pattern="*tensorflow*")]
- snapshot = t.snapshots[-1]
- old_snapshot = t.snapshots[-2]
- stats = snapshot.filter_traces(filters).compare_to(old_snapshot.filter_traces(filters), 'traceback')
- top_k = sorted([i for i in stats if i.size_diff > 0], key=lambda j: j.size_diff)[::-1][:10]
- for k in top_k:
- print('Leaked', k.size_diff, 'KB')
- for f in k.traceback:
- print(f)
Add Comment
Please, Sign In to add comment