Guest User

Untitled

a guest
Nov 22nd, 2017
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.96 KB | None | 0 0
  1. {"cells":[{"cell_type":"code","source":["import pyspark.sql.functions as Fn\nimport pyspark.sql.types as TP\n\n\nimport pandas as pd\nfrom random import randint\nfrom sklearn.ensemble import RandomForestClassifier, RandomForestRegressor\nimport numpy as np\nimport pickle\nimport math\nimport lightgbm as lgb\nfrom pyspark.ml.pipeline import Estimator, Model, Pipeline\nfrom pyspark.ml.param.shared import *\nfrom pyspark.sql.functions import avg, stddev_samp"],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"code","source":["##loading the data and very basic data imputing \ndf=sqlContext.read.format(\"csv\").option(\"header\", \"true\").option(\"inferSchema\",\"true\").option(\"sep\",\",\").option(\"charset\",\"iso-8859-1\").load(\"/FileStore/tables/o1ckeyso1507654725878/cs_training-d35cb.csv\")\ndf=df.withColumn(\"MonthlyIncome\",Fn.col(\"MonthlyIncome\").cast(\"double\"))\ndf=df.withColumn(\"label\",Fn.lit(1)-Fn.col(\"SeriousDlqin2yrs\"))\ndf=df.withColumn(\"NumberOfDependents\",Fn.col(\"NumberOfDependents\").cast(\"int\"))\ndf=df.fillna(0)\n"],"metadata":{},"outputs":[],"execution_count":2},{"cell_type":"code","source":["#!!!! when creating a class, use only the camel case, for the creation all variables\n\nclass HasListFeatures(Params):\n\n list_features = Param(Params._dummy(),\n \"list_features\", \"list_features\")\n\n def __init__(self):\n super(HasListFeatures, self).__init__()\n\n def setListFeatures(self, value):\n return self._set(list_features=value)\n\n def getListFeatures(self):\n return self.getOrDefault(self.list_features)\n\nclass HasPartitionNumber(Params):\n\n partition_number = Param(Params._dummy(),\n \"partition_number\", \"partition_number\")\n\n def __init__(self):\n super(HasPartitionNumber, self).__init__()\n\n def setPartitionNumber(self, value):\n return self._set(partition_number=value)\n\n def getPartitionNumber(self):\n return self.getOrDefault(self.partition_number)\n \nclass HasSklearnModel(Params):\n\n sklearn_model = Param(Params._dummy(),\n \"sklearn_model\", \"sklearn_model\")\n\n def __init__(self):\n super(HasSklearnModel, self).__init__()\n\n def setSklearnModel(self, value):\n return self._set(sklearn_model=value)\n\n def getSklearnModel(self):\n return self.getOrDefault(self.sklearn_model) \n\n \nclass HasListModels(Params):\n\n list_models = Param(Params._dummy(),\n \"list_models\", \"list_models\")\n\n def __init__(self):\n super(HasListModels, self).__init__()\n\n def setListModels(self, value):\n return self._set(list_models=value)\n\n def getListModels(self):\n return self.getOrDefault(self.list_models) \n \n "],"metadata":{},"outputs":[],"execution_count":3},{"cell_type":"code","source":["class DataPrepPartition(Estimator, HasInputCol,\n HasPartitionNumber):\n\n def _fit(self, dataset):\n target=self.getInputCol()\n list_features=dataset.columns\n list_features.remove(target)\n return (DataPrepPartitionModel()\n .setInputCol(target)\n .setListFeatures(list_features)\n .setPartitionNumber(self.getPartitionNumber()))\n \nclass DataPrepPartitionModel(Model, HasInputCol,\n HasPartitionNumber,HasListFeatures):\n \n def _transform(self, dataset):\n partition_number= self.getPartitionNumber()\n target=self.getInputCol()\n list_features=self.getListFeatures()\n \n def build_model_rf_list(y,features, depth, trees):\n # !! spark does not accept numpy format\n #y=pandas_df[0]\n #features=pandas_df[range(1,pandas_df.shape[1])]\n clf=RandomForestRegressor(n_estimators=trees,max_depth=depth,min_samples_split=2,random_state=None,n_jobs=3)\n model_clf=clf.fit(features, y)\n return model_clf\n \n def create_rand_int(x):\n return randint(0,x)\n rand_int_udf=Fn.udf(create_rand_int,TP.IntegerType())\n \n return dataset.withColumn(\"partition\",rand_int_udf(Fn.lit(partition_number))).repartition(\"partition\").select( [\"partition\", target]+list_features).rdd.map(lambda tuple_: (tuple_[0],(tuple_[1:])))\n \n \n"],"metadata":{},"outputs":[],"execution_count":4},{"cell_type":"code","source":["class DataModelling(Model, HasSklearnModel):\n \n def _transform(self, dataset):\n sklearn_model=self.getSklearnModel()\n def build_model_rf_list(y,features, sklearn_model):\n model_clf=sklearn_model.fit(features, y)\n return model_clf\n \n return dataset.combineByKey(lambda x: ([x[0]],[x[1:]]), lambda x, y: (x[0]+[y[0]],x[1]+[y[1:]]), lambda x,y: x + y).map(lambda (k,v):(k,build_model_rf_list(v[0],v[1],sklearn_model)))\n"],"metadata":{},"outputs":[],"execution_count":5},{"cell_type":"code","source":["class DataTransforming(Model, HasListModels):\n\n def _transform(self, dataset):\n list_models=self.getListModels()\n #broadcast_list=sc.broadcast(list_models)\n def apply_model(xx,model):\n x=np.array(xx).reshape(1,len(xx))\n return model.predict(x)[0].item()\n\n return dataset.map(lambda (k,v): (v[0],[apply_model(v[1:],mod) for mod in list_models])).map(lambda (y,y_pred): (y,np.mean(y_pred).item(), np.std(y_pred).item()))\n"],"metadata":{},"outputs":[],"execution_count":6},{"cell_type":"code","source":["#data preparation\ndata_prep=DataPrepPartition().setInputCol(\"MonthlyIncome\").setPartitionNumber(3)\n\nmodel_prep=data_prep.fit(df)\n\nrdd_transformed_data=model_prep.transform(df)\n \nrdd_transformed_data.take(5)"],"metadata":{},"outputs":[],"execution_count":7},{"cell_type":"code","source":["#training to get the models\nclf=RandomForestRegressor(n_estimators=10,max_depth=50,min_samples_split=2,random_state=None,n_jobs=3)\ndm=DataModelling().setSklearnModel(clf)\n\nlist_model=dm.transform(rdd_transformed_data).collect()"],"metadata":{},"outputs":[],"execution_count":8},{"cell_type":"code","source":["# saving our models\nfor i in range(len(list_model)):\n filename = '/dbfs/FileStore/tables/model_test'+str(i)+'.sav'\n pickle.dump(list_model[i][1], open(filename, 'wb+'))"],"metadata":{},"outputs":[],"execution_count":9},{"cell_type":"code","source":["# now doing loading the models, that we will apply on our new data\nlist_models=[]\nfor i in range(len(list_model)):\n filename = '/dbfs/FileStore/tables/model_test'+str(i)+'.sav'\n list_models.append(pickle.load(open(filename, 'rb')))\nlist_models"],"metadata":{},"outputs":[],"execution_count":10},{"cell_type":"code","source":["#transforming the data\ndt=DataTransforming().setListModels(list_models)\n\nrdd_output=dt.transform(rdd_transformed_data)\nrdd_output.take(5)\n"],"metadata":{},"outputs":[],"execution_count":11},{"cell_type":"code","source":[""],"metadata":{},"outputs":[],"execution_count":12}],"metadata":{"name":"distribute_sklearn_classes","notebookId":780316462238092},"nbformat":4,"nbformat_minor":0}
Add Comment
Please, Sign In to add comment