Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from sklearn.feature_extraction import DictVectorizer
- from tqdm import tqdm_notebook
- from sklearn.metrics import confusion_matrix
- from sklearn.metrics import precision_recall_fscore_support
- from sklearn.model_selection import KFold, StratifiedKFold
- from sklearn import svm
- from sklearn.ensemble import RandomForestClassifier
- from sklearn.neighbors import KNeighborsClassifier
- from sklearn.utils import shuffle
- import glob
- import os
- from msbase.utils import load_json
- from matplotlib.pyplot import cm
- import pandas as pd
- import numpy as np
- import json
- def load_vectors(vectors_dir: str, labels):
- DX = []
- DY = []
- DZ = []
- DAPKs = []
- for i, label in enumerate(labels):
- vectors = json.load(open(vectors_dir + '/' + label + "-vectors.json", "r"))
- DAPKs += [ apk for apk, v in vectors ]
- DX += [ v for apk, v in vectors ]
- DY += [i] * len(vectors)
- if label == "benign":
- DZ += [0] * len(vectors)
- else:
- DZ += [1] * len(vectors)
- if isinstance(DX[0], dict):
- v = DictVectorizer(sparse=False)
- DX = v.fit_transform(DX)
- feature_names = v.feature_names_
- else:
- DX = np.array(DX)
- return DX, np.array(DY), DZ, feature_names, DAPKs
- def classify_fold(train_X, train_Y, test_X, test_Y,
- labels, feature_names,
- n_estimators, max_features, max_depth, report=False):
- classifier = RandomForestClassifier(max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, n_jobs=6, random_state=33)
- classifier.fit(train_X, train_Y)
- pred_Y = classifier.predict(test_X)
- pred_proba_Y = classifier.predict_proba(test_X)
- for i in range(pred_proba_Y.shape[1], len(labels)):
- pred_proba_Y = np.insert(pred_proba_Y, i, 0, axis=1)
- if report:
- feature_importances = pd.DataFrame(classifier.feature_importances_,
- index = feature_names,
- columns=['importance']).sort_values('importance',ascending=False)
- # precision_recall_fscore_support(test_Y, pred_Y, labels=labels) + \
- return (None, None, None, None, feature_importances, test_Y, pred_Y, pred_proba_Y)
- return classifier.score(test_X, test_Y)
- def classify(DX, DY, labels, feature_names, DAPKs, n_estimators, max_features, split_ratio, max_depth, report=False):
- X, Y, APKs = shuffle(DX, DY, DAPKs)
- classifier = RandomForestClassifier(max_depth=max_depth, n_estimators=n_estimators, max_features=max_features, n_jobs=6)
- train_size = int(len(Y) * split_ratio)
- train_X = X[:train_size]
- train_Y = Y[:train_size]
- classifier.fit(train_X, train_Y)
- test_X = X[train_size:]
- test_Y = Y[train_size:]
- test_APKs = APKs[train_size:]
- pred_Y = classifier.predict(test_X)
- pred_proba_Y = classifier.predict_proba(test_X)
- if report:
- feature_importances = pd.DataFrame(classifier.feature_importances_,
- index = feature_names,
- columns=['importance']).sort_values('importance',ascending=False)
- return precision_recall_fscore_support(test_Y, pred_Y, labels=labels) + (feature_importances, test_Y, pred_Y, pred_proba_Y, test_APKs)
- return classifier.score(test_X, test_Y)
- def classify_knn(DX, DY, labels, split_ratio=0.7, n_neighbors=3, report=False):
- X, Y = shuffle(DX, DY)
- classifier = KNeighborsClassifier(n_neighbors=n_neighbors)
- train_size = int(len(Y) * split_ratio)
- train_X = X[:train_size]
- train_Y = Y[:train_size]
- classifier.fit(train_X, train_Y)
- test_X = X[train_size:]
- test_Y = Y[train_size:]
- pred_Y = classifier.predict(test_X)
- if report:
- return precision_recall_fscore_support(test_Y, pred_Y, labels=labels)
- return classifier.score(test_X, test_Y)
- def classify_svm(DX, DY, labels, kernel="rbf", split_ratio=0.7, report=False):
- X, Y = shuffle(DX, DY)
- classifier = svm.SVC(kernel=kernel)
- train_size = int(len(Y) * split_ratio)
- train_X = X[:train_size]
- train_Y = Y[:train_size]
- classifier.fit(train_X, train_Y)
- test_X = X[train_size:]
- test_Y = Y[train_size:]
- pred_Y = classifier.predict(test_X)
- if report:
- return precision_recall_fscore_support(test_Y, pred_Y, labels=labels), classifier
- return classifier.score(test_X, test_Y)
- def matrix(DX, DY, labels):
- _, n_feats = DX.shape
- estimate_scores = {}
- for n_estimators in [2, 20, 60, 80, 100, 160, 200]:
- if n_estimators > n_feats:
- continue
- estimate_scores[n_estimators] = {}
- for max_features in [2, 20, 60, 80, 100, 160, 200]:
- if max_features > n_feats:
- continue
- scores = []
- for i in range(10):
- scores.append(classify(DX, DY, labels=labels, n_estimators=n_estimators,
- max_features=max_features, split_ratio=0.7))
- score = np.mean(scores)
- estimate_scores[n_estimators][max_features] = score
- color= cm.rainbow(np.linspace(0, 1, len(estimate_scores)))
- n_estimators_map = dict(zip(estimate_scores.keys(), range(len(estimate_scores))))
- for n_estimators, scores in estimate_scores.items():
- xs, ys = zip(*scores.items())
- plt.plot(xs, ys, c=color[n_estimators_map[n_estimators]], label=str(n_estimators))
- plt.xlabel("max_features")
- plt.ylabel("accuracy")
- plt.legend()
- plt.show()
- def avg_eval(DX, DY, DAPKs, combined_labels, combined_labels_index, feature_names,
- max_features, n_estimators, n_fold, max_depth):
- feature_importances_s = []
- y_true_all = []
- y_pred_all = []
- y_pred_proba_all = []
- APKs_test_all = []
- kf = KFold(n_splits=n_fold, shuffle=True, random_state=36)
- for train_index, test_index in kf.split(DX):
- _, _, _, _, feature_importances, y_true, y_pred, y_pred_proba = \
- classify_fold(DX[train_index], DY[train_index], DX[test_index], DY[test_index],
- combined_labels_index, feature_names,
- max_features=max_features, n_estimators=n_estimators,
- max_depth=max_depth,
- report=True)
- feature_importances_s.append(feature_importances)
- y_true_all.append(y_true)
- y_pred_all.append(y_pred)
- y_pred_proba_all.append(y_pred_proba)
- APKs_test_all += list(pd.DataFrame(DAPKs, columns=["APK"]).loc[test_index]["APK"])
- return None, feature_importances_s, \
- np.concatenate(y_true_all), np.concatenate(y_pred_all), \
- np.concatenate(y_pred_proba_all), APKs_test_all
- # NOTE: Kmean isn't very good
- # kmeans = KMeans(n_clusters=len(combined_labels))
- # y_pred = kmeans.fit_predict(DX)
- # mat = confusion_matrix(DY, y_pred).T
- # mat
- # size_array = np.array([n for l, n in label_stat])
- # size_array
- # mat = (mat / size_array)
- # sn.heatmap(mat,
- # xticklabels=labels,
- # yticklabels=range(len(labels)))
- # plt.xlabel('true label')
- # plt.ylabel('predicted label')
- def load_vt_stat(apks):
- os.chdir("../..")
- metadata_paths = []
- #if not gapps_only:
- metadata_paths.extend(glob.glob("samples_metadata/*/*.test.json"))
- # metadata_paths.extend(glob.glob("all_samples_eval/*.test.json"))
- #label_samples = {}
- #bin_samples = { True: [], False: [] } # is_benign
- #vt_stat = {}
- #vt_stat_bin = {}
- #for metadata_path in metadata_paths:
- # testset_json = load_json(metadata_path)
- #if gapps_only:
- # label = os.path.basename(metadata_path).split(".")[0]
- # for test_data in testset_json:
- #if not gapps_only:
- # label = test_data['label']
- # if label not in label_samples:
- # label_samples[label] = []
- # bin_label = test_data['label'] == "benign"
- # label_samples[label].append(test_data)
- #bin_samples[bin_label].append(test_data)
- apks_is_malicious = {}
- apks_is_malicious_major = {}
- for metadata_path in metadata_paths:
- for test_data in load_json(metadata_path):
- if test_data["apk"] in apks:
- assert "virustotal" in test_data, test_data['apk']
- vt_report = test_data["virustotal"]
- assert "positives" in vt_report and "scans" in vt_report
- #vt_frac_positives += int(vt_report["positives"]) / len(vt_report["scans"])
- #print(vt_report["positives"], len(vt_report["scans"]))
- assert len(vt_report["scans"]) == vt_report["total"]
- vt_major = vt_report["positives"] > int(len(vt_report["scans"]) * 0.5)
- vt_exist = vt_report["positives"] >= 1
- apks_is_malicious[test_data['apk']] = vt_exist
- apks_is_malicious_major[test_data['apk']] = vt_major
- ret = [ int(apks_is_malicious[apk]) for apk in apks ]
- ret_major = [ int(apks_is_malicious_major[apk]) for apk in apks ]
- assert len(ret) == len(apks)
- os.chdir("eval/ase19")
- return ret, ret_major
- # vt_total += 1
- # if vt_total > 0:
- # vt_stat[label] = {
- # "vt_frac": vt_frac_positives / vt_total,
- # "vt_exist": vt_exist_positives / vt_total,
- # "vt_major": vt_major_positives / vt_total,
- # "vt_support": vt_total,
- # }
- # for label, samples in bin_samples.items():
- # vt_frac_positives = 0
- # vt_exist_positives = 0
- # vt_major_positives = 0
- # vt_total = 0
- # for test_data in samples:
- # if "virustotal" in test_data:
- # vt_report = test_data["virustotal"]
- # if "positives" in vt_report and "scans" in vt_report:
- # vt_frac_positives += int(vt_report["positives"]) / len(vt_report["scans"])
- # vt_exist_positives += int(vt_report["positives"] > 1)
- # vt_major_positives += int(vt_report["positives"] > len(vt_report["scans"]) * 0.5)
- # vt_total += 1
- # if vt_total > 0:
- # vt_stat_bin[label] = {
- # "vt_frac": vt_frac_positives / vt_total,
- # "vt_exist": vt_exist_positives / vt_total,
- # "vt_major": vt_major_positives / vt_total,
- # "vt_support": vt_total,
- # }
- # vt_result_df = pd.DataFrame(vt_stat).T
- # vt_stat_bin_df = pd.DataFrame(vt_stat_bin).T
- # vt_stat_bin_df = vt_stat_bin_df.rename(index={True: "benign", False: "malicious"}).drop(["vt_frac", "vt_major"], axis=1)
- # return vt_result_df, vt_stat_bin_df
- # FIXME: PCA is not good
- # from sklearn.decomposition import PCA
- # from mpl_toolkits.mplot3d import Axes3D
- # pca = PCA(n_components=3)
- # pca_2 = PCA(n_components=2)
- # components = pca.fit_transform(DX)
- # components_2 = pca_2.fit_transform(DX)
- # result = pd.DataFrame(components, columns=['PCA%i' % i for i in range(3)])
- # print(result.shape)
- # result_2 = pd.DataFrame(components_2, columns=['PCA%i' % i for i in range(2)])
- # print(result_2.shape)
- # def plot(color_map, DY, labels):
- # colors = [color_map[y] for y in DY]
- ## Plot initialisation
- # fig = plt.figure(figsize=(8, 6))
- # ax = Axes3D(fig)
- # ax.scatter(result['PCA0'], result['PCA1'], result['PCA2'], c=colors, cmap="Set2_r", s=60)
- ## make simple, bare axis lines through space:
- # xAxisLine = ((min(result['PCA0']), max(result['PCA0'])), (0, 0), (0,0))
- # ax.plot(xAxisLine[0], xAxisLine[1], xAxisLine[2], 'r')
- # yAxisLine = ((0, 0), (min(result['PCA1']), max(result['PCA1'])), (0,0))
- # ax.plot(yAxisLine[0], yAxisLine[1], yAxisLine[2], 'r')
- # zAxisLine = ((0, 0), (0,0), (min(result['PCA2']), max(result['PCA2'])))
- # ax.plot(zAxisLine[0], zAxisLine[1], zAxisLine[2], 'r')
- ## label the axes
- # ax.set_xlabel("PC1")
- # ax.set_ylabel("PC2")
- # ax.set_zlabel("PC3")
- # markers = [plt.Line2D([0,0],[0,0], color=color, marker='o', linestyle='') for color in color_map]
- # plt.legend(markers, labels, numpoints=1)
- # color_map = cm.rainbow(np.linspace(0, 1, len(combined_labels)))
- # plot(color_map, DY, combined_labels)
- # color_map_2 = cm.rainbow(np.linspace(0, 1, 2))
- # plot(color_map_2, DZ, ["benign", "malicous"])
- # DX_pca = result
- # DX_pca.shape
- # results, classifier = classify_svm(result_2, DZ, labels=[0,1], report=True)
- # results
- # plt.figure(1, figsize=(4, 3))
- # colors_2 = [color_map_2[y] for y in DZ]
- # plt.scatter(result_2['PCA0'], result['PCA1'], c=colors_2, zorder=10, cmap=plt.cm.Paired,
- # edgecolors='k')
- # plt.scatter(classifier.support_vectors_[:, 0], classifier.support_vectors_[:, 1], s=80,
- # facecolors='none', zorder=10, edgecolors='k')
- # plt.axis('tight')
- # x_min = -4
- # x_max = 4
- # y_min = -4
- # y_max = 4
- # XX, YY = np.mgrid[x_min:x_max:200j, y_min:y_max:200j]
- # Z = classifier.decision_function(np.c_[XX.ravel(), YY.ravel()])
- # Put the result into a color plot
- # Z = Z.reshape(XX.shape)
- # plt.figure(1, figsize=(4, 3))
- # plt.pcolormesh(XX, YY, Z > 0, cmap=plt.cm.Paired)
- # plt.contour(XX, YY, Z, colors=['k', 'k', 'k'], linestyles=['--', '-', '--'],
- # levels=[-1, -.5, 0, .5, 1])
- # plt.xlim(x_min, x_max)
- # plt.ylim(y_min, y_max)
- # plt.xticks(())
- # plt.yticks(())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement