Advertisement
Guest User

Untitled

a guest
Mar 24th, 2019
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.89 KB | None | 0 0
  1. ## Create Date Parameter
  2. dbutils.widgets.text("varReportDate", "19000101")
  3. ReportDate = dbutils.widgets.get("varReportDate")
  4. print(ReportDate)
  5.  
  6. ## Connect to Storage
  7. storage_account_name = "mystorage"
  8. storage_account_access_key = ""
  9.  
  10. file_location = "wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_" + ReportDate + ".csv"
  11. file_type = "csv"
  12.  
  13. spark.conf.set(
  14. "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  15. storage_account_access_key)
  16.  
  17. ## Define your Input Schema
  18. from pyspark.sql.types import *
  19.  
  20. schema = StructType([
  21. StructField("ReportingDate", DateType(), True),
  22. StructField("id", StringType(), True),
  23. StructField("x1", IntegerType(), True),
  24. StructField("x2", DoubleType(), True)
  25. ])
  26.  
  27. ## Read in Data
  28. dataset = spark.read\
  29. .format(file_type)\
  30. .option("header", "true")\
  31. .schema(schema)\
  32. .load(file_location)
  33. #.schema(schema)
  34. #.option("inferSchema", "true")
  35. # dataset = dataset.na.fill(0)
  36. display(dataset)
  37.  
  38. ## Load in Model and Transformation Pipeline
  39. from pyspark.ml.tuning import CrossValidatorModel
  40. from pyspark.ml import PipelineModel
  41. from pyspark.sql.types import IntegerType
  42. from pyspark.sql.functions import col, round
  43. from pyspark.ml.regression import GeneralizedLinearRegressionModel
  44.  
  45. mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
  46. # Fit the pipeline to new data
  47. mydataset = mypipeline.transform(dataset)
  48.  
  49. # Score the data using the model
  50. mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
  51. mystamp = mymodel.bestModel.transform(mydataset)
  52.  
  53. mystamp = mystamp.select(col("id"),
  54. col("ReportingDate"),
  55. col("prediction").alias("MyForecast"))
  56. display(mystamp)
  57.  
  58. ## Write out Data
  59. fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"
  60. output.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv(fileloc)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement