Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os.path
- from collections import Counter
- import numpy as np
- import scipy as sp
- import pandas as pd
- import csv
- #[L, R] - границы дисперсии, только факторы с такой дисперсией будем рассматривать
- from sklearn.ensemble import AdaBoostClassifier
- L = 0.5
- R = 10
- #pos - индексы факторов (столбцов), дисперсия которых не лежит в [L, R]
- pos = [0, 1, 2, 4, 8, 13, 21, 22, 23, 27, 28]
- class StandardScaler:
- def __init__(self):
- self.mean = []
- self.var = []
- def fit(self, X):
- for j in range(len(X[0])):
- #среднее значение
- sum = 0.0
- for i in range(len(X)):
- sum += X[i][j]
- self.mean.append(sum / len(X))
- #дисперсия
- sum = 0.0
- for i in range(len(X)):
- sum += (X[i][j] - self.mean[j]) * (X[i][j] - self.mean[j])
- sum /= (len(X) - 1)
- self.var.append(sp.sqrt(sum))
- #print(self.var)
- delete_features(X, self.var)
- return self
- def transform(self, X):
- for i in range(len(X)):
- for j in range(len(X[i])):
- X[i][j] = (X[i][j] - self.mean[j]) / self.var[j]
- return X
- #Так как большой разброс и очень мало данных,
- #значит влияние фактора c большой (или маленькой) дисперсией только ухудшает точность -
- #удалим такие факторы
- def delete_features(X, var):
- ind = []
- for i in range(len(var)):
- if not (L < var[i] < R):
- ind.append(i)
- #print(ind)
- return np.delete(X, ind, 1)
- class MinMaxScaler:
- def __init__(self):
- self.data_min = []
- self.data_max = []
- def fit(self, X):
- for j in range(len(X[0])):
- self.data_min.append(X[:,j].min())
- self.data_max.append(X[:, j].max())
- def transform(self, X):
- for i in range(len(X)):
- #print(i)
- for j in range(len(X[i])):
- X[i][j] = (X[i][j] - self.data_min[j]) /\
- (self.data_max[j] - self.data_min[j])
- return X
- def cross_features(X):
- n = len(X[0])
- A = np.zeros(len(X), [])
- for i in range(len(X)):
- for j in range(n):
- for k in range(j, n):
- A[i].append(X[i][j] * X[i][k])
- return A
- class LinearRegression:
- def __init__(self):
- self.beta = None
- self.beta0 = None
- def fit(self, X, y):
- sum = y.sum()
- self.beta0 = sum / len(y)
- T = X.transpose()
- #TX = np.dot(T, X)
- TX = np.matmul(T,X)
- ITX = np.linalg.inv(TX)
- ITXT = np.matmul(ITX, T)
- self.beta = np.matmul(ITXT, np.transpose(y))
- return self
- def predict(self, X):
- p = self.beta
- res = [0] * len(X)
- for k in range(len(res)):
- for i in range(len(p)):
- res[k] += p[i] * X[k][i]
- #будем использовать beta0, как свободный член
- res[k] += self.beta0
- return res
- def rmse(y_true, y_pred):
- sum = 0.0
- for i in range(len(y_true)):
- sum += (y_pred[i] - y_true[i]) * (y_pred[i] - y_true[i])
- return sp.sqrt(sum/len(y_true))
- def learn_test_split(X, y):
- XL = []
- XT = []
- yL = []
- yT = []
- for i in range(len(X)):
- a = np.random.randint(0, 1)
- if a == 0 and len(XL) < len(X) / 2:
- XL.append(X[i])
- yL.append(y[i])
- else:
- XT.append(X[i])
- yT.append(y[i])
- X_learn = np.asarray(XL)
- X_test = np.asarray(XT)
- y_learn = np.asarray(yL)
- y_test = np.asarray(yT)
- return X_learn, X_test, y_learn, y_test
- def cv_linear_regression(X, y, *, n_iter=1000):
- a = LinearRegression()
- sum_rmse = 0.0
- for i in range(n_iter):
- X_learn, X_test, y_learn, y_test = learn_test_split(X, y)
- a.fit(X_learn, y_learn)
- y_pred = a.predict(X_test)
- sum_rmse += rmse(y_test, y_pred)
- return sum_rmse / n_iter
- def feature_importance(X, Y, feature_names):
- adabBoost = AdaBoostClassifier(random_state=42, n_estimators=100)
- adabBoost.fit(X, Y)
- importances = adabBoost.feature_importances_
- indices = np.argsort(importances)[::-1]
- # Print the feature ranking
- print("Feature ranking:")
- for f in range(X.shape[1]):
- print("%d. feature %s (%f)" % (f + 1, feature_names[indices[f]], importances[indices[f]]))
- class Pipeline:
- def __init__(self):
- #self.scaler = MinMaxScaler()
- self.scaler = StandardScaler()
- self.LR = LinearRegression()
- def fit(self, X, y):
- self.scaler = self.scaler.fit(X)
- X = self.scaler.transform(X)
- self.LR = self.LR.fit(X, y)
- return self
- def predict(self, X):
- X = self.scaler.transform(X)
- y = self.LR.predict(X)
- return y
- class CatEncoder:
- def __init__(self):
- self.feature_values = [[]]
- self.rows = []
- def fit(self, X):
- l = 0
- for j in range(len(X[0])):
- if type(X[0][j]) == str:
- self.rows.append(j)
- self.feature_values.append([])
- for i in range(len(X)):
- self.feature_values[l].append((X[i][j], i))
- self.feature_values[l].sort()
- l += 1
- return self
- def transform(self, X):
- for k, j in enumerate(self.rows):
- for i, cur in enumerate(self.feature_values[k]):
- X[cur[1]][j] = i
- return X
- def clean_data(X):
- # X = np.delete(X, del_columns, 1)
- for j in range(len(X[0])):
- cnt = Counter()
- empty = []
- for i in range(len(X)):
- if str(X[i][j]) != 'nan':
- cnt[X[i][j]] += 1
- else:
- empty.append(i)
- for i, v in enumerate(empty):
- X[v][j] = cnt.most_common(1)[0][0]
- return X
- def into(X):
- X = clean_data(X)
- CE = CatEncoder().fit(X)
- X = CE.transform(X)
- X = X.astype('float64')
- return X
- def main():
- df_learn = pd.read_csv("learn.csv")
- X_learn = df_learn.ix[:, :-1].as_matrix()
- X_learn = into(X_learn)
- y_learn = df_learn.ix[:, -1].as_matrix()
- # X = np.delete(X, pos, 1)
- rmse = cv_linear_regression(X_learn, y_learn)
- Pl = Pipeline().fit(X_learn, y_learn)
- df_test = pd.read_csv("test.csv")
- #X_test = pd.get_dummies(df_test.ix[:, 1:],columns=cat_columns).as_matrix()
- X_test = df_test.ix[:, 1:].as_matrix()
- X_test = into(X_test)
- y_pred = Pl.predict(X_test)
- solution_name, _ext = os.path.splitext(os.path.basename(__file__))
- pd.DataFrame({"y": y_pred, "id": df_test["id"]}) \
- .to_csv(solution_name + ".csv", index=False, header=True)
- print("rmse:", rmse)
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement