Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os, sys, time, subprocess, threading, collections
- import psutil
- import tkinter as tk
- from tkinter import ttk, messagebox
- from tkinter.scrolledtext import ScrolledText
- from matplotlib.figure import Figure
- from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
- # ---- Configuration ----
- REFRESH_MS = 1000 # background sampling interval (ms)
- UI_UPDATE_MS = 800 # UI update interval (ms)
- HISTORY_POINTS = 60 # history length for charts
- # ---- Helpers ----
- def safe_run(cmd, timeout=1.0):
- try:
- return subprocess.check_output(cmd, shell=True, stderr=subprocess.DEVNULL, universal_newlines=True, timeout=timeout).strip()
- except Exception:
- return ""
- def size_fmt(n):
- try:
- n = float(n)
- except Exception:
- return str(n)
- for unit in ['B', 'KB', 'MB', 'GB', 'TB', 'PB']:
- if abs(n) < 1024.0:
- return f"{n:3.1f} {unit}"
- n /= 1024.0
- return f"{n:.1f} EB"
- def cpu_name():
- try:
- with open("/proc/cpuinfo", "r") as f:
- for line in f:
- if line.lower().startswith("model name"):
- return line.split(":", 1)[1].strip()
- except Exception:
- pass
- out = safe_run("lscpu | grep 'Model name' || true")
- if out and ":" in out:
- return out.split(":", 1)[1].strip()
- return "CPU"
- def detect_nvidia():
- return bool(safe_run("which nvidia-smi"))
- def query_nvidia():
- out = safe_run("nvidia-smi --query-gpu=index,name,utilization.gpu,memory.total,memory.used --format=csv,noheader,nounits")
- gpus = []
- if not out:
- return gpus
- for line in out.splitlines():
- parts = [p.strip() for p in line.split(",")]
- if len(parts) >= 5:
- try:
- gpus.append({
- "index": int(parts[0]),
- "name": parts[1],
- "util": float(parts[2]),
- "mem_total": float(parts[3]),
- "mem_used": float(parts[4])
- })
- except Exception:
- pass
- return gpus
- # ---- Background sampler thread ----
- class Sampler(threading.Thread):
- def __init__(self, interval_ms=REFRESH_MS):
- super().__init__(daemon=True)
- self.interval = max(50, interval_ms) / 1000.0
- self.lock = threading.Lock()
- self.running = True
- # histories
- self.cpu_hist = collections.deque([0] * HISTORY_POINTS, maxlen=HISTORY_POINTS)
- self.mem_hist = collections.deque([0] * HISTORY_POINTS, maxlen=HISTORY_POINTS)
- self.net_rx_hist = collections.deque([0] * HISTORY_POINTS, maxlen=HISTORY_POINTS)
- self.net_tx_hist = collections.deque([0] * HISTORY_POINTS, maxlen=HISTORY_POINTS)
- self.disk_read_rate = {} # per-device B/s
- self.disk_write_rate = {}
- # last counters
- self.last_net = psutil.net_io_counters()
- self.last_disk = psutil.disk_io_counters(perdisk=True)
- self.nvidia = detect_nvidia()
- self.nvidia_info = []
- self.sampled = {}
- self.start()
- def run(self):
- while self.running:
- try:
- cpu = psutil.cpu_percent(interval=None)
- mem = psutil.virtual_memory().percent
- now_net = psutil.net_io_counters()
- rx = now_net.bytes_recv - self.last_net.bytes_recv
- tx = now_net.bytes_sent - self.last_net.bytes_sent
- sec = max(self.interval, 0.001)
- rx_rate = rx / sec; tx_rate = tx / sec
- self.last_net = now_net
- # disk io rates
- cur_disk = psutil.disk_io_counters(perdisk=True)
- dr = {}; dw = {}
- for k, v in cur_disk.items():
- pv = self.last_disk.get(k)
- if pv:
- dr[k] = (v.read_bytes - pv.read_bytes) / sec
- dw[k] = (v.write_bytes - pv.write_bytes) / sec
- else:
- dr[k] = 0.0; dw[k] = 0.0
- self.last_disk = cur_disk
- # nvidia
- ninfo = []
- if self.nvidia:
- ninfo = query_nvidia()
- # write into sampled with lock
- with self.lock:
- self.cpu_hist.append(cpu); self.mem_hist.append(mem)
- self.net_rx_hist.append(rx_rate); self.net_tx_hist.append(tx_rate)
- self.disk_read_rate = dr; self.disk_write_rate = dw
- self.nvidia_info = ninfo
- self.sampled['cpu'] = cpu; self.sampled['mem'] = mem
- self.sampled['rx_rate'] = rx_rate; self.sampled['tx_rate'] = tx_rate
- self.sampled['timestamp'] = time.time()
- except Exception:
- pass
- time.sleep(self.interval)
- def stop(self):
- self.running = False
- # ---- Main App ----
- class TaskManagerApp(tk.Tk):
- def __init__(self):
- super().__init__()
- self.title("Task Manager - Win11 Dark (Optimized)")
- self.geometry("1200x750")
- self.configure(bg="#141414")
- self.style = ttk.Style(self)
- try:
- self.style.theme_use("clam")
- except Exception:
- pass
- self.style.configure("TNotebook", background="#141414")
- self.style.configure("TNotebook.Tab", background="#1f1f1f", foreground="white", padding=[10, 6])
- self.style.map("TNotebook.Tab", background=[("selected", "#2b2b2b")])
- self.style.configure("Treeview", background="#1b1b1b", foreground="white", fieldbackground="#1b1b1b", rowheight=20)
- self.style.configure("Treeview.Heading", background="#262626", foreground="white")
- self.refresh_ms = REFRESH_MS
- self.sampler = Sampler(self.refresh_ms)
- self.create_widgets()
- self.after(UI_UPDATE_MS, self.ui_update_loop)
- self.protocol("WM_DELETE_WINDOW", self.on_close)
- def create_widgets(self):
- nb = ttk.Notebook(self)
- nb.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
- # tabs
- self.tab_processes = ttk.Frame(nb); nb.add(self.tab_processes, text="Processes")
- self.tab_performance = ttk.Frame(nb); nb.add(self.tab_performance, text="Performance")
- self.tab_startup = ttk.Frame(nb); nb.add(self.tab_startup, text="Startup")
- self.tab_users = ttk.Frame(nb); nb.add(self.tab_users, text="Users")
- self.tab_details = ttk.Frame(nb); nb.add(self.tab_details, text="Details")
- # build each tab
- self.build_processes_tab(self.tab_processes)
- self.build_performance_tab(self.tab_performance)
- self.build_startup_tab(self.tab_startup)
- self.build_users_tab(self.tab_users)
- self.build_details_tab(self.tab_details)
- # bottom controls
- ctrl = tk.Frame(self, bg="#141414"); ctrl.pack(fill=tk.X, padx=8, pady=(0, 8))
- ttk.Button(ctrl, text="Refresh Now", command=self.force_refresh).pack(side=tk.LEFT, padx=4)
- ttk.Button(ctrl, text="End Task", command=self.end_task).pack(side=tk.LEFT, padx=4)
- ttk.Button(ctrl, text="Kill (SIGKILL)", command=self.kill_task).pack(side=tk.LEFT, padx=4)
- ttk.Button(ctrl, text="Force-Kill (xkill mode)", command=self.xkill_mode).pack(side=tk.LEFT, padx=4)
- ttk.Button(ctrl, text="Show Details", command=self.show_selected_details).pack(side=tk.LEFT, padx=4)
- ttk.Label(ctrl, text="Auto-refresh:", background="#141414", foreground="white").pack(side=tk.LEFT, padx=(16, 4))
- self.auto_var = tk.BooleanVar(value=True)
- ttk.Checkbutton(ctrl, text="On/Off", variable=self.auto_var).pack(side=tk.LEFT)
- ttk.Label(ctrl, text="UI(ms):", background="#141414", foreground="white").pack(side=tk.LEFT, padx=(16, 4))
- self.ui_interval_var = tk.IntVar(value=UI_UPDATE_MS)
- ttk.Entry(ctrl, textvariable=self.ui_interval_var, width=6).pack(side=tk.LEFT, padx=4)
- ttk.Button(ctrl, text="Set UI Interval", command=self.set_ui_interval).pack(side=tk.LEFT, padx=4)
- # ---- Processes tab ----
- def build_processes_tab(self, parent):
- f = tk.Frame(parent, bg="#141414"); f.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
- cols = ("pid", "name", "user", "cpu", "mem", "status")
- self.proc_tree = ttk.Treeview(f, columns=cols, show="headings", selectmode="browse")
- for c, h in (("pid", "PID"), ("name", "Name"), ("user", "User"), ("cpu", "CPU %"), ("mem", "Mem %"), ("status", "Status")):
- self.proc_tree.heading(c, text=h); self.proc_tree.column(c, width=120 if c != "name" else 420, anchor="w")
- vsb = ttk.Scrollbar(f, orient="vertical", command=self.proc_tree.yview); self.proc_tree.configure(yscroll=vsb.set)
- self.proc_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True); vsb.pack(side=tk.LEFT, fill=tk.Y)
- self.proc_tree.bind("<Double-1>", lambda e: self.show_selected_details())
- # search box and refresh button
- right = tk.Frame(f, bg="#141414"); right.pack(side=tk.LEFT, fill=tk.Y, padx=(8, 0))
- ttk.Label(right, text="Filter:", background="#141414", foreground="white").pack(anchor="nw")
- self.filter_var = tk.StringVar(value="")
- ttk.Entry(right, textvariable=self.filter_var, width=30).pack(anchor="nw", pady=(0, 8))
- ttk.Button(right, text="Refresh", command=self.refresh_processes_now).pack(anchor="nw")
- ttk.Button(right, text="Kill selected", command=self.kill_task).pack(anchor="nw", pady=(8, 0))
- def refresh_processes_now(self):
- # lightweight iteration
- sel_pid = None
- sel = self.proc_tree.selection()
- if sel: sel_pid = self.proc_tree.item(sel[0])["values"][0]
- for r in self.proc_tree.get_children(): self.proc_tree.delete(r)
- keyword = self.filter_var.get().lower().strip()
- # prime cpu
- for p in psutil.process_iter():
- try:
- p.cpu_percent(interval=None)
- except Exception:
- pass
- for p in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_percent', 'status']):
- try:
- info = p.info
- name = (info.get('name') or "")
- if keyword and keyword not in name.lower():
- continue
- self.proc_tree.insert("", "end", values=(info.get('pid'), name, info.get('username') or "", f"{(info.get('cpu_percent') or 0):.1f}", f"{(info.get('memory_percent') or 0):.1f}", info.get('status') or ""))
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- continue
- # restore selection
- if sel_pid:
- for iid in self.proc_tree.get_children():
- if str(self.proc_tree.item(iid)["values"][0]) == str(sel_pid):
- self.proc_tree.selection_set(iid); break
- # ---- Performance tab ----
- def build_performance_tab(self, parent):
- frame = tk.Frame(parent, bg="#141414"); frame.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
- left = tk.Frame(frame, bg="#141414"); left.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
- right = tk.Frame(frame, bg="#141414", width=380); right.pack(side=tk.RIGHT, fill=tk.Y, padx=(8, 0))
- # CPU block (big)
- cpu_block = ttk.LabelFrame(left, text="CPU", padding=6); cpu_block.pack(fill=tk.X, padx=4, pady=4)
- self.cpu_name_lbl = ttk.Label(cpu_block, text=cpu_name()); self.cpu_name_lbl.pack(anchor="w")
- self.cpu_freq_lbl = ttk.Label(cpu_block, text="Freq: N/A"); self.cpu_freq_lbl.pack(anchor="w")
- self.cpu_bar = ttk.Progressbar(cpu_block, orient="horizontal", length=800, mode="determinate", maximum=100); self.cpu_bar.pack(fill=tk.X, pady=(4, 4))
- self.cpu_chart_fig = Figure(figsize=(6, 1.6), dpi=100, facecolor="#141414")
- self.cpu_ax = self.cpu_chart_fig.add_subplot(111); self.cpu_ax.set_facecolor("#141414"); self.cpu_canvas = FigureCanvasTkAgg(self.cpu_chart_fig, master=cpu_block); self.cpu_canvas.get_tk_widget().pack(fill=tk.X)
- # GPU block
- gpu_block = ttk.LabelFrame(left, text="GPU", padding=6); gpu_block.pack(fill=tk.X, padx=4, pady=4)
- self.gpu_text = ttk.Label(gpu_block, text="GPU: N/A"); self.gpu_text.pack(anchor="w")
- self.gpu_bar = ttk.Progressbar(gpu_block, orient="horizontal", length=800, mode="determinate", maximum=100); self.gpu_bar.pack(fill=tk.X, pady=(4, 4))
- self.gpu_chart_fig = Figure(figsize=(6, 1), dpi=90, facecolor="#141414"); self.gpu_ax = self.gpu_chart_fig.add_subplot(111); self.gpu_ax.set_facecolor("#141414"); self.gpu_canvas = FigureCanvasTkAgg(self.gpu_chart_fig, master=gpu_block); self.gpu_canvas.get_tk_widget().pack(fill=tk.X)
- # Memory block
- mem_block = ttk.LabelFrame(left, text="Memory", padding=6); mem_block.pack(fill=tk.X, padx=4, pady=4)
- self.mem_lbl = ttk.Label(mem_block, text="Memory: N/A"); self.mem_lbl.pack(anchor="w")
- self.mem_bar = ttk.Progressbar(mem_block, orient="horizontal", length=800, mode="determinate", maximum=100); self.mem_bar.pack(fill=tk.X, pady=(4, 4))
- self.mem_chart_fig = Figure(figsize=(6, 1), dpi=90, facecolor="#141414"); self.mem_ax = self.mem_chart_fig.add_subplot(111); self.mem_ax.set_facecolor("#141414"); self.mem_canvas = FigureCanvasTkAgg(self.mem_chart_fig, master=mem_block); self.mem_canvas.get_tk_widget().pack(fill=tk.X)
- # Storage block (list + small chart)
- disk_block = ttk.LabelFrame(left, text="Storage", padding=6); disk_block.pack(fill=tk.BOTH, padx=4, pady=4, expand=True)
- cols = ("device", "mount", "model", "total", "used", "free", "%", "r/s", "w/s")
- self.disk_tree = ttk.Treeview(disk_block, columns=cols, show="headings", height=6)
- for c, h in (("device", "Device"), ("mount", "Mount"), ("model", "Model"), ("total", "Total"), ("used", "Used"), ("free", "Free"), ("%", "% Used"), ("r/s", "Read/s"), ("w/s", "Write/s")):
- self.disk_tree.heading(c, text=h); self.disk_tree.column(c, width=120 if c in ("device", "mount", "model") else 90, anchor="center")
- vsb = ttk.Scrollbar(disk_block, orient="vertical", command=self.disk_tree.yview); self.disk_tree.configure(yscroll=vsb.set)
- self.disk_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True); vsb.pack(side=tk.LEFT, fill=tk.Y)
- self.disk_chart_fig = Figure(figsize=(6, 1.2), dpi=90, facecolor="#141414"); self.disk_ax = self.disk_chart_fig.add_subplot(111); self.disk_ax.set_facecolor("#141414"); self.disk_canvas = FigureCanvasTkAgg(self.disk_chart_fig, master=disk_block); self.disk_canvas.get_tk_widget().pack(fill=tk.X, padx=6, pady=4)
- # Right column: Network + small summary
- net_block = ttk.LabelFrame(right, text="Network", padding=6); net_block.pack(fill=tk.X, padx=4, pady=4)
- self.net_lbl = ttk.Label(net_block, text="RX: 0/s | TX: 0/s"); self.net_lbl.pack(anchor="w")
- self.net_chart_fig = Figure(figsize=(3.2, 3), dpi=100, facecolor="#141414")
- self.net_ax = self.net_chart_fig.add_subplot(111); self.net_ax.set_facecolor("#141414"); self.net_canvas = FigureCanvasTkAgg(self.net_chart_fig, master=net_block); self.net_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
- # GPU details on right
- gpu_info_block = ttk.LabelFrame(right, text="GPU Details", padding=6); gpu_info_block.pack(fill=tk.BOTH, expand=False, padx=4, pady=4)
- self.gpu_info_text = ScrolledText(gpu_info_block, height=6, bg="#111111", fg="white"); self.gpu_info_text.pack(fill=tk.BOTH, expand=True)
- # prepare disk models mapping
- self.disk_models = self._disk_model_map()
- def _disk_model_map(self):
- out = safe_run("lsblk -ndo NAME,MODEL 2>/dev/null")
- m = {}
- for line in out.splitlines():
- parts = line.split(None, 1)
- if not parts:
- continue
- name = parts[0]
- model = parts[1] if len(parts) > 1 else ""
- m["/dev/" + name] = model
- return m
- # ---- Startup tab ----
- def build_startup_tab(self, parent):
- f = tk.Frame(parent, bg="#141414"); f.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
- cols = ("name", "exec", "path", "enabled")
- self.start_tree = ttk.Treeview(f, columns=cols, show="headings")
- for c, h in (("name", "Name"), ("exec", "Exec"), ("path", "File"), ("enabled", "Enabled")):
- self.start_tree.heading(c, text=h); self.start_tree.column(c, width=300 if c == "path" else 140)
- vsb = ttk.Scrollbar(f, orient="vertical", command=self.start_tree.yview); self.start_tree.configure(yscroll=vsb.set)
- self.start_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True); vsb.pack(side=tk.LEFT, fill=tk.Y)
- btns = tk.Frame(parent, bg="#141414"); btns.pack(fill=tk.X, padx=8, pady=(0, 8))
- ttk.Button(btns, text="Refresh Startup", command=self.refresh_startup).pack(side=tk.LEFT, padx=4)
- ttk.Button(btns, text="Open Autostart Folder", command=self.open_autostart).pack(side=tk.LEFT, padx=4)
- ttk.Button(btns, text="Disable (move .disabled)", command=self.disable_startup).pack(side=tk.LEFT, padx=4)
- self.refresh_startup()
- def refresh_startup(self):
- # list desktop autostart + systemd enabled services
- def parse_desktop(path):
- name = ""; execv = ""; enabled = "Yes"
- try:
- with open(path, "r", errors="ignore") as f:
- for L in f:
- if "=" in L:
- k, v = L.split("=", 1); k = k.strip(); v = v.strip()
- if k.lower() == "name":
- name = v
- if k.lower() == "exec":
- execv = v
- if path.endswith(".disabled"):
- enabled = "No"
- except Exception:
- pass
- return (name or os.path.basename(path), execv, path, enabled)
- self.start_tree.delete(*self.start_tree.get_children())
- home = os.path.expanduser("~")
- paths = [os.path.join(home, ".config", "autostart"), "/etc/xdg/autostart"]
- for p in paths:
- if os.path.isdir(p):
- for fn in sorted(os.listdir(p)):
- if fn.endswith(".desktop") or fn.endswith(".desktop.disabled"):
- self.start_tree.insert("", "end", values=parse_desktop(os.path.join(p, fn)))
- # systemd user
- out = safe_run("systemctl --user list-unit-files --type=service --state=enabled 2>/dev/null")
- if out:
- for line in out.splitlines():
- if line.strip() and not line.startswith("UNIT"):
- svc = line.split()[0]
- self.start_tree.insert("", "end", values=(svc, "systemd --user", "(systemd user)", "Yes"))
- # system services (may require permission)
- out2 = safe_run("systemctl list-unit-files --type=service --state=enabled 2>/dev/null")
- if out2:
- for line in out2.splitlines():
- if line.strip() and not line.startswith("UNIT"):
- svc = line.split()[0]
- self.start_tree.insert("", "end", values=(svc, "systemd", "(system)", "Yes"))
- def open_autostart(self):
- path = os.path.expanduser("~/.config/autostart"); os.makedirs(path, exist_ok=True)
- os.system(f'xdg-open "{path}" &')
- def disable_startup(self):
- sel = self.start_tree.selection()
- if not sel:
- messagebox.showwarning("No selection", "Select a startup entry."); return
- path = self.start_tree.item(sel[0])["values"][2]
- try:
- new = path + ".disabled"; os.rename(path, new); messagebox.showinfo("Disabled", f"Moved to {new}"); self.refresh_startup()
- except Exception as e:
- messagebox.showerror("Error", str(e))
- # ---- Users tab ----
- def build_users_tab(self, parent):
- f = tk.Frame(parent, bg="#141414"); f.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
- cols = ("user", "terminal", "host", "started")
- self.user_tree = ttk.Treeview(f, columns=cols, show="headings")
- for c, h in (("user", "User"), ("terminal", "Terminal"), ("host", "Host"), ("started", "Started")):
- self.user_tree.heading(c, text=h); self.user_tree.column(c, width=220)
- vsb = ttk.Scrollbar(f, orient="vertical", command=self.user_tree.yview); self.user_tree.configure(yscroll=vsb.set)
- self.user_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True); vsb.pack(side=tk.LEFT, fill=tk.Y)
- self.refresh_users()
- def refresh_users(self):
- self.user_tree.delete(*self.user_tree.get_children())
- for u in psutil.users():
- started = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(u.started)) if getattr(u, "started", None) else ""
- self.user_tree.insert("", "end", values=(u.name, getattr(u, "terminal", ""), getattr(u, "host", ""), started))
- # ---- Details tab ----
- def build_details_tab(self, parent):
- f = tk.Frame(parent, bg="#141414"); f.pack(fill=tk.BOTH, expand=True, padx=8, pady=8)
- cols = ("pid", "name", "cpu", "mem", "cmd")
- self.details_tree = ttk.Treeview(f, columns=cols, show="headings")
- for c, h in (("pid", "PID"), ("name", "Name"), ("cpu", "CPU%"), ("mem", "Mem%"), ("cmd", "Cmdline")):
- self.details_tree.heading(c, text=h); self.details_tree.column(c, width=140 if c != "cmd" else 520)
- vsb = ttk.Scrollbar(f, orient="vertical", command=self.details_tree.yview); self.details_tree.configure(yscroll=vsb.set)
- self.details_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True); vsb.pack(side=tk.LEFT, fill=tk.Y)
- self.details_tree.bind("<Double-1>", lambda e: self.open_detail_window())
- def refresh_details(self):
- self.details_tree.delete(*self.details_tree.get_children())
- for p in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'cmdline']):
- try:
- cmd = " ".join(p.info.get('cmdline') or [])
- self.details_tree.insert("", "end", values=(p.info.get('pid'), p.info.get('name') or "", f"{(p.info.get('cpu_percent') or 0):.1f}", f"{(p.info.get('memory_percent') or 0):.1f}", cmd))
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- continue
- def open_detail_window(self):
- sel = self.details_tree.selection()
- if not sel:
- return
- pid = int(self.details_tree.item(sel[0])['values'][0])
- try:
- p = psutil.Process(pid)
- info = p.as_dict(attrs=['pid', 'name', 'exe', 'cmdline', 'cwd', 'username', 'create_time', 'status', 'cpu_percent', 'memory_percent', 'num_threads', 'io_counters'], ad_value="N/A")
- txt = []
- txt.append(f"PID: {info.get('pid')}"); txt.append(f"Name: {info.get('name')}"); txt.append(f"Exe: {info.get('exe')}")
- txt.append(f"Cmdline: {' '.join(info.get('cmdline') or [])}"); txt.append(f"CWD: {info.get('cwd')}"); txt.append(f"User: {info.get('username')}")
- txt.append(f"Started: {time.ctime(info.get('create_time')) if info.get('create_time') not in (None, 'N/A') else 'N/A'}"); txt.append(f"Status: {info.get('status')}")
- txt.append(f"CPU%: {info.get('cpu_percent')}"); txt.append(f"Memory%: {info.get('memory_percent')}")
- io = info.get('io_counters');
- if io and io != "N/A":
- txt.append(f"I/O: read={getattr(io, 'read_bytes', 'N/A')}, write={getattr(io, 'write_bytes', 'N/A')}")
- txt.append(f"Threads: {info.get('num_threads')}")
- win = tk.Toplevel(self); win.title(f"Details - PID {pid}"); win.configure(bg="#141414")
- st = ScrolledText(win, width=100, height=20, bg="#111111", fg="white"); st.pack(fill=tk.BOTH, expand=True, padx=8, pady=8); st.insert("1.0", "\n".join(txt))
- except Exception as e:
- messagebox.showerror("Error", str(e))
- # ---- Actions: End / Kill / xkill ----
- def end_task(self):
- pid = self._get_selected_pid_from_proc()
- if not pid:
- return
- try:
- psutil.Process(pid).terminate()
- messagebox.showinfo("Terminated", f"Sent TERM to PID {pid}")
- self.refresh_processes_now()
- except Exception as e:
- messagebox.showerror("Error", str(e))
- def kill_task(self):
- pid = self._get_selected_pid_from_proc()
- if not pid:
- return
- try:
- psutil.Process(pid).kill()
- messagebox.showinfo("Killed", f"Sent KILL to PID {pid}")
- self.refresh_processes_now()
- except Exception as e:
- messagebox.showerror("Error", str(e))
- def xkill_mode(self):
- # try system xkill first
- if safe_run("which xkill"):
- try:
- # launch xkill in background; user will click window to kill it
- subprocess.Popen(["xkill"])
- messagebox.showinfo("xkill", "xkill started. Click a window to kill it.")
- return
- except Exception as e:
- messagebox.showerror("Error launching xkill", str(e))
- return
- # fallback: ask user to select a process to kill (already available) or use xdotool to get window under cursor
- if safe_run("which xdotool"):
- try:
- # instruct user to move cursor and press Enter
- messagebox.showinfo("xkill fallback", "Move mouse over window to kill, then press OK.")
- out = safe_run("xdotool getwindowfocus getwindowpid 2>/dev/null || xdotool getmouselocation --shell && xprop -root _NET_ACTIVE_WINDOW")
- # we will attempt to get window pid by window id under cursor - best-effort
- # simpler approach: call xdotool getwindowfocus getwindowpid
- pid_str = safe_run("xdotool getwindowfocus getwindowpid 2>/dev/null")
- if pid_str:
- pid = int(pid_str.strip())
- psutil.Process(pid).kill()
- messagebox.showinfo("Killed", f"Killed PID {pid} (from window under cursor)")
- return
- except Exception:
- pass
- messagebox.showinfo("xkill unavailable", "xkill and xdotool not available. Use End Task / Kill on selected process.")
- def _get_selected_pid_from_proc(self):
- sel = self.proc_tree.selection()
- if not sel:
- messagebox.showwarning("No selection", "Select a process first in Processes tab.")
- return None
- try:
- return int(self.proc_tree.item(sel[0])["values"][0])
- except Exception:
- return None
- # ---- UI update loop ----
- def ui_update_loop(self):
- if self.auto_var.get():
- # update processes, details, users, performance displays
- try:
- self.refresh_processes_now()
- self.refresh_details()
- self.refresh_users()
- self.update_performance_ui()
- # self.refresh_startup() # Removed as refresh_startup is called on its own
- except Exception:
- pass
- # schedule next
- try:
- ms = int(self.ui_interval_var.get())
- if ms < 200:
- ms = UI_UPDATE_MS
- self.after(ms, self.ui_update_loop)
- except Exception:
- self.after(UI_UPDATE_MS, self.ui_update_loop)
- def force_refresh(self):
- self.refresh_processes_now(); self.refresh_details(); self.refresh_users(); self.update_performance_ui(); self.refresh_startup()
- # ---- Performance UI updater (reads sampler) ----
- def update_performance_ui(self):
- s = self.sampler
- with s.lock:
- cpu = s.sampled.get('cpu', 0)
- mem = s.sampled.get('mem', 0)
- rx_rate = s.sampled.get('rx_rate', 0)
- tx_rate = s.sampled.get('tx_rate', 0)
- cpu_hist = list(s.cpu_hist)
- mem_hist = list(s.mem_hist)
- rx_hist = list(s.net_rx_hist)
- tx_hist = list(s.net_tx_hist)
- disk_r = dict(s.disk_read_rate)
- disk_w = dict(s.disk_write_rate)
- ninfo = list(s.nvidia_info)
- # CPU stats
- try:
- freq = psutil.cpu_freq()
- freq_text = f"Freq: {freq.current:.0f} MHz" if freq else "Freq: N/A"
- except Exception:
- freq_text = "Freq: N/A"
- self.cpu_name_lbl.config(text=cpu_name())
- self.cpu_freq_lbl.config(text=freq_text)
- self.cpu_bar['value'] = cpu
- # draw cpu chart
- self.cpu_ax.cla()
- self.cpu_ax.plot(cpu_hist, color='cyan')
- self.cpu_ax.set_ylim(0, 100)
- self.cpu_ax.set_facecolor('#141414'); self.cpu_ax.tick_params(colors='white')
- self.cpu_canvas.draw_idle()
- # GPU
- if ninfo:
- g = ninfo[0]
- self.gpu_text.config(text=f"{g['name']} | Util {g['util']:.0f}% | VRAM {g['mem_used']}/{g['mem_total']} MiB")
- self.gpu_bar['value'] = g['util']
- self.gpu_ax.cla(); self.gpu_ax.plot([g['util']] * len(cpu_hist), color='magenta'); self.gpu_ax.set_ylim(0, 100); self.gpu_canvas.draw_idle()
- # detailed text
- self.gpu_info_text.delete('1.0', tk.END)
- for g in ninfo:
- self.gpu_info_text.insert(tk.END, f"{g['index']}: {g['name']} - Util {g['util']:.0f}% | Mem {g['mem_used']}/{g['mem_total']} MiB\n")
- else:
- self.gpu_text.config(text="GPU: not available or unsupported"); self.gpu_bar['value'] = 0
- # Memory
- self.mem_lbl.config(text=f"Memory: {mem:.1f}%")
- self.mem_bar['value'] = mem
- self.mem_ax.cla(); self.mem_ax.plot(mem_hist, color='lime'); self.mem_ax.set_ylim(0, 100); self.mem_canvas.draw_idle()
- # Disks - show partitions and per-device rates
- self.disk_tree.delete(*self.disk_tree.get_children())
- parts = psutil.disk_partitions(all=False)
- seen_mounts = set()
- for part in parts:
- try:
- if part.mountpoint in seen_mounts:
- continue
- seen_mounts.add(part.mountpoint)
- usage = psutil.disk_usage(part.mountpoint)
- dev = part.device
- model = self.disk_models.get(dev, "")
- key = os.path.basename(dev)
- r = disk_r.get(key, 0.0); w = disk_w.get(key, 0.0)
- self.disk_tree.insert("", "end", values=(dev, part.mountpoint, model, size_fmt(usage.total), size_fmt(usage.used), size_fmt(usage.free), f"{usage.percent:.1f}%", size_fmt(r) + "/s", size_fmt(w) + "/s"))
- except Exception:
- continue
- # disk chart: top read+write combined
- top_r = sorted(disk_r.items(), key=lambda kv: kv[1], reverse=True)[:5]
- top_w = sorted(disk_w.items(), key=lambda kv: kv[1], reverse=True)[:5]
- labels = [k for k, _ in top_r] or ['-']
- values = [v for _, v in top_r] or [0]
- self.disk_ax.cla()
- self.disk_ax.bar(range(len(values)), [v / 1024.0 for v in values])
- self.disk_ax.set_ylabel("KB/s"); self.disk_ax.set_xticks(range(len(values))); self.disk_ax.set_xticklabels(labels, rotation=30, color='white')
- self.disk_ax.set_facecolor('#141414'); self.disk_canvas.draw_idle()
- # Network
- self.net_lbl.config(text=f"RX: {size_fmt(rx_rate)}/s | TX: {size_fmt(tx_rate)}/s")
- net_series = [(rx + tx) / 1024.0 for rx, tx in zip(rx_hist, tx_hist)]
- self.net_ax.cla(); self.net_ax.plot([r / 1024.0 for r in rx_hist], label='RX KB/s'); self.net_ax.plot([t / 1024.0 for t in tx_hist], label='TX KB/s')
- self.net_ax.legend(loc='upper right', facecolor='#141414', labelcolor='white'); self.net_ax.set_facecolor('#141414'); self.net_canvas.draw_idle()
- # ---- Misc refresh helpers ----
- def refresh_startup(self):
- try:
- self.start_tree.delete(*self.start_tree.get_children())
- home = os.path.expanduser("~")
- pths = [os.path.join(home, ".config", "autostart"), "/etc/xdg/autostart"]
- for p in pths:
- if os.path.isdir(p):
- for fn in sorted(os.listdir(p)):
- if fn.endswith(".desktop") or fn.endswith(".desktop.disabled"):
- path = os.path.join(p, fn)
- name = ""; execv = ""; enabled = "Yes"
- try:
- with open(path, "r", errors="ignore") as f:
- for L in f:
- if "=" in L:
- k, v = L.split("=", 1); k = k.strip().lower(); v = v.strip()
- if k == "name":
- name = v
- if k == "exec":
- execv = v
- if path.endswith(".disabled"):
- enabled = "No"
- except Exception:
- pass
- self.start_tree.insert("", "end", values=(name or fn, execv, path, enabled))
- # systemd user/system enabled
- out = safe_run("systemctl --user list-unit-files --type=service --state=enabled 2>/dev/null")
- if out:
- for line in out.splitlines():
- if line.strip() and not line.startswith("UNIT"):
- svc = line.split()[0]; self.start_tree.insert("", "end", values=(svc, "systemd --user", "(systemd user)", "Yes"))
- out2 = safe_run("systemctl list-unit-files --type=service --state=enabled 2>/dev/null")
- if out2:
- for line in out2.splitlines():
- if line.strip() and not line.startswith("UNIT"):
- svc = line.split()[0]; self.start_tree.insert("", "end", values=(svc, "systemd", "(system)", "Yes"))
- except Exception:
- pass
- def refresh_users(self):
- try:
- self.user_tree.delete(*self.user_tree.get_children())
- for u in psutil.users():
- started = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(u.started)) if getattr(u, "started", None) else ""
- self.user_tree.insert("", "end", values=(u.name, getattr(u, "terminal", ""), getattr(u, "host", ""), started))
- except Exception:
- pass
- # Fix recursion: implement actual refresh wrapper
- def refresh_processes_now(self):
- try:
- sel_pid = None; sel = self.proc_tree.selection()
- if sel:
- sel_pid = self.proc_tree.item(sel[0])["values"][0]
- self.proc_tree.delete(*self.proc_tree.get_children())
- keyword = self.filter_var.get().lower().strip() if hasattr(self, 'filter_var') else ""
- # prime cpu
- for p in psutil.process_iter():
- try:
- p.cpu_percent(interval=None)
- except Exception:
- pass
- for p in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_percent', 'status']):
- try:
- info = p.info
- name = (info.get('name') or "")
- if keyword and keyword not in name.lower():
- continue
- self.proc_tree.insert("", "end", values=(info.get('pid'), name, info.get('username') or "", f"{(info.get('cpu_percent') or 0):.1f}", f"{(info.get('memory_percent') or 0):.1f}", info.get('status') or ""))
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- continue
- if sel_pid:
- for iid in self.proc_tree.get_children():
- if str(self.proc_tree.item(iid)["values"][0]) == str(sel_pid):
- self.proc_tree.selection_set(iid); break
- except Exception:
- pass
- def set_ui_interval(self):
- try:
- v = int(self.ui_interval_var.get())
- if v < 200:
- raise ValueError
- # schedule uses variable in ui_update_loop
- messagebox.showinfo("Set", f"UI interval set to {v} ms")
- except Exception:
- messagebox.showerror("Invalid", "Enter integer >=200")
- def show_selected_details(self):
- sel = self.proc_tree.selection()
- if not sel:
- messagebox.showwarning("No selection", "Select a process first.")
- return
- pid = int(self.proc_tree.item(sel[0])["values"][0])
- try:
- p = psutil.Process(pid)
- info = p.as_dict(attrs=['pid', 'name', 'exe', 'cmdline', 'cwd', 'username', 'create_time', 'status', 'cpu_percent', 'memory_percent', 'num_threads', 'io_counters'], ad_value="N/A")
- lines = []
- lines.append(f"PID: {info.get('pid')}")
- lines.append(f"Name: {info.get('name')}")
- lines.append(f"Exe: {info.get('exe')}")
- lines.append(f"Cmdline: {' '.join(info.get('cmdline') or [])}")
- lines.append(f"CWD: {info.get('cwd')}")
- lines.append(f"User: {info.get('username')}")
- lines.append(f"Started: {time.ctime(info.get('create_time')) if info.get('create_time') not in (None, 'N/A') else 'N/A'}")
- lines.append(f"Status: {info.get('status')}")
- lines.append(f"CPU%: {info.get('cpu_percent')}")
- lines.append(f"Memory%: {info.get('memory_percent')}")
- io = info.get('io_counters')
- if io and io != "N/A":
- lines.append(f"I/O: read={getattr(io, 'read_bytes', 'N/A')}, write={getattr(io, 'write_bytes', 'N/A')}")
- lines.append(f"Threads: {info.get('num_threads')}")
- win = tk.Toplevel(self); win.title(f"Details - PID {pid}"); win.configure(bg="#141414")
- st = ScrolledText(win, width=100, height=20, bg="#111111", fg="white"); st.pack(fill=tk.BOTH, expand=True, padx=8, pady=8); st.insert("1.0", "\n".join(lines))
- except Exception as e:
- messagebox.showerror("Error", str(e))
- def on_close(self):
- try:
- self.sampler.stop()
- except Exception:
- pass
- self.destroy()
- # ---- Run ----
- def main():
- try:
- import psutil
- except Exception:
- print("psutil missing. Install: sudo apt install python3-psutil")
- return
- app = TaskManagerApp()
- app.mainloop()
- if __name__ == "__main__":
- main()
Advertisement
Comments
-
- https://www.reddit.com/r/pythonhelp/comments/1osf9zm/comment/nnwtfiu/?context=1
Add Comment
Please, Sign In to add comment