Advertisement
Guest User

Untitled

a guest
Jul 20th, 2017
54
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.80 KB | None | 0 0
  1. from bokeh.layouts import gridplot
  2. from bokeh.plotting import figure, show, helpers
  3. from bokeh.resources import CDN
  4. from bokeh.io import output_notebook, push_notebook
  5. from bokeh.models import Range1d, ColumnDataSource
  6. import numpy as np
  7. output_notebook(resources=CDN)
  8.  
  9. def helper(d):
  10. if 'NumCPUs' in d:
  11. yield d['NumCPUs']
  12. for k in d:
  13. if isinstance(d[k], list):
  14. for i in d[k]:
  15. for j in helper(i):
  16. yield j
  17. client_table = ray.global_state.client_table()
  18. num_cpus = list(helper(client_table))[0]
  19.  
  20. # Function to generate a task time series plot
  21. def task_time_series():
  22.  
  23. # Create the Bokeh plot
  24. time_series_fig = figure(title="Task Time Series",
  25. tools=["save", "hover", "wheel_zoom", "box_zoom", "pan"],
  26. background_fill_color="#FFFFFF", x_range=[0, 1], y_range=[0, 1])
  27.  
  28. # Create the data source that the plot will pull from
  29. time_series_source = ColumnDataSource(data=dict(
  30. left=[],
  31. right=[],
  32. top=[]
  33. ))
  34.  
  35. # Plot the rectangles representing the distribution
  36. time_series_fig.quad(left="left", right="right", top="top", bottom=0,
  37. source=time_series_source, fill_color="#B3B3B3", line_color="#033649")
  38.  
  39. # Label the plot axes
  40. time_series_fig.xaxis.axis_label = "Time in seconds"
  41. time_series_fig.yaxis.axis_label = "% of total capacity"
  42.  
  43. handle = show(gridplot(time_series_fig, ncols=1, plot_width=500, plot_height=500, toolbar_location="below"),
  44. notebook_handle=True)
  45.  
  46. # Generate the histogram that will be plotted
  47. def time_series_data(abs_earliest, abs_latest, abs_num_tasks, tasks):
  48. granularity = 1
  49. earliest = time.time()
  50. latest = 0
  51.  
  52. for task_id, data in tasks.items():
  53. if data["score"] > latest:
  54. latest = data["score"]
  55. if data["score"] < earliest:
  56. earliest = data["score"]
  57.  
  58. num_buckets = math.ceil((latest - earliest) / granularity)
  59. buckets = []
  60.  
  61. # Create a distribution (buckets)
  62. buffer = []
  63. for i in range(0, num_buckets, granularity):
  64. start = i * granularity + earliest
  65. end = ((i + 1) * granularity) + earliest
  66. t = ray.global_state.task_profiles(start=start, end=end)
  67. total_usage = 0
  68. for task_id, data in t.items():
  69. if data["store_outputs_end"] > end:
  70. task_end = end
  71. buffer.append(data)
  72. else:
  73. task_end = data["store_outputs_end"]
  74. if data["get_arguments_start"] < start:
  75. task_start = start
  76. else:
  77. task_start = data["get_arguments_start"]
  78. total_usage += task_end - task_start
  79. temp_buff = []
  80. for data in buffer:
  81. if data["store_outputs_end"] > end:
  82. temp_buff.append(data)
  83. total_usage += 1
  84. buffer = temp_buff
  85.  
  86. if total_usage is 0:
  87. continue
  88.  
  89. total_capacity = num_cpus * granularity
  90. buckets.append(int(total_usage/total_capacity * 8))
  91.  
  92. if len(buckets) == 0:
  93. return [], [], []
  94.  
  95. distr = []
  96.  
  97. for x in range(len(buckets)):
  98. distr.extend([earliest - abs_earliest + granularity * x] * buckets[x])
  99.  
  100. # Create a histogram from the distribution data
  101. bins = [earliest - abs_earliest + (i - 1) * granularity for i in range(len(buckets) + 2)]
  102. hist, bin_edges = np.histogram(distr, bins=bins)
  103.  
  104. left = bin_edges[:-1]
  105. right = bin_edges[1:]
  106. top = hist
  107.  
  108. return left, right, top
  109.  
  110. # Update the plot based on the sliders
  111. def time_series_update(abs_earliest, abs_latest, abs_num_tasks, tasks):
  112. left, right, top = time_series_data(abs_earliest, abs_latest, abs_num_tasks, tasks)
  113.  
  114. time_series_source.data = {"left": left, "right": right, "top": top}
  115.  
  116. x_range = (max(0, min(left)) if len(left) else 0, max(right) if len(right) else 1)
  117. y_range = (0, max(top) + 1 if len(top) else 1)
  118.  
  119. # Define the axis ranges
  120. x_range = helpers._get_range(x_range)
  121. time_series_fig.x_range.start = x_range.start
  122. time_series_fig.x_range.end = x_range.end
  123.  
  124. y_range = helpers._get_range(y_range)
  125. time_series_fig.y_range.start = y_range.start
  126. time_series_fig.y_range.end = num_cpus
  127.  
  128. # Push the updated data to the notebook
  129. push_notebook(handle=handle)
  130.  
  131. get_sliders(time_series_update)
  132.  
  133. task_time_series()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement