Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- All-in-one Visual Weather Engine (Phases 1–3) with terminal progress bar and interactive phase picker
- Phases:
- 1 - Static visualization: temperature heatmap + isotherms, rainfall, wind
- 2 - Animated particles: wind streaks and rain droplets (MP4/GIF or interactive)
- 3 - 3D scene: terrain surface with weather overlays (Matplotlib 3D or Plotly)
- Key features:
- - Ingests heightmap (grayscale or RGB) and texture (RGB).
- - Prompts for sea level threshold (normalized 0..1). Default: 0.22.
- - Interactive phase picker: [1/2/3/all]
- - Cross-references oceans from height and texture.
- - Simulates temperature (land vs ocean), divergence-free wind, humidity, rainfall.
- - Timestamped outputs to avoid overwriting (shared stamp for "all").
- Dependencies:
- pip install numpy matplotlib pillow scipy
- Optional: plotly (for interactive 3D), kaleido (for static plotly export), ffmpeg on system (for MP4 export)
- """
- import argparse
- import os
- import sys
- import shutil
- import time
- import datetime as _dt
- import numpy as np
- import matplotlib as mpl
- import matplotlib.pyplot as plt
- from matplotlib import animation
- from matplotlib import patheffects as pe
- from matplotlib.collections import LineCollection
- from matplotlib.colors import ListedColormap, Normalize
- from PIL import Image
- from scipy.ndimage import (
- gaussian_filter,
- distance_transform_edt,
- maximum_filter,
- binary_opening,
- binary_closing,
- )
- # 3D (registers projection)
- from mpl_toolkits.mplot3d import Axes3D # noqa: F401
- # -----------------------------
- # Terminal progress bar
- # -----------------------------
- class ProgressBar:
- def __init__(self, width=40):
- self.width = width
- self.total = 100
- self.current = 0
- self.desc = ""
- self._last_len = 0
- self._start_time = None
- def _term_width(self):
- try:
- cols = shutil.get_terminal_size((80, 20)).columns
- return max(40, cols)
- except Exception:
- return 80
- def new_task(self, desc, total):
- self.desc = str(desc)
- self.total = max(1, int(total))
- self.current = 0
- self._start_time = time.time()
- self._render()
- def update(self, n):
- self.current = max(0, min(int(n), int(self.total)))
- self._render()
- def advance(self, dn):
- self.update(self.current + dn)
- def done(self, suffix=""):
- self.current = self.total
- self._render(done=True, suffix=suffix)
- sys.stdout.write("\n")
- sys.stdout.flush()
- self.desc = ""
- self._start_time = None
- def _render(self, done=False, suffix=""):
- pct = (float(self.current) / float(self.total)) if self.total else 1.0
- pct = max(0.0, min(1.0, pct))
- term_w = self._term_width()
- bar_w = min(self.width, max(10, term_w - 30))
- filled = int(bar_w * pct)
- bar = "=" * filled + (">" if filled < bar_w and not done else "") + "-" * max(0, bar_w - filled - (0 if filled == bar_w or done else 1))
- percent_txt = f"{int(pct * 100):3d}%"
- elapsed = ""
- if self._start_time is not None:
- dt = time.time() - self._start_time
- if dt >= 0.1:
- elapsed = f" {dt:5.1f}s"
- line = f"\r[{bar}] {percent_txt} {self.desc}{elapsed}"
- if suffix:
- line += f" {suffix}"
- pad = max(0, self._last_len - len(line))
- sys.stdout.write(line + (" " * pad))
- sys.stdout.flush()
- self._last_len = len(line)
- # -----------------------------
- # Prompts / I/O helpers
- # -----------------------------
- def prompt_for_existing_file(prompt_msg, default=None):
- while True:
- prompt = f"{prompt_msg}"
- if default:
- prompt += f" [{default}]"
- prompt += ": "
- path = input(prompt).strip().strip('"').strip("'")
- if path == "" and default:
- path = default
- if os.path.isfile(path):
- return path
- print("File not found. Please provide a valid file path.")
- def prompt_for_float(prompt_msg, default=None, minv=None, maxv=None):
- while True:
- prompt = f"{prompt_msg}"
- if default is not None:
- prompt += f" [{default}]"
- prompt += ": "
- s = input(prompt).strip()
- if s == "" and default is not None:
- val = float(default)
- else:
- try:
- val = float(s)
- except Exception:
- print("Please enter a number.")
- continue
- if minv is not None and val < minv:
- print(f"Value must be >= {minv}")
- continue
- if maxv is not None and val > maxv:
- print(f"Value must be <= {maxv}")
- continue
- return val
- def prompt_for_phase(default_choice=None):
- """
- Interactive picker for phase(s). Returns a list of phases to run.
- """
- while True:
- prompt = "Pick a phase [1/2/3/all]"
- if default_choice is not None:
- prompt += f" [{default_choice}]"
- prompt += ": "
- s = input(prompt).strip().lower()
- if s == "" and default_choice is not None:
- s = str(default_choice).lower()
- if s in ("1", "2", "3"):
- return [int(s)]
- if s in ("all", "a", "123"):
- return [1, 2, 3]
- print("Please enter 1, 2, 3, or 'all'.")
- def read_image_as_array(path, mode=None):
- img = Image.open(path)
- if mode:
- img = img.convert(mode)
- arr = np.array(img)
- return arr
- def ensure_gray2d(a):
- """
- Ensure a is a single-channel 2D array (float32).
- If RGB, convert to luminance (BT.709).
- """
- a = np.asarray(a)
- if a.ndim == 2:
- return a.astype(np.float32)
- if a.ndim == 3:
- if a.shape[2] == 1:
- return a[..., 0].astype(np.float32)
- a = a.astype(np.float32)
- r, g, b = a[..., 0], a[..., 1], a[..., 2]
- gray = 0.2126 * r + 0.7152 * g + 0.0722 * b
- return gray.astype(np.float32)
- return np.squeeze(a).astype(np.float32)
- def to_float01(arr):
- arr = np.asarray(arr)
- if arr.ndim == 2:
- arr = arr.astype(np.float32)
- mn, mx = float(arr.min()), float(arr.max())
- if mx > mn:
- arr = (arr - mn) / (mx - mn)
- else:
- arr = np.zeros_like(arr, dtype=np.float32)
- return arr
- else:
- if arr.dtype == np.uint8:
- return arr.astype(np.float32) / 255.0
- arr = arr.astype(np.float32)
- mn, mx = float(arr.min()), float(arr.max())
- return (arr - mn) / (mx - mn + 1e-9)
- def resize_to_match(arr, target_shape):
- """
- Resize arr (H,W) or (H,W,3) to target_shape (H,W[,C]) using bilinear.
- Only H and W from target_shape are used.
- """
- if isinstance(target_shape, tuple) and len(target_shape) >= 2:
- H, W = int(target_shape[0]), int(target_shape[1])
- else:
- raise ValueError("target_shape must be a tuple like (H, W) or (H, W, C)")
- arr_clipped = np.clip(arr, 0, 1)
- im = Image.fromarray((arr_clipped * 255).astype(np.uint8))
- im = im.resize((W, H), resample=Image.BILINEAR)
- return (np.array(im).astype(np.float32) / 255.0)
- def add_timestamp_to_path(path, fmt="%Y%m%d_%H%M%S", stamp=None):
- """
- Insert a timestamp before the file extension to avoid overwriting.
- If stamp is provided, reuse it across multiple outputs.
- """
- directory, filename = os.path.split(path)
- name, ext = os.path.splitext(filename)
- if stamp is None:
- stamp = _dt.datetime.now().strftime(fmt)
- new_name = f"{name}_{stamp}{ext}"
- return os.path.join(directory if directory else ".", new_name)
- # -----------------------------
- # Math helpers and grids
- # -----------------------------
- def fbm_noise(shape, octaves=5, base_sigma=40.0, persistence=0.5, lacunarity=2.0, seed=42):
- rng = np.random.default_rng(seed)
- H, W = shape
- acc = np.zeros((H, W), dtype=np.float32)
- amp = 1.0
- sigma = base_sigma
- total_amp = 0.0
- for _ in range(octaves):
- n = rng.standard_normal((H, W)).astype(np.float32)
- s = max(0.5, sigma)
- layer = gaussian_filter(n, s, mode='reflect')
- acc += amp * layer
- total_amp += amp
- amp *= persistence
- sigma /= lacunarity
- acc /= (total_amp + 1e-8)
- acc = (acc - acc.min()) / (acc.max() - acc.min() + 1e-9)
- return acc
- def logistic_stable(x):
- """
- Numerically stable logistic sigmoid: sigma(x) without overflow.
- """
- x = x.astype(np.float64)
- out = np.empty_like(x)
- pos = x >= 0
- with np.errstate(over='ignore', under='ignore'):
- out[pos] = 1.0 / (1.0 + np.exp(-x[pos]))
- ex = np.exp(x[~pos])
- out[~pos] = ex / (1.0 + ex)
- return out.astype(np.float32)
- def compute_grids(H, W):
- y = np.linspace(0.0, 1.0, H, dtype=np.float32)
- x = np.linspace(0.0, 1.0, W, dtype=np.float32)
- Y, X = np.meshgrid(y, x, indexing='ij')
- return X, Y
- # -----------------------------
- # Meteorological fields
- # -----------------------------
- def detect_water_mask_combined(texture_rgb, height_norm, sea_level=0.22,
- blue_thresh=0.45, blue_margin=0.05,
- smooth_sigma=1.2, morph=True):
- """
- Combine height-based and color-based water detection.
- - by_height: height <= sea_level
- - by_color: blue dominant over R/G by margin, above threshold
- """
- by_height = height_norm <= float(sea_level)
- red = texture_rgb[..., 0]
- green = texture_rgb[..., 1]
- blue = texture_rgb[..., 2]
- by_color = (blue > blue_thresh) & (blue > green + blue_margin) & (blue > red + blue_margin)
- water = by_height | by_color
- water = gaussian_filter(water.astype(np.float32), sigma=smooth_sigma) > 0.5
- if morph:
- water = binary_opening(water, structure=np.ones((3, 3)))
- water = binary_closing(water, structure=np.ones((3, 3)))
- # Signed distance to coast: negative over water, positive over land
- land = ~water
- dist_to_land = distance_transform_edt(~water) # distance for water pixels to nearest land
- dist_to_water = distance_transform_edt(~land) # distance for land pixels to nearest water
- signed_coast_dist = dist_to_land.astype(np.float32)
- signed_coast_dist[water] = -dist_to_water[water].astype(np.float32)
- return water, signed_coast_dist
- def compute_temperature(height_norm, X, Y, water_mask, coast_dist,
- seed=42, max_elev_m=3000.0, sea_level_temp_C=22.0,
- lapse_rate_K_per_km=6.5, lat_grad_C=10.0, sst_variance_C=1.5):
- """
- Temperature (°C), blending land lapse-rate cooling and sea surface temperature.
- """
- H, W = height_norm.shape
- elev_m = height_norm * max_elev_m
- lapse = lapse_rate_K_per_km * (elev_m / 1000.0)
- lat_component_land = lat_grad_C * (Y - 0.5)
- lat_component_sea = 0.6 * lat_grad_C * (Y - 0.5)
- land_noise = fbm_noise((H, W), octaves=5, base_sigma=min(H, W)/16,
- persistence=0.55, lacunarity=2.1, seed=seed+101)
- land_noise = (land_noise - 0.5) * 4.0 # +/- 2C
- sst_noise = fbm_noise((H, W), octaves=4, base_sigma=min(H, W)/14,
- persistence=0.6, lacunarity=2.0, seed=seed+102)
- sst_noise = (sst_noise - 0.5) * (2 * sst_variance_C)
- T_land = sea_level_temp_C - lapse + lat_component_land + land_noise
- T_sea = sea_level_temp_C + lat_component_sea + sst_noise
- # Stable coastline blend (0 over water, 1 over land)
- k = 5.0 # pixels
- blend = logistic_stable(coast_dist / k)
- T = blend * T_land + (1.0 - blend) * T_sea
- return T
- def compute_streamfunction(height_norm, X, Y, water_mask,
- base_wind_dir_deg=35.0, base_wind_speed=6.0,
- seed=42, k_orog=0.9, k_noise=1.0,
- ocean_speed_boost=0.25, mountain_drag=0.12):
- """
- Divergence-free flow via streamfunction with speed scaling over ocean/mountains.
- """
- H, W = height_norm.shape
- theta = np.deg2rad(base_wind_dir_deg)
- u0 = base_wind_speed * np.cos(theta)
- v0 = base_wind_speed * np.sin(theta)
- elev_smooth = gaussian_filter(height_norm, sigma=min(H, W)/80)
- noise = fbm_noise((H, W), octaves=5, base_sigma=min(H, W)/20, seed=seed+202)
- noise = gaussian_filter(noise, sigma=min(H, W)/60)
- psi = (u0 * Y) - (v0 * X) + k_orog * elev_smooth + k_noise * noise
- dy = 1.0 / max(H - 1, 1)
- dx = 1.0 / max(W - 1, 1)
- dpsidy = np.gradient(psi, dy, axis=0)
- dpsidx = np.gradient(psi, dx, axis=1)
- u = gaussian_filter(dpsidy, sigma=1.0)
- v = gaussian_filter(-dpsidx, sigma=1.0)
- speed_scale = 1.0 + ocean_speed_boost * (water_mask.astype(np.float32)) - mountain_drag * (height_norm ** 1.5)
- u *= speed_scale
- v *= speed_scale
- return u, v
- def compute_humidity(height_norm, water_mask, coast_dist, seed=42):
- """
- Humidity 0..1; high over ocean, decays inland, penalized by elevation.
- """
- H, W = height_norm.shape
- dist = np.abs(coast_dist)
- dist_norm = dist / (dist.max() + 1e-9)
- base_ocean = 0.9
- base_land = 0.35
- baseline = np.where(water_mask, base_ocean, base_land)
- inland_decay = np.exp(-3.0 * dist_norm)
- elev_penalty = np.clip(1.0 - height_norm, 0, 1)
- noise = fbm_noise((H, W), octaves=4, base_sigma=min(H, W)/12, seed=seed+303)
- noise = (noise - 0.5) * 0.18
- q = baseline * (0.5 + 0.5 * inland_decay) * (0.6 + 0.4 * elev_penalty) + noise
- q = np.clip(q, 0, 1)
- return q
- def compute_rainfall(u, v, height_norm, humidity, temperature_C, water_mask, seed=42):
- """
- Rainfall rate (mm/hr): orographic on land + convective everywhere.
- """
- H, W = height_norm.shape
- dy = 1.0 / max(H - 1, 1)
- dx = 1.0 / max(W - 1, 1)
- dzdy = np.gradient(height_norm, dy, axis=0)
- dzdx = np.gradient(height_norm, dx, axis=1)
- upslope = u * dzdx + v * dzdy
- orog = np.clip(upslope, 0, None)
- orog = orog * (~water_mask) # suppress orographic rain over oceans
- t_excess = np.clip(temperature_C - (np.quantile(temperature_C, 0.6)), 0, None)
- conv_noise = fbm_noise((H, W), octaves=3, base_sigma=min(H, W)/18, seed=seed+404)
- conv = t_excess * humidity * conv_noise
- rain = 30.0 * (0.65 * orog + 0.35 * conv)
- rain = gaussian_filter(rain, sigma=0.8)
- return rain
- def wind_speed(u, v):
- return np.sqrt(u*u + v*v)
- def vorticity(u, v):
- H, W = u.shape
- dy = 1.0 / max(H - 1, 1)
- dx = 1.0 / max(W - 1, 1)
- dvdx = np.gradient(v, dx, axis=1)
- dudy = np.gradient(u, dy, axis=0)
- return dvdx - dudy
- def find_local_extrema(field, num_max=3, num_min=3, neighborhood=21):
- f = gaussian_filter(field, sigma=2.0)
- max_mask = f == maximum_filter(f, size=neighborhood, mode='reflect')
- max_coords = np.argwhere(max_mask)
- max_vals = f[max_mask]
- if len(max_vals) > 0:
- idx = np.argsort(max_vals)[::-1][:num_max]
- hot_points = max_coords[idx]
- else:
- hot_points = np.empty((0, 2), dtype=int)
- g = -f
- min_mask = g == maximum_filter(g, size=neighborhood, mode='reflect')
- min_coords = np.argwhere(min_mask)
- min_vals = f[min_mask]
- if len(min_vals) > 0:
- jdx = np.argsort(min_vals)[:num_min]
- cold_points = min_coords[jdx]
- else:
- cold_points = np.empty((0, 2), dtype=int)
- return hot_points, cold_points
- def find_storm_centers(u, v, num_centers=3, neighborhood=31, min_quantile=0.92):
- vort = vorticity(u, v)
- vort_s = gaussian_filter(vort, sigma=2.0)
- thresh = np.quantile(vort_s, min_quantile)
- mask = (vort_s >= thresh) & (vort_s == maximum_filter(vort_s, size=neighborhood))
- coords = np.argwhere(mask)
- vals = vort_s[mask]
- if len(vals) > 0:
- idx = np.argsort(vals)[::-1][:num_centers]
- pts = coords[idx]
- else:
- pts = np.empty((0, 2), dtype=int)
- return pts, vort_s
- # -----------------------------
- # Colormaps and drawing helpers
- # -----------------------------
- def _get_cmap(name: str):
- cmaps = getattr(mpl, "colormaps", None)
- if cmaps is not None:
- return cmaps.get_cmap(name)
- # Fallback for older Matplotlib
- from matplotlib import cm as _cm # local import to avoid Pylance noise
- return _cm.get_cmap(name)
- def build_temperature_cmap():
- return _get_cmap('coolwarm')
- def build_rain_cmap():
- base = _get_cmap('Blues')
- colors = np.array(base(np.linspace(0, 1, 256)), copy=True) # ensure writable ndarray
- colors[:, -1] = np.linspace(0.0, 0.85, 256)
- return ListedColormap(colors)
- def annotate_points(ax, pts, labels, color='k'):
- for (y, x), label in zip(pts, labels):
- ax.text(
- x, y, label,
- color='white',
- ha='center', va='center',
- fontsize=10, weight='bold',
- bbox=dict(boxstyle='round,pad=0.25', fc=color, ec='none', alpha=0.8),
- zorder=10,
- path_effects=[pe.withStroke(linewidth=1.5, foreground='black', alpha=0.3)]
- )
- def draw_fronts(ax, T):
- gy, gx = np.gradient(gaussian_filter(T, sigma=1.2))
- gradmag = np.hypot(gx, gy)
- level = np.quantile(gradmag, 0.93)
- cs = ax.contour(gradmag, levels=[level], colors=['magenta'], linewidths=1.0, linestyles='dashed', alpha=0.7)
- return cs
- # -----------------------------
- # Phase 1: Static visualization
- # -----------------------------
- def render_static_map(texture, T, rain, u, v, out_path, dpi=200, show=False,
- draw_quiver=False, title='Phase 1: Static Weather', pb=None):
- if pb:
- pb.new_task("Rendering static map", total=3)
- H, W, _ = texture.shape
- aspect = W / H
- fig_w = 12
- fig_h = max(6, fig_w / aspect)
- fig, ax = plt.subplots(figsize=(fig_w, fig_h), dpi=dpi)
- ax.set_axis_off()
- ax.imshow(texture, interpolation='bilinear', zorder=0)
- imT = ax.imshow(T, cmap=build_temperature_cmap(), alpha=0.35, zorder=2)
- t_min, t_max = float(np.nanmin(T)), float(np.nanmax(T))
- levels = np.linspace(t_min, t_max, 12)
- cs = ax.contour(T, levels=levels, colors=['k'], linewidths=0.7, alpha=0.7, zorder=3)
- ax.clabel(cs, inline=1, fontsize=8, fmt='%1.0f°C')
- imR = ax.imshow(rain, cmap=build_rain_cmap(), alpha=0.75, zorder=4)
- ds = max(1, int(min(H, W) / 120))
- YY, XX = np.mgrid[0:H:ds, 0:W:ds]
- U = u[::ds, ::ds]
- V = v[::ds, ::ds]
- sp = wind_speed(U, V)
- lw = 1.5 * (sp / (sp.max() + 1e-9)) + 0.5
- ax.streamplot(XX, YY, U, V, color='white', linewidth=lw, density=1.3, arrowsize=0.8, minlength=0.15, zorder=5)
- if draw_quiver:
- step = ds * 3
- ax.quiver(np.arange(0, W, step), np.arange(0, H, step),
- u[::step, ::step], v[::step, ::step],
- color='white', alpha=0.8, zorder=6, pivot='mid', scale=50)
- hot_pts, cold_pts = find_local_extrema(T, num_max=3, num_min=3, neighborhood=31)
- if len(hot_pts) > 0:
- annotate_points(ax, hot_pts, ['HOT'] * len(hot_pts), color='#d62728')
- if len(cold_pts) > 0:
- annotate_points(ax, cold_pts, ['COLD'] * len(cold_pts), color='#1f77b4')
- storm_pts, _ = find_storm_centers(u, v, num_centers=3, neighborhood=31, min_quantile=0.92)
- if len(storm_pts) > 0:
- annotate_points(ax, storm_pts, ['L'] * len(storm_pts), color='#2ca02c')
- draw_fronts(ax, T)
- if pb:
- pb.advance(1)
- cbar_T = fig.colorbar(imT, ax=ax, fraction=0.046, pad=0.02)
- cbar_T.set_label('Temperature (°C)')
- cbar_R = fig.colorbar(imR, ax=ax, fraction=0.046, pad=0.02)
- cbar_R.set_label('Rainfall (mm/hr)')
- ax.set_title(title, fontsize=14, weight='bold')
- fig.tight_layout()
- if pb:
- pb.advance(1)
- fig.savefig(out_path, bbox_inches='tight', dpi=dpi)
- if show:
- plt.show()
- plt.close(fig)
- if pb:
- pb.done("Static map saved")
- # -----------------------------
- # Phase 2: Animation (wind/rain)
- # -----------------------------
- def bilinear_sample(a, x, y):
- H, W = a.shape
- x = np.clip(x, 0.0, W - 1.000001)
- y = np.clip(y, 0.0, H - 1.000001)
- x0 = np.floor(x).astype(np.int32)
- y0 = np.floor(y).astype(np.int32)
- x1 = np.clip(x0 + 1, 0, W - 1)
- y1 = np.clip(y0 + 1, 0, H - 1)
- fx = x - x0
- fy = y - y0
- v00 = a[y0, x0]
- v10 = a[y0, x1]
- v01 = a[y1, x0]
- v11 = a[y1, x1]
- v0 = v00 * (1.0 - fx) + v10 * fx
- v1 = v01 * (1.0 - fx) + v11 * fx
- return v0 * (1.0 - fy) + v1 * fy
- def advect(x, y, u, v, px_per_frame):
- ux = bilinear_sample(u, x, y) * px_per_frame
- vy = bilinear_sample(v, x, y) * px_per_frame
- return x + ux, y + vy, np.sqrt(ux*ux + vy*vy)
- def wrap_or_respawn(x, y, W, H, rng, respawn_mask=None, edge='wrap'):
- if edge == 'wrap':
- x = np.mod(x, W)
- y = np.mod(y, H)
- return x, y
- need = respawn_mask if respawn_mask is not None else (
- (x < 0) | (x >= W) | (y < 0) | (y >= H)
- )
- n = np.count_nonzero(need)
- if n > 0:
- x[need] = rng.uniform(0, W, size=n)
- y[need] = rng.uniform(0, H, size=n)
- return x, y
- def sample_positions_from_weight_map(weight_map, n, rng):
- H, W = weight_map.shape
- w = weight_map.ravel().astype(np.float64)
- s = w.sum()
- if s <= 0:
- x = rng.uniform(0, W, size=n)
- y = rng.uniform(0, H, size=n)
- return x, y
- p = w / s
- idx = rng.choice(W * H, size=n, replace=True, p=p)
- y_idx, x_idx = np.divmod(idx, W)
- x = x_idx + rng.uniform(0, 1, size=n)
- y = y_idx + rng.uniform(0, 1, size=n)
- return x, y
- def animate_weather(texture, T, rain_field, u, v,
- out_path='weather_anim.mp4', mode='both',
- n_wind=2000, n_rain=2000, fps=30, duration=32,
- dpi=160, show=False, seed=42, draw_isotherms=True,
- temp_alpha=0.28, rain_alpha=0.35, streamlines=False,
- blit=True, flow_scale=0.7, base_title='Phase 2: Animated Weather',
- pb=None):
- rng = np.random.default_rng(seed)
- H, W, _ = texture.shape
- frames = int(max(1, duration) * max(1, fps))
- spd = wind_speed(u, v)
- ref = np.percentile(spd, 99.0) + 1e-9
- px_per_frame = (flow_scale * min(H, W)) / (ref * fps)
- aspect = W / H
- fig_w = 12
- fig_h = max(6, fig_w / aspect)
- fig, ax = plt.subplots(figsize=(fig_w, fig_h), dpi=dpi)
- ax.set_axis_off()
- ax.set_xlim(0, W)
- ax.set_ylim(H, 0)
- ax.imshow(texture, interpolation='bilinear', zorder=0)
- ax.imshow(T, cmap=build_temperature_cmap(), alpha=temp_alpha, zorder=1)
- if draw_isotherms:
- levels = np.linspace(float(np.nanmin(T)), float(np.nanmax(T)), 10)
- cs = ax.contour(T, levels=levels, colors=['k'], linewidths=0.6, alpha=0.6, zorder=2)
- ax.clabel(cs, inline=1, fontsize=8, fmt='%1.0f°C')
- r_cmap = build_rain_cmap()
- ax.imshow(rain_field, cmap=r_cmap, alpha=rain_alpha, zorder=3)
- if streamlines:
- ds = max(1, int(min(H, W) / 120))
- YY, XX = np.mgrid[0:H:ds, 0:W:ds]
- U = u[::ds, ::ds]
- V = v[::ds, ::ds]
- sp = wind_speed(U, V)
- lw = 1.2 * (sp / (sp.max() + 1e-9)) + 0.4
- ax.streamplot(XX, YY, U, V, color='white', linewidth=lw, density=1.2, arrowsize=0.7, minlength=0.15, zorder=4)
- artists = []
- # Wind streaks
- lc = None
- wind_x = wind_y = None
- wind_norm = Normalize(vmin=0.0, vmax=np.percentile(spd, 95.0))
- if mode in ('wind', 'both') and n_wind > 0:
- wind_x = rng.uniform(0, W, size=n_wind).astype(np.float32)
- wind_y = rng.uniform(0, H, size=n_wind).astype(np.float32)
- lc = LineCollection(segments=[], cmap=_get_cmap('plasma'), norm=wind_norm,
- linewidths=1.2, alpha=0.9, zorder=6)
- lc.set_array(np.zeros(n_wind, dtype=np.float32))
- if blit:
- lc.set_animated(True)
- ax.add_collection(lc) # type: ignore[arg-type]
- artists.append(lc)
- # Rain scatter (init arrays so Pylance knows they're arrays)
- rain_x = np.empty(0, dtype=np.float32)
- rain_y = np.empty(0, dtype=np.float32)
- rain_alpha_arr = np.empty(0, dtype=np.float32)
- rain_life = np.empty(0, dtype=np.float32)
- sc = None
- rain_map = rain_field - rain_field.min()
- if rain_map.max() > 0:
- rain_map = rain_map / (rain_map.max() + 1e-9)
- max_rain = max(0, n_rain)
- if mode in ('rain', 'both') and max_rain > 0:
- init_n = min(max_rain, max(100, max_rain // 5))
- rx, ry = sample_positions_from_weight_map(rain_map, init_n, rng)
- rain_x = rx.astype(np.float32)
- rain_y = ry.astype(np.float32)
- rain_alpha_arr = np.ones_like(rain_x, dtype=np.float32) * 0.85
- rain_life = rng.uniform(0.8 * fps, 2.0 * fps, size=rain_x.shape[0]).astype(np.float32)
- sc = ax.scatter(rain_x, rain_y, s=6.0, c='white', alpha=rain_alpha_arr, edgecolors='none', zorder=7)
- if blit:
- sc.set_animated(True)
- artists.append(sc)
- ax.set_title(f"{base_title}", fontsize=14, weight='bold')
- fig.canvas.draw()
- def init():
- return artists
- def update(frame_idx):
- nonlocal rain_x, rain_y, rain_alpha_arr, rain_life
- if lc is not None:
- new_x, new_y, _ = advect(wind_x, wind_y, u, v, px_per_frame)
- segs = np.stack([np.stack([wind_x, wind_y], axis=1),
- np.stack([new_x, new_y], axis=1)], axis=1)
- lc.set_segments(segs)
- s_here = bilinear_sample(spd, wind_x, wind_y)
- lc.set_array(s_here.astype(np.float32))
- wind_x[:] = new_x
- wind_y[:] = new_y
- wind_x[:], wind_y[:] = wrap_or_respawn(wind_x, wind_y, W, H, rng, edge='wrap')
- if sc is not None:
- spawn_per_frame = max(1, int(0.03 * max_rain))
- rx, ry = sample_positions_from_weight_map(rain_map, spawn_per_frame, rng)
- if rain_x.size == 0:
- rain_x = rx.astype(np.float32)
- rain_y = ry.astype(np.float32)
- rain_alpha_arr = np.ones_like(rain_x) * 0.85
- rain_life = rng.uniform(0.8 * fps, 2.0 * fps, size=rain_x.shape[0]).astype(np.float32)
- else:
- rain_x = np.concatenate([rain_x, rx.astype(np.float32)])
- rain_y = np.concatenate([rain_y, ry.astype(np.float32)])
- rain_alpha_arr = np.concatenate([rain_alpha_arr, np.ones(rx.shape[0], dtype=np.float32) * 0.85])
- rain_life = np.concatenate([rain_life, rng.uniform(0.8 * fps, 2.0 * fps, size=rx.shape[0]).astype(np.float32)])
- if rain_x.size > max_rain:
- rain_x = rain_x[-max_rain:]
- rain_y = rain_y[-max_rain:]
- rain_alpha_arr = rain_alpha_arr[-max_rain:]
- rain_life = rain_life[-max_rain:]
- rain_x, rain_y, _ = advect(rain_x, rain_y, u, v, px_per_frame)
- rain_life -= 1.0
- rain_alpha_arr *= 0.985
- out = (rain_x < -2) | (rain_x > W + 2) | (rain_y < -2) | (rain_y > H + 2) | (rain_life <= 0) | (rain_alpha_arr < 0.05)
- if np.any(out):
- keep_mask = ~out
- rain_x = rain_x[keep_mask]
- rain_y = rain_y[keep_mask]
- rain_alpha_arr = rain_alpha_arr[keep_mask]
- rain_life = rain_life[keep_mask]
- sc.set_offsets(np.c_[rain_x, rain_y])
- sc.set_alpha(rain_alpha_arr)
- return artists
- anim = animation.FuncAnimation(fig, update, init_func=init, frames=frames, interval=1000.0 / fps, blit=blit)
- # Progress bar for saving frames
- if out_path:
- ext = os.path.splitext(out_path)[1].lower()
- total_frames = frames
- if pb:
- pb.new_task("Exporting animation frames", total=total_frames)
- def progress_callback(i, n):
- if pb:
- pb.update(i + 1)
- try:
- if ext == '.mp4':
- Writer = animation.FFMpegWriter
- writer = Writer(fps=fps, metadata=dict(artist='You'), bitrate=6000)
- anim.save(out_path, writer=writer, dpi=dpi, progress_callback=progress_callback)
- elif ext == '.gif':
- anim.save(out_path, writer='pillow', dpi=dpi, progress_callback=progress_callback)
- else:
- Writer = animation.FFMpegWriter
- writer = Writer(fps=fps, metadata=dict(artist='You'), bitrate=6000)
- anim.save(out_path if ext else out_path + '.mp4', writer=writer, dpi=dpi, progress_callback=progress_callback)
- except Exception as e:
- if pb:
- pb.done("Export failed")
- print("\nFailed to save animation:", e)
- print("Tip: for MP4, ensure ffmpeg is installed and on PATH.")
- else:
- if pb:
- pb.done("Animation saved")
- if show:
- plt.show()
- plt.close(fig)
- # -----------------------------
- # Phase 3: 3D scene
- # -----------------------------
- def render_3d_matplotlib(texture, height_norm, T, rain, u, v, out_path=None, show=False,
- max_elev_m=3000.0, title='Phase 3: 3D Weather (Matplotlib)', pb=None):
- if pb:
- pb.new_task("Preparing 3D data", total=3)
- H, W = height_norm.shape
- target = 220
- stride = max(1, int(min(H, W) / target))
- h_ds = height_norm[::stride, ::stride]
- T_ds = T[::stride, ::stride]
- rain_ds = rain[::stride, ::stride]
- u_ds = u[::stride, ::stride]
- v_ds = v[::stride, ::stride]
- Hd, Wd = h_ds.shape
- if pb:
- pb.advance(1)
- Xg, Yg = np.meshgrid(np.linspace(0, 1, Wd), np.linspace(0, 1, Hd))
- Z = h_ds * max_elev_m
- t_cmap = build_temperature_cmap()
- t_norm = (T_ds - T_ds.min()) / (T_ds.max() - T_ds.min() + 1e-9)
- face_colors = t_cmap(t_norm)
- rain_norm = np.clip(rain_ds / (np.percentile(rain_ds, 98) + 1e-9), 0, 1)
- blue_tint = np.zeros_like(face_colors)
- blue_tint[..., 2] = 1.0
- alpha_rain = 0.35 * rain_norm
- face_colors = (1 - alpha_rain[..., None]) * face_colors + alpha_rain[..., None] * blue_tint
- if pb:
- pb.advance(1)
- fig = plt.figure(figsize=(12, 8))
- ax = fig.add_subplot(111, projection='3d')
- ax.plot_surface(Xg, Yg, Z, facecolors=face_colors, rstride=1, cstride=1, linewidth=0.0, antialiased=False, shade=False)
- ax.set_title(title, fontsize=14, weight='bold')
- ax.set_xlabel('X')
- ax.set_ylabel('Y')
- ax.set_zlabel('Elevation (m)')
- ax.view_init(elev=55, azim=-60)
- step = max(1, int(min(Hd, Wd) / 22))
- Xq = Xg[::step, ::step]
- Yq = Yg[::step, ::step]
- Zq = Z[::step, ::step] + 30.0
- Uq = u_ds[::step, ::step]
- Vq = v_ds[::step, ::step]
- Wq = np.zeros_like(Uq)
- mag = np.sqrt(Uq**2 + Vq**2) + 1e-9
- Uq = Uq / mag
- Vq = Vq / mag
- ax.quiver(Xq, Yq, Zq, Uq, Vq, Wq, length=0.03, color='white', linewidth=0.6, normalize=False)
- fig.tight_layout()
- if pb:
- pb.advance(1)
- if out_path:
- fig.savefig(out_path, dpi=180, bbox_inches='tight')
- if show:
- plt.show()
- plt.close(fig)
- if pb:
- pb.done("3D scene saved" if out_path else "3D scene ready")
- def render_3d_plotly(texture, height_norm, T, rain, out_path=None,
- max_elev_m=3000.0, title='Phase 3: 3D Weather (Plotly)', pb=None):
- try:
- import plotly.graph_objects as go # type: ignore
- except Exception:
- print("Plotly not installed. Falling back to Matplotlib 3D.")
- return False
- if pb:
- pb.new_task("Preparing 3D (Plotly)", total=2)
- H, W = height_norm.shape
- target = 300
- stride = max(1, int(min(H, W) / target))
- h_ds = height_norm[::stride, ::stride]
- T_ds = T[::stride, ::stride]
- Hd, Wd = h_ds.shape
- Z = h_ds * max_elev_m
- t_norm = (T_ds - T_ds.min()) / (T_ds.max() - T_ds.min() + 1e-9)
- if pb:
- pb.advance(1)
- fig = go.Figure(data=[
- go.Surface(
- z=Z,
- x=np.linspace(0, 1, Wd),
- y=np.linspace(0, 1, Hd),
- surfacecolor=t_norm,
- colorscale='RdBu',
- reversescale=True,
- cmin=0.0, cmax=1.0,
- showscale=True,
- contours={"z": {"show": False}},
- )
- ])
- fig.update_layout(
- title=title,
- scene=dict(
- xaxis_title='X', yaxis_title='Y', zaxis_title='Elevation (m)',
- aspectmode='data',
- ),
- width=1000, height=700
- )
- if out_path:
- if out_path.lower().endswith('.html'):
- fig.write_html(out_path)
- else:
- try:
- fig.write_image(out_path) # requires kaleido
- except Exception:
- print("Plotly static image export failed (install 'kaleido'). Writing HTML instead.")
- fig.write_html(os.path.splitext(out_path)[0] + ".html")
- else:
- fig.show()
- if pb:
- pb.done("3D scene saved")
- return True
- # -----------------------------
- # Pipeline orchestration
- # -----------------------------
- def build_fields(heightmap_path, texture_path, sea_level, seed, base_wind_dir, base_wind_speed, max_elev_m, pb=None):
- if pb:
- pb.new_task("Loading inputs", total=8)
- h_raw = read_image_as_array(heightmap_path)
- if pb: pb.advance(1)
- h_gray = ensure_gray2d(h_raw)
- if pb: pb.advance(1)
- height_norm = to_float01(h_gray)
- if pb: pb.advance(1)
- tex_raw = read_image_as_array(texture_path, mode='RGB')
- texture = to_float01(tex_raw)
- if pb: pb.advance(1)
- if texture.shape[:2] != height_norm.shape:
- texture = resize_to_match(texture, height_norm.shape)
- if pb: pb.advance(1)
- H, W = height_norm.shape
- X, Y = compute_grids(H, W)
- if pb: pb.advance(1)
- water_mask, coast_dist = detect_water_mask_combined(texture, height_norm, sea_level=sea_level)
- if pb: pb.advance(1)
- T = compute_temperature(height_norm, X, Y, water_mask, coast_dist,
- seed=seed, max_elev_m=max_elev_m)
- if pb: pb.advance(1)
- if pb: pb.new_task("Building wind/humidity/rain", total=3)
- u, v = compute_streamfunction(height_norm, X, Y, water_mask,
- base_wind_dir_deg=base_wind_dir,
- base_wind_speed=base_wind_speed,
- seed=seed, k_orog=0.9, k_noise=1.0)
- if pb: pb.advance(1)
- q = compute_humidity(height_norm, water_mask, coast_dist, seed=seed)
- if pb: pb.advance(1)
- rain = compute_rainfall(u, v, height_norm, q, T, water_mask, seed=seed)
- if pb: pb.advance(1)
- if pb: pb.done("Fields ready")
- return texture, height_norm, water_mask, T, rain, u, v
- # -----------------------------
- # Main / CLI
- # -----------------------------
- def main():
- ap = argparse.ArgumentParser(description="All-in-one Visual Weather Engine (Phases 1–3)")
- ap.add_argument('--phase', type=int, default=None, choices=[1, 2, 3], help='1=Static, 2=Animation, 3=3D (omit to pick interactively)')
- ap.add_argument('--heightmap', type=str, default=None, help='Path to heightmap (grayscale or RGB).')
- ap.add_argument('--texture', type=str, default=None, help='Path to texture map (RGB).')
- ap.add_argument('--sea-level', type=float, default=None, help='Sea level threshold in normalized height units [0..1], e.g., 0.22')
- ap.add_argument('--out', type=str, default=None, help='Output file (png/mp4/gif/html). For --phase all, used as base name.')
- ap.add_argument('--seed', type=int, default=42, help='Random seed.')
- ap.add_argument('--base-wind-dir', type=float, default=35.0, help='Wind direction deg (0=E, 90=N).')
- ap.add_argument('--base-wind-speed', type=float, default=6.0, help='Base wind speed (arbitrary units).')
- ap.add_argument('--max-elev', type=float, default=3000.0, help='Max elevation represented by heightmap (m).')
- # Phase 1 options
- ap.add_argument('--dpi', type=int, default=200, help='DPI for static or animation frames.')
- ap.add_argument('--show', action='store_true', help='Show window after rendering.')
- ap.add_argument('--draw-quiver', action='store_true', help='Add quiver arrows to static map.')
- # Phase 2 options
- ap.add_argument('--fps', type=int, default=30, help='Frames per second (animation).')
- ap.add_argument('--duration', type=float, default=10.0, help='Duration in seconds (animation).')
- ap.add_argument('--mode', type=str, default='both', choices=['wind', 'rain', 'both'], help='Particle mode (animation).')
- ap.add_argument('--n-wind', type=int, default=2000, help='Number of wind particles.')
- ap.add_argument('--n-rain', type=int, default=2000, help='Max number of rain particles.')
- ap.add_argument('--flow-scale', type=float, default=0.7, help='Relative particle speed scaling (~0.1..1.5).')
- ap.add_argument('--no-blit', action='store_true', help='Disable blitting (slower but more compatible).')
- ap.add_argument('--no-isos', action='store_true', help='Disable isotherm labels in animation.')
- ap.add_argument('--temp-alpha', type=float, default=0.28, help='Temperature overlay alpha (animation).')
- ap.add_argument('--rain-alpha', type=float, default=0.55, help='Rain overlay alpha (animation).')
- ap.add_argument('--streamlines', action='store_true', help='Draw static streamlines in animation.')
- # Phase 3 options
- ap.add_argument('--plotly', action='store_true', help='Use Plotly for 3D (if installed).')
- args = ap.parse_args()
- pb = ProgressBar(width=42)
- # Inputs and sea level
- heightmap_path = prompt_for_existing_file("Enter heightmap path", default=args.heightmap)
- texture_path = prompt_for_existing_file("Enter texture map path", default=args.texture)
- sea_level = prompt_for_float("Enter sea level threshold (normalized 0..1)", default=(args.sea_level if args.sea_level is not None else 0.22), minv=0.0, maxv=1.0)
- # Interactive phase pick if not provided via CLI
- phases_to_run = [args.phase] if args.phase is not None else prompt_for_phase(default_choice="1")
- # Build meteorological fields once
- texture, height_norm, water_mask, T, rain, u, v = build_fields(
- heightmap_path, texture_path, sea_level, seed=args.seed,
- base_wind_dir=args.base_wind_dir, base_wind_speed=args.base_wind_speed,
- max_elev_m=args.max_elev, pb=pb
- )
- # Compute shared timestamp for outputs
- stamp = _dt.datetime.now().strftime("%Y%m%d_%H%M%S")
- # Helper to build output names
- def outputs_for_phase(phase, base_out, use_plotly, stamp):
- if base_out:
- base_dir, fn = os.path.split(base_out)
- base_name, _ext = os.path.splitext(fn)
- base_dir = base_dir if base_dir else "."
- else:
- base_dir, base_name = ".", "weather"
- if phase == 1:
- path = os.path.join(base_dir, f"{base_name}_map.png")
- elif phase == 2:
- path = os.path.join(base_dir, f"{base_name}_anim.mp4")
- elif phase == 3:
- path = os.path.join(base_dir, f"{base_name}_3d.html" if use_plotly else f"{base_name}_3d.png")
- else:
- path = os.path.join(base_dir, f"{base_name}.out")
- # Add shared timestamp
- return add_timestamp_to_path(path, stamp=stamp)
- # Run requested phases
- if phases_to_run == [1]:
- out1 = outputs_for_phase(1, args.out, args.plotly, stamp)
- render_static_map(texture, T, rain, u, v, out_path=out1, dpi=args.dpi, show=args.show,
- draw_quiver=args.draw_quiver, title='Phase 1: Static Weather Visualization', pb=pb)
- print(f"Saved: {out1}")
- elif phases_to_run == [2]:
- out2 = outputs_for_phase(2, args.out, args.plotly, stamp)
- animate_weather(
- texture=texture, T=T, rain_field=rain, u=u, v=v,
- out_path=out2, mode=args.mode, n_wind=args.n_wind, n_rain=args.n_rain,
- fps=args.fps, duration=args.duration, dpi=args.dpi, show=args.show,
- seed=args.seed, draw_isotherms=not args.no_isos, temp_alpha=args.temp_alpha,
- rain_alpha=args.rain_alpha, streamlines=args.streamlines, blit=not args.no_blit,
- flow_scale=args.flow_scale, base_title='Phase 2: Animated Weather',
- pb=pb
- )
- print(f"Done. Output: {out2}")
- elif phases_to_run == [3]:
- out3 = outputs_for_phase(3, args.out, args.plotly, stamp)
- used_plotly = False
- if args.plotly:
- used_plotly = render_3d_plotly(texture, height_norm, T, rain, out_path=out3,
- max_elev_m=args.max_elev, title='Phase 3: 3D Weather (Plotly)', pb=pb)
- if not used_plotly:
- # If we were planning HTML but fell back, adjust extension to PNG + timestamp again
- if out3.lower().endswith(".html"):
- out3 = outputs_for_phase(3, args.out, False, stamp)
- render_3d_matplotlib(texture, height_norm, T, rain, u, v, out_path=out3, show=args.show,
- max_elev_m=args.max_elev, title='Phase 3: 3D Weather (Matplotlib)', pb=pb)
- print(f"Saved: {out3}")
- else:
- # Run all phases with one shared stamp
- out1 = outputs_for_phase(1, args.out, args.plotly, stamp)
- out2 = outputs_for_phase(2, args.out, args.plotly, stamp)
- out3 = outputs_for_phase(3, args.out, args.plotly, stamp)
- # Phase 1
- render_static_map(texture, T, rain, u, v, out_path=out1, dpi=args.dpi, show=False,
- draw_quiver=args.draw_quiver, title='Phase 1: Static Weather Visualization', pb=pb)
- print(f"Saved: {out1}")
- # Phase 2
- animate_weather(
- texture=texture, T=T, rain_field=rain, u=u, v=v,
- out_path=out2, mode=args.mode, n_wind=args.n_wind, n_rain=args.n_rain,
- fps=args.fps, duration=args.duration, dpi=args.dpi, show=False,
- seed=args.seed, draw_isotherms=not args.no_isos, temp_alpha=args.temp_alpha,
- rain_alpha=args.rain_alpha, streamlines=args.streamlines, blit=not args.no_blit,
- flow_scale=args.flow_scale, base_title='Phase 2: Animated Weather', pb=pb
- )
- print(f"Saved: {out2}")
- # Phase 3
- used_plotly = False
- if args.plotly:
- used_plotly = render_3d_plotly(texture, height_norm, T, rain, out_path=out3,
- max_elev_m=args.max_elev, title='Phase 3: 3D Weather (Plotly)', pb=pb)
- if not used_plotly:
- if out3.lower().endswith(".html"):
- out3 = outputs_for_phase(3, args.out, False, stamp)
- render_3d_matplotlib(texture, height_norm, T, rain, u, v, out_path=out3, show=args.show,
- max_elev_m=args.max_elev, title='Phase 3: 3D Weather (Matplotlib)', pb=pb)
- print(f"Saved: {out3}")
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment