Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import os
- import sys
- import json
- import yaml
- import time
- import hashlib
- import requests
- import tempfile
- import subprocess
- import argparse
- import threading
- import socket
- from pathlib import Path
- from urllib.parse import urlparse
- class AIVMManager:
- def __init__(self, config):
- self.config = config
- self.work_dir = Path(config.get('work_dir', './vm_instances'))
- self.work_dir.mkdir(exist_ok=True)
- self.instance_name = config.get('instance_name', 'ai-agent')
- self.vm_memory = config.get('memory', '2G')
- self.vm_cpus = config.get('cpus', 2)
- self.vm_disk_size = config.get('disk_size', '20G')
- self.ssh_port = config.get('ssh_port', 2222)
- self.ssh_user = config.get('ssh_user', 'aiagent')
- self.ssh_password = config.get('ssh_password', 'aiagent123')
- self.instance_dir = self.work_dir / self.instance_name
- self.instance_dir.mkdir(exist_ok=True)
- self.pid_file = self.instance_dir / 'qemu.pid'
- self.serial_socket = self.instance_dir / 'serial.sock'
- self.ssh_ready_flag = self.instance_dir / 'ssh_ready'
- self.images_dir = self.work_dir / 'images'
- self.images_dir.mkdir(exist_ok=True)
- def log(self, message, level="INFO"):
- timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
- print(f"[{timestamp}] {level}: {message}")
- log_file = self.instance_dir / 'deploy.log'
- with open(log_file, 'a') as f:
- f.write(f"[{timestamp}] {level}: {message}\n")
- def check_dependencies(self):
- required_tools = ['qemu-system-x86_64', 'qemu-img', 'genisoimage', 'sshpass']
- missing_tools = []
- for tool in required_tools:
- if not self._command_exists(tool):
- missing_tools.append(tool)
- if missing_tools:
- self.log(f"Missing required tools: {', '.join(missing_tools)}", "ERROR")
- self.log("Install with: apt-get install qemu-system-x86 qemu-utils genisoimage sshpass")
- return False
- try:
- self.work_dir.mkdir(exist_ok=True)
- self.images_dir.mkdir(exist_ok=True)
- self.instance_dir.mkdir(exist_ok=True)
- test_file = self.work_dir / 'test_write'
- test_file.write_text('test')
- test_file.unlink()
- self.log(f"Work directory: {self.work_dir.resolve()}")
- self.log(f"Images directory: {self.images_dir.resolve()}")
- self.log(f"Instance directory: {self.instance_dir.resolve()}")
- except Exception as e:
- self.log(f"Directory setup failed: {e}", "ERROR")
- return False
- return True
- def _command_exists(self, command):
- return subprocess.run(['which', command],
- capture_output=True, text=True).returncode == 0
- def is_running(self):
- if not self.pid_file.exists():
- return False
- try:
- with open(self.pid_file, 'r') as f:
- pid = int(f.read().strip())
- os.kill(pid, 0)
- return True
- except (ProcessLookupError, ValueError):
- if self.pid_file.exists():
- self.pid_file.unlink()
- return False
- def download_base_image(self):
- image_url = 'https://cloud.debian.org/images/cloud/bookworm/latest/debian-12-generic-amd64.qcow2'
- image_path = self.images_dir / 'debian-12-base.qcow2'
- if image_path.exists():
- self.log(f"Using cached base image: {image_path.resolve()}")
- return image_path
- self.log(f"Downloading base image from {image_url}")
- self.log(f"Will save to: {image_path.resolve()}")
- try:
- response = requests.get(image_url, stream=True)
- response.raise_for_status()
- total_size = int(response.headers.get('content-length', 0))
- downloaded = 0
- image_path.parent.mkdir(parents=True, exist_ok=True)
- with open(image_path, 'wb') as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk:
- f.write(chunk)
- downloaded += len(chunk)
- if total_size > 0:
- progress = (downloaded / total_size) * 100
- print(f"\rDownload progress: {progress:.1f}%", end='', flush=True)
- print()
- self.log(f"Base image downloaded successfully to {image_path.resolve()}")
- if not image_path.exists():
- raise Exception("Downloaded file does not exist")
- file_size = image_path.stat().st_size
- if file_size < 100 * 1024 * 1024:
- raise Exception(f"Downloaded file seems too small: {file_size} bytes")
- self.log(f"Base image size: {file_size / (1024*1024):.1f} MB")
- return image_path
- except Exception as e:
- self.log(f"Failed to download base image: {e}", "ERROR")
- if image_path.exists():
- image_path.unlink()
- raise
- def create_instance_image(self, base_image):
- instance_image = self.instance_dir / f'{self.instance_name}.qcow2'
- if instance_image.exists():
- self.log(f"Removing existing instance image")
- instance_image.unlink()
- self.log(f"Creating instance image with {self.vm_disk_size} disk")
- base_image_abs = base_image.resolve()
- instance_image_abs = instance_image.resolve()
- cmd = [
- 'qemu-img', 'create', '-f', 'qcow2',
- '-F', 'qcow2', '-b', str(base_image_abs),
- str(instance_image_abs)
- ]
- self.log(f"Base image path: {base_image_abs}")
- self.log(f"Instance image path: {instance_image_abs}")
- result = subprocess.run(cmd, capture_output=True, text=True)
- if result.returncode != 0:
- raise Exception(f"Failed to create instance image: {result.stderr}")
- cmd = ['qemu-img', 'resize', str(instance_image_abs), self.vm_disk_size]
- result = subprocess.run(cmd, capture_output=True, text=True)
- if result.returncode != 0:
- raise Exception(f"Failed to resize instance image: {result.stderr}")
- return instance_image
- def create_cloud_init_config(self):
- user_data = {
- 'ssh_pwauth': True,
- 'disable_root': False,
- 'chpasswd': {
- 'expire': False,
- 'users': [
- {'name': 'root', 'password': self.ssh_password, 'type': 'text'},
- {'name': self.ssh_user, 'password': self.ssh_password, 'type': 'text'}
- ]
- },
- 'users': [
- {
- 'name': self.ssh_user,
- 'sudo': 'ALL=(ALL) NOPASSWD:ALL',
- 'shell': '/bin/bash',
- 'lock_passwd': False,
- 'ssh_authorized_keys': self.config.get('ssh_keys', [])
- }
- ],
- 'package_update': True,
- 'packages': [
- 'python3',
- 'python3-pip',
- 'curl',
- 'wget',
- 'htop',
- 'tmux',
- 'vim',
- 'net-tools'
- ],
- 'runcmd': [
- 'systemctl enable ssh',
- 'systemctl start ssh',
- 'systemctl status ssh',
- 'sed -i "s/#PasswordAuthentication yes/PasswordAuthentication yes/" /etc/ssh/sshd_config',
- 'sed -i "s/PasswordAuthentication no/PasswordAuthentication yes/" /etc/ssh/sshd_config',
- 'sed -i "s/#PermitRootLogin prohibit-password/PermitRootLogin yes/" /etc/ssh/sshd_config',
- 'systemctl restart ssh',
- 'touch /tmp/ssh_ready',
- 'echo "SSH_READY_$(date +%s)" > /tmp/ssh_ready',
- 'mkdir -p /opt/ai-agent',
- f'chown {self.ssh_user}:{self.ssh_user} /opt/ai-agent',
- f'echo "export PYTHONUNBUFFERED=1" >> /home/{self.ssh_user}/.bashrc',
- f'echo "export DEBIAN_FRONTEND=noninteractive" >> /home/{self.ssh_user}/.bashrc',
- f'echo "cd /opt/ai-agent" >> /home/{self.ssh_user}/.bashrc',
- 'nohup bash -c "sleep 60 && pip3 install requests aiohttp asyncio numpy pandas" > /tmp/pip_install.log 2>&1 &',
- 'echo "SYSTEM_READY_$(date +%s)" >> /tmp/ssh_ready',
- ],
- 'final_message': 'AI Agent VM is SSH-ready!'
- }
- meta_data = {
- 'instance-id': f'{self.instance_name}-{int(time.time())}',
- 'local-hostname': self.instance_name
- }
- network_config = {
- 'version': 2,
- 'ethernets': {
- 'enp0s3': {
- 'dhcp4': True
- }
- }
- }
- return user_data, meta_data, network_config
- def create_cloud_init_iso(self):
- user_data, meta_data, network_config = self.create_cloud_init_config()
- iso_path = self.instance_dir / 'cloudinit.iso'
- with tempfile.TemporaryDirectory() as temp_dir:
- temp_path = Path(temp_dir)
- with open(temp_path / 'user-data', 'w') as f:
- f.write('#cloud-config\n')
- yaml.dump(user_data, f, default_flow_style=False)
- with open(temp_path / 'meta-data', 'w') as f:
- yaml.dump(meta_data, f, default_flow_style=False)
- with open(temp_path / 'network-config', 'w') as f:
- yaml.dump(network_config, f, default_flow_style=False)
- cmd = [
- 'genisoimage', '-output', str(iso_path),
- '-volid', 'cidata', '-joliet', '-rock',
- str(temp_path / 'user-data'),
- str(temp_path / 'meta-data'),
- str(temp_path / 'network-config')
- ]
- result = subprocess.run(cmd, capture_output=True, text=True)
- if result.returncode != 0:
- raise Exception(f"Failed to create cloud-init ISO: {result.stderr}")
- return iso_path
- def start_instance(self, instance_image, cloudinit_iso):
- if self.is_running():
- self.log("Instance is already running")
- return True
- if self.serial_socket.exists():
- self.serial_socket.unlink()
- cmd = [
- 'qemu-system-x86_64',
- '-machine', 'accel=kvm:tcg',
- '-cpu', 'host',
- '-m', self.vm_memory,
- '-smp', str(self.vm_cpus),
- '-drive', f'file={instance_image},format=qcow2,if=virtio',
- '-drive', f'file={cloudinit_iso},format=raw,if=virtio,readonly=on',
- '-netdev', f'user,id=net0,hostfwd=tcp::{self.ssh_port}-:22',
- '-device', 'virtio-net-pci,netdev=net0',
- '-serial', f'unix:{self.serial_socket},server,nowait',
- '-display', 'none',
- '-daemonize',
- '-pidfile', str(self.pid_file),
- '-rtc', 'base=utc',
- '-device', 'virtio-rng-pci',
- ]
- self.log(f"Starting instance: {self.instance_name}")
- self.log(f"SSH will be available on port {self.ssh_port}")
- result = subprocess.run(cmd, capture_output=True, text=True)
- if result.returncode != 0:
- raise Exception(f"Failed to start instance: {result.stderr}")
- self.log("Instance started successfully")
- return True
- def wait_for_ssh(self, timeout=180):
- self.log("Waiting for SSH to become available...")
- start_time = time.time()
- last_status_time = time.time()
- while time.time() - start_time < timeout:
- try:
- cmd = [
- 'sshpass', '-p', self.ssh_password,
- 'ssh', '-p', str(self.ssh_port),
- '-o', 'ConnectTimeout=3',
- '-o', 'StrictHostKeyChecking=no',
- '-o', 'UserKnownHostsFile=/dev/null',
- '-o', 'LogLevel=ERROR',
- f'{self.ssh_user}@localhost',
- 'echo "SSH_TEST_SUCCESS"'
- ]
- result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
- if result.returncode == 0 and "SSH_TEST_SUCCESS" in result.stdout:
- elapsed = time.time() - start_time
- self.log(f"SSH is ready! (took {elapsed:.1f} seconds)")
- with open(self.ssh_ready_flag, 'w') as f:
- f.write(f"ready:{int(time.time())}")
- return True
- except subprocess.TimeoutExpired:
- pass
- except Exception as e:
- pass
- if time.time() - last_status_time > 15:
- elapsed = time.time() - start_time
- self.log(f"Still waiting for SSH... ({elapsed:.0f}s elapsed)")
- last_status_time = time.time()
- time.sleep(2)
- self.log(f"SSH connection timeout after {timeout} seconds", "ERROR")
- return False
- def test_ssh_connection(self):
- try:
- cmd = [
- 'sshpass', '-p', self.ssh_password,
- 'ssh', '-p', str(self.ssh_port),
- '-o', 'ConnectTimeout=5',
- '-o', 'StrictHostKeyChecking=no',
- '-o', 'UserKnownHostsFile=/dev/null',
- '-o', 'LogLevel=ERROR',
- f'{self.ssh_user}@localhost',
- 'uname -a && echo "---" && free -h && echo "---" && df -h /'
- ]
- result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
- if result.returncode == 0:
- return True, result.stdout.strip()
- else:
- return False, result.stderr.strip()
- except Exception as e:
- return False, str(e)
- def execute_ssh_command(self, command):
- try:
- cmd = [
- 'sshpass', '-p', self.ssh_password,
- 'ssh', '-p', str(self.ssh_port),
- '-o', 'ConnectTimeout=5',
- '-o', 'StrictHostKeyChecking=no',
- '-o', 'UserKnownHostsFile=/dev/null',
- '-o', 'LogLevel=ERROR',
- f'{self.ssh_user}@localhost',
- command
- ]
- result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
- return result.returncode == 0, result.stdout, result.stderr
- except Exception as e:
- return False, "", str(e)
- def stop_instance(self):
- if not self.is_running():
- self.log("Instance is not running")
- return True
- try:
- with open(self.pid_file, 'r') as f:
- pid = int(f.read().strip())
- self.log(f"Stopping instance (PID: {pid})")
- os.kill(pid, 15)
- for _ in range(10):
- try:
- os.kill(pid, 0)
- time.sleep(1)
- except ProcessLookupError:
- break
- else:
- self.log("Force killing instance")
- os.kill(pid, 9)
- if self.pid_file.exists():
- self.pid_file.unlink()
- if self.serial_socket.exists():
- self.serial_socket.unlink()
- if self.ssh_ready_flag.exists():
- self.ssh_ready_flag.unlink()
- self.log("Instance stopped successfully")
- return True
- except Exception as e:
- self.log(f"Error stopping instance: {e}", "ERROR")
- return False
- def get_status(self):
- status = {
- 'running': self.is_running(),
- 'ssh_ready': self.ssh_ready_flag.exists(),
- 'ssh_port': self.ssh_port,
- 'ssh_user': self.ssh_user,
- 'instance_dir': str(self.instance_dir)
- }
- if status['running']:
- with open(self.pid_file, 'r') as f:
- status['pid'] = int(f.read().strip())
- if status['ssh_ready']:
- ssh_success, ssh_info = self.test_ssh_connection()
- status['ssh_working'] = ssh_success
- if ssh_success:
- status['system_info'] = ssh_info
- return status
- def run(self, force_recreate=False):
- try:
- if self.is_running() and not force_recreate:
- self.log("Instance is already running")
- if self.ssh_ready_flag.exists():
- self.log(f"SSH ready on port {self.ssh_port}")
- return True
- else:
- self.log("Waiting for SSH to become ready...")
- return self.wait_for_ssh()
- if force_recreate:
- self.stop_instance()
- if not self.check_dependencies():
- return False
- base_image = self.download_base_image()
- instance_image = self.create_instance_image(base_image)
- cloudinit_iso = self.create_cloud_init_iso()
- self.start_instance(instance_image, cloudinit_iso)
- if self.wait_for_ssh():
- self.log("Instance is ready for AI agent connection!")
- return True
- else:
- self.log("Instance started but SSH is not ready", "ERROR")
- return False
- except Exception as e:
- self.log(f"Failed to run instance: {e}", "ERROR")
- return False
- def load_config(config_file=None):
- default_config = {
- 'instance_name': 'ai-agent',
- 'memory': '4G',
- 'cpus': 2,
- 'disk_size': '30G',
- 'ssh_user': 'aiagent',
- 'ssh_password': 'aiagent123',
- 'ssh_port': 2222,
- 'ssh_keys': [],
- 'work_dir': './vm_instances'
- }
- if config_file and os.path.exists(config_file):
- with open(config_file, 'r') as f:
- if config_file.endswith('.json'):
- user_config = json.load(f)
- else:
- user_config = yaml.safe_load(f)
- default_config.update(user_config)
- return default_config
- def main():
- parser = argparse.ArgumentParser(description='AI Agent VM Manager')
- parser.add_argument('command', nargs='?', choices=['run', 'stop', 'status', 'connect', 'exec'],
- default='run', help='Command to execute')
- parser.add_argument('--name', '-n', help='Instance name', default='ai-agent')
- parser.add_argument('--config', '-c', help='Configuration file')
- parser.add_argument('--force', '-f', action='store_true', help='Force recreate instance')
- parser.add_argument('--port', '-p', type=int, help='SSH port override')
- parser.add_argument('command_args', nargs='*', help='Additional command arguments')
- args = parser.parse_args()
- config = load_config(args.config)
- config['instance_name'] = args.name
- if args.port:
- config['ssh_port'] = args.port
- manager = AIVMManager(config)
- if args.command == 'run':
- success = manager.run(force_recreate=args.force)
- if success:
- print(f"\n=== INSTANCE READY ===")
- print(f"SSH: ssh -p {config['ssh_port']} {config['ssh_user']}@localhost")
- print(f"Password: {config['ssh_password']}")
- print(f"Instance directory: {manager.instance_dir}")
- print("======================")
- sys.exit(0 if success else 1)
- elif args.command == 'stop':
- success = manager.stop_instance()
- sys.exit(0 if success else 1)
- elif args.command == 'status':
- status = manager.get_status()
- print("=== INSTANCE STATUS ===")
- for key, value in status.items():
- print(f"{key}: {value}")
- print("=======================")
- elif args.command == 'connect':
- if not manager.is_running():
- print("Instance is not running. Use 'run' command first.")
- sys.exit(1)
- ssh_cmd = [
- 'sshpass', '-p', config['ssh_password'],
- 'ssh', '-p', str(config['ssh_port']),
- '-o', 'StrictHostKeyChecking=no',
- '-o', 'UserKnownHostsFile=/dev/null',
- f"{config['ssh_user']}@localhost"
- ]
- print(f"Connecting to {config['instance_name']}...")
- subprocess.run(ssh_cmd)
- elif args.command == 'exec':
- if not args.command_args:
- print("No command specified for exec")
- sys.exit(1)
- command = ' '.join(args.command_args)
- success, stdout, stderr = manager.execute_ssh_command(command)
- if stdout:
- print(stdout)
- if stderr:
- print(stderr, file=sys.stderr)
- sys.exit(0 if success else 1)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment