Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Grand‑Tour‑LLM
- ==============
- A *stand‑alone* Python script that automates a grand tour of an n‑dimensional
- dataset and lets an OpenAI *vision* model pick the k most interesting 2‑D
- projections.
- Usage (after `pip install -r requirements.txt` and setting $OPENAI_API_KEY):
- python grand_tour_llm.py data.csv --steps 5000 --out ./results \
- --model gpt-4o-vision-preview \
- --prompt "You are looking for small isolated clusters. Rate..."
- Arguments
- ---------
- Positional
- csv_path Path to a **numeric** CSV file (rows=samples, columns=features).
- Optional
- -o, --out Output directory (default ./llm_tour_results)
- -s, --steps Frames to explore (default 100)
- -m, --model ChatCompletion model name (default gpt-4o-vision-preview)
- -k, --keep Override k (top views kept). Default = round(sqrt(d)).
- -p, --prompt Override the system prompt that rates plots.
- --min-div Minimum Grassmann distance between kept planes (default 0.15)
- --seed Random seed.
- Outputs
- -------
- *out_dir*/
- view_01.png, view_02.png, ... – PNGs of top‑k scatterplots
- views.json – JSON with rating, summary, basis (flattened) per view
- Requirements
- ------------
- openai>=1.14.0, numpy, pandas, scipy, scikit‑learn, matplotlib, pillow, tqdm
- """
- from __future__ import annotations
- import os, io, json, base64, argparse, heapq, math, time, random
- from pathlib import Path
- import numpy as np
- import pandas as pd
- from tqdm import trange
- from scipy.linalg import subspace_angles
- from sklearn.preprocessing import RobustScaler
- from sklearn.decomposition import FastICA
- import matplotlib
- matplotlib.use("Agg") # headless
- import matplotlib.pyplot as plt
- import openai
- # ---------------------------------------------------------------------------
- # ------------------------------ HELPERS -----------------------------------
- # ---------------------------------------------------------------------------
- def robust_scale(X: np.ndarray) -> np.ndarray:
- """Center by median and scale by IQR (RobustScaler)."""
- scaler = RobustScaler().fit(X)
- return scaler.transform(X)
- def initial_plane_ica(X: np.ndarray) -> np.ndarray:
- """Return p×2 orthonormal basis spanning the first two independent components."""
- ica = FastICA(n_components=2, whiten="unit-variance", random_state=0)
- ica.fit(X)
- W = ica.components_.T # shape (p, 2)
- # Orthonormalise via QR so later Grassmann distances behave nicely
- Q, _ = np.linalg.qr(W)
- return Q[:, :2]
- def random_plane(p: int) -> np.ndarray:
- """Return a p×2 random orthonormal basis."""
- A = np.random.normal(size=(p, 2))
- Q, _ = np.linalg.qr(A)
- return Q[:, :2]
- def render_scatter(points: np.ndarray, labels=None, size=(512, 512)) -> bytes:
- """Render 2‑D points → PNG bytes."""
- fig = plt.figure(figsize=(size[0] / 100, size[1] / 100), dpi=100)
- ax = fig.add_subplot(111)
- ax.scatter(points[:, 0], points[:, 1], s=6, c=labels, cmap="tab10", alpha=0.8)
- ax.set_xticks([])
- ax.set_yticks([])
- ax.set_title("")
- buf = io.BytesIO()
- fig.savefig(buf, format="png", bbox_inches="tight")
- plt.close(fig)
- buf.seek(0)
- return buf.read()
- def grassmann_dist(B1: np.ndarray, B2: np.ndarray) -> float:
- """Principal‑angle (Riemannian) distance between two 2‑planes in R^p."""
- theta = subspace_angles(B1, B2)
- return float(np.linalg.norm(theta))
- # ---------------------------------------------------------------------------
- # --------------------------- LLM INTERACTION ------------------------------
- # ---------------------------------------------------------------------------
- DEFAULT_PROMPT = (
- "You are a data scientists visualizing scatter plots to get insight. "
- "Rate the scatterplot's interestingness from 1 (dull) to 10 (very revealing) "
- "*numerically* under the key 'rating' and explain why in <100 words under the key 'summary'. "
- "Look for clustering of data points, both major clustering and smallish isolated clusters, "
- "spot linear or non-linear dependencies, both global and applying only to a cluster. "
- "Return a *valid JSON object* with keys 'rating' (int) and 'summary' (str)."
- )
- def rate_with_llm(img_bytes: bytes, model: str, system_prompt: str) -> tuple[int, str]:
- """Call the chat model on one image. Returns (rating, summary)."""
- b64 = base64.b64encode(img_bytes).decode()
- client = openai.OpenAI()
- messages = [
- {"role": "system", "content": system_prompt},
- {
- "role": "user",
- "content": [
- {"type": "text", "text": "Here is the scatterplot:"},
- {
- "type": "image_url",
- "image_url": {"url": f"data:image/png;base64,{b64}", "detail": "low"},
- },
- ],
- },
- ]
- resp = client.chat.completions.create(model=model, messages=messages)
- try:
- js = json.loads(resp.choices[0].message.content)
- return int(js["rating"]), str(js["summary"])
- except Exception as e: # noqa: BLE001
- # Fallback: treat any parse failure as '1 – dull'
- return 1, f"(parse‑error) {e}: {resp.choices[0].message.content[:60]}..."
- # ---------------------------------------------------------------------------
- # ---------------------------- TOP‑K HEAP ----------------------------------
- # ---------------------------------------------------------------------------
- class TopKViews:
- def __init__(self, k: int, min_div: float):
- self.k = k
- self.min_div = min_div
- self.heap: list[tuple[int, np.ndarray, dict]] = [] # (score, basis, record)
- def maybe_add(self, record: dict):
- score, basis = record["score"], record["basis"]
- # Diversity check
- if any(grassmann_dist(basis, t[1]) < self.min_div for t in self.heap):
- return
- if len(self.heap) < self.k:
- heapq.heappush(self.heap, (score, basis, record))
- elif score > self.heap[0][0]:
- heapq.heapreplace(self.heap, (score, basis, record))
- def results(self):
- return sorted([t[2] for t in self.heap], key=lambda r: -r["score"])
- # ---------------------------------------------------------------------------
- # ------------------------------ MAIN LOOP ---------------------------------
- # ---------------------------------------------------------------------------
- def run_tour(
- X: np.ndarray,
- steps: int,
- model: str,
- prompt: str,
- k: int,
- min_div: float = 0.15,
- seed: int | None = None,
- ) -> list[dict]:
- if seed is not None:
- np.random.seed(seed)
- random.seed(seed)
- n, p = X.shape
- # Initial plane seeded by ICA
- basis0 = initial_plane_ica(X)
- topk = TopKViews(k, min_div)
- for t in trange(steps, desc="Tour", unit="view"):
- if t == 0:
- B = basis0
- else:
- B = random_plane(p)
- proj = X @ B # shape (n, 2)
- png = render_scatter(proj)
- score, summary = rate_with_llm(png, model, prompt)
- rec = {
- "score": score,
- "summary": summary,
- "basis": B.tolist(), # easier to json‑dump later
- "image_bytes": png,
- }
- topk.maybe_add(rec)
- return topk.results()
- # ---------------------------------------------------------------------------
- # ------------------------------- I/O --------------------------------------
- # ---------------------------------------------------------------------------
- def save_results(results: list[dict], out_dir: Path):
- out_dir.mkdir(parents=True, exist_ok=True)
- meta = []
- for rank, rec in enumerate(results, 1):
- fname = out_dir / f"view_{rank:02d}.png"
- with open(fname, "wb") as f:
- f.write(rec["image_bytes"])
- meta.append(
- {
- "rank": rank,
- "score": rec["score"],
- "summary": rec["summary"],
- "png": fname.name,
- "basis_flat": np.array(rec["basis"]).flatten().tolist(),
- }
- )
- with open(out_dir / "views.json", "w", encoding="utf-8") as f:
- json.dump(meta, f, indent=2)
- # ---------------------------------------------------------------------------
- # ------------------------------ SCRIPT ------------------------------------
- # ---------------------------------------------------------------------------
- def parse_args():
- ap = argparse.ArgumentParser(description="LLM‑guided grand tour")
- ap.add_argument("csv_path", type=str, help="Numeric CSV file to explore")
- ap.add_argument("-o", "--out", default="./llm_tour_results", type=str)
- ap.add_argument("-s", "--steps", default=100, type=int, help="Frames to sample")
- ap.add_argument("-m", "--model", default="gpt-4o-mini", type=str)
- ap.add_argument("-k", "--keep", type=int, help="Top‑k views to keep (default 3*sqrt(d))")
- ap.add_argument("-p", "--prompt", type=str, default=DEFAULT_PROMPT)
- ap.add_argument("--min-div", type=float, default=0.15)
- ap.add_argument("--seed", type=int)
- return ap.parse_args()
- def main():
- args = parse_args()
- # OpenAI key – prefer env var, fallback CLI (for dev convenience)
- if not openai.api_key: # empty string evaluates False
- openai.api_key = os.getenv("OPENAI_API_KEY")
- if not openai.api_key:
- raise RuntimeError("Set OPENAI_API_KEY environment variable or openai.api_key")
- df = pd.read_csv(args.csv_path)
- X = df.select_dtypes(include=["number"]).values
- if X.size == 0:
- raise ValueError("CSV does not contain numeric columns.")
- X = robust_scale(X)
- d = X.shape[1]
- k = args.keep if args.keep is not None else max(1, round(3*math.sqrt(d)))
- print(f"Data shape: n={X.shape[0]}, d={d}; keeping top‑k={k} views")
- results = run_tour(
- X,
- steps=args.steps,
- model=args.model,
- prompt=args.prompt,
- k=k,
- min_div=args.min_div,
- seed=args.seed,
- )
- out_dir = Path(args.out)
- save_results(results, out_dir)
- print(f"Saved {len(results)} views → {out_dir.resolve()}")
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment