Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Top AEW Feed Generator with Caching
- Same ranking algorithm as before, but with added caching layer to improve performance.
- Cache invalidates after 5 minutes or when blocked DIDs change.
- """
- from datetime import datetime, timedelta, UTC
- from typing import Optional, List, Dict, Tuple
- import math
- from functools import lru_cache
- import json
- from server import config
- from server.database import Post, AEWNetwork, db
- from server.logger import logger
- from server.admin_blocks import get_blocked_dids
- from peewee import fn, SQL
- # Feed identifier
- uri = f"at://did:plc:5kk6tru4mdazgfmgyflvz3k2/app.bsky.feed.generator/top-aew"
- CURSOR_EOF = 'eof'
- CACHE_DURATION = 300 # Cache duration in seconds (5 minutes)
- # Enhanced cache structure
- _feed_cache = {
- 'timestamp': None, # When the cache was last updated
- 'feeds': {}, # Dict of cursor -> feed data
- 'blocked_hash': None, # Hash of blocked DIDs for cache invalidation
- }
- def get_blocked_hash(blocked_dids: set) -> str:
- """Create a hash of blocked DIDs for cache invalidation"""
- return hash(frozenset(blocked_dids))
- def is_cache_valid() -> bool:
- """Check if cache is still valid"""
- if not _feed_cache['timestamp']:
- return False
- # Check cache age
- age = (datetime.now(UTC) - _feed_cache['timestamp']).total_seconds()
- if age > CACHE_DURATION:
- logger.info("Cache expired due to age")
- return False
- # Check if blocked DIDs have changed
- current_blocked = get_blocked_dids()
- current_hash = get_blocked_hash(current_blocked)
- if current_hash != _feed_cache['blocked_hash']:
- logger.info("Cache invalidated due to blocks change")
- return False
- return True
- def get_cached_feed(cursor: Optional[str], limit: int) -> Optional[dict]:
- """Get feed from cache if available"""
- if not is_cache_valid():
- return None
- cache_key = f"{cursor}:{limit}"
- if cache_key in _feed_cache['feeds']:
- logger.info("Serving response from cache")
- return _feed_cache['feeds'][cache_key]
- return None
- def update_cache(cursor: Optional[str], limit: int, feed_data: dict) -> None:
- """Update cache with new feed data"""
- try:
- # Initialize cache if needed
- if not _feed_cache['timestamp']:
- _feed_cache['feeds'] = {}
- # Update cache metadata
- _feed_cache['timestamp'] = datetime.now(UTC)
- _feed_cache['blocked_hash'] = get_blocked_hash(get_blocked_dids())
- # Store feed data
- cache_key = f"{cursor}:{limit}"
- _feed_cache['feeds'][cache_key] = feed_data
- # Cleanup old entries if cache is getting too large
- if len(_feed_cache['feeds']) > 100: # Arbitrary limit
- oldest_keys = sorted(_feed_cache['feeds'].keys())[:-50] # Keep newest 50
- for key in oldest_keys:
- del _feed_cache['feeds'][key]
- logger.info(f"Updated cache with new feed data for cursor: {cursor}")
- except Exception as e:
- logger.error(f"Error updating cache: {e}")
- def ensure_tz_aware(dt: datetime | str) -> datetime:
- """
- Ensure datetime is timezone aware, handling both datetime and string inputs.
- """
- if isinstance(dt, str):
- try:
- # Try parsing ISO format first (preferred)
- dt = datetime.fromisoformat(dt.replace('Z', '+00:00'))
- except ValueError:
- try:
- # Fallback to basic format
- dt = datetime.strptime(dt.split('.')[0], "%Y-%m-%d %H:%M:%S")
- except Exception as e:
- logger.error(f"Error parsing datetime '{dt}': {e}")
- raise
- if dt and dt.tzinfo is None:
- return dt.replace(tzinfo=UTC)
- return dt
- @lru_cache(maxsize=1000)
- def is_network_member(author_did: str) -> bool:
- """Check if an author is part of the AEW network"""
- try:
- return AEWNetwork.select().where(
- (AEWNetwork.did == author_did) &
- (AEWNetwork.is_followed_by_aew == True)
- ).exists()
- except Exception:
- return False
- def calculate_engagement_score(post: Post, age_hours: float) -> Tuple[float, Dict[str, float]]:
- """Calculate engagement score using HN-inspired algorithm with AEW customizations."""
- try:
- # Get engagement metrics
- likes = getattr(post, 'likeCount', 0) or 0
- reposts = getattr(post, 'repostCount', 0) or 0
- replies = getattr(post, 'replyCount', 0) or 0
- # Base engagement calculations with HN-style power dampening
- base_score = (likes * 1.0) + (reposts * 1.5) + (replies * 1.0)
- if base_score > 1:
- base_score = math.pow(base_score - 1, 0.8) # HN's power dampening
- # Calculate time decay (HN-style)
- time_base = 2.0 # 120 minutes in hours
- decay_factor = age_hours / time_base
- time_multiplier = 1.0 / ((decay_factor + 1) ** 1.8) # HN's gravity
- # Network boost (reward AEW talent/official posts)
- network_multiplier = 1.1 if is_network_member(post.author) else 1.0
- # Engagement balance boost (reward diverse engagement)
- has_balance = (likes > 0 and reposts > 0 and replies > 0)
- balance_multiplier = 1.1 if has_balance else 1.0
- # Viral boost for highly engaged content
- viral_multiplier = 1.2 if (reposts > 10 or likes > 50) else 1.0
- # Calculate final score
- score = base_score * time_multiplier * network_multiplier * balance_multiplier * viral_multiplier
- # Create detailed breakdown for logging
- details = {
- 'base_score': base_score,
- 'age_hours': age_hours,
- 'time_multiplier': time_multiplier,
- 'network_multiplier': network_multiplier,
- 'balance_multiplier': balance_multiplier,
- 'viral_multiplier': viral_multiplier,
- 'final_score': score,
- 'likes': likes,
- 'reposts': reposts,
- 'replies': replies,
- 'is_network': is_network_member(post.author)
- }
- return score, details
- except Exception as e:
- logger.error(f"Error calculating score for post {post.cid}: {e}")
- return 0.0, {}
- def get_filtered_posts(blocked_dids: set, cursor_time: Optional[datetime] = None, cursor_cid: Optional[str] = None) -> List[Post]:
- """Get posts filtered by time window and blocked users."""
- try:
- # Build base query
- cutoff_time = datetime.now(UTC) - timedelta(hours=24)
- query = (Post
- .select()
- .where(
- (Post.indexed_at >= cutoff_time) &
- ~(Post.author << list(blocked_dids)) &
- (Post.reply_parent.is_null(True))
- )
- .order_by(Post.indexed_at.desc())
- )
- # Apply cursor for pagination
- if cursor_time and cursor_cid:
- query = query.where(
- (Post.indexed_at < cursor_time) |
- ((Post.indexed_at == cursor_time) & (Post.cid < cursor_cid))
- )
- return list(query)
- except Exception as e:
- logger.error(f"Error in get_filtered_posts: {e}", exc_info=True)
- return []
- def handler(cursor: Optional[str], limit: int) -> dict:
- """Main feed handler with caching."""
- try:
- # Check cache first
- cached_feed = get_cached_feed(cursor, limit)
- if cached_feed:
- return cached_feed
- logger.info(f"Feed request - cursor: {cursor}, limit: {limit}")
- blocked_dids = get_blocked_dids()
- logger.info(f"Using {len(blocked_dids)} blocked DIDs")
- # Handle EOF cursor
- if cursor == CURSOR_EOF:
- logger.info("Received EOF cursor")
- return {'cursor': CURSOR_EOF, 'feed': []}
- # Parse cursor for pagination
- cursor_time = None
- cursor_cid = None
- if cursor:
- try:
- cursor_parts = cursor.split('::')
- if len(cursor_parts) != 2:
- raise ValueError('Malformed cursor')
- timestamp, cursor_cid = cursor_parts
- cursor_time = datetime.fromtimestamp(int(timestamp) / 1000, UTC)
- logger.info(f"Parsed cursor - time: {cursor_time}, cid: {cursor_cid}")
- except Exception as e:
- logger.error(f"Error parsing cursor: {e}")
- raise ValueError('Invalid cursor format')
- # Get posts with pagination
- posts = get_filtered_posts(blocked_dids, cursor_time, cursor_cid)
- logger.info(f"Retrieved {len(posts)} posts after filtering")
- if not posts:
- logger.info("No posts found")
- return {'cursor': CURSOR_EOF, 'feed': []}
- # Calculate scores and sort
- scored_posts = []
- current_time = datetime.now(UTC)
- # Log detailed scoring information
- logger.info("\nScoring breakdown for posts:")
- for post in posts:
- try:
- age_hours = (current_time - ensure_tz_aware(post.indexed_at)).total_seconds() / 3600.0
- score, details = calculate_engagement_score(post, age_hours)
- scored_posts.append((post, score))
- # Detailed logging for debugging
- logger.info(
- f"\nPost {post.cid}:\n"
- f"- Age: {details['age_hours']:.1f} hours\n"
- f"- Engagement: {details['likes']}? {details['reposts']}? {details['replies']}?\n"
- f"- Base Score: {details['base_score']:.1f}\n"
- f"- Time Mult: {details['time_multiplier']:.3f}\n"
- f"- Network Mult: {details['network_multiplier']:.1f}\n"
- f"- Balance Mult: {details['balance_multiplier']:.1f}\n"
- f"- Viral Mult: {details['viral_multiplier']:.1f}\n"
- f"- Final Score: {details['final_score']:.1f}\n"
- f"- Network Member: {'Yes' if details['is_network'] else 'No'}"
- )
- except Exception as e:
- logger.error(f"Error scoring post {post.cid}: {e}")
- continue
- # Sort by score and limit results
- sorted_posts = sorted(scored_posts, key=lambda x: (-x[1], ensure_tz_aware(x[0].indexed_at)))
- result_posts = [post for post, _ in sorted_posts[:limit]]
- # Log selected posts
- logger.info("\nFinal post selection:")
- for i, post in enumerate(result_posts, 1):
- logger.info(
- f"{i}. Selected post {post.cid}: "
- f"engagement={post.likeCount}? {post.repostCount}? {post.replyCount}? "
- f"time={post.indexed_at} "
- f"network={'Yes' if is_network_member(post.author) else 'No'}"
- )
- # Generate feed entries
- feed = [{'post': post.uri} for post in result_posts]
- # Generate cursor for next page
- next_cursor = CURSOR_EOF
- if result_posts:
- last_post = result_posts[-1]
- next_cursor = f"{int(ensure_tz_aware(last_post.indexed_at).timestamp() * 1000)}::{last_post.cid}"
- logger.info(f"Generated next cursor: {next_cursor}")
- # Store result in cache before returning
- result = {
- 'cursor': next_cursor,
- 'feed': feed
- }
- update_cache(cursor, limit, result)
- return result
- except Exception as e:
- logger.error(f"Error handling feed request: {e}", exc_info=True)
- return {'cursor': CURSOR_EOF, 'feed': []}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement