Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import aiohttp
- import sys
- from collections import deque
- from datetime import datetime
- API_URL = "https://nyanpass.com/api/get_count"
- INTERVAL = 0.1
- short_history = deque(maxlen=int(60 / INTERVAL))
- medium_history = deque(maxlen=int(3600 / INTERVAL))
- long_history = deque(maxlen=int(86400 / INTERVAL))
- async def fetch_count(session):
- try:
- async with session.get(API_URL) as resp:
- data = await resp.json()
- count = int(data.get("count", 0))
- return count
- except Exception:
- return None
- def calculate_rate(history):
- if len(history) < 2:
- return 0
- old_time, old_count = history[0]
- new_time, new_count = history[-1]
- elapsed_sec = (new_time - old_time).total_seconds()
- delta_count = new_count - old_count
- return delta_count / elapsed_sec if elapsed_sec > 0 else 0
- async def display_loop():
- async with aiohttp.ClientSession() as session:
- while True:
- count = await fetch_count(session)
- now = datetime.now()
- if count is not None:
- short_history.append((now, count))
- medium_history.append((now, count))
- long_history.append((now, count))
- rate_short = calculate_rate(short_history)
- rate_medium = calculate_rate(medium_history)
- rate_long = calculate_rate(long_history)
- per_minute = rate_short * 60
- per_hour = rate_medium * 3600
- per_day = rate_long * 86400
- sys.stdout.write(
- f"\rにゃんぱすーされた回数: {count:,}回 | "
- f"予測: 1分 {per_minute:,.1f}回 / "
- f"1時間 {per_hour:,.0f}回 / "
- f"1日 {per_day:,.0f}回"
- )
- sys.stdout.flush()
- await asyncio.sleep(INTERVAL)
- if __name__ == "__main__":
- try:
- asyncio.run(display_loop())
- except KeyboardInterrupt:
- print("\n終了しました。")
Advertisement
Add Comment
Please, Sign In to add comment