Advertisement
athaena

Simulated redis data for publishing to a local redis

May 14th, 2025
334
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.89 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. """
  3. Simulation script for activity prediction data.
  4. Generates random tracking data and publishes it to a Redis channel.
  5. """
  6.  
  7. import asyncio
  8. import json
  9. import random
  10. import aioredis
  11. import argparse
  12. from typing import Dict, List, Any, Tuple
  13.  
  14. # Constants from your configuration
  15. CLASS_NAMES = [
  16.     "standing",
  17.     "sitting_down",
  18.     "sitting",
  19.     "lying",
  20.     "standing_up",
  21.     "falling",
  22.     "bending",
  23. ]
  24.  
  25. # Default Redis connection settings
  26. REDIS_HOST = "localhost"
  27. REDIS_PORT = 6379
  28. REDIS_CHANNEL = "activity_stream"
  29.  
  30. # Default simulation settings
  31. MAX_TRACK_ID = 4
  32. PUBLISH_DELAY = 1.0  # seconds between publications
  33.  
  34.  
  35. def generate_random_bbox() -> List[float]:
  36.     """Generate a random bounding box with normalized coordinates [x1, y1, x2, y2]."""
  37.     # Generate x1, y1 coordinates (top-left)
  38.     x1 = random.uniform(0.05, 0.8)
  39.     y1 = random.uniform(0.05, 0.8)
  40.    
  41.     # Generate width and height (ensuring box stays within frame)
  42.     width = random.uniform(0.1, min(0.3, 1.0 - x1))
  43.     height = random.uniform(0.2, min(0.5, 1.0 - y1))
  44.    
  45.     # Calculate x2, y2 coordinates (bottom-right)
  46.     x2 = x1 + width
  47.     y2 = y1 + height
  48.    
  49.     return [x1, y1, x2, y2]
  50.  
  51.  
  52. def generate_random_action() -> Tuple[str, float]:
  53.     """Generate a random action with confidence score."""
  54.     action = random.choice(CLASS_NAMES)
  55.     confidence = random.uniform(0.6, 0.99)
  56.     return action, confidence
  57.  
  58.  
  59. def generate_random_tracking_data(num_tracks: int) -> Dict[str, Any]:
  60.     """Generate random tracking data for a specified number of tracks."""
  61.     tracking_data = {}
  62.    
  63.     # Randomly decide how many tracks to include in this frame (1 to num_tracks)
  64.     active_tracks = random.randint(1, num_tracks)
  65.     track_ids = random.sample(range(num_tracks), active_tracks)
  66.    
  67.     for track_id in track_ids:
  68.         track_id_str = str(track_id)
  69.         action, confidence = generate_random_action()
  70.        
  71.         tracking_data[track_id_str] = {
  72.             "track_id": track_id_str,
  73.             "current_actions": [[action, confidence]],
  74.             "last_bbox": generate_random_bbox()
  75.         }
  76.    
  77.     return {"tracking_data": tracking_data}
  78.  
  79.  
  80. async def publish_simulated_data(
  81.     redis_host: str,
  82.     redis_port: int,
  83.     channel: str,
  84.     max_track_id: int,
  85.     delay: float,
  86.     iterations: int = None
  87. ) -> None:
  88.     """Publish simulated tracking data to a Redis channel."""
  89.     # Connect to Redis
  90.     redis = await aioredis.from_url(f"redis://{redis_host}:{redis_port}")
  91.    
  92.     try:
  93.         print(f"Connected to Redis at {redis_host}:{redis_port}")
  94.         print(f"Publishing to channel: {channel}")
  95.         print(f"Press Ctrl+C to stop")
  96.        
  97.         count = 0
  98.         while iterations is None or count < iterations:
  99.             # Generate random tracking data
  100.             data = generate_random_tracking_data(max_track_id + 1)
  101.            
  102.             # Convert to JSON and publish
  103.             data_json = json.dumps(data)
  104.             await redis.publish(channel, data_json)
  105.            
  106.             # Print info about what was published
  107.             num_tracks = len(data["tracking_data"])
  108.             track_ids = ", ".join(data["tracking_data"].keys())
  109.             print(f"Published data with {num_tracks} tracks (IDs: {track_ids})")
  110.            
  111.             count += 1
  112.             await asyncio.sleep(delay)
  113.            
  114.     except KeyboardInterrupt:
  115.         print("\nStopping simulation...")
  116.     finally:
  117.         # Close Redis connection
  118.         await redis.close()
  119.         print("Redis connection closed")
  120.  
  121.  
  122. def main():
  123.     """Main entry point for the script."""
  124.     parser = argparse.ArgumentParser(description="Simulate activity tracking data and publish to Redis")
  125.     parser.add_argument("--host", default=REDIS_HOST, help=f"Redis host (default: {REDIS_HOST})")
  126.     parser.add_argument("--port", type=int, default=REDIS_PORT, help=f"Redis port (default: {REDIS_PORT})")
  127.     parser.add_argument("--channel", default=REDIS_CHANNEL, help=f"Redis channel (default: {REDIS_CHANNEL})")
  128.     parser.add_argument("--delay", type=float, default=PUBLISH_DELAY, help=f"Delay between publications in seconds (default: {PUBLISH_DELAY})")
  129.     parser.add_argument("--max-tracks", type=int, default=MAX_TRACK_ID, help=f"Maximum number of tracks (default: {MAX_TRACK_ID})")
  130.     parser.add_argument("--iterations", type=int, help="Number of publications (default: unlimited)")
  131.    
  132.     args = parser.parse_args()
  133.    
  134.     try:
  135.         asyncio.run(publish_simulated_data(
  136.             redis_host=args.host,
  137.             redis_port=args.port,
  138.             channel=args.channel,
  139.             max_track_id=args.max_tracks,
  140.             delay=args.delay,
  141.             iterations=args.iterations
  142.         ))
  143.     except Exception as e:
  144.         print(f"Error: {e}")
  145.         return 1
  146.    
  147.     return 0
  148.  
  149.  
  150. if __name__ == "__main__":
  151.     exit(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement