Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import tkinter as tk
- from tkinter import ttk, filedialog, messagebox, scrolledtext
- import requests
- import urllib.parse
- import numpy as np
- from PIL import Image, ImageTk
- import threading
- import time
- import matplotlib.cm as cm
- import struct
- class ImageStreamViewer:
- def __init__(self, root):
- self.root = root
- self.root.title("Image Stream Viewer - Fast")
- self.root.geometry("1800x1000")
- self.current_image_array = None
- self.current_pil_image = None
- self.resolution = None
- self.raw_data = None
- self.streaming = False
- self.stream_thread = None
- # Simple palette mapping
- self.palettes = {
- 'Grayscale': None,
- 'Hot': cm.hot,
- 'Jet': cm.jet,
- 'Cool': cm.cool,
- 'Viridis': cm.viridis,
- 'Plasma': cm.plasma,
- }
- self.first = True
- # Performance tracking
- self.frame_count = 0
- self.last_fps_time = time.time()
- self.setup_gui()
- def setup_gui(self):
- main_frame = ttk.Frame(self.root, padding="1")
- main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
- # Left panel for controls
- left_frame = ttk.Frame(main_frame, width=700)
- left_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 10))
- left_frame.grid_propagate(False)
- # Right panel for image
- right_frame = ttk.Frame(main_frame)
- right_frame.grid(row=0, column=1, sticky=(tk.W, tk.E, tk.N, tk.S))
- # === CONTROLS ===
- # Connection
- conn_frame = ttk.LabelFrame(left_frame, text="Connection", padding="5")
- conn_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 5))
- ttk.Label(conn_frame, text="IP:").grid(row=0, column=0, sticky=tk.W)
- self.ip_var = tk.StringVar(value="192.168.178.92")
- ttk.Entry(conn_frame, textvariable=self.ip_var, width=15).grid(row=0, column=1, padx=(5, 10))
- self.start_button = ttk.Button(conn_frame, text="Start Stream", command=self.toggle_stream)
- self.start_button.grid(row=0, column=2, padx=(0, 5))
- self.save_button = ttk.Button(conn_frame, text="Save", command=self.save_image)
- self.save_button.grid(row=0, column=3, padx=(0, 5))
- ttk.Label(conn_frame, text="FPS:").grid(row=0, column=4, padx=(10, 5))
- self.fps_var = tk.StringVar(value="20")
- ttk.Entry(conn_frame, textvariable=self.fps_var, width=4).grid(row=0, column=5)
- self.status_label = ttk.Label(conn_frame, text="Status: Stopped")
- self.status_label.grid(row=1, column=0, columnspan=3, sticky=tk.W)
- self.fps_label = ttk.Label(conn_frame, text="Actual FPS: 0")
- self.fps_label.grid(row=1, column=3, columnspan=3, sticky=tk.W)
- # Format
- format_frame = ttk.LabelFrame(left_frame, text="Format", padding="5")
- format_frame.grid(row=1, column=0, sticky=(tk.W, tk.E), pady=(0, 5))
- ttk.Label(format_frame, text="Width:").grid(row=0, column=0, sticky=tk.W)
- self.width_var = tk.StringVar(value="640")
- ttk.Entry(format_frame, textvariable=self.width_var, width=6).grid(row=0, column=1, padx=(5, 10))
- ttk.Label(format_frame, text="Height:").grid(row=0, column=2, sticky=tk.W)
- self.height_var = tk.StringVar(value="480")
- ttk.Entry(format_frame, textvariable=self.height_var, width=6).grid(row=0, column=3, padx=(5, 10))
- ttk.Label(format_frame, text="Low offset:").grid(row=1, column=0, sticky=tk.W)
- self.low_offset_var = tk.StringVar(value="0x238")
- ttk.Entry(format_frame, textvariable=self.low_offset_var, width=8).grid(row=1, column=1, padx=(5, 10))
- ttk.Label(format_frame, text="High offset:").grid(row=1, column=2, sticky=tk.W)
- self.high_offset_var = tk.StringVar(value="0x4B238")
- ttk.Entry(format_frame, textvariable=self.high_offset_var, width=8).grid(row=1, column=3, padx=(5, 10))
- ttk.Button(format_frame, text="Parse Header", command=self.parse_header).grid(row=0, column=4, padx=(10, 0))
- # Display
- display_frame = ttk.LabelFrame(left_frame, text="Display", padding="5")
- display_frame.grid(row=2, column=0, sticky=(tk.W, tk.E), pady=(0, 5))
- ttk.Label(display_frame, text="Palette:").grid(row=0, column=0, sticky=tk.W)
- self.palette_var = tk.StringVar(value="Grayscale")
- palette_combo = ttk.Combobox(display_frame, textvariable=self.palette_var,
- values=list(self.palettes.keys()), state="readonly", width=10)
- palette_combo.grid(row=0, column=1, padx=(5, 10))
- palette_combo.bind("<<ComboboxSelected>>", self.on_palette_change)
- self.auto_range_var = tk.BooleanVar(value=False)
- ttk.Checkbutton(display_frame, text="Auto Range", variable=self.auto_range_var).grid(row=0, column=2)
- # Range controls
- ttk.Button(display_frame, text="Auto Now", command=self.auto_range).grid(row=1, column=0, padx=(10, 0))
- ttk.Button(display_frame, text="Reset", command=self.reset_range).grid(row=2, column=0, padx=(10, 0))
- # Offset controls
- ttk.Label(display_frame, text="Offset:").grid(row=1, column=1, sticky=tk.W)
- self.offset_var = tk.IntVar(value=32767)
- offset_frame = ttk.Frame(display_frame)
- offset_frame.grid(row=1, column=2, columnspan=2, sticky=(tk.W, tk.E))
- ttk.Button(offset_frame, text="-", width=3, command=lambda: self.adjust_offset(-5)).grid(row=0, column=0)
- self.offset_scale = ttk.Scale(offset_frame, from_=0, to=65535, variable=self.offset_var,
- orient=tk.HORIZONTAL, length=360, command=self.on_offset_change)
- self.offset_scale.grid(row=0, column=1, padx=(2, 2))
- ttk.Button(offset_frame, text="+", width=3, command=lambda: self.adjust_offset(5)).grid(row=0, column=2)
- self.offset_label = ttk.Label(offset_frame, text="32767", width=6)
- self.offset_label.grid(row=0, column=3, padx=(5, 0))
- # Range controls
- ttk.Label(display_frame, text="Range:").grid(row=2, column=1, sticky=tk.W)
- self.range_var = tk.IntVar(value=32767)
- range_frame = ttk.Frame(display_frame)
- range_frame.grid(row=2, column=2, columnspan=2, sticky=(tk.W, tk.E))
- ttk.Button(range_frame, text="-", width=3, command=lambda: self.adjust_range(-5)).grid(row=0, column=0)
- self.range_scale = ttk.Scale(range_frame, from_=1, to=65535, variable=self.range_var,
- orient=tk.HORIZONTAL, length=360, command=self.on_range_change)
- self.range_scale.grid(row=0, column=1, padx=(2, 2))
- ttk.Button(range_frame, text="+", width=3, command=lambda: self.adjust_range(5)).grid(row=0, column=2)
- self.range_label = ttk.Label(range_frame, text="32767", width=6)
- self.range_label.grid(row=0, column=3, padx=(5, 0))
- # Min/Max display (read-only)
- ttk.Label(display_frame, text="Min:").grid(row=3, column=1, sticky=tk.W)
- self.min_display_label = ttk.Label(display_frame, text="0", width=6, relief="sunken")
- self.min_display_label.grid(row=3, column=2, sticky=tk.W)
- ttk.Label(display_frame, text="Max:").grid(row=3, column=3, sticky=tk.W)
- self.max_display_label = ttk.Label(display_frame, text="65535", width=6, relief="sunken")
- self.max_display_label.grid(row=3, column=4, sticky=tk.W)
- # Commands
- cmd_frame = ttk.LabelFrame(left_frame, text="Commands", padding="5")
- cmd_frame.grid(row=3, column=0, sticky=(tk.W, tk.E), pady=(0, 5))
- ttk.Label(cmd_frame, text="Cmd:").grid(row=0, column=0, sticky=tk.W)
- self.cmd_entry = ttk.Entry(cmd_frame, width=15)
- self.cmd_entry.grid(row=0, column=1, padx=(5, 10))
- ttk.Label(cmd_frame, text="Value:").grid(row=0, column=2, sticky=tk.W)
- self.value_entry = ttk.Entry(cmd_frame, width=15)
- self.value_entry.grid(row=0, column=3, padx=(5, 10))
- ttk.Button(cmd_frame, text="Send", command=self.send_command).grid(row=0, column=4)
- # FFC Button
- ttk.Button(cmd_frame, text="FFC", command=self.send_ffc_command).grid(row=0, column=5, padx=(10, 0))
- self.cmd_response = scrolledtext.ScrolledText(cmd_frame, height=3, width=60, font=("Courier", 8))
- self.cmd_response.grid(row=1, column=0, columnspan=6, pady=(5, 0), sticky=(tk.W, tk.E))
- # Info
- info_frame = ttk.LabelFrame(left_frame, text="Info", padding="5")
- info_frame.grid(row=4, column=0, sticky=(tk.W, tk.E), pady=(0, 5))
- self.info_text = tk.Text(info_frame, height=20, width=90, font=("Courier", 8))
- info_scroll = ttk.Scrollbar(info_frame, orient=tk.VERTICAL, command=self.info_text.yview)
- self.info_text.configure(yscrollcommand=info_scroll.set)
- self.info_text.grid(row=0, column=0, sticky=(tk.W, tk.E))
- info_scroll.grid(row=0, column=1, sticky=(tk.N, tk.S))
- # === IMAGE DISPLAY ===
- image_frame = ttk.LabelFrame(right_frame, text="Image", padding="5")
- image_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
- self.canvas = tk.Canvas(image_frame, bg="black", width=1000, height=800)
- self.canvas.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
- # Pixel info
- self.pixel_label = ttk.Label(image_frame, text="Pixel: Move mouse over image")
- self.pixel_label.grid(row=1, column=0, sticky=tk.W)
- self.canvas.bind("<Motion>", self.on_mouse_move)
- # Grid weights
- self.root.columnconfigure(0, weight=1)
- self.root.rowconfigure(0, weight=1)
- main_frame.columnconfigure(0, weight=0)
- main_frame.columnconfigure(1, weight=1)
- main_frame.rowconfigure(0, weight=1)
- left_frame.columnconfigure(0, weight=1)
- right_frame.columnconfigure(0, weight=1)
- right_frame.rowconfigure(0, weight=1)
- image_frame.columnconfigure(0, weight=1)
- image_frame.rowconfigure(0, weight=1)
- # Bind Enter keys
- self.cmd_entry.bind("<Return>", lambda e: self.send_command())
- self.value_entry.bind("<Return>", lambda e: self.send_command())
- # Initialize min/max display
- self.update_min_max_display()
- def log(self, text):
- self.info_text.insert(tk.END, f"{text}\n")
- self.info_text.see(tk.END)
- def parse_header(self):
- if self.raw_data is None:
- return
- try:
- data = self.raw_data
- width = struct.unpack('<I', data[8:12])[0]
- height = struct.unpack('<I', data[12:16])[0]
- self.width_var.set(str(width))
- self.height_var.set(str(height))
- self.log(f"Parsed: {width}x{height}")
- except Exception as e:
- self.log(f"Parse error: {e}")
- def get_offset(self, offset_str):
- try:
- if offset_str.startswith('0x'):
- return int(offset_str, 16)
- else:
- return int(offset_str)
- except:
- return 0
- def send_command(self):
- cmd = self.cmd_entry.get().strip()
- value = self.value_entry.get().strip()
- if not cmd:
- return
- command = f"{cmd},{value}" if value else f"{cmd},"
- ip = self.ip_var.get()
- url = f"http://{ip}/cgi-bin/dmcmd?Command={urllib.parse.quote(command)}"
- def make_request():
- try:
- response = requests.get(url, timeout=5)
- self.root.after(0, lambda: self.cmd_response.insert(tk.END, f"OK: {response.text}\n"))
- except Exception as e:
- self.root.after(0, lambda: self.cmd_response.insert(tk.END, f"Error: {e}\n"))
- threading.Thread(target=make_request, daemon=True).start()
- def send_ffc_command(self):
- """Send FFC command (KBD with value M)"""
- command = "KBD,C"
- ip = self.ip_var.get()
- url = f"http://{ip}/cgi-bin/dmcmd?Command={urllib.parse.quote(command)}"
- def make_request():
- try:
- response = requests.get(url, timeout=5)
- self.root.after(0, lambda: self.cmd_response.insert(tk.END, f"FFC OK: {response.text}\n"))
- except Exception as e:
- self.root.after(0, lambda: self.cmd_response.insert(tk.END, f"FFC Error: {e}\n"))
- threading.Thread(target=make_request, daemon=True).start()
- def adjust_offset(self, delta):
- """Adjust offset by delta amount"""
- current = self.offset_var.get()
- new_value = max(0, min(65535, current + delta))
- self.offset_var.set(new_value)
- self.on_offset_change()
- def adjust_range(self, delta):
- """Adjust range by delta amount"""
- current = self.range_var.get()
- new_value = max(1, min(65535, current + delta))
- self.range_var.set(new_value)
- self.on_range_change()
- def on_mouse_move(self, event):
- if self.current_image_array is None or not hasattr(self, 'scale_factor'):
- return
- try:
- x = int(event.x / self.scale_factor)
- y = int(event.y / self.scale_factor)
- h, w = self.current_image_array.shape
- if 0 <= x < w and 0 <= y < h:
- value = self.current_image_array[y, x]
- self.pixel_label.config(text=f"Pixel ({x},{y}): {value}")
- except:
- pass
- def on_palette_change(self, event=None):
- if self.current_image_array is not None:
- self.update_display()
- def offset_range_to_min_max(self):
- """Convert offset and range to min and max values"""
- offset = self.offset_var.get()
- range_val = self.range_var.get()
- min_val = max(0, offset - range_val)
- max_val = min(65535, offset + range_val)
- return min_val, max_val
- def min_max_to_offset_range(self, min_val, max_val):
- """Convert min and max values to offset and range"""
- offset = (min_val + max_val) // 2
- range_val = (max_val - min_val) // 2
- return offset, range_val
- def update_min_max_display(self):
- """Update the min/max display labels"""
- min_val, max_val = self.offset_range_to_min_max()
- self.min_display_label.config(text=str(min_val))
- self.max_display_label.config(text=str(max_val))
- def on_offset_change(self, event=None):
- offset = int(self.offset_var.get())
- self.offset_label.config(text=str(offset))
- self.update_min_max_display()
- if self.current_image_array is not None:
- self.update_display()
- def on_range_change(self, event=None):
- range_val = int(self.range_var.get())
- self.range_label.config(text=str(range_val))
- self.update_min_max_display()
- if self.current_image_array is not None:
- self.update_display()
- def auto_range(self):
- if self.current_image_array is not None:
- data = self.current_image_array.flatten()
- min_val = int(np.percentile(data, 1))
- max_val = int(np.percentile(data, 99))
- offset, range_val = self.min_max_to_offset_range(min_val, max_val)
- self.offset_var.set(offset)
- self.range_var.set(range_val)
- self.on_offset_change()
- self.on_range_change()
- def reset_range(self):
- offset, range_val = self.min_max_to_offset_range(0, 65535)
- self.offset_var.set(offset)
- self.range_var.set(range_val)
- self.on_offset_change()
- self.on_range_change()
- def get_url(self):
- ip = self.ip_var.get()
- return f"http://{ip}:5000/stream?Type=RAW&Source=Raw&Mode=TCP&Heart-beat=No&Frames=1&Snap=No&Channel=1"
- def fetch_data(self):
- try:
- url = self.get_url()
- response = requests.get(url, timeout=1.0)
- if response.status_code == 200:
- self.raw_data = response.content
- self.process_data()
- # FPS calculation
- self.frame_count += 1
- current_time = time.time()
- if current_time - self.last_fps_time >= 1.0:
- fps = self.frame_count / (current_time - self.last_fps_time)
- self.root.after(0, lambda: self.fps_label.config(text=f"Actual FPS: {fps:.1f}"))
- self.frame_count = 0
- self.last_fps_time = current_time
- except Exception as e:
- self.log(f"Fetch error: {e}")
- def process_data(self):
- if self.raw_data is None:
- return
- try:
- width = int(self.width_var.get())
- height = int(self.height_var.get())
- total_pixels = width * height
- low_offset = self.get_offset(self.low_offset_var.get())
- high_offset = self.get_offset(self.high_offset_var.get())
- if len(self.raw_data) >= high_offset + total_pixels:
- # Extract bytes
- low_bytes = np.frombuffer(self.raw_data[low_offset:low_offset + total_pixels], dtype=np.uint8)
- high_bytes = np.frombuffer(self.raw_data[high_offset:high_offset + total_pixels], dtype=np.uint8)
- # Combine to 16-bit
- image_data = low_bytes.astype(np.uint16) + (high_bytes.astype(np.uint16) << 8)
- # Reshape
- self.current_image_array = image_data.reshape((height, width))
- # Auto range if enabled
- if self.first or self.auto_range_var.get():
- data = self.current_image_array.flatten()
- min_val = int(np.percentile(data, 1))
- max_val = int(np.percentile(data, 99))
- offset, range_val = self.min_max_to_offset_range(min_val, max_val)
- self.offset_var.set(offset)
- self.range_var.set(range_val)
- self.offset_label.config(text=str(offset))
- self.range_label.config(text=str(range_val))
- self.update_min_max_display()
- self.first = False
- self.root.after(0, self.update_display)
- except Exception as e:
- self.log(f"Process error: {e}")
- def update_display(self):
- if self.current_image_array is None:
- return
- try:
- min_val, max_val = self.offset_range_to_min_max()
- # Normalize
- if max_val > min_val:
- normalized = np.clip((self.current_image_array.astype(np.float32) - min_val) / (max_val - min_val), 0, 1)
- else:
- normalized = np.zeros_like(self.current_image_array, dtype=np.float32)
- # Apply colormap
- palette = self.palette_var.get()
- if palette == "Grayscale":
- image_8bit = (normalized * 255).astype(np.uint8)
- pil_image = Image.fromarray(image_8bit, mode='L')
- else:
- cmap = self.palettes[palette]
- colored = cmap(normalized)
- rgb_array = (colored[:, :, :3] * 255).astype(np.uint8)
- pil_image = Image.fromarray(rgb_array, mode='RGB')
- self.current_pil_image = pil_image
- # Scale for display
- canvas_w = self.canvas.winfo_width()
- canvas_h = self.canvas.winfo_height()
- if canvas_w > 1 and canvas_h > 1:
- img_w, img_h = pil_image.size
- scale_x = canvas_w / img_w
- scale_y = canvas_h / img_h
- scale = min(scale_x, scale_y, 3.0) # Max 3x scale
- if scale > 1.0:
- new_w = int(img_w * scale)
- new_h = int(img_h * scale)
- display_image = pil_image.resize((new_w, new_h), Image.Resampling.NEAREST)
- self.scale_factor = scale
- else:
- display_image = pil_image
- self.scale_factor = 1.0
- self.photo = ImageTk.PhotoImage(display_image)
- self.canvas.delete("all")
- self.canvas.create_image(canvas_w//2, canvas_h//2, image=self.photo)
- except Exception as e:
- self.log(f"Display error: {e}")
- def stream_worker(self):
- while self.streaming:
- start_time = time.time()
- self.fetch_data()
- # Frame rate control
- try:
- target_fps = int(self.fps_var.get())
- frame_time = 1.0 / target_fps
- elapsed = time.time() - start_time
- sleep_time = max(0, frame_time - elapsed)
- time.sleep(sleep_time)
- except:
- time.sleep(0.05) # 20 FPS fallback
- def toggle_stream(self):
- if not self.streaming:
- self.first = True
- self.streaming = True
- self.start_button.config(text="Stop")
- self.status_label.config(text="Status: Streaming")
- self.stream_thread = threading.Thread(target=self.stream_worker, daemon=True)
- self.stream_thread.start()
- else:
- self.streaming = False
- self.start_button.config(text="Start Stream")
- self.status_label.config(text="Status: Stopped")
- def save_image(self):
- if self.current_pil_image is None:
- messagebox.showwarning("No Image", "No image to save.")
- return
- file_path = filedialog.asksaveasfilename(
- defaultextension=".png",
- filetypes=[("PNG files", "*.png"), ("JPEG files", "*.jpg")]
- )
- if file_path:
- try:
- self.current_pil_image.save(file_path)
- messagebox.showinfo("Success", f"Image saved: {file_path}")
- except Exception as e:
- messagebox.showerror("Error", f"Save failed: {e}")
- def main():
- root = tk.Tk()
- app = ImageStreamViewer(root)
- root.mainloop()
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment