Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- Simulation script for activity prediction data.
- Generates random tracking data and publishes it to a Redis channel.
- """
- import asyncio
- import json
- import random
- import aioredis
- import argparse
- from typing import Dict, List, Any, Tuple
- # Constants from your configuration
- CLASS_NAMES = [
- "standing",
- "sitting_down",
- "sitting",
- "lying",
- "standing_up",
- "falling",
- "bending",
- ]
- # Default Redis connection settings
- REDIS_HOST = "localhost"
- REDIS_PORT = 6379
- REDIS_CHANNEL = "activity_stream"
- # Default simulation settings
- MAX_TRACK_ID = 4
- PUBLISH_DELAY = 1.0 # seconds between publications
- def generate_random_bbox() -> List[float]:
- """Generate a random bounding box with normalized coordinates [x1, y1, x2, y2]."""
- # Generate x1, y1 coordinates (top-left)
- x1 = random.uniform(0.05, 0.8)
- y1 = random.uniform(0.05, 0.8)
- # Generate width and height (ensuring box stays within frame)
- width = random.uniform(0.1, min(0.3, 1.0 - x1))
- height = random.uniform(0.2, min(0.5, 1.0 - y1))
- # Calculate x2, y2 coordinates (bottom-right)
- x2 = x1 + width
- y2 = y1 + height
- return [x1, y1, x2, y2]
- def generate_random_action() -> Tuple[str, float]:
- """Generate a random action with confidence score."""
- action = random.choice(CLASS_NAMES)
- confidence = random.uniform(0.6, 0.99)
- return action, confidence
- def generate_random_tracking_data(num_tracks: int) -> Dict[str, Any]:
- """Generate random tracking data for a specified number of tracks."""
- tracking_data = {}
- # Randomly decide how many tracks to include in this frame (1 to num_tracks)
- active_tracks = random.randint(1, num_tracks)
- track_ids = random.sample(range(num_tracks), active_tracks)
- for track_id in track_ids:
- track_id_str = str(track_id)
- action, confidence = generate_random_action()
- tracking_data[track_id_str] = {
- "track_id": track_id_str,
- "current_actions": [[action, confidence]],
- "last_bbox": generate_random_bbox()
- }
- return {"tracking_data": tracking_data}
- async def publish_simulated_data(
- redis_host: str,
- redis_port: int,
- channel: str,
- max_track_id: int,
- delay: float,
- iterations: int = None
- ) -> None:
- """Publish simulated tracking data to a Redis channel."""
- # Connect to Redis
- redis = await aioredis.from_url(f"redis://{redis_host}:{redis_port}")
- try:
- print(f"Connected to Redis at {redis_host}:{redis_port}")
- print(f"Publishing to channel: {channel}")
- print(f"Press Ctrl+C to stop")
- count = 0
- while iterations is None or count < iterations:
- # Generate random tracking data
- data = generate_random_tracking_data(max_track_id + 1)
- # Convert to JSON and publish
- data_json = json.dumps(data)
- await redis.publish(channel, data_json)
- # Print info about what was published
- num_tracks = len(data["tracking_data"])
- track_ids = ", ".join(data["tracking_data"].keys())
- print(f"Published data with {num_tracks} tracks (IDs: {track_ids})")
- count += 1
- await asyncio.sleep(delay)
- except KeyboardInterrupt:
- print("\nStopping simulation...")
- finally:
- # Close Redis connection
- await redis.close()
- print("Redis connection closed")
- def main():
- """Main entry point for the script."""
- parser = argparse.ArgumentParser(description="Simulate activity tracking data and publish to Redis")
- parser.add_argument("--host", default=REDIS_HOST, help=f"Redis host (default: {REDIS_HOST})")
- parser.add_argument("--port", type=int, default=REDIS_PORT, help=f"Redis port (default: {REDIS_PORT})")
- parser.add_argument("--channel", default=REDIS_CHANNEL, help=f"Redis channel (default: {REDIS_CHANNEL})")
- parser.add_argument("--delay", type=float, default=PUBLISH_DELAY, help=f"Delay between publications in seconds (default: {PUBLISH_DELAY})")
- parser.add_argument("--max-tracks", type=int, default=MAX_TRACK_ID, help=f"Maximum number of tracks (default: {MAX_TRACK_ID})")
- parser.add_argument("--iterations", type=int, help="Number of publications (default: unlimited)")
- args = parser.parse_args()
- try:
- asyncio.run(publish_simulated_data(
- redis_host=args.host,
- redis_port=args.port,
- channel=args.channel,
- max_track_id=args.max_tracks,
- delay=args.delay,
- iterations=args.iterations
- ))
- except Exception as e:
- print(f"Error: {e}")
- return 1
- return 0
- if __name__ == "__main__":
- exit(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement