Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- import math
- import pandas as pd
- import random
- import numpy as np
- from bokeh.io import show, output_notebook, push_notebook
- from bokeh.resources import CDN
- from bokeh.models import (
- ColumnDataSource,
- HoverTool,
- LinearColorMapper,
- BasicTicker,
- PrintfTickFormatter,
- ColorBar,
- )
- from bokeh.plotting import figure, helpers
- output_notebook(resources=CDN)
- # Function to create the cluster usage "heat map"
- def cluster_usage():
- # Initial values
- source = ColumnDataSource(data={"node_ip_address":['127.0.0.1'], "time":[0], "num_tasks":[0]})
- # Define the color schema
- colors = ["#75968f", "#a5bab7", "#c9d9d3", "#e2e2e2", "#dfccce", "#ddb7b1", "#cc7878", "#933b41", "#550b1d"]
- mapper = LinearColorMapper(palette=colors, low=0, high=2)
- TOOLS = "hover,save,xpan,box_zoom,reset,xwheel_zoom"
- # Create the plot
- p = figure(title="Cluster Usage", y_range=list(set(source.data['node_ip_address'])),
- x_axis_location="above", plot_width=900, plot_height=500,
- tools=TOOLS, toolbar_location='below')
- # Format the plot axes
- p.grid.grid_line_color = None
- p.axis.axis_line_color = None
- p.axis.major_tick_line_color = None
- p.axis.major_label_text_font_size = "10pt"
- p.axis.major_label_standoff = 0
- p.xaxis.major_label_orientation = math.pi / 3
- # Plot rectangles
- p.rect(x="time", y="node_ip_address", width=1, height=1,
- source=source,
- fill_color={"field": "num_tasks", "transform": mapper},
- line_color=None)
- # Add legend to the side of the plot
- color_bar = ColorBar(color_mapper=mapper, major_label_text_font_size="8pt",
- ticker=BasicTicker(desired_num_ticks=len(colors)),
- label_standoff=6, border_line_color=None, location=(0, 0))
- p.add_layout(color_bar, "right")
- # Define hover tool
- p.select_one(HoverTool).tooltips = [
- ("Node IP Address", "@node_ip_address"),
- ("Number of tasks running", "@num_tasks"),
- ("Time", "@time")
- ]
- # Define the axis labels
- p.xaxis.axis_label = "Time in seconds"
- p.yaxis.axis_label = "Node IP Address"
- handle = show(p, notebook_handle=True)
- # Function to update the heat map
- def heat_map_update(abs_earliest, abs_latest, abs_num_tasks, tasks):
- granularity = 1
- print("hi")
- earliest = time.time()
- latest = 0
- for task_id, data in tasks.items():
- if data["score"] > latest:
- latest = data["score"]
- if data["score"] < earliest:
- earliest = data["score"]
- num_buckets = math.ceil((latest - earliest) / granularity)
- buckets = [0] * num_buckets
- worker_info = ray.global_state.workers()
- num_tasks = []
- nodes = []
- times = []
- start_point = earliest
- end_point = len(buckets) * granularity + earliest
- # Determine the distribution "buckets" over the timeline
- for i in range(0, len(buckets), granularity):
- start = i * granularity + earliest
- end = (i + 1) * granularity + earliest
- t = ray.global_state.task_profiles(start=math.floor(start), end=math.ceil(end))
- node_to_num = dict()
- for task_id, data in t.items():
- worker = data["worker_id"]
- node = worker_info[worker]["node_ip_address"]
- if node not in node_to_num:
- node_to_num[node] = 0
- node_to_num[node] += 1
- for node_ip, counter in node_to_num.items():
- num_tasks.append(node_to_num[node_ip])
- nodes.append(node_ip)
- times.append(earliest - abs_earliest + i * granularity)
- p.y_range = helpers._get_range(list(set(nodes)))
- if len(num_tasks) == 0:
- return
- else:
- mapper.low = min(min(num_tasks), 0)
- mapper.high = max(max(num_tasks), 1)
- # Update notebook
- source.data = {"node_ip_address": nodes, "time": times, "num_tasks": num_tasks}
- push_notebook(handle=handle)
- get_sliders(heat_map_update)
- cluster_usage()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement