Guest User

Untitled

a guest
Aug 13th, 2018
132
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.21 KB | None | 0 0
  1. import sys, os, time
  2. import aztk.spark
  3. from aztk.error import AztkError
  4. import datetime
  5. from memory_profiler import profile
  6.  
  7.  
  8. # set your secrets
  9. secrets_confg = aztk.spark.models.SecretsConfiguration(
  10. service_principal=aztk.spark.models.ServicePrincipalConfiguration(
  11. tenant_id="",
  12. client_id="",
  13. credential="",
  14. batch_account_resource_id="",
  15. storage_account_resource_id="",
  16. ),
  17. #ssh_pub_key="../../rsa_key.pub"
  18. )
  19.  
  20. # set path to root of repository to reference files
  21. ROOT_PATH = os.path.normpath(os.path.join(os.path.dirname(__file__)))
  22.  
  23. print(ROOT_PATH)
  24.  
  25. # create a spark client
  26. client = aztk.spark.Client(secrets_confg)
  27.  
  28. # list available clusters
  29. clusters = client.list_clusters()
  30.  
  31. # define a custom script
  32. custom_script = aztk.spark.models.CustomScript(
  33. name="pythondependencies.sh",
  34. script=os.path.join(ROOT_PATH, 'custom-scripts', 'pythondependencies.sh'),
  35. run_on="all-nodes")
  36.  
  37. print("THE PATH IS ", os.path.join(ROOT_PATH, 'aztk_cli','config', 'jars'))
  38.  
  39. # define spark configuration
  40. spark_conf = aztk.spark.models.SparkConfiguration(
  41. spark_defaults_conf=os.path.join(ROOT_PATH, 'aztk_cli', 'config', 'spark-defaults.conf'),
  42. spark_env_sh=os.path.join(ROOT_PATH, 'aztk_cli', 'config', 'spark-env.sh'),
  43. core_site_xml=os.path.join(ROOT_PATH, 'aztk_cli', 'config', 'core-site.xml'),
  44. jars=[os.path.join(ROOT_PATH, 'aztk_cli','config', 'jars', jar) for jar in os.listdir(os.path.join(ROOT_PATH,'aztk_cli', 'config', 'jars'))]
  45. )
  46.  
  47. userconfig = aztk.spark.models.UserConfiguration(
  48. username="spark",
  49. ssh_key="~/.ssh/id_rsa.pub" ,
  50. password="spark"
  51. )
  52.  
  53. toolkit=aztk.models.Toolkit(software="spark", version="2.3.0")
  54.  
  55. # configure my cluster
  56. cluster_config = aztk.spark.models.ClusterConfiguration(
  57. cluster_id="myfirstcluster", #Cluster should not include capital letters
  58. vm_count = 4,
  59. vm_size="Standard_E4s_v3", #standard_g2 4cpu, 56Gib, Standard_E8s_v3 : 8cores, 64Gb
  60. custom_scripts=[custom_script],
  61. spark_configuration=spark_conf,
  62. user_configuration=userconfig,
  63. toolkit=toolkit
  64. )
  65.  
  66. # create a cluster, and wait until it is ready
  67. try:
  68. print("Cluster configured creating cluster now :", datetime.datetime.now())
  69. cluster = client.create_cluster(cluster_config)
  70. cluster = client.get_cluster(cluster_config.cluster_id)
  71. print("getting cluster for" cluster_config.cluster_id)
  72.  
  73. # The cluster will take 5 to 10 minutes to be ready depending on the size and availability in the batch account
  74. cluster = client.wait_until_cluster_is_ready(cluster.id)
  75. except AztkError as e:
  76. print(e.message)
  77. sys.exit()
  78.  
  79. print("Cluser is ready now:", datetime.datetime.now())
  80.  
  81. # # create a user for the cluster if you need to connect to it
  82. #client.create_user(cluster.id, "loblaw", "loblaw2018")
  83.  
  84. # create the app to run
  85. app1 = aztk.spark.models.ApplicationConfiguration(
  86. name="workplz",
  87. application= os.path.join(ROOT_PATH, 'main_scoring.py'),
  88. driver_memory = "30g",
  89. executor_memory = "30g", #the memory you think your app needs. make sure that each node has at least that amount of memory
  90. py_files =['utils.py', 'datastorage.py']
  91. )
  92.  
  93. # submit an app and wait until it is finished running
  94. client.submit(cluster.id, app1)
  95. print("Submitted app. Waiting ...")
  96.  
  97. #wait
  98. client.wait_until_application_done(cluster.id, app1.name)
  99.  
  100. # get logs for app, print to console
  101. #app1_logs1 = client.get_application_log(cluster_id=cluster_config.cluster_id, application_name=app1.name)
  102. #print(app1_logs1.log)
  103.  
  104. print("Done waiting time now is:", datetime.datetime.now())
  105. # get status of app
  106. status = client.get_application_status(cluster_config.cluster_id, app1.name)
  107.  
  108. # stream logs of app, print to console as it runs
  109. current_bytes = 0
  110. while True:
  111. app1_logs = client.get_application_log(
  112. cluster_id=cluster_config.cluster_id,
  113. application_name=app1.name,
  114. tail=True,
  115. current_bytes=current_bytes)
  116.  
  117. print(app1_logs.log, end="")
  118.  
  119. if app1_logs.application_state == 'completed':
  120. break
  121. current_bytes = app1_logs.total_bytes
  122. time.sleep(1)
  123.  
  124. # wait until all jobs finish, then delete the cluster
  125. client.wait_until_applications_done(cluster.id)
  126.  
  127. print("All jobs finished let's delete the cluster")
  128. client.delete_cluster(cluster.id)
Add Comment
Please, Sign In to add comment