import asyncio import aiohttp import sys from collections import deque from datetime import datetime API_URL = "https://nyanpass.com/api/get_count" INTERVAL = 0.1 short_history = deque(maxlen=int(60 / INTERVAL)) medium_history = deque(maxlen=int(3600 / INTERVAL)) long_history = deque(maxlen=int(86400 / INTERVAL)) async def fetch_count(session): try: async with session.get(API_URL) as resp: data = await resp.json() count = int(data.get("count", 0)) return count except Exception: return None def calculate_rate(history): if len(history) < 2: return 0 old_time, old_count = history[0] new_time, new_count = history[-1] elapsed_sec = (new_time - old_time).total_seconds() delta_count = new_count - old_count return delta_count / elapsed_sec if elapsed_sec > 0 else 0 async def display_loop(): async with aiohttp.ClientSession() as session: while True: count = await fetch_count(session) now = datetime.now() if count is not None: short_history.append((now, count)) medium_history.append((now, count)) long_history.append((now, count)) rate_short = calculate_rate(short_history) rate_medium = calculate_rate(medium_history) rate_long = calculate_rate(long_history) per_minute = rate_short * 60 per_hour = rate_medium * 3600 per_day = rate_long * 86400 sys.stdout.write( f"\rにゃんぱすーされた回数: {count:,}回 | " f"予測: 1分 {per_minute:,.1f}回 / " f"1時間 {per_hour:,.0f}回 / " f"1日 {per_day:,.0f}回" ) sys.stdout.flush() await asyncio.sleep(INTERVAL) if __name__ == "__main__": try: asyncio.run(display_loop()) except KeyboardInterrupt: print("\n終了しました。")