ArcheontPB

Spark-comands

Mar 15th, 2020
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.99 KB | None | 0 0
  1. # With column helps you to create new columm basing on the values of old
  2. df = df.withColumn('percentagescaleddays', round((df['DAYSONMARKET'] - min_days) / (max_days - min_days)) * 100)
  3.  
  4. # return a tuple with colnames n=and datatypes
  5. df.dtypes
  6.  
  7. # Drop columns in list
  8. cols_to_drop = ['LOTSIZEDIMENSION', 'LISTTYPE']
  9. df = df.drop(*cols_to_drop)
  10.  
  11. # where is used to filter rows that pass the specified condition
  12. text_filter = ~df['ASSUMABLEMORTGAGE'].isin(yes_values) | df['ASSUMABLEMORTGAGE'].isNull()
  13. df = df.where(text_filter)
  14.  
  15. # import of aggregate functions
  16. from pyspark.sql.functions import mean, stddev
  17.  
  18. # with agg we can apply the aggregate function, collect()[0][0] can be used to retrive the calculated value
  19. mean_val = df.agg({'log_SalesClosePrice': 'mean'}).collect()[0][0]
  20. stddev_val = df.agg({'log_SalesClosePrice': 'stddev'}).collect()[0][0]
  21.  
  22. # Joins (conditions can be a list)
  23. condition = [walk_df['latitude'] == df['latitude'], walk_df['longitude'] == df['longitude']]
  24. df_1.join(df_2, condition, 'type_in_string')
  25.  
  26. # add df to view
  27. df.createOrReplaceTempView('df')
  28.  
  29. # run sql query
  30. spark.sql('query in string')
  31.  
  32. # we can cast a column to a type given in string
  33. walk_df['latitude'].cast('double')
  34.  
  35.  
  36. # we can convert a column with date to spark date using to_date
  37. df = df.withColumn('date', to_date(df['LISTDATE']))
  38.  
  39. # and then in pyspark.sql.functions we have all functions to manipulate date
  40.  
  41. ## Using lag and window you can create column that have value from a previous row
  42.  
  43. from pyspark.sql.functions import lag, datediff, to_date
  44. from pyspark.sql.window import Window
  45.  
  46. # Cast data type
  47. mort_df = mort_df.withColumn('DATE', to_date(mort_df['DATE']))
  48.  
  49. # Create window
  50. w = Window().orderBy(mort_df['DATE'])
  51. # Create lag column
  52. mort_df = mort_df.withColumn('DATE-1', lag('DATE', count=1).over(w))
  53.  
  54. # use like function to find pattern in a string
  55. has_attached_garage = df['GARAGEDESCRIPTION'].like('%Attached Garage%')
  56.  
  57. # for conditionals use when and otherwise
  58. df = df.withColumn('has_attached_garage', (when(has_attached_garage, 1)
  59. .when(has_detached_garage, 0)
  60. .otherwise(None)))
  61.  
  62.  
  63. ##############################################
  64. from pyspark.sql.functions import coalesce, first
  65.  
  66. # Pivot
  67. piv_df = ex_df.groupBy('NO').pivot('ex_garage_list').agg(coalesce(first('constant_val')))
  68.  
  69. # Join the dataframes together and fill null
  70. joined_df = df.join(piv_df, on='NO', how='left')
  71.  
  72. # Columns to zero fill
  73. zfill_cols = piv_df.columns
  74.  
  75. # Zero fill the pivoted values
  76. zfilled_df = joined_df.fillna(0, subset=zfill_cols)
  77. ##############################################
  78.  
  79. # Fit Lasso model (α = 1) to training data
  80. regression = LinearRegression(labelCol='duration', regParam=1, elasticNetParam=1)
  81.  
  82. # Pipelines
  83.  
  84. from pyspark.ml import Pipeline
  85.  
  86. pipeline = Pipeline(stages=[indexer, onehot, assemble, regression])
  87. pipeline = pipeline.fit(train_df) # call fit for all
  88. pipelines.stages[2]
Add Comment
Please, Sign In to add comment