Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ## Create Date Parameter
- dbutils.widgets.text("varReportDate", "19000101")
- ReportDate = dbutils.widgets.get("varReportDate")
- print(ReportDate)
- ## Connect to Storage
- storage_account_name = "mystorage"
- storage_account_access_key = ""
- file_location = "wasbs://<container>@mystorage.blob.core.windows.net/myfiles/data_" + ReportDate + ".csv"
- file_type = "csv"
- spark.conf.set(
- "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
- storage_account_access_key)
- ## Define your Input Schema
- from pyspark.sql.types import *
- schema = StructType([
- StructField("ReportingDate", DateType(), True),
- StructField("id", StringType(), True),
- StructField("x1", IntegerType(), True),
- StructField("x2", DoubleType(), True)
- ])
- ## Read in Data
- dataset = spark.read\
- .format(file_type)\
- .option("header", "true")\
- .schema(schema)\
- .load(file_location)
- #.schema(schema)
- #.option("inferSchema", "true")
- # dataset = dataset.na.fill(0)
- display(dataset)
- ## Load in Model and Transformation Pipeline
- from pyspark.ml.tuning import CrossValidatorModel
- from pyspark.ml import PipelineModel
- from pyspark.sql.types import IntegerType
- from pyspark.sql.functions import col, round
- from pyspark.ml.regression import GeneralizedLinearRegressionModel
- mypipeline = PipelineModel.load("/mnt/trainedmodels/pipeline/")
- # Fit the pipeline to new data
- mydataset = mypipeline.transform(dataset)
- # Score the data using the model
- mymodel = CrossValidatorModel.load("/mnt/trainedmodels/lr")
- mystamp = mymodel.bestModel.transform(mydataset)
- mystamp = mystamp.select(col("id"),
- col("ReportingDate"),
- col("prediction").alias("MyForecast"))
- display(mystamp)
- ## Write out Data
- fileloc = "/mnt/output" + str(ReportDate) #+ ".csv"
- output.write.mode("overwrite").format("com.databricks.spark.csv").option("header","true").csv(fileloc)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement