Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import tensorflow as tf
- from tensorflow.contrib import learn
- from tflearn.data_utils import to_categorical, pad_sequences
- import pickle
- from cnn_url_classifier.utils import * #utils file in cnn_url_classifier
- import time
- from url_utils import format_url
- from tqdm import tqdm #For progress bar
- import argparse
- import numpy as np
- # Class to get a URL Safety Score for a single URL (can be used for subject score by changing parameters(
- # Easier to use and faster than
- class CNNURLScore():
- def __init__(self, max_length_words = 200, max_length_chars = 200, root_dir = '', max_length_subwords = 20, data_directory = "test_1000.txt",
- delimit_mode_no = 1, subword_dict_directory = "runs/10000/subwords_dict.p",
- word_dict_directory = "runs/10000/words_dict.p", char_dict_directory = "runs/10000/chars_dict.p",
- emb_dimension = 32, emb_mode_no = 1, batch_size_no = 128,
- log_output_directory = "runs/url/urleval.txt", log_checkpoint_directory = "runs/10000/checkpoints/"):
- # Set default parameters and load model
- # data arguments
- self.max_len_words = max_length_words # maximum length of url in words
- self.max_len_chars = max_length_chars # maximum length of url in characters
- self.max_len_subwords = max_length_subwords # maximum length of word in subwords/ characters
- self.data_dir = root_dir + data_directory # location of data file - Not useful for testing single URL/subject
- self.delimit_mode = delimit_mode_no # 0: delimit by special chars, 1: delimit by special chars + each char as a word
- self.subword_dict_dir = root_dir + subword_dict_directory # directory of the subword dictionary
- self.word_dict_dir = root_dir + word_dict_directory # directory of the word dictionary
- self.char_dict_dir = root_dir + char_dict_directory # directory of the character dictionary
- # model args
- self.emb_dim = emb_dimension # embedding dimension size
- self.emb_mode = emb_mode_no # Char CNN
- # test args
- self.batch_size = batch_size_no #For testing of multiple URLs (can be useful in future if classifying a list of URLs)
- # log args
- self.log_output_dir = root_dir + log_output_directory # Directory to save the test results - Not used for single test
- self.log_checkpoint_dir = root_dir + log_checkpoint_directory # Directory of the learned model - Not used for single test
- # Loading model
- # self.ngram_dict = pickle.load(open(self.subword_dict_dir, "rb")) # Loading trained model's ngram dictionary
- self.word_dict = pickle.load(open(self.word_dict_dir, "rb")) # Loading trained model's word dictionary
- self.chars_dict = pickle.load(open(self.char_dict_dir, "rb")) # Loading trained model's character dictionary
- def test_step(self, x, emb_mode):
- '''
- :param x: (List)
- :param emb_mode: (int) 1: only character-based CNN, 2: only word-based CNN, 3: character and word CNN, 4: character-level word CNN, 5: character and character-level word CNN
- :return:
- '''
- p = 1.0
- if emb_mode == 1:
- feed_dict = {
- self.input_x_char_seq: x[0],
- self.dropout_keep_prob: p}
- elif emb_mode == 2:
- feed_dict = {
- self.input_x_word: x[0],
- self.dropout_keep_prob: p}
- elif emb_mode == 3:
- feed_dict = {
- self.input_x_char_seq: x[0],
- input_x_word: x[1],
- self.dropout_keep_prob: p}
- elif emb_mode == 4:
- feed_dict = {
- input_x_word: x[0],
- self.input_x_char: x[1],
- self.input_x_char_pad_idx: x[2],
- self.dropout_keep_prob: p}
- elif emb_mode == 5:
- feed_dict = {
- self.input_x_char_seq: x[0],
- input_x_word: x[1],
- self.input_x_char: x[2],
- self.input_x_char_pad_idx: x[3],
- self.dropout_keep_prob: p}
- preds, s = self.sess.run([self.predictions, self.scores], feed_dict)
- return preds, s
- def test_url(self, url_test):
- sys.stdout = open(os.devnull, 'w') #disable printing
- '''
- :param url: (string) URL to be tested
- :return: (int) Normalized score indicating benign/malicious level
- '''
- sys.stdout = open(os.devnull, 'w') # disable printing
- urls, labels = [url_test], [0] # TO-DO
- x, word_reverse_dict = get_word_vocab(urls, self.max_len_words)
- word_x = get_words(x, word_reverse_dict, self.delimit_mode, urls) # TO-DO
- chared_id_x = char_id_x(urls, self.chars_dict, self.max_len_chars) # TO-DO
- checkpoint_file = tf.train.latest_checkpoint(self.log_checkpoint_dir)
- graph = tf.Graph()
- with graph.as_default():
- session_conf = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
- session_conf.gpu_options.allow_growth = True
- self.sess = tf.Session(config=session_conf) #CHECK - if self is needed
- with self.sess.as_default():
- saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
- saver.restore(self.sess, checkpoint_file)
- if self.emb_mode in [1, 3, 5]:
- self.input_x_char_seq = graph.get_operation_by_name("input_x_char_seq").outputs[0]
- if self.emb_mode in [2, 3, 4, 5]:
- input_x_word = graph.get_operation_by_name("input_x_word").outputs[0]
- if self.emb_mode in [4, 5]:
- self.input_x_char = graph.get_operation_by_name("input_x_char").outputs[0]
- self.input_x_char_pad_idx = graph.get_operation_by_name("input_x_char_pad_idx").outputs[0]
- self.dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
- self.predictions = graph.get_operation_by_name("output/predictions").outputs[0]
- self.scores = graph.get_operation_by_name("output/scores").outputs[0]
- if self.emb_mode == 1:
- batches = batch_iter(list(chared_id_x), self.batch_size, 1, shuffle=False)
- elif self.emb_mode == 2:
- batches = batch_iter(list(worded_id_x), self.batch_size, 1, shuffle=False)
- elif self.emb_mode == 3:
- batches = batch_iter(list(zip(chared_id_x, worded_id_x)), self.batch_size, 1, shuffle=False)
- elif self.emb_mode == 4:
- batches = batch_iter(list(zip(ngramed_id_x, worded_id_x)), self.batch_size, 1, shuffle=False)
- elif self.emb_mode == 5:
- batches = batch_iter(list(zip(ngramed_id_x, worded_id_x, chared_id_x)), self.batch_size, 1,
- shuffle=False)
- all_predictions = []
- all_scores = []
- nb_batches = int(len(labels) / self.batch_size)
- if len(labels) % self.batch_size != 0:
- nb_batches += 1
- # print("Number of batches in total: {}".format(nb_batches))
- batch = next(batches)
- if self.emb_mode == 1:
- x_char_seq = batch
- elif self.emb_mode == 2:
- x_word = batch
- elif self.emb_mode == 3:
- x_char_seq, x_word = zip(*batch)
- elif self.emb_mode == 4:
- x_char, x_word = zip(*batch)
- elif self.emb_mode == 5:
- x_char, x_word, x_char_seq = zip(*batch)
- x_batch = []
- if self.emb_mode in [1, 3, 5]:
- x_char_seq = pad_seq_in_word(x_char_seq, self.max_len_chars)
- x_batch.append(x_char_seq)
- if self.emb_mode in [2, 3, 4, 5]:
- x_word = pad_seq_in_word(x_word, self.max_len_words)
- x_batch.append(x_word)
- if self.emb_mode in [4, 5]:
- x_char, x_char_pad_idx = pad_seq(x_char, self.max_len_words, self.max_len_subwords,
- self.emb_dim)
- x_batch.extend([x_char, x_char_pad_idx])
- batch_predictions, batch_scores = self.test_step(x_batch, self.emb_mode)
- all_predictions = np.concatenate([all_predictions, batch_predictions])
- all_scores.extend(batch_scores)
- class_pred = all_predictions[0] # Class of URL -> Benign/Malicious
- score = softmax(all_scores)[0][1] # Maliciousness score of URL (using softmax)
- state = ''
- if (class_pred == 0):
- state = "Benign"
- elif (class_pred == 1):
- state = "Malicious"
- sys.stdout = sys.__stdout__ # enable printing
- #print(str(urls[0]) + "\t" + "Prediction: " + state + "\tScore:\t" + str(score))
- sys.stdout = sys.__stdout__ #enable printing
- #print("all_scores for single URL: " + str(all_scores))
- return score
- def test_url_lst(self, url_lst):
- '''
- :param url: (List of strings) List of URLs to be tested
- :return: (List of floats) List of normalized scores indicating benign/malicious level of each URL
- '''
- sys.stdout = open(os.devnull, 'w') # disable printing
- url_lst = [format_url(url) for url in url_lst]
- urls, labels = url_lst, [0 for url in url_lst] # TO-DO
- x, word_reverse_dict = get_word_vocab(urls, self.max_len_words)
- word_x = get_words(x, word_reverse_dict, self.delimit_mode, urls) # TO-DO
- chared_id_x = char_id_x(urls, self.chars_dict, self.max_len_chars) # TO-DO
- checkpoint_file = tf.train.latest_checkpoint(self.log_checkpoint_dir)
- graph = tf.Graph()
- with graph.as_default():
- session_conf = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
- session_conf.gpu_options.allow_growth = True
- self.sess = tf.Session(config=session_conf) #CHECK - if self is needed
- with self.sess.as_default():
- saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
- saver.restore(self.sess, checkpoint_file)
- if self.emb_mode in [1, 3, 5]:
- self.input_x_char_seq = graph.get_operation_by_name("input_x_char_seq").outputs[0]
- if self.emb_mode in [2, 3, 4, 5]:
- input_x_word = graph.get_operation_by_name("input_x_word").outputs[0]
- if self.emb_mode in [4, 5]:
- self.input_x_char = graph.get_operation_by_name("input_x_char").outputs[0]
- self.input_x_char_pad_idx = graph.get_operation_by_name("input_x_char_pad_idx").outputs[0]
- self.dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
- self.predictions = graph.get_operation_by_name("output/predictions").outputs[0]
- self.scores = graph.get_operation_by_name("output/scores").outputs[0]
- if self.emb_mode == 1:
- batches = batch_iter(list(chared_id_x), self.batch_size, 1, shuffle=False)
- elif self.emb_mode == 2:
- batches = batch_iter(list(worded_id_x), self.batch_size, 1, shuffle=False)
- elif self.emb_mode == 3:
- batches = batch_iter(list(zip(chared_id_x, worded_id_x)), self.batch_size, 1, shuffle=False)
- elif self.emb_mode == 4:
- batches = batch_iter(list(zip(ngramed_id_x, worded_id_x)), self.batch_size, 1, shuffle=False)
- elif self.emb_mode == 5:
- batches = batch_iter(list(zip(ngramed_id_x, worded_id_x, chared_id_x)), self.batch_size, 1,
- shuffle=False)
- all_predictions = []
- all_scores = []
- nb_batches = int(len(labels) / self.batch_size)
- if len(labels) % self.batch_size != 0:
- nb_batches += 1
- # print("Number of batches in total: {}".format(nb_batches))
- '''
- it = tqdm(range(nb_batches),
- desc="emb_mode {} delimit_mode {} test_size {}".format(self.emb_mode,
- self.emb_mode,
- len(labels)), ncols=0)
- '''
- #it = tqdm(range(nb_batches))
- #for idx in it:
- for i in range(nb_batches):
- # for batch in batches:
- batch = next(batches)
- if self.emb_mode == 1:
- x_char_seq = batch
- elif self.emb_mode == 2:
- x_word = batch
- elif self.emb_mode == 3:
- x_char_seq, x_word = zip(*batch)
- elif self.emb_mode == 4:
- x_char, x_word = zip(*batch)
- elif self.emb_mode == 5:
- x_char, x_word, x_char_seq = zip(*batch)
- x_batch = []
- if self.emb_mode in [1, 3, 5]:
- x_char_seq = pad_seq_in_word(x_char_seq, self.max_len_chars)
- x_batch.append(x_char_seq)
- if self.emb_mode in [2, 3, 4, 5]:
- x_word = pad_seq_in_word(x_word, self.max_len_words)
- x_batch.append(x_word)
- if self.emb_mode in [4, 5]:
- x_char, x_char_pad_idx = pad_seq(x_char, self.max_len_words, self.max_len_subwords,
- self.emb_dim)
- x_batch.extend([x_char, x_char_pad_idx])
- batch_predictions, batch_scores = self.test_step(x_batch, self.emb_mode)
- all_predictions = np.concatenate([all_predictions, batch_predictions])
- all_scores.extend(batch_scores)
- # Leaving commented code for possible future use/testing
- #class_pred = all_predictions[0] # Class of URL -> Benign/Malicious
- #score = softmax(all_scores)[0][1] # Maliciousness score of URL (using softmax)
- #print("all_predictions " + str(all_predictions))
- #print("class_pred " + str(class_pred))
- #print("score " + str(score))
- state = ''
- softmax_scores = [softmax(i) for i in all_scores]
- #print("softmax_scores: " + str(softmax_scores))
- #for i in range(len(labels)):
- # print("Softmax score: " + str(softmax_scores[i][1]))
- sys.stdout = sys.__stdout__ # enable printing
- scores_lst = [i[1] for i in softmax_scores]
- return scores_lst
- ####### Examples #######
- if __name__ == "__main__":
- a = CNNURLScore()
- start = time.time()
- print("Score: " + "apple.com " + str(a.test_url("apple.com")))
- print("Score: " + "google.com " + str(a.test_url("google.com")))
- end = time.time()
- #print("Time: " + str(end - start))
- s = time.time()
- print(a.test_url_lst(["apple.com", "google.com"]))
- e = time.time()
- #print("Time 2: " + str(e - s))
- #s = CNNURLScore(delimit_mode_no = 0, subword_dict_directory = "runs/subjects/subwords_dict.p", word_dict_directory = "runs/subjects/words_dict.p",
- #char_dict_directory = "runs/subjects/chars_dict.p", emb_mode_no = 2)
- #print(s.emb_mode)
- #print(s.test_url("Subject testing"))
Add Comment
Please, Sign In to add comment