Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- os.environ['THEANO_FLAGS'] = ",".join('%s=%s' % (key, value) for key, value in {
- 'device': 'cpu',
- 'mode': 'FAST_RUN',
- 'floatX': 'float32',
- #'exception_verbosity': 'high',
- 'optimizer': 'fast_compile',
- }.items())
- import random
- from keras.utils.np_utils import *
- from keras.models import *
- from keras.layers import *
- import numpy as np; np.random.seed(1337)
- ########### CONSTANTS / CONFIG ###########
- NUM_EPOCHS = 300
- ENCODE_MAPPER = {
- 'a': (1, 0, 0, 0),
- 'c': (0, 1, 0, 0),
- 'g': (0, 0, 1, 0),
- 't': (0, 0, 0, 1),
- # 'n': (1, 1, 1, 1),
- }
- NUM_CLASSES = 2
- PRINT_WIDTH = 200
- MATCH_SEQ = 'ac'
- ONE_HOT_DIMENSION = len(list(ENCODE_MAPPER.values())[0])
- ALPHABET = list(ENCODE_MAPPER.keys())
- ALPHABET_SIZE = len(ALPHABET)
- DECODE_MAPPER = {
- one_hot: nucleotide
- for nucleotide, one_hot in ENCODE_MAPPER.items()
- }
- class Main(object):
- def __init__(self):
- self.model = None
- self.build_model()
- self.perform_training()
- self.perform_testing()
- def build_model(self):
- sequence = Input(shape=(None, ONE_HOT_DIMENSION), dtype='float32')
- # convolution = Convolution1D(filter_length=6, nb_filter=10)(sequence)
- # max_pooling = MaxPooling1D(pool_length=2)(convolution)
- # dropout = Dropout(0.2)(max_pooling)
- dropout = Dropout(0.2)(sequence)
- # bidirectional LSTM
- forward_lstm = LSTM(
- output_dim=50, init='uniform', inner_init='uniform', forget_bias_init='one', return_sequences=True,
- activation='tanh', inner_activation='sigmoid',
- )(dropout)
- backward_lstm = LSTM(
- output_dim=50, init='uniform', inner_init='uniform', forget_bias_init='one', return_sequences=True,
- activation='tanh', inner_activation='sigmoid', go_backwards=True,
- )(dropout)
- blstm = merge([forward_lstm, backward_lstm], mode='concat', concat_axis=-1)
- dense = TimeDistributed(Dense(NUM_CLASSES))(blstm)
- self.model = Model(input=sequence, output=dense)
- print 'Compiling model...'
- self.model.compile(
- loss='binary_crossentropy',
- optimizer='adam',
- metrics=['accuracy']
- )
- def perform_training(self):
- print 'Training...'
- sequence_list, y_list = self.generate_sequences((100, 150), 10 )
- X_list = [
- self.encode_sequence(seq)
- for seq in sequence_list
- ]
- for sample_i, (X, y) in enumerate(zip(X_list, y_list)):
- print '\t X.shape=%s y.shape=%s' % (X.shape, y.shape)
- self.model.fit(
- X, y,
- verbose=0,
- batch_size=1,
- nb_epoch=NUM_EPOCHS,
- #callbacks=[ EarlyStopping(monitor='val_loss', patience=3, verbose=1) ]
- )
- def perform_testing(self):
- print 'Testing...'
- sequence_list, y_list = self.generate_sequences((100, 150), 10)
- X_list = [
- self.encode_sequence(seq)
- for seq in sequence_list
- ]
- for sample_i, (X, y) in enumerate(zip(X_list, y_list)):
- print '\t X.shape=%s y.shape=%s' % (X.shape, y.shape)
- y_predicted = np.round(np.array(
- self.model.predict(X, batch_size=1)
- ))
- print '\t y_predicted.shape=%s' % str(y.shape)
- print 'Test accuracy:', accuracy(y, y_predicted)
- print '\nSample #%d' % sample_i
- self.print_seq_labels(
- sequence_list[sample_i],
- "".join(
- '2' if sum(l) == 2 else '0' if sum(l) == 0 else '1' if l[1] else '-'
- for l in y[0]
- ),
- "".join(
- '2' if sum(l) == 2 else '0' if sum(l) == 0 else '1' if l[1] else '-'
- for l in y_predicted[0]
- )
- )
- def generate_sequences(self, length_range, num_sequences):
- print 'Generating data...'
- sequence_list, y_list = [], []
- for sequence_i in range(num_sequences):
- length = random.randint(*length_range)
- # Generate a random DNA sequence, while ensuring that the gene occurs in the sequence at least once
- sequence = "".join(
- random.choice(ALPHABET) for i in range(length)
- )
- sequence_list.append(sequence)
- y = np.zeros((len(sequence), 2), dtype=int)
- # Find all the positions that the gene matches on the sequence and label these regions
- # matches = [match.start() for match in re.finditer(self.target_gene, sequence)]
- matches = self.find_all_substrings(MATCH_SEQ, sequence)
- print 'Labeled %d genes...' % len(matches)
- y[:, 0] = 1
- for match_index in matches:
- y[:, 0][match_index : match_index + len(MATCH_SEQ)] = 0
- y[:, 1][match_index : match_index + len(MATCH_SEQ)] = 1
- y_list.append(np.array([y]))
- return sequence_list, y_list
- @staticmethod
- def find_all_substrings(sub, string):
- index = 0
- matches = []
- try:
- while True:
- index = string.index(sub, index + 1)
- matches.append(index)
- except ValueError:
- pass
- return matches
- @staticmethod
- def print_seq_labels(*row_list):
- assert len(set(len(row) for row in row_list)) == 1, 'not all rows are of equal length'
- length = len(row_list[0])
- for i in range(0, length, PRINT_WIDTH):
- for row in row_list:
- print "".join(map(str, row[i:i + PRINT_WIDTH]))
- print
- @staticmethod
- def encode_sequence(sequence):
- return np.array([[
- ENCODE_MAPPER[nucleotide]
- for nucleotide in sequence
- ]])
- if __name__ == '__main__':
- Main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement