Guest User

prisjakt_market_index.py

a guest
Jan 25th, 2026
63
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 16.20 KB | Software | 0 0
  1. #!/usr/bin/env python3
  2. """
  3. prisjakt_market_index.py
  4.  
  5. Interactive Prisjakt index generator:
  6. - Accepts category or product URL
  7. - Outputs into output/<page_title>/
  8. - Prints progress per product (no silent waiting)
  9. - Per-product hard timeout so it can't "hang" forever
  10.  
  11. Outputs:
  12. output/<run_name>/
  13. market_price_index.png
  14. data_coverage.png
  15. price_history_daily_last_1y.csv
  16. run_log.txt
  17. debug/
  18. *_history_request.json
  19. *_history_response.json
  20. """
  21.  
  22. import json
  23. import os
  24. import random
  25. import re
  26. import time
  27. from dataclasses import dataclass
  28. from datetime import datetime, timedelta, timezone
  29. from typing import Any, Dict, List, Optional
  30.  
  31. import numpy as np
  32. import pandas as pd
  33. import matplotlib.pyplot as plt
  34. from playwright.sync_api import sync_playwright
  35.  
  36.  
  37. # ---------------- config ----------------
  38.  
  39. HEADLESS = True
  40. DAYS = 90
  41. DEFAULT_TOP_N = 50
  42.  
  43. # Cloudflare safety knobs
  44. DELAY_MIN_S = 4.0
  45. DELAY_MAX_S = 9.0
  46.  
  47. # Per-product hard timeout (seconds)
  48. PER_PRODUCT_TIMEOUT_S = 75
  49.  
  50. SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
  51. BASE_OUT_DIR = os.path.join(SCRIPT_DIR, "output")
  52.  
  53. # ----------------------------------------
  54.  
  55.  
  56. @dataclass
  57. class Product:
  58. name: str
  59. url: str
  60. pid: str
  61.  
  62.  
  63. def now_utc() -> datetime:
  64. return datetime.now(timezone.utc)
  65.  
  66.  
  67. def slug(s: str, max_len: int = 80) -> str:
  68. s = re.sub(r"[^\wæøåÆØÅ\-]+", "_", (s or "").strip(), flags=re.UNICODE)
  69. s = re.sub(r"_+", "_", s).strip("_")
  70. return s[:max_len] if s else "prisjakt_run"
  71.  
  72.  
  73. def parse_pid(url: str) -> Optional[str]:
  74. m = re.search(r"[?&]p=(\d+)", url)
  75. return m.group(1) if m else None
  76.  
  77.  
  78. def is_category_url(url: str) -> bool:
  79. return "/c/" in url
  80.  
  81.  
  82. def is_product_url(url: str) -> bool:
  83. return "/product.php" in url and "p=" in url
  84.  
  85.  
  86. def ensure_sort_popularity(url: str) -> str:
  87. # Do NOT override if you already provided sort=...
  88. if is_category_url(url) and "sort=" not in url:
  89. joiner = "&" if "?" in url else "?"
  90. return f"{url}{joiner}sort=popularity"
  91. return url
  92.  
  93.  
  94. def human_delay() -> None:
  95. time.sleep(random.uniform(DELAY_MIN_S, DELAY_MAX_S))
  96.  
  97.  
  98. def safe_click(locator, timeout_ms: int = 2000) -> bool:
  99. try:
  100. locator.scroll_into_view_if_needed(timeout=timeout_ms)
  101. except Exception:
  102. pass
  103. try:
  104. locator.click(timeout=timeout_ms)
  105. return True
  106. except Exception:
  107. pass
  108. try:
  109. locator.click(timeout=timeout_ms, force=True)
  110. return True
  111. except Exception:
  112. return False
  113.  
  114.  
  115. def dismiss_cookie_banners(page) -> None:
  116. selectors = [
  117. "button:has-text('Godta alle')",
  118. "button:has-text('Godta')",
  119. "button:has-text('Aksepter alle')",
  120. "button:has-text('Aksepter')",
  121. "button:has-text('Accept all')",
  122. "button:has-text('Accept')",
  123. "#onetrust-accept-btn-handler",
  124. "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
  125. "#CybotCookiebotDialogBodyButtonAccept",
  126. ]
  127.  
  128. for _ in range(4):
  129. for sel in selectors:
  130. try:
  131. loc = page.locator(sel).first
  132. if loc.count() and loc.is_visible():
  133. safe_click(loc, 2500)
  134. page.wait_for_timeout(250)
  135. except Exception:
  136. pass
  137.  
  138. # try iframes
  139. try:
  140. for f in page.frames:
  141. for sel in selectors:
  142. try:
  143. loc = f.locator(sel).first
  144. if loc.count() and loc.is_visible():
  145. safe_click(loc, 2500)
  146. page.wait_for_timeout(250)
  147. except Exception:
  148. pass
  149. except Exception:
  150. pass
  151.  
  152. try:
  153. page.keyboard.press("Escape")
  154. except Exception:
  155. pass
  156.  
  157.  
  158. def extract_products_from_category(page, top_n: int) -> List[Product]:
  159. products: List[Product] = []
  160. seen = set()
  161.  
  162. links = page.locator("a[href*='/product.php?p=']")
  163. n = min(links.count(), 800)
  164.  
  165. for i in range(n):
  166. try:
  167. href = links.nth(i).get_attribute("href") or ""
  168. if not href:
  169. continue
  170. pid = parse_pid(href)
  171. if not pid or pid in seen:
  172. continue
  173.  
  174. url = "https://www.prisjakt.no" + href if href.startswith("/") else href
  175. name = re.sub(r"\s+", " ", (links.nth(i).inner_text() or "").strip())
  176. if len(name) < 6:
  177. continue
  178.  
  179. products.append(Product(name=name, url=url, pid=pid))
  180. seen.add(pid)
  181.  
  182. if len(products) >= top_n:
  183. break
  184. except Exception:
  185. continue
  186.  
  187. return products
  188.  
  189.  
  190. def goto_statistics(page, product_url: str) -> None:
  191. url = product_url if "#statistics" in product_url else product_url + "#statistics"
  192.  
  193. page.goto(url, wait_until="domcontentloaded", timeout=60000)
  194. dismiss_cookie_banners(page)
  195.  
  196. # Try to activate the statistics tab/section
  197. for sel in [
  198. "a[data-test='PriceHistorySummary']",
  199. "a[aria-label='Prishistorikk']",
  200. "a:has-text('Prishistorikk')",
  201. "a[href*='#statistics']",
  202. "text=Se prishistorikk",
  203. ]:
  204. try:
  205. loc = page.locator(sel).first
  206. if loc.count() and loc.is_visible():
  207. safe_click(loc, 3000)
  208. page.wait_for_timeout(500)
  209. break
  210. except Exception:
  211. pass
  212.  
  213. page.wait_for_timeout(1500)
  214.  
  215.  
  216. def has_historyv2(payload: Any) -> bool:
  217. try:
  218. return isinstance(payload["data"]["product"]["historyV2"]["historyItems"], list)
  219. except Exception:
  220. return False
  221.  
  222.  
  223. def patch_variables_for_1y(vars_in: Any) -> Any:
  224. if not isinstance(vars_in, dict):
  225. return vars_in
  226.  
  227. v = dict(vars_in)
  228. now = now_utc()
  229. start = now - timedelta(days=DAYS)
  230.  
  231. start_iso = start.strftime("%Y-%m-%dT%H:%M:%S.000Z")
  232. now_iso = now.strftime("%Y-%m-%dT%H:%M:%S.000Z")
  233.  
  234. v["from"] = start_iso
  235. v["to"] = now_iso
  236.  
  237. for k in list(v.keys()):
  238. if k.lower() in ("limit", "first", "last", "take", "size", "count", "pagesize"):
  239. v[k] = 5000
  240.  
  241. return v
  242.  
  243.  
  244. def events_to_daily(items: List[Dict[str, Any]]) -> pd.DataFrame:
  245. rows = []
  246. for it in items:
  247. d = it.get("date")
  248. p = it.get("price")
  249. if not d or p is None:
  250. continue
  251. dt = pd.to_datetime(d, utc=True, errors="coerce")
  252. if pd.isna(dt):
  253. continue
  254. try:
  255. price = float(p)
  256. except Exception:
  257. continue
  258. rows.append((dt, price))
  259.  
  260. if not rows:
  261. return pd.DataFrame(columns=["price"])
  262.  
  263. df = pd.DataFrame(rows, columns=["dt", "price"]).sort_values("dt")
  264. df = df.drop_duplicates(subset=["dt"], keep="last").set_index("dt")
  265.  
  266. end = pd.Timestamp(now_utc()).normalize()
  267. start = end - pd.Timedelta(days=DAYS)
  268. idx = pd.date_range(start=start, end=end, freq="D", tz="UTC")
  269.  
  270. daily = df["price"].resample("D").last().reindex(idx).ffill()
  271. return pd.DataFrame({"price": daily})
  272.  
  273.  
  274. def build_market_index(df_prices: pd.DataFrame, coverage_fraction: float = 0.70, min_count: int = 5):
  275. norm = pd.DataFrame(index=df_prices.index)
  276.  
  277. for c in df_prices.columns:
  278. s = df_prices[c]
  279. fv = s.first_valid_index()
  280. if fv is None:
  281. continue
  282. base = s.loc[fv]
  283. if pd.isna(base) or base <= 0:
  284. continue
  285. norm[c] = (s / base) * 100.0
  286.  
  287. coverage = norm.notna().sum(axis=1)
  288. threshold = max(min_count, int(np.ceil(coverage_fraction * norm.shape[1]))) if norm.shape[1] else min_count
  289. valid = coverage[coverage >= threshold]
  290. start_date = valid.index.min() if len(valid) else norm.index.min()
  291.  
  292. norm_trim = norm.loc[start_date:].copy()
  293. coverage_trim = coverage.loc[start_date:].copy()
  294.  
  295. median = norm_trim.median(axis=1, skipna=True)
  296. q25 = norm_trim.quantile(0.25, axis=1)
  297. q75 = norm_trim.quantile(0.75, axis=1)
  298.  
  299. return norm_trim, coverage_trim, threshold, start_date, median, q25, q75
  300.  
  301.  
  302. def plot_index(median, q25, q75, out_png: str) -> None:
  303. plt.figure(figsize=(16, 7))
  304. plt.fill_between(median.index, q25.values, q75.values, alpha=0.2) # fixed
  305. plt.plot(median.index, median.values, linewidth=2)
  306. plt.title("Market price index (normalized, trimmed for sparse data)")
  307. plt.ylabel("Index (100 = first observed price per item)")
  308. plt.grid(True, alpha=0.25)
  309. plt.tight_layout()
  310. plt.savefig(out_png, dpi=200, bbox_inches="tight")
  311. plt.close()
  312.  
  313.  
  314. def plot_coverage(coverage, threshold: int, total: int, out_png: str) -> None:
  315. plt.figure(figsize=(16, 4.8))
  316. plt.plot(coverage.index, coverage.values, linewidth=2)
  317. plt.title(f"Data coverage over time (threshold {threshold}/{total})")
  318. plt.ylabel("Items with price data")
  319. plt.grid(True, alpha=0.25)
  320. plt.tight_layout()
  321. plt.savefig(out_png, dpi=200, bbox_inches="tight")
  322. plt.close()
  323.  
  324.  
  325. def main() -> None:
  326. print("Starting Prisjakt indexer...", flush=True)
  327.  
  328. url = input("Enter Prisjakt URL: ").strip()
  329. if not url:
  330. raise SystemExit("No URL provided.")
  331. url = ensure_sort_popularity(url)
  332.  
  333. top_n = DEFAULT_TOP_N
  334. if is_category_url(url):
  335. raw = input(f"Top N products [default {DEFAULT_TOP_N}]: ").strip()
  336. if raw:
  337. top_n = max(1, min(50, int(raw)))
  338.  
  339. with sync_playwright() as p:
  340. browser = p.chromium.launch(headless=HEADLESS)
  341. ctx = browser.new_context(
  342. locale="nb-NO",
  343. user_agent=(
  344. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  345. "AppleWebKit/537.36 (KHTML, like Gecko) "
  346. "Chrome/122.0.0.0 Safari/537.36"
  347. ),
  348. )
  349.  
  350. page = ctx.new_page()
  351. print("Loading URL...", flush=True)
  352. page.goto(url, wait_until="domcontentloaded", timeout=60000)
  353. dismiss_cookie_banners(page)
  354. page.wait_for_timeout(800)
  355.  
  356. page_title = page.title() or f"prisjakt_{int(time.time())}"
  357. run_name = slug(page_title)
  358. out_dir = os.path.join(BASE_OUT_DIR, run_name)
  359. debug_dir = os.path.join(out_dir, "debug")
  360. os.makedirs(debug_dir, exist_ok=True)
  361.  
  362. log_path = os.path.join(out_dir, "run_log.txt")
  363.  
  364. def log(msg: str) -> None:
  365. print(msg, flush=True)
  366. with open(log_path, "a", encoding="utf-8") as f:
  367. f.write(msg + "\n")
  368.  
  369. log(f"Output folder: {out_dir}")
  370. log(f"Title: {page_title}")
  371. log(f"URL: {url}")
  372.  
  373. products: List[Product] = []
  374. if is_product_url(url):
  375. pid = parse_pid(url) or "unknown"
  376. products = [Product(name=page_title, url=url, pid=pid)]
  377. else:
  378. try:
  379. page.wait_for_selector("a[href*='/product.php?p=']", timeout=60000)
  380. except Exception:
  381. pass
  382. products = extract_products_from_category(page, top_n=top_n)
  383.  
  384. if not products:
  385. browser.close()
  386. raise SystemExit("No products found. Possibly consent wall or blocked.")
  387.  
  388. log(f"Products selected: {len(products)}")
  389.  
  390. prod_page = ctx.new_page()
  391. series_map: Dict[str, pd.DataFrame] = {}
  392.  
  393. for i, pr in enumerate(products, 1):
  394. t0 = time.time()
  395. log(f"\n[{i:02d}/{len(products)}] Visiting {pr.pid} {pr.name}")
  396.  
  397. captured_req: Optional[Dict[str, Any]] = None
  398.  
  399. def on_response(resp):
  400. nonlocal captured_req
  401. try:
  402. if "/_internal/bff" not in resp.url:
  403. return
  404. ctype = (resp.headers.get("content-type") or "").lower()
  405. if "json" not in ctype and "+json" not in ctype:
  406. return
  407. data = resp.json()
  408. if has_historyv2(data):
  409. post = resp.request.post_data
  410. if post:
  411. captured_req = json.loads(post)
  412. except Exception:
  413. return
  414.  
  415. prod_page.on("response", on_response)
  416.  
  417. try:
  418. goto_statistics(prod_page, pr.url)
  419. except Exception as e:
  420. log(f"[WARN] Failed to load product page: {e}")
  421. prod_page.remove_listener("response", on_response)
  422. human_delay()
  423. continue
  424.  
  425. while captured_req is None and (time.time() - t0) < PER_PRODUCT_TIMEOUT_S:
  426. prod_page.wait_for_timeout(300)
  427.  
  428. if captured_req is None:
  429. log("[WARN] No historyV2 request captured (timeout).")
  430. prod_page.remove_listener("response", on_response)
  431. human_delay()
  432. continue
  433.  
  434. req_path = os.path.join(debug_dir, f"{pr.pid}_{slug(pr.name)}_history_request.json")
  435. with open(req_path, "w", encoding="utf-8") as f:
  436. json.dump(captured_req, f, ensure_ascii=False, indent=2)
  437.  
  438. replay_payload = dict(captured_req)
  439. replay_payload["variables"] = patch_variables_for_1y(replay_payload.get("variables", {}))
  440.  
  441. try:
  442. r = ctx.request.post(
  443. "https://www.prisjakt.no/_internal/bff",
  444. data=json.dumps(replay_payload),
  445. headers={"content-type": "application/json"},
  446. timeout=60000,
  447. )
  448. expanded = r.json()
  449. except Exception as e:
  450. log(f"[WARN] Replay failed: {e}")
  451. prod_page.remove_listener("response", on_response)
  452. human_delay()
  453. continue
  454.  
  455. resp_path = os.path.join(debug_dir, f"{pr.pid}_{slug(pr.name)}_history_response.json")
  456. with open(resp_path, "w", encoding="utf-8") as f:
  457. json.dump(expanded, f, ensure_ascii=False, indent=2)
  458.  
  459. if not has_historyv2(expanded):
  460. log("[WARN] Replay response missing historyV2.")
  461. prod_page.remove_listener("response", on_response)
  462. human_delay()
  463. continue
  464.  
  465. items = expanded["data"]["product"]["historyV2"]["historyItems"] or []
  466. df_daily = events_to_daily(items)
  467.  
  468. if df_daily.empty or df_daily["price"].isna().all():
  469. log("[WARN] No usable daily series.")
  470. prod_page.remove_listener("response", on_response)
  471. human_delay()
  472. continue
  473.  
  474. series_map[pr.name] = df_daily
  475. log(f"[OK] events={len(items)} daily_points={len(df_daily)} captured_in={time.time()-t0:.1f}s")
  476.  
  477. prod_page.remove_listener("response", on_response)
  478. human_delay()
  479.  
  480. browser.close()
  481.  
  482. if not series_map:
  483. raise SystemExit("No series extracted. Check output/<run>/debug and run_log.txt")
  484.  
  485. wide = pd.concat({k: v["price"] for k, v in series_map.items()}, axis=1)
  486.  
  487. csv_path = os.path.join(out_dir, "price_history_daily_last_1y.csv")
  488. wide.to_csv(csv_path, index=True)
  489.  
  490. norm_trim, coverage_trim, threshold, start_date, median, q25, q75 = build_market_index(wide)
  491.  
  492. idx_png = os.path.join(out_dir, "market_price_index.png")
  493. cov_png = os.path.join(out_dir, "data_coverage.png")
  494. plot_index(median, q25, q75, idx_png)
  495. plot_coverage(coverage_trim, threshold, norm_trim.shape[1], cov_png)
  496.  
  497. med = median.dropna()
  498. if len(med) >= 2:
  499. pct = (med.iloc[-1] / med.iloc[0] - 1.0) * 100.0
  500. print(f"\nTrim start date: {start_date}", flush=True)
  501. print(f"Median index change: {med.iloc[0]:.2f} -> {med.iloc[-1]:.2f} ({pct:.2f}%)", flush=True)
  502.  
  503. print("\nWrote:", flush=True)
  504. print(f" {csv_path}", flush=True)
  505. print(f" {idx_png}", flush=True)
  506. print(f" {cov_png}", flush=True)
  507. print(f" {os.path.join(out_dir, 'run_log.txt')}", flush=True)
  508.  
  509.  
  510. if __name__ == "__main__":
  511. main()
  512.  
Advertisement
Add Comment
Please, Sign In to add comment