SHARE
TWEET

PySpark Qualitative_Bankruptcy

a guest Jul 20th, 2015 278 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1.  
  2. # coding: utf-8
  3.  
  4. # First import packages and classes that we will need throughout
  5.  
  6. # In[182]:
  7.  
  8. import numpy as np
  9.  
  10. from pyspark.mllib.regression import LabeledPoint
  11.  
  12. from pyspark.mllib.classification import LogisticRegressionWithLBFGS
  13. from pyspark.mllib.classification import LogisticRegressionWithSGD
  14.  
  15.  
  16. # Make sure that we have the files in the right place
  17.  
  18. # In[183]:
  19.  
  20. get_ipython().run_cell_magic(u'sh', u'', u'ls playground/')
  21.  
  22.  
  23. # Peek at the data set
  24.  
  25. # In[184]:
  26.  
  27. get_ipython().run_cell_magic(u'sh', u'', u'head playground/Qualitative_Bankruptcy.data.txt')
  28.  
  29.  
  30. # Load the data set into a Spark RDD
  31.  
  32. # In[206]:
  33.  
  34. data = sc.textFile('playground/Qualitative_Bankruptcy.data.txt')
  35.  
  36.  
  37. # Check to makes sure the data set is loaded correctly
  38.  
  39. # In[207]:
  40.  
  41. print data.count()
  42.  
  43. assert data.count() == 250
  44.  
  45.  
  46. # Read few lines
  47.  
  48. # In[208]:
  49.  
  50. data.take(2)
  51.  
  52.  
  53. # The dictionary `getDoubleValue` map the categorical features into numerial representation
  54. #
  55. # The function `line_parser()` transform each line into a `LabeledPoint` object
  56. #
  57. # Finally perform try the function on some examples
  58.  
  59. # In[209]:
  60.  
  61. getDoubleValue = { 'P' : 3.0, 'A' : 2.0, 'N' : 1.0, 'NB': 1.0, 'B': 0.0 }
  62.  
  63. def line_parser(line):
  64.     tokens = line.split(',')
  65.     label = getDoubleValue[tokens[-1]]
  66.     features = map(lambda t: getDoubleValue[t], tokens[:-1])
  67.     return LabeledPoint(label, features)
  68.  
  69. lp = line_parser(example_line)
  70. print lp
  71.  
  72. assert lp.label == 1.0
  73. assert np.allclose(lp.features, [3.0, 3.0, 2.0, 2.0, 2.0, 3.0])
  74.  
  75.  
  76. # Map the data set into a data set of `LabeledPoint`s
  77.  
  78. # In[210]:
  79.  
  80. parsedData = data.map(line_parser)
  81.  
  82. print parsedData.take(1)[0]
  83.  
  84. # Integrity check
  85. assert parsedData.filter(lambda lp: lp.label != 1.0 and lp.label != 0.0).isEmpty()
  86.  
  87.  
  88. # Split the data into training and test (we're missing the validation set)
  89.  
  90. # In[211]:
  91.  
  92. trainingData, testData = parsedData.randomSplit([0.6, 0.4], seed = 434)
  93.  
  94.  
  95. # Train two logistic regression models with two different optimizers (LBFGS and SGD).
  96.  
  97. # In[212]:
  98.  
  99. model1 = LogisticRegressionWithLBFGS.train(trainingData, iterations = 100, intercept = True, numClasses = 2)
  100. model2 = LogisticRegressionWithSGD.train(trainingData, iterations = 100, intercept = True)
  101.  
  102. # Print the model parameters
  103. print model1
  104. print model2
  105.  
  106.  
  107. # Compare the models on few random samples
  108.  
  109. # In[213]:
  110.  
  111. samples = trainingData.sample(False, 10.0 / 250.0).collect()
  112. for point in samples:
  113.     print point, model1.predict(point.features), model2.predict(point.features)
  114.  
  115.  
  116. # Evaluate the training and test errors
  117.  
  118. # In[217]:
  119.  
  120. trainingLabelAndPreds1 = trainingData.map(lambda point: (point.label, model1.predict(point.features)))
  121. trainingError1 = trainingLabelAndPreds1.map(lambda (r1, r2): float(r1 != r2)).mean()
  122. print 'LBFGS training error =', trainingError1
  123.  
  124. testLabelAndPreds1 = testData.map(lambda point: (point.label, model1.predict(point.features)))
  125. testError1 = testLabelAndPreds1.map(lambda (r1, r2): float(r1 != r2)).mean()
  126. print 'LBFGS test error =',testError1
  127.  
  128. trainingLabelAndPreds2 = trainingData.map(lambda point: (point.label, model2.predict(point.features)))
  129. trainingError2 = trainingLabelAndPreds2.map(lambda (r1, r2): float(r1 != r2)).mean()
  130. print 'SGD training error =', trainingError2
  131.  
  132. testLabelAndPreds2 = testData.map(lambda point: (point.label, model2.predict(point.features)))
  133. testError2 = testLabelAndPreds2.map(lambda (r1, r2): float(r1 != r2)).mean()
  134. print 'SGD test error =', testError2
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Not a member of Pastebin yet?
Sign Up, it unlocks many cool features!
 
Top