Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- prisjakt_market_index.py
- Interactive Prisjakt index generator:
- - Accepts category or product URL
- - Outputs into output/<page_title>/
- - Prints progress per product (no silent waiting)
- - Per-product hard timeout so it can't "hang" forever
- Outputs:
- output/<run_name>/
- market_price_index.png
- data_coverage.png
- price_history_daily_last_1y.csv
- run_log.txt
- debug/
- *_history_request.json
- *_history_response.json
- """
- import json
- import os
- import random
- import re
- import time
- from dataclasses import dataclass
- from datetime import datetime, timedelta, timezone
- from typing import Any, Dict, List, Optional
- import numpy as np
- import pandas as pd
- import matplotlib.pyplot as plt
- from playwright.sync_api import sync_playwright
- # ---------------- config ----------------
- HEADLESS = True
- DAYS = 90
- DEFAULT_TOP_N = 50
- # Cloudflare safety knobs
- DELAY_MIN_S = 4.0
- DELAY_MAX_S = 9.0
- # Per-product hard timeout (seconds)
- PER_PRODUCT_TIMEOUT_S = 75
- SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
- BASE_OUT_DIR = os.path.join(SCRIPT_DIR, "output")
- # ----------------------------------------
- @dataclass
- class Product:
- name: str
- url: str
- pid: str
- def now_utc() -> datetime:
- return datetime.now(timezone.utc)
- def slug(s: str, max_len: int = 80) -> str:
- s = re.sub(r"[^\wæøåÆØÅ\-]+", "_", (s or "").strip(), flags=re.UNICODE)
- s = re.sub(r"_+", "_", s).strip("_")
- return s[:max_len] if s else "prisjakt_run"
- def parse_pid(url: str) -> Optional[str]:
- m = re.search(r"[?&]p=(\d+)", url)
- return m.group(1) if m else None
- def is_category_url(url: str) -> bool:
- return "/c/" in url
- def is_product_url(url: str) -> bool:
- return "/product.php" in url and "p=" in url
- def ensure_sort_popularity(url: str) -> str:
- # Do NOT override if you already provided sort=...
- if is_category_url(url) and "sort=" not in url:
- joiner = "&" if "?" in url else "?"
- return f"{url}{joiner}sort=popularity"
- return url
- def human_delay() -> None:
- time.sleep(random.uniform(DELAY_MIN_S, DELAY_MAX_S))
- def safe_click(locator, timeout_ms: int = 2000) -> bool:
- try:
- locator.scroll_into_view_if_needed(timeout=timeout_ms)
- except Exception:
- pass
- try:
- locator.click(timeout=timeout_ms)
- return True
- except Exception:
- pass
- try:
- locator.click(timeout=timeout_ms, force=True)
- return True
- except Exception:
- return False
- def dismiss_cookie_banners(page) -> None:
- selectors = [
- "button:has-text('Godta alle')",
- "button:has-text('Godta')",
- "button:has-text('Aksepter alle')",
- "button:has-text('Aksepter')",
- "button:has-text('Accept all')",
- "button:has-text('Accept')",
- "#onetrust-accept-btn-handler",
- "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
- "#CybotCookiebotDialogBodyButtonAccept",
- ]
- for _ in range(4):
- for sel in selectors:
- try:
- loc = page.locator(sel).first
- if loc.count() and loc.is_visible():
- safe_click(loc, 2500)
- page.wait_for_timeout(250)
- except Exception:
- pass
- # try iframes
- try:
- for f in page.frames:
- for sel in selectors:
- try:
- loc = f.locator(sel).first
- if loc.count() and loc.is_visible():
- safe_click(loc, 2500)
- page.wait_for_timeout(250)
- except Exception:
- pass
- except Exception:
- pass
- try:
- page.keyboard.press("Escape")
- except Exception:
- pass
- def extract_products_from_category(page, top_n: int) -> List[Product]:
- products: List[Product] = []
- seen = set()
- links = page.locator("a[href*='/product.php?p=']")
- n = min(links.count(), 800)
- for i in range(n):
- try:
- href = links.nth(i).get_attribute("href") or ""
- if not href:
- continue
- pid = parse_pid(href)
- if not pid or pid in seen:
- continue
- url = "https://www.prisjakt.no" + href if href.startswith("/") else href
- name = re.sub(r"\s+", " ", (links.nth(i).inner_text() or "").strip())
- if len(name) < 6:
- continue
- products.append(Product(name=name, url=url, pid=pid))
- seen.add(pid)
- if len(products) >= top_n:
- break
- except Exception:
- continue
- return products
- def goto_statistics(page, product_url: str) -> None:
- url = product_url if "#statistics" in product_url else product_url + "#statistics"
- page.goto(url, wait_until="domcontentloaded", timeout=60000)
- dismiss_cookie_banners(page)
- # Try to activate the statistics tab/section
- for sel in [
- "a[data-test='PriceHistorySummary']",
- "a[aria-label='Prishistorikk']",
- "a:has-text('Prishistorikk')",
- "a[href*='#statistics']",
- "text=Se prishistorikk",
- ]:
- try:
- loc = page.locator(sel).first
- if loc.count() and loc.is_visible():
- safe_click(loc, 3000)
- page.wait_for_timeout(500)
- break
- except Exception:
- pass
- page.wait_for_timeout(1500)
- def has_historyv2(payload: Any) -> bool:
- try:
- return isinstance(payload["data"]["product"]["historyV2"]["historyItems"], list)
- except Exception:
- return False
- def patch_variables_for_1y(vars_in: Any) -> Any:
- if not isinstance(vars_in, dict):
- return vars_in
- v = dict(vars_in)
- now = now_utc()
- start = now - timedelta(days=DAYS)
- start_iso = start.strftime("%Y-%m-%dT%H:%M:%S.000Z")
- now_iso = now.strftime("%Y-%m-%dT%H:%M:%S.000Z")
- v["from"] = start_iso
- v["to"] = now_iso
- for k in list(v.keys()):
- if k.lower() in ("limit", "first", "last", "take", "size", "count", "pagesize"):
- v[k] = 5000
- return v
- def events_to_daily(items: List[Dict[str, Any]]) -> pd.DataFrame:
- rows = []
- for it in items:
- d = it.get("date")
- p = it.get("price")
- if not d or p is None:
- continue
- dt = pd.to_datetime(d, utc=True, errors="coerce")
- if pd.isna(dt):
- continue
- try:
- price = float(p)
- except Exception:
- continue
- rows.append((dt, price))
- if not rows:
- return pd.DataFrame(columns=["price"])
- df = pd.DataFrame(rows, columns=["dt", "price"]).sort_values("dt")
- df = df.drop_duplicates(subset=["dt"], keep="last").set_index("dt")
- end = pd.Timestamp(now_utc()).normalize()
- start = end - pd.Timedelta(days=DAYS)
- idx = pd.date_range(start=start, end=end, freq="D", tz="UTC")
- daily = df["price"].resample("D").last().reindex(idx).ffill()
- return pd.DataFrame({"price": daily})
- def build_market_index(df_prices: pd.DataFrame, coverage_fraction: float = 0.70, min_count: int = 5):
- norm = pd.DataFrame(index=df_prices.index)
- for c in df_prices.columns:
- s = df_prices[c]
- fv = s.first_valid_index()
- if fv is None:
- continue
- base = s.loc[fv]
- if pd.isna(base) or base <= 0:
- continue
- norm[c] = (s / base) * 100.0
- coverage = norm.notna().sum(axis=1)
- threshold = max(min_count, int(np.ceil(coverage_fraction * norm.shape[1]))) if norm.shape[1] else min_count
- valid = coverage[coverage >= threshold]
- start_date = valid.index.min() if len(valid) else norm.index.min()
- norm_trim = norm.loc[start_date:].copy()
- coverage_trim = coverage.loc[start_date:].copy()
- median = norm_trim.median(axis=1, skipna=True)
- q25 = norm_trim.quantile(0.25, axis=1)
- q75 = norm_trim.quantile(0.75, axis=1)
- return norm_trim, coverage_trim, threshold, start_date, median, q25, q75
- def plot_index(median, q25, q75, out_png: str) -> None:
- plt.figure(figsize=(16, 7))
- plt.fill_between(median.index, q25.values, q75.values, alpha=0.2) # fixed
- plt.plot(median.index, median.values, linewidth=2)
- plt.title("Market price index (normalized, trimmed for sparse data)")
- plt.ylabel("Index (100 = first observed price per item)")
- plt.grid(True, alpha=0.25)
- plt.tight_layout()
- plt.savefig(out_png, dpi=200, bbox_inches="tight")
- plt.close()
- def plot_coverage(coverage, threshold: int, total: int, out_png: str) -> None:
- plt.figure(figsize=(16, 4.8))
- plt.plot(coverage.index, coverage.values, linewidth=2)
- plt.title(f"Data coverage over time (threshold {threshold}/{total})")
- plt.ylabel("Items with price data")
- plt.grid(True, alpha=0.25)
- plt.tight_layout()
- plt.savefig(out_png, dpi=200, bbox_inches="tight")
- plt.close()
- def main() -> None:
- print("Starting Prisjakt indexer...", flush=True)
- url = input("Enter Prisjakt URL: ").strip()
- if not url:
- raise SystemExit("No URL provided.")
- url = ensure_sort_popularity(url)
- top_n = DEFAULT_TOP_N
- if is_category_url(url):
- raw = input(f"Top N products [default {DEFAULT_TOP_N}]: ").strip()
- if raw:
- top_n = max(1, min(50, int(raw)))
- with sync_playwright() as p:
- browser = p.chromium.launch(headless=HEADLESS)
- ctx = browser.new_context(
- locale="nb-NO",
- user_agent=(
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/122.0.0.0 Safari/537.36"
- ),
- )
- page = ctx.new_page()
- print("Loading URL...", flush=True)
- page.goto(url, wait_until="domcontentloaded", timeout=60000)
- dismiss_cookie_banners(page)
- page.wait_for_timeout(800)
- page_title = page.title() or f"prisjakt_{int(time.time())}"
- run_name = slug(page_title)
- out_dir = os.path.join(BASE_OUT_DIR, run_name)
- debug_dir = os.path.join(out_dir, "debug")
- os.makedirs(debug_dir, exist_ok=True)
- log_path = os.path.join(out_dir, "run_log.txt")
- def log(msg: str) -> None:
- print(msg, flush=True)
- with open(log_path, "a", encoding="utf-8") as f:
- f.write(msg + "\n")
- log(f"Output folder: {out_dir}")
- log(f"Title: {page_title}")
- log(f"URL: {url}")
- products: List[Product] = []
- if is_product_url(url):
- pid = parse_pid(url) or "unknown"
- products = [Product(name=page_title, url=url, pid=pid)]
- else:
- try:
- page.wait_for_selector("a[href*='/product.php?p=']", timeout=60000)
- except Exception:
- pass
- products = extract_products_from_category(page, top_n=top_n)
- if not products:
- browser.close()
- raise SystemExit("No products found. Possibly consent wall or blocked.")
- log(f"Products selected: {len(products)}")
- prod_page = ctx.new_page()
- series_map: Dict[str, pd.DataFrame] = {}
- for i, pr in enumerate(products, 1):
- t0 = time.time()
- log(f"\n[{i:02d}/{len(products)}] Visiting {pr.pid} {pr.name}")
- captured_req: Optional[Dict[str, Any]] = None
- def on_response(resp):
- nonlocal captured_req
- try:
- if "/_internal/bff" not in resp.url:
- return
- ctype = (resp.headers.get("content-type") or "").lower()
- if "json" not in ctype and "+json" not in ctype:
- return
- data = resp.json()
- if has_historyv2(data):
- post = resp.request.post_data
- if post:
- captured_req = json.loads(post)
- except Exception:
- return
- prod_page.on("response", on_response)
- try:
- goto_statistics(prod_page, pr.url)
- except Exception as e:
- log(f"[WARN] Failed to load product page: {e}")
- prod_page.remove_listener("response", on_response)
- human_delay()
- continue
- while captured_req is None and (time.time() - t0) < PER_PRODUCT_TIMEOUT_S:
- prod_page.wait_for_timeout(300)
- if captured_req is None:
- log("[WARN] No historyV2 request captured (timeout).")
- prod_page.remove_listener("response", on_response)
- human_delay()
- continue
- req_path = os.path.join(debug_dir, f"{pr.pid}_{slug(pr.name)}_history_request.json")
- with open(req_path, "w", encoding="utf-8") as f:
- json.dump(captured_req, f, ensure_ascii=False, indent=2)
- replay_payload = dict(captured_req)
- replay_payload["variables"] = patch_variables_for_1y(replay_payload.get("variables", {}))
- try:
- r = ctx.request.post(
- "https://www.prisjakt.no/_internal/bff",
- data=json.dumps(replay_payload),
- headers={"content-type": "application/json"},
- timeout=60000,
- )
- expanded = r.json()
- except Exception as e:
- log(f"[WARN] Replay failed: {e}")
- prod_page.remove_listener("response", on_response)
- human_delay()
- continue
- resp_path = os.path.join(debug_dir, f"{pr.pid}_{slug(pr.name)}_history_response.json")
- with open(resp_path, "w", encoding="utf-8") as f:
- json.dump(expanded, f, ensure_ascii=False, indent=2)
- if not has_historyv2(expanded):
- log("[WARN] Replay response missing historyV2.")
- prod_page.remove_listener("response", on_response)
- human_delay()
- continue
- items = expanded["data"]["product"]["historyV2"]["historyItems"] or []
- df_daily = events_to_daily(items)
- if df_daily.empty or df_daily["price"].isna().all():
- log("[WARN] No usable daily series.")
- prod_page.remove_listener("response", on_response)
- human_delay()
- continue
- series_map[pr.name] = df_daily
- log(f"[OK] events={len(items)} daily_points={len(df_daily)} captured_in={time.time()-t0:.1f}s")
- prod_page.remove_listener("response", on_response)
- human_delay()
- browser.close()
- if not series_map:
- raise SystemExit("No series extracted. Check output/<run>/debug and run_log.txt")
- wide = pd.concat({k: v["price"] for k, v in series_map.items()}, axis=1)
- csv_path = os.path.join(out_dir, "price_history_daily_last_1y.csv")
- wide.to_csv(csv_path, index=True)
- norm_trim, coverage_trim, threshold, start_date, median, q25, q75 = build_market_index(wide)
- idx_png = os.path.join(out_dir, "market_price_index.png")
- cov_png = os.path.join(out_dir, "data_coverage.png")
- plot_index(median, q25, q75, idx_png)
- plot_coverage(coverage_trim, threshold, norm_trim.shape[1], cov_png)
- med = median.dropna()
- if len(med) >= 2:
- pct = (med.iloc[-1] / med.iloc[0] - 1.0) * 100.0
- print(f"\nTrim start date: {start_date}", flush=True)
- print(f"Median index change: {med.iloc[0]:.2f} -> {med.iloc[-1]:.2f} ({pct:.2f}%)", flush=True)
- print("\nWrote:", flush=True)
- print(f" {csv_path}", flush=True)
- print(f" {idx_png}", flush=True)
- print(f" {cov_png}", flush=True)
- print(f" {os.path.join(out_dir, 'run_log.txt')}", flush=True)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment