Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from bokeh.layouts import gridplot
- from bokeh.plotting import figure, show, helpers
- from bokeh.resources import CDN
- from bokeh.io import output_notebook, push_notebook
- from bokeh.models import Range1d, ColumnDataSource
- import numpy as np
- output_notebook(resources=CDN)
- def helper(d):
- if 'NumCPUs' in d:
- yield d['NumCPUs']
- for k in d:
- if isinstance(d[k], list):
- for i in d[k]:
- for j in helper(i):
- yield j
- client_table = ray.global_state.client_table()
- num_cpus = list(helper(client_table))[0]
- # Function to generate a task time series plot
- def task_time_series():
- # Create the Bokeh plot
- time_series_fig = figure(title="Task Time Series",
- tools=["save", "hover", "wheel_zoom", "box_zoom", "pan"],
- background_fill_color="#FFFFFF", x_range=[0, 1], y_range=[0, 1])
- # Create the data source that the plot will pull from
- time_series_source = ColumnDataSource(data=dict(
- left=[],
- right=[],
- top=[]
- ))
- # Plot the rectangles representing the distribution
- time_series_fig.quad(left="left", right="right", top="top", bottom=0,
- source=time_series_source, fill_color="#B3B3B3", line_color="#033649")
- # Label the plot axes
- time_series_fig.xaxis.axis_label = "Time in seconds"
- time_series_fig.yaxis.axis_label = "% of total capacity"
- handle = show(gridplot(time_series_fig, ncols=1, plot_width=500, plot_height=500, toolbar_location="below"),
- notebook_handle=True)
- # Generate the histogram that will be plotted
- def time_series_data(abs_earliest, abs_latest, abs_num_tasks, tasks):
- granularity = 1
- earliest = time.time()
- latest = 0
- for task_id, data in tasks.items():
- if data["score"] > latest:
- latest = data["score"]
- if data["score"] < earliest:
- earliest = data["score"]
- num_buckets = math.ceil((latest - earliest) / granularity)
- buckets = []
- # Create a distribution (buckets)
- buffer = []
- for i in range(0, num_buckets, granularity):
- start = i * granularity + earliest
- end = ((i + 1) * granularity) + earliest
- t = ray.global_state.task_profiles(start=start, end=end)
- total_usage = 0
- for task_id, data in t.items():
- if data["store_outputs_end"] > end:
- task_end = end
- buffer.append(data)
- else:
- task_end = data["store_outputs_end"]
- if data["get_arguments_start"] < start:
- task_start = start
- else:
- task_start = data["get_arguments_start"]
- total_usage += task_end - task_start
- temp_buff = []
- for data in buffer:
- if data["store_outputs_end"] > end:
- temp_buff.append(data)
- total_usage += 1
- buffer = temp_buff
- if total_usage is 0:
- continue
- total_capacity = num_cpus * granularity
- buckets.append(int(total_usage/total_capacity * 8))
- if len(buckets) == 0:
- return [], [], []
- distr = []
- for x in range(len(buckets)):
- distr.extend([earliest - abs_earliest + granularity * x] * buckets[x])
- # Create a histogram from the distribution data
- bins = [earliest - abs_earliest + (i - 1) * granularity for i in range(len(buckets) + 2)]
- hist, bin_edges = np.histogram(distr, bins=bins)
- left = bin_edges[:-1]
- right = bin_edges[1:]
- top = hist
- return left, right, top
- # Update the plot based on the sliders
- def time_series_update(abs_earliest, abs_latest, abs_num_tasks, tasks):
- left, right, top = time_series_data(abs_earliest, abs_latest, abs_num_tasks, tasks)
- time_series_source.data = {"left": left, "right": right, "top": top}
- x_range = (max(0, min(left)) if len(left) else 0, max(right) if len(right) else 1)
- y_range = (0, max(top) + 1 if len(top) else 1)
- # Define the axis ranges
- x_range = helpers._get_range(x_range)
- time_series_fig.x_range.start = x_range.start
- time_series_fig.x_range.end = x_range.end
- y_range = helpers._get_range(y_range)
- time_series_fig.y_range.start = y_range.start
- time_series_fig.y_range.end = num_cpus
- # Push the updated data to the notebook
- push_notebook(handle=handle)
- get_sliders(time_series_update)
- task_time_series()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement