import tkinter as tk from tkinter import ttk, filedialog, messagebox, scrolledtext import requests import urllib.parse import numpy as np from PIL import Image, ImageTk import threading import time import matplotlib.cm as cm import struct import json import os from matplotlib.colors import LinearSegmentedColormap class ImageStreamViewer: SETTINGS_FILE = "viewer_settings.json" def __init__(self, root): self.root = root self.root.title("Image Stream Viewer - Fast") self.root.geometry("1800x1000") self.current_image_array = None self.current_pil_image = None self.resolution = None self.raw_data = None self.streaming = False self.stream_thread = None self.locked_pixel = None # Add this to your __init__ or setup_gui # Add custom palettes (example: custom1) def custom1_cmap(): # HSV sweep from 0 to 360° N = 256 hsv_colors = [(i / (N - 1), 1.0, 1.0) for i in range(N)] rgb_colors = [tuple(int(x * 255) for x in cm.colors.hsv_to_rgb(hsv)) for hsv in hsv_colors] rgb_hex = ['#%02x%02x%02x' % rgb for rgb in rgb_colors] return LinearSegmentedColormap.from_list("custom1", rgb_hex) # Simple palette mapping self.palettes = { 'Grayscale': None, 'Hot': cm.hot, 'Jet': cm.jet, 'Cool': cm.cool, 'Viridis': cm.viridis, 'Plasma': cm.plasma, 'Inferno': cm.inferno, 'Magma': cm.magma, 'Cividis': cm.cividis, 'Rainbow': cm.rainbow, 'Bone': cm.bone, 'Copper': cm.copper, 'Spring': cm.spring, 'Summer': cm.summer, 'Autumn': cm.autumn, 'Winter': cm.winter, 'Twilight': cm.twilight, '!Twilight': cm.twilight_shifted, 'Custom1': custom1_cmap(), } self.first = True # Performance tracking self.frame_count = 0 self.last_fps_time = time.time() self.setup_gui() self.load_settings() self.root.protocol("WM_DELETE_WINDOW", self.on_close) def setup_gui(self): main_frame = ttk.Frame(self.root, padding="1") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) # Left panel for controls left_frame = ttk.Frame(main_frame, width=700) left_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S), padx=(0, 10)) left_frame.grid_propagate(False) # Right panel for image right_frame = ttk.Frame(main_frame) right_frame.grid(row=0, column=1, sticky=(tk.W, tk.E, tk.N, tk.S)) # === CONTROLS === # Connection conn_frame = ttk.LabelFrame(left_frame, text="Connection", padding="5") conn_frame.grid(row=0, column=0, sticky=(tk.W, tk.E), pady=(0, 5)) ttk.Label(conn_frame, text="IP:").grid(row=0, column=0, sticky=tk.W) self.ip_var = tk.StringVar(value="192.168.178.92") ttk.Entry(conn_frame, textvariable=self.ip_var, width=15).grid(row=0, column=1, padx=(5, 10)) self.start_button = ttk.Button(conn_frame, text="Start Stream", command=self.toggle_stream) self.start_button.grid(row=0, column=2, padx=(0, 5)) self.save_button = ttk.Button(conn_frame, text="Save", command=self.save_image) self.save_button.grid(row=0, column=3, padx=(0, 5)) ttk.Label(conn_frame, text="FPS:").grid(row=0, column=4, padx=(10, 5)) self.fps_var = tk.StringVar(value="20") ttk.Entry(conn_frame, textvariable=self.fps_var, width=4).grid(row=0, column=5) self.status_label = ttk.Label(conn_frame, text="Status: Stopped") self.status_label.grid(row=1, column=0, columnspan=3, sticky=tk.W) self.fps_label = ttk.Label(conn_frame, text="Actual FPS: 0") self.fps_label.grid(row=1, column=3, columnspan=3, sticky=tk.W) # Format format_frame = ttk.LabelFrame(left_frame, text="Format", padding="5") format_frame.grid(row=1, column=0, sticky=(tk.W, tk.E), pady=(0, 5)) ttk.Label(format_frame, text="Width:").grid(row=0, column=0, sticky=tk.W) self.width_var = tk.StringVar(value="640") ttk.Entry(format_frame, textvariable=self.width_var, width=6).grid(row=0, column=1, padx=(5, 10)) ttk.Label(format_frame, text="Height:").grid(row=0, column=2, sticky=tk.W) self.height_var = tk.StringVar(value="480") ttk.Entry(format_frame, textvariable=self.height_var, width=6).grid(row=0, column=3, padx=(5, 10)) ttk.Label(format_frame, text="Low offset:").grid(row=1, column=0, sticky=tk.W) self.low_offset_var = tk.StringVar(value="0x238") ttk.Entry(format_frame, textvariable=self.low_offset_var, width=8).grid(row=1, column=1, padx=(5, 10)) ttk.Label(format_frame, text="High offset:").grid(row=1, column=2, sticky=tk.W) self.high_offset_var = tk.StringVar(value="0x4B238") ttk.Entry(format_frame, textvariable=self.high_offset_var, width=8).grid(row=1, column=3, padx=(5, 10)) ttk.Button(format_frame, text="Parse Header", command=self.parse_header).grid(row=0, column=4, padx=(10, 0)) # Display display_frame = ttk.LabelFrame(left_frame, text="Display", padding="5") display_frame.grid(row=2, column=0, sticky=(tk.W, tk.E), pady=(0, 5)) ttk.Label(display_frame, text="Palette:").grid(row=0, column=0, sticky=tk.W) self.palette_var = tk.StringVar(value="Grayscale") palette_combo = ttk.Combobox(display_frame, textvariable=self.palette_var, values=list(self.palettes.keys()), state="readonly", width=10) palette_combo.grid(row=0, column=1, padx=(5, 10)) palette_combo.bind("<>", self.on_palette_change) self.auto_range_var = tk.BooleanVar(value=False) ttk.Checkbutton(display_frame, text="Auto Range", variable=self.auto_range_var).grid(row=0, column=2) # Range controls ttk.Button(display_frame, text="Auto Now", command=self.auto_range).grid(row=1, column=0, padx=(10, 0)) ttk.Button(display_frame, text="Reset", command=self.reset_range).grid(row=2, column=0, padx=(10, 0)) # Offset controls ttk.Label(display_frame, text="Offset:").grid(row=1, column=1, sticky=tk.W) self.offset_var = tk.IntVar(value=8192) offset_frame = ttk.Frame(display_frame) offset_frame.grid(row=1, column=2, columnspan=2, sticky=(tk.W, tk.E)) ttk.Button(offset_frame, text="-", width=3, command=lambda: self.adjust_offset(-5)).grid(row=0, column=0) self.offset_scale = ttk.Scale(offset_frame, from_=7000, to=8000, variable=self.offset_var, orient=tk.HORIZONTAL, length=360, command=self.on_offset_change) self.offset_scale.grid(row=0, column=1, padx=(2, 2)) ttk.Button(offset_frame, text="+", width=3, command=lambda: self.adjust_offset(5)).grid(row=0, column=2) self.offset_label = ttk.Label(offset_frame, text="8192", width=6) self.offset_label.grid(row=0, column=3, padx=(5, 0)) # Range controls ttk.Label(display_frame, text="Range:").grid(row=2, column=1, sticky=tk.W) self.range_var = tk.IntVar(value=1024) range_frame = ttk.Frame(display_frame) range_frame.grid(row=2, column=2, columnspan=2, sticky=(tk.W, tk.E)) ttk.Button(range_frame, text="-", width=3, command=lambda: self.adjust_range(-5)).grid(row=0, column=0) self.range_scale = ttk.Scale(range_frame, from_=1, to=1024, variable=self.range_var, orient=tk.HORIZONTAL, length=360, command=self.on_range_change) self.range_scale.grid(row=0, column=1, padx=(2, 2)) ttk.Button(range_frame, text="+", width=3, command=lambda: self.adjust_range(5)).grid(row=0, column=2) self.range_label = ttk.Label(range_frame, text="1024", width=6) self.range_label.grid(row=0, column=3, padx=(5, 0)) ttk.Label(display_frame, text="Min:").grid(row=3, column=1, sticky=tk.W) self.min_display_label = ttk.Label(display_frame, text="0", width=6, relief="sunken") self.min_display_label.grid(row=3, column=2, sticky=tk.W) ttk.Label(display_frame, text="Max:").grid(row=3, column=3, sticky=tk.W) self.max_display_label = ttk.Label(display_frame, text="16383", width=6, relief="sunken") self.max_display_label.grid(row=3, column=4, sticky=tk.W) # Raw data min/max info self.raw_min_label = ttk.Label(display_frame, text="0", width=6, relief="sunken") self.raw_min_label.grid(row=4, column=0, sticky=tk.W) self.raw_avg_label = ttk.Label(display_frame, text="0", width=6, relief="sunken") self.raw_avg_label.grid(row=4, column=1, sticky=tk.W) self.raw_max_label = ttk.Label(display_frame, text="0", width=6, relief="sunken") self.raw_max_label.grid(row=4, column=2, sticky=tk.W) # Commands cmd_frame = ttk.LabelFrame(left_frame, text="Commands", padding="5") cmd_frame.grid(row=3, column=0, sticky=(tk.W, tk.E), pady=(0, 5)) ttk.Label(cmd_frame, text="Cmd:").grid(row=0, column=0, sticky=tk.W) self.cmd_entry = ttk.Entry(cmd_frame, width=15) self.cmd_entry.grid(row=0, column=1, padx=(5, 10)) ttk.Label(cmd_frame, text="Value:").grid(row=0, column=2, sticky=tk.W) self.value_entry = ttk.Entry(cmd_frame, width=15) self.value_entry.grid(row=0, column=3, padx=(5, 10)) ttk.Button(cmd_frame, text="Send", command=self.send_command).grid(row=0, column=4) # FFC Button ttk.Button(cmd_frame, text="FFC", command=self.send_ffc_command).grid(row=0, column=5, padx=(10, 0)) self.cmd_response = scrolledtext.ScrolledText(cmd_frame, height=3, width=60, font=("Courier", 8)) self.cmd_response.grid(row=1, column=0, columnspan=6, pady=(5, 0), sticky=(tk.W, tk.E)) # Info info_frame = ttk.LabelFrame(left_frame, text="Info", padding="5") info_frame.grid(row=4, column=0, sticky=(tk.W, tk.E), pady=(0, 5)) self.info_text = tk.Text(info_frame, height=20, width=90, font=("Courier", 8)) info_scroll = ttk.Scrollbar(info_frame, orient=tk.VERTICAL, command=self.info_text.yview) self.info_text.configure(yscrollcommand=info_scroll.set) self.info_text.grid(row=0, column=0, sticky=(tk.W, tk.E)) info_scroll.grid(row=0, column=1, sticky=(tk.N, tk.S)) # === IMAGE DISPLAY === image_frame = ttk.LabelFrame(right_frame, text="Image", padding="5") image_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) self.canvas = tk.Canvas(image_frame, bg="black", width=1000, height=800) self.canvas.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) self.canvas.bind("", self.on_canvas_click) self.canvas.bind("", self.on_canvas_right_click) # Pixel info self.pixel_label = ttk.Label(image_frame, text="Pixel: Move mouse over image") self.pixel_label.grid(row=1, column=0, sticky=tk.W) self.temp_label = ttk.Label(image_frame, text="Temperature: -- °C", font=("Arial", 12, "bold")) self.temp_label.grid(row=2, column=0, sticky=tk.W, pady=(10,0)) self.pixel_label = ttk.Label(image_frame, text="Pixel: Move mouse over image") self.pixel_label.grid(row=3, column=0, sticky=tk.W) # Reference input fields ref_frame = ttk.Frame(image_frame) ref_frame.grid(row=4, column=0, sticky=tk.W, pady=(5,0)) ttk.Label(ref_frame, text="Ref1 °C:").grid(row=0, column=0, sticky=tk.W) self.ref_temp_var = tk.DoubleVar(value=25.0) ttk.Entry(ref_frame, textvariable=self.ref_temp_var, width=6).grid(row=0, column=1, padx=(5, 15)) ttk.Label(ref_frame, text="Ref1 Pixel:").grid(row=0, column=2, sticky=tk.W) self.ref_pixel_var = tk.IntVar(value=7750) self.ref_pixel_entry = ttk.Entry(ref_frame, textvariable=self.ref_pixel_var, width=8) self.ref_pixel_entry.grid(row=0, column=3, padx=(5, 15)) ttk.Label(ref_frame, text="Ref2 °C:").grid(row=0, column=4, sticky=tk.W) self.ref2_temp_var = tk.DoubleVar(value=36.0) ttk.Entry(ref_frame, textvariable=self.ref2_temp_var, width=6).grid(row=0, column=5, padx=(5, 15)) ttk.Label(ref_frame, text="Ref2 Pixel:").grid(row=0, column=6, sticky=tk.W) self.ref2_pixel_var = tk.IntVar(value=7880) self.ref2_pixel_entry = ttk.Entry(ref_frame, textvariable=self.ref2_pixel_var, width=8) self.ref2_pixel_entry.grid(row=0, column=7, padx=(5, 15)) # Optionally, add a label for pixels per degree self.canvas.bind("", self.on_mouse_move) # Grid weights self.root.columnconfigure(0, weight=1) self.root.rowconfigure(0, weight=1) main_frame.columnconfigure(0, weight=0) main_frame.columnconfigure(1, weight=1) main_frame.rowconfigure(0, weight=1) left_frame.columnconfigure(0, weight=1) right_frame.columnconfigure(0, weight=1) right_frame.rowconfigure(0, weight=1) image_frame.columnconfigure(0, weight=1) image_frame.rowconfigure(0, weight=1) # Bind Enter keys self.cmd_entry.bind("", lambda e: self.send_command()) self.value_entry.bind("", lambda e: self.send_command()) # Initialize min/max display self.update_min_max_display() def log(self, text): self.info_text.insert(tk.END, f"{text}\n") self.info_text.see(tk.END) def on_canvas_click(self, event): if self.current_image_array is None or not hasattr(self, 'scale_factor'): return x = int(event.x / self.scale_factor) y = int(event.y / self.scale_factor) h, w = self.current_image_array.shape if 0 <= x < w and 0 <= y < h: self.locked_pixel = (x, y) self.update_display() def on_canvas_right_click(self, event): self.locked_pixel = None self.update_display() def parse_header(self): if self.raw_data is None: return try: data = self.raw_data width = struct.unpack('= 200: min_val = int(np.partition(data, 100)[100]) max_val = int(np.partition(data, -100)[-100]) else: min_val = int(np.min(data)) max_val = int(np.max(data)) offset, range_val = self.min_max_to_offset_range(min_val, max_val) self.offset_var.set(offset) self.range_var.set(range_val) self.on_offset_change() self.on_range_change() def reset_range(self): offset, range_val = self.min_max_to_offset_range(0, 16383) self.offset_var.set(offset) self.range_var.set(range_val) self.on_offset_change() self.on_range_change() def get_url(self): ip = self.ip_var.get() return f"http://{ip}:5000/stream?Type=RAW&Source=Raw&Mode=TCP&Heart-beat=No&Frames=1&Snap=No&Channel=1" def fetch_data(self): try: url = self.get_url() response = requests.get(url, timeout=1.0) if response.status_code == 200: self.raw_data = response.content self.process_data() # FPS calculation self.frame_count += 1 current_time = time.time() if current_time - self.last_fps_time >= 1.0: fps = self.frame_count / (current_time - self.last_fps_time) self.root.after(0, lambda: self.fps_label.config(text=f"Actual FPS: {fps:.1f}")) self.frame_count = 0 self.last_fps_time = current_time except Exception as e: self.log(f"Fetch error: {e}") def process_data(self): if self.raw_data is None: return try: width = int(self.width_var.get()) height = int(self.height_var.get()) total_pixels = width * height low_offset = self.get_offset(self.low_offset_var.get()) high_offset = self.get_offset(self.high_offset_var.get()) if len(self.raw_data) >= high_offset + total_pixels: # Extract bytes low_bytes = np.frombuffer(self.raw_data[low_offset:low_offset + total_pixels], dtype=np.uint8) high_bytes = np.frombuffer(self.raw_data[high_offset:high_offset + total_pixels], dtype=np.uint8) # Combine to 16-bit image_data = low_bytes.astype(np.uint16) + (high_bytes.astype(np.uint16) << 8) # Reshape self.current_image_array = image_data.reshape((height, width)) # Update raw min/max info data = self.current_image_array.flatten() raw_min = int(np.partition(data, 10)[10]) raw_max = int(np.partition(data, -10)[-10]) self.raw_min_label.config(text=str(raw_min)) self.raw_max_label.config(text=str(raw_max)) self.raw_avg_label.config(text=str(int(np.mean(self.current_image_array)))) # Auto range if enabled if self.first or self.auto_range_var.get(): data = self.current_image_array.flatten() if data.size >= 20: min_val = int(np.partition(data, 10)[10]) max_val = int(np.partition(data, -10)[-10]) else: min_val = int(np.min(data)) max_val = int(np.max(data)) offset, range_val = self.min_max_to_offset_range(min_val, max_val) self.offset_var.set(offset) self.range_var.set(range_val) self.offset_label.config(text=str(offset)) self.range_label.config(text=str(range_val)) self.update_min_max_display() self.first = False self.root.after(0, self.update_display) except Exception as e: self.log(f"Process error: {e}") def get_settings(self): return { "ip": self.ip_var.get(), "fps": self.fps_var.get(), "width": self.width_var.get(), "height": self.height_var.get(), "low_offset": self.low_offset_var.get(), "high_offset": self.high_offset_var.get(), "palette": self.palette_var.get(), "auto_range": self.auto_range_var.get(), "offset": self.offset_var.get(), "range": self.range_var.get(), "ref_temp": self.ref_temp_var.get(), "ref_pixel": self.ref_pixel_var.get(), "ref2_temp": self.ref2_temp_var.get(), "ref2_pixel": self.ref2_pixel_var.get(), } def set_settings(self, settings): self.ip_var.set(settings.get("ip", "192.168.178.92")) self.fps_var.set(settings.get("fps", "20")) self.width_var.set(settings.get("width", "640")) self.height_var.set(settings.get("height", "480")) self.low_offset_var.set(settings.get("low_offset", "0x238")) self.high_offset_var.set(settings.get("high_offset", "0x4B238")) self.palette_var.set(settings.get("palette", "Grayscale")) self.auto_range_var.set(settings.get("auto_range", False)) self.offset_var.set(settings.get("offset", 8192)) self.range_var.set(settings.get("range", 32767)) self.ref_temp_var.set(settings.get("ref_temp", 35.0)) self.ref_pixel_var.set(settings.get("ref_pixel", 7800)) self.ref2_temp_var.set(settings.get("ref2_temp", 35.0)) self.ref2_pixel_var.set(settings.get("ref2_pixel", 7800)) def save_settings(self): try: with open(self.SETTINGS_FILE, "w") as f: json.dump(self.get_settings(), f) except Exception as e: self.log(f"Settings save error: {e}") def load_settings(self): if os.path.exists(self.SETTINGS_FILE): try: with open(self.SETTINGS_FILE, "r") as f: settings = json.load(f) self.set_settings(settings) except Exception as e: self.log(f"Settings load error: {e}") def on_close(self): self.save_settings() self.root.destroy() def update_display(self): if self.current_image_array is None: return try: min_val, max_val = self.offset_range_to_min_max() # Cache normalized array if image or min/max changed if not hasattr(self, '_normalized_cache'): self._normalized_cache = None self._cache_image_id = None self._cache_min = None self._cache_max = None image_id = id(self.current_image_array) if (self._normalized_cache is None or self._cache_image_id != image_id or self._cache_min != min_val or self._cache_max != max_val): if max_val > min_val: normalized = np.clip((self.current_image_array.astype(np.float32) - min_val) / (max_val - min_val), 0, 1) else: normalized = np.zeros_like(self.current_image_array, dtype=np.float32) self._normalized_cache = normalized self._cache_image_id = image_id self._cache_min = min_val self._cache_max = max_val else: normalized = self._normalized_cache # Apply colormap palette = self.palette_var.get() if palette == "Grayscale": image_8bit = (normalized * 255).astype(np.uint8) pil_image = Image.fromarray(image_8bit, mode='L') else: cmap = self.palettes[palette] colored = cmap(normalized) rgb_array = (colored[:, :, :3] * 255).astype(np.uint8) pil_image = Image.fromarray(rgb_array, mode='RGB') self.current_pil_image = pil_image # Scale for display canvas_w = self.canvas.winfo_width() canvas_h = self.canvas.winfo_height() if canvas_w > 1 and canvas_h > 1: img_w, img_h = pil_image.size scale_x = canvas_w / img_w scale_y = canvas_h / img_h scale = min(scale_x, scale_y, 3.0) # Max 3x scale if scale > 1.0: new_w = int(img_w * scale) new_h = int(img_h * scale) display_image = pil_image.resize((new_w, new_h), Image.Resampling.NEAREST) else: display_image = pil_image self.scale_factor = scale self.photo = ImageTk.PhotoImage(display_image) self.canvas.delete("all") self.canvas.create_image(canvas_w//2, canvas_h//2, image=self.photo, anchor=tk.CENTER) # Draw red marker if locked_pixel is set if self.locked_pixel is not None: lx, ly = self.locked_pixel marker_x = int(lx * self.scale_factor) marker_y = int(ly * self.scale_factor) r = 8 # marker radius self.canvas.create_text(marker_x+1, marker_y - r + 10, text="X", fill="red", font=("Arial", 8)) #self.canvas.create_oval( # marker_x - r, marker_y - r, marker_x + r, marker_y + r, # outline="red", width=3 #) # Show temperature for locked pixel value = self.current_image_array[ly, lx] ref1_temp = self.ref_temp_var.get() ref1_pixel = self.ref_pixel_var.get() ref2_temp = self.ref2_temp_var.get() ref2_pixel = self.ref2_pixel_var.get() if ref2_pixel != ref1_pixel: temperature = ref1_temp + (value - ref1_pixel) * (ref2_temp - ref1_temp) / (ref2_pixel - ref1_pixel) self.temp_label.config(text=f"Temperature (locked): {temperature:.1f} °C") else: self.temp_label.config(text="Temperature: -- °C") except Exception as e: self.log(f"Display error: {e}") self.canvas.delete("all") self.current_pil_image = None def stream_worker(self): while self.streaming: start_time = time.time() self.fetch_data() # Frame rate control try: target_fps = int(self.fps_var.get()) frame_time = 1.0 / target_fps elapsed = time.time() - start_time sleep_time = max(0, frame_time - elapsed) time.sleep(sleep_time) except: time.sleep(0.05) # 20 FPS fallback def toggle_stream(self): if not self.streaming: self.first = True self.streaming = True self.start_button.config(text="Stop") self.status_label.config(text="Status: Streaming") self.stream_thread = threading.Thread(target=self.stream_worker, daemon=True) self.stream_thread.start() # Set reference pixel to avg on stream start if self.current_image_array is not None: avg_pixel = int(np.mean(self.current_image_array)) self.ref_pixel_var.set(avg_pixel) else: self.streaming = False self.start_button.config(text="Start Stream") self.status_label.config(text="Status: Stopped") def save_image(self): if self.current_pil_image is None: messagebox.showwarning("No Image", "No image to save.") return file_path = filedialog.asksaveasfilename( defaultextension=".png", filetypes=[("PNG files", "*.png"), ("JPEG files", "*.jpg")] ) if file_path: try: self.current_pil_image.save(file_path) messagebox.showinfo("Success", f"Image saved: {file_path}") except Exception as e: messagebox.showerror("Error", f"Save failed: {e}") def main(): root = tk.Tk() app = ImageStreamViewer(root) root.mainloop() if __name__ == "__main__": main()