Guest User

PySpark Qualitative_Bankruptcy

a guest
Jul 20th, 2015
348
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.46 KB | None | 0 0
  1.  
  2. # coding: utf-8
  3.  
  4. # First import packages and classes that we will need throughout
  5.  
  6. # In[182]:
  7.  
  8. import numpy as np
  9.  
  10. from pyspark.mllib.regression import LabeledPoint
  11.  
  12. from pyspark.mllib.classification import LogisticRegressionWithLBFGS
  13. from pyspark.mllib.classification import LogisticRegressionWithSGD
  14.  
  15.  
  16. # Make sure that we have the files in the right place
  17.  
  18. # In[183]:
  19.  
  20. get_ipython().run_cell_magic(u'sh', u'', u'ls playground/')
  21.  
  22.  
  23. # Peek at the data set
  24.  
  25. # In[184]:
  26.  
  27. get_ipython().run_cell_magic(u'sh', u'', u'head playground/Qualitative_Bankruptcy.data.txt')
  28.  
  29.  
  30. # Load the data set into a Spark RDD
  31.  
  32. # In[206]:
  33.  
  34. data = sc.textFile('playground/Qualitative_Bankruptcy.data.txt')
  35.  
  36.  
  37. # Check to makes sure the data set is loaded correctly
  38.  
  39. # In[207]:
  40.  
  41. print data.count()
  42.  
  43. assert data.count() == 250
  44.  
  45.  
  46. # Read few lines
  47.  
  48. # In[208]:
  49.  
  50. data.take(2)
  51.  
  52.  
  53. # The dictionary `getDoubleValue` map the categorical features into numerial representation
  54. #
  55. # The function `line_parser()` transform each line into a `LabeledPoint` object
  56. #
  57. # Finally perform try the function on some examples
  58.  
  59. # In[209]:
  60.  
  61. getDoubleValue = { 'P' : 3.0, 'A' : 2.0, 'N' : 1.0, 'NB': 1.0, 'B': 0.0 }
  62.  
  63. def line_parser(line):
  64.     tokens = line.split(',')
  65.     label = getDoubleValue[tokens[-1]]
  66.     features = map(lambda t: getDoubleValue[t], tokens[:-1])
  67.     return LabeledPoint(label, features)
  68.  
  69. lp = line_parser(example_line)
  70. print lp
  71.  
  72. assert lp.label == 1.0
  73. assert np.allclose(lp.features, [3.0, 3.0, 2.0, 2.0, 2.0, 3.0])
  74.  
  75.  
  76. # Map the data set into a data set of `LabeledPoint`s
  77.  
  78. # In[210]:
  79.  
  80. parsedData = data.map(line_parser)
  81.  
  82. print parsedData.take(1)[0]
  83.  
  84. # Integrity check
  85. assert parsedData.filter(lambda lp: lp.label != 1.0 and lp.label != 0.0).isEmpty()
  86.  
  87.  
  88. # Split the data into training and test (we're missing the validation set)
  89.  
  90. # In[211]:
  91.  
  92. trainingData, testData = parsedData.randomSplit([0.6, 0.4], seed = 434)
  93.  
  94.  
  95. # Train two logistic regression models with two different optimizers (LBFGS and SGD).
  96.  
  97. # In[212]:
  98.  
  99. model1 = LogisticRegressionWithLBFGS.train(trainingData, iterations = 100, intercept = True, numClasses = 2)
  100. model2 = LogisticRegressionWithSGD.train(trainingData, iterations = 100, intercept = True)
  101.  
  102. # Print the model parameters
  103. print model1
  104. print model2
  105.  
  106.  
  107. # Compare the models on few random samples
  108.  
  109. # In[213]:
  110.  
  111. samples = trainingData.sample(False, 10.0 / 250.0).collect()
  112. for point in samples:
  113.     print point, model1.predict(point.features), model2.predict(point.features)
  114.  
  115.  
  116. # Evaluate the training and test errors
  117.  
  118. # In[217]:
  119.  
  120. trainingLabelAndPreds1 = trainingData.map(lambda point: (point.label, model1.predict(point.features)))
  121. trainingError1 = trainingLabelAndPreds1.map(lambda (r1, r2): float(r1 != r2)).mean()
  122. print 'LBFGS training error =', trainingError1
  123.  
  124. testLabelAndPreds1 = testData.map(lambda point: (point.label, model1.predict(point.features)))
  125. testError1 = testLabelAndPreds1.map(lambda (r1, r2): float(r1 != r2)).mean()
  126. print 'LBFGS test error =',testError1
  127.  
  128. trainingLabelAndPreds2 = trainingData.map(lambda point: (point.label, model2.predict(point.features)))
  129. trainingError2 = trainingLabelAndPreds2.map(lambda (r1, r2): float(r1 != r2)).mean()
  130. print 'SGD training error =', trainingError2
  131.  
  132. testLabelAndPreds2 = testData.map(lambda point: (point.label, model2.predict(point.features)))
  133. testError2 = testLabelAndPreds2.map(lambda (r1, r2): float(r1 != r2)).mean()
  134. print 'SGD test error =', testError2
Add Comment
Please, Sign In to add comment