Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import csv
- import json
- import os
- import requests
- import time
- import threading
- import logging
- from datetime import datetime, timedelta
- from contextlib import contextmanager
- from concurrent.futures import ThreadPoolExecutor, as_completed
- import subprocess
- from pathlib import Path
- from typing import Dict, List, Any
- from dataclasses import dataclass
- import signal
- import tempfile
- import argparse
- # Configure logging
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('onedrive_sync.log'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- @dataclass
- class Config:
- tenant_id: str
- client_id: str
- client_secret: str
- download_path: str
- batch_size: int = 1000
- max_workers: int = 15
- max_retries: int = 5
- requests_per_minute: int = 4000
- token_refresh_minutes: int = 30
- class TokenManager:
- def __init__(self, config: Config):
- self.config = config
- self.tokens: Dict[str, Dict] = {}
- self.lock = threading.Lock()
- def generate_token(self) -> Dict:
- token_url = f"https://login.microsoftonline.com/{self.config.tenant_id}/oauth2/v2.0/token"
- token_body = {
- "grant_type": "client_credentials",
- "scope": "https://graph.microsoft.com/.default",
- "client_id": self.config.client_id,
- "client_secret": self.config.client_secret
- }
- response = requests.post(token_url, data=token_body)
- token_response = response.json()
- if 'error' in token_response:
- raise Exception(f"Token generation failed: {token_response['error_description']}")
- access_token = token_response["access_token"]
- expires_in = token_response["expires_in"]
- expiry_time = (datetime.utcnow() + timedelta(seconds=expires_in)).isoformat() + "Z"
- return {"access_token": access_token, "expiry": expiry_time}
- def is_token_expired(self, token_data: Dict) -> bool:
- expiry_time = datetime.fromisoformat(token_data['expiry'].rstrip('Z'))
- return datetime.utcnow() + timedelta(minutes=self.config.token_refresh_minutes) >= expiry_time
- def get_token(self, email: str) -> Dict:
- with self.lock:
- if email not in self.tokens or self.is_token_expired(self.tokens[email]):
- self.tokens[email] = self.generate_token()
- return self.tokens[email]
- class ProgressTracker:
- def __init__(self, total_accounts: int):
- self.start_time = datetime.now()
- self.completed = 0
- self.failed = 0
- self.total = total_accounts
- self.lock = threading.Lock()
- def update_progress(self, success: bool):
- with self.lock:
- if success:
- self.completed += 1
- else:
- self.failed += 1
- elapsed = (datetime.now() - self.start_time).total_seconds() / 3600
- completion_rate = self.completed / max(1, elapsed)
- estimated_remaining = (self.total - self.completed) / max(1, completion_rate)
- self.log_progress(elapsed, estimated_remaining)
- def log_progress(self, elapsed: float, estimated_remaining: float):
- logger.info(
- f"Progress: {self.completed}/{self.total} ({self.completed/self.total*100:.2f}%)\n"
- f"Failed: {self.failed}\n"
- f"Elapsed Hours: {elapsed:.2f}\n"
- f"Estimated Remaining Hours: {estimated_remaining:.2f}"
- )
- class RcloneManager:
- def __init__(self, config: Config):
- self.config = config
- # Make sure to use the correct path for rclone config
- self.config_path = Path(os.getenv('APPDATA')) / 'rclone' / 'rclone.conf'
- self.config_path.parent.mkdir(parents=True, exist_ok=True)
- if not self.config_path.exists():
- self.config_path.touch()
- def update_config(self, email: str, one_drive_id: str, token: Dict) -> None:
- try:
- config_content = self.config_path.read_text() if self.config_path.exists() else ""
- # Normalize email for rclone config name
- safe_email = email.replace('@', '_').replace('.', '_').lower()
- new_config = f"""
- [onedrive_auto_{safe_email}]
- type = onedrive
- client_id = {self.config.client_id}
- client_secret = {self.config.client_secret}
- auth_url = https://login.microsoftonline.com/{self.config.tenant_id}/oauth2/v2.0/authorize
- token_url = https://login.microsoftonline.com/{self.config.tenant_id}/oauth2/v2.0/token
- drive_id = {one_drive_id}
- drive_type = business
- token = {json.dumps(token)}
- """
- # Remove any existing config for this email
- sections = config_content.split('[')
- filtered_sections = [s for s in sections if not s.startswith(f'onedrive_auto_{safe_email}')]
- config_content = '['.join(filtered_sections)
- # Add new config
- config_content += new_config
- # Write the updated config
- self.config_path.write_text(config_content.strip())
- logger.info(f"Updated rclone config for {email}")
- # Verify the config was written
- if not self.config_path.exists():
- raise Exception("Config file was not created")
- # Add debug logging
- logger.debug(f"Config file contents for {email}:\n{self.config_path.read_text()}")
- except Exception as e:
- logger.error(f"Error updating rclone config for {email}: {str(e)}")
- raise
- class ThrottleManager:
- def __init__(self):
- self.last_check_time = time.time()
- self.threshold_speed = 1 * 1024 * 1024 # 1 MB/s in bytes
- self.check_interval = 10 # Check every 10 seconds
- self.speed_history = []
- self.history_size = 5
- self.throttle_threshold = 0.8
- self.cooloff_duration = 100
- self.is_throttled = False
- self.active_processes = []
- self.lock = threading.Lock()
- def parse_rclone_progress(self, line: str) -> float:
- """Parse rclone output to get transferred bytes"""
- try:
- # Log the raw line for debugging
- logger.debug(f"Raw rclone output: {line}")
- if "Transferred:" in line:
- # Split the line and find the speed part
- parts = line.strip().split(',')
- for part in parts:
- part = part.strip()
- logger.debug(f"Parsing part: {part}")
- # Look for various speed formats
- if any(unit in part for unit in ['/s', 'B/s', 'iB/s']):
- # Extract the numeric value and unit
- speed_parts = part.split()
- if len(speed_parts) >= 2:
- try:
- value = float(speed_parts[0])
- unit = speed_parts[1]
- # Convert to bytes based on unit
- if 'GB' in unit or 'GiB' in unit:
- value *= 1024 * 1024 * 1024
- elif 'MB' in unit or 'MiB' in unit:
- value *= 1024 * 1024
- elif 'KB' in unit or 'KiB' in unit:
- value *= 1024
- logger.debug(f"Parsed speed: {value/1024/1024:.2f} MB/s")
- return value
- except ValueError:
- logger.debug(f"Failed to parse speed value from: {speed_parts}")
- continue
- return 0
- except Exception as e:
- logger.debug(f"Error parsing rclone progress: {str(e)}")
- return 0
- def check_throttling(self, process) -> bool:
- """Check if we're being throttled based on transfer speed"""
- current_time = time.time()
- if current_time - self.last_check_time >= self.check_interval:
- try:
- # Read all available output
- output_lines = []
- while True:
- line = process.stdout.readline()
- if not line:
- break
- if isinstance(line, bytes):
- try:
- line = line.decode('utf-8')
- except UnicodeDecodeError:
- line = line.decode('utf-8', errors='ignore')
- output_lines.append(line)
- # Process all lines and get the most recent speed
- current_speed = 0
- for line in output_lines:
- speed = self.parse_rclone_progress(line)
- if speed > 0:
- current_speed = speed
- # Only consider non-zero speeds for history
- if current_speed > 0:
- self.speed_history.append(current_speed)
- if len(self.speed_history) > self.history_size:
- self.speed_history.pop(0)
- logger.debug(f"Current transfer speed: {current_speed/1024/1024:.2f} MB/s")
- logger.debug(f"Speed history: {[speed/1024/1024 for speed in self.speed_history]}")
- # Only check for throttling if we have enough history
- if len(self.speed_history) >= self.history_size:
- avg_speed = sum(self.speed_history) / len(self.speed_history)
- if avg_speed < self.threshold_speed:
- with self.lock:
- self.is_throttled = True
- logger.warning(f"Throttling detected! Average speed: {avg_speed/1024/1024:.2f} MB/s")
- return True
- self.last_check_time = current_time
- except Exception as e:
- logger.debug(f"Error in throttle checking: {str(e)}")
- return False
- return False
- def handle_throttling(self):
- """Handle throttling by stopping all processes and waiting"""
- logger.warning("Starting cooloff period...")
- # Stop all active processes
- with self.lock:
- for process in self.active_processes:
- try:
- process.terminate()
- process.wait(timeout=10)
- except Exception as e:
- logger.error(f"Error terminating process: {e}")
- self.active_processes.clear()
- # Wait for cooloff period
- time.sleep(self.cooloff_duration)
- with self.lock:
- self.is_throttled = False
- self.consecutive_slow_counts = 0
- logger.info("Cooloff period completed, resuming operations")
- def register_process(self, process):
- """Register a new rclone process"""
- with self.lock:
- self.active_processes.append(process)
- def unregister_process(self, process):
- """Unregister a completed rclone process"""
- with self.lock:
- if process in self.active_processes:
- self.active_processes.remove(process)
- class DownloadManager:
- def __init__(self, config: Config, progress_tracker: ProgressTracker):
- self.config = config
- self.progress_tracker = progress_tracker
- self.token_manager = TokenManager(config)
- self.rclone_manager = RcloneManager(config)
- self.throttle_manager = ThrottleManager()
- def download_with_retry(self, email: str, one_drive_id: str) -> bool:
- normalized_email = email.replace('@', '_').replace('.', '_').lower()
- retry_count = 0
- while retry_count < self.config.max_retries:
- try:
- # Check if we're currently throttled
- if self.throttle_manager.is_throttled:
- self.throttle_manager.handle_throttling()
- token = self.token_manager.get_token(email)
- self.rclone_manager.update_config(email, one_drive_id, token)
- download_path = Path(self.config.download_path) / normalized_email
- if not download_path.exists():
- download_path.mkdir(parents=True, exist_ok=True) # Ensure directory exists
- # Start rclone process with pipe for output monitoring
- process = subprocess.Popen([
- 'rclone', 'copy',
- f'onedrive_auto_{normalized_email}:',
- str(download_path),
- '--user-agent', 'ISV|rclone.org|rclone/1.68.2',
- '--transfers', '20',
- '--checkers', '20',
- '--tpslimit', '2000',
- '--retries', '3',
- '--low-level-retries', '10',
- '--ignore-size',
- '--ignore-checksum',
- '--update',
- '--stats', '1s', # Update stats every second
- '--progress',
- '-v' # Verbose output
- ], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- universal_newlines=False,
- bufsize=1,
- encoding=None)
- self.throttle_manager.register_process(process)
- while process.poll() is None:
- if self.throttle_manager.check_throttling(process):
- process.terminate()
- self.throttle_manager.handle_throttling()
- break
- self.throttle_manager.unregister_process(process)
- if process.returncode == 0:
- logger.info(f"Download completed for {email}")
- self.progress_tracker.update_progress(True)
- return True
- else:
- stderr_output = process.stderr.read()
- logger.error(f"Download failed for {email}: {stderr_output}")
- raise Exception(f"Download failed: {stderr_output}")
- except Exception as e:
- retry_count += 1
- delay = 60
- logger.error(f"Attempt {retry_count} failed for {email}. Retrying in {delay} seconds. Error: {e}")
- time.sleep(delay)
- logger.error(f"Failed to download for {email} after {self.config.max_retries} attempts")
- self.progress_tracker.update_progress(False)
- return False
- class OneDriveSyncer:
- def __init__(self, config: Config, csv_path: str):
- self.config = config
- self.csv_path = csv_path
- self.status_file = Path('onedrive_status.json')
- self.checkpoint_file = Path('sync_checkpoint.json')
- self.download_manager = None
- self.should_stop = False
- self.lock = threading.Lock() # Add a lock for thread-safe status updates
- def load_status_data(self) -> Dict:
- if self.status_file.exists():
- return {item['onedriveid']: item for item in json.loads(self.status_file.read_text())}
- return {}
- def save_status_data(self, status_data: Dict):
- # Write to a temporary file first, then rename it atomically
- with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(self.status_file), delete=False) as tmp_file:
- tmp_file.write(json.dumps(list(status_data.values()), indent=4))
- tmp_file_path = tmp_file.name
- os.replace(tmp_file_path, self.status_file)
- def process_csv(self):
- status_data = self.load_status_data()
- new_entries = []
- with open(self.csv_path, newline='', encoding='utf-8') as csvfile:
- reader = csv.DictReader(csvfile)
- for row in reader:
- onedriveid = row['onedriveid']
- email = row['email']
- if onedriveid not in status_data:
- status_data[onedriveid] = {
- 'onedriveid': onedriveid,
- 'email': email,
- 'status': 'pending'
- }
- new_entries.append({'onedriveid': onedriveid, 'email': email, 'status': 'pending'})
- # Save any new entries to the status file
- if new_entries:
- current_status = self.load_status_data()
- current_status.update({item['onedriveid']: item for item in new_entries})
- self.save_status_data(current_status)
- return status_data
- def process_batch(self, batch: List[Dict]):
- with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
- futures_to_items = {}
- # Submit all items in the batch at once
- for item in batch:
- if self.should_stop:
- break
- if item['status'] == 'pending':
- future = executor.submit(
- self.download_manager.download_with_retry,
- item['email'],
- item['onedriveid']
- )
- futures_to_items[future] = item
- # Process completed futures
- for future in as_completed(futures_to_items.keys()):
- item = futures_to_items[future]
- try:
- success = future.result()
- if success:
- item['status'] = 'completed'
- else:
- item['status'] = 'failed'
- # Update status file after each item
- with self.lock:
- self.save_status_data(self.status_data)
- except Exception as e:
- logger.error(f"Unexpected error in download thread for {item['email']}: {e}")
- item['status'] = 'failed'
- with self.lock:
- self.save_status_data(self.status_data)
- def signal_handler(self, signum, frame):
- logger.info("Received shutdown signal. Completing current batch...")
- self.should_stop = True
- def run(self):
- # Register signal handlers
- signal.signal(signal.SIGINT, self.signal_handler)
- signal.signal(signal.SIGTERM, self.signal_handler)
- # Initialize status
- self.status_data = self.process_csv()
- pending_items = [item for item in self.status_data.values() if item['status'] == 'pending']
- # Initialize progress tracker and download manager
- self.progress_tracker = ProgressTracker(len(pending_items))
- self.download_manager = DownloadManager(self.config, self.progress_tracker)
- # Load checkpoint
- start_index = 0
- if self.checkpoint_file.exists():
- checkpoint_data = json.loads(self.checkpoint_file.read_text())
- start_index = checkpoint_data.get('last_processed_index', 0)
- logger.info(f"Resuming from index {start_index}")
- # Process in batches
- pending_items_list = list(pending_items)
- for i in range(start_index, len(pending_items_list), self.config.batch_size):
- if self.should_stop:
- break
- batch = pending_items_list[i:i + self.config.batch_size]
- logger.info(f"Processing batch {i//self.config.batch_size + 1}")
- # Process the entire batch at once
- self.process_batch(batch)
- # Update checkpoint after each batch
- with self.lock:
- self.checkpoint_file.write_text(json.dumps({'last_processed_index': i + len(batch)}))
- logger.info("Sync completed or stopped")
- def load_secrets(secrets_path: str) -> Dict[str, str]:
- """Load secrets from a JSON file."""
- with open(secrets_path, 'r') as f:
- return json.load(f)
- def main():
- # Parse command-line arguments
- parser = argparse.ArgumentParser(description="Sync OneDrive accounts.")
- parser.add_argument("--folder", type=str, required=True, help="Name of the folder (e.g., 'batch1').")
- parser.add_argument("--csv", type=str, required=True, help="Path to the CSV file containing OneDrive ID details.")
- args = parser.parse_args()
- # Load secrets from secrets.json
- secrets = load_secrets("secrets.json")
- tenant_id = secrets.get("tenant_id")
- client_id = secrets.get("client_id")
- client_secret = secrets.get("client_secret")
- if not all([tenant_id, client_id, client_secret]):
- raise ValueError("Missing required secrets in secrets.json.")
- # Create downloads directory
- static_download_path = "Z:/data/"
- dynamic_download_path = args.folder
- download_path = Path(static_download_path) / dynamic_download_path
- download_path.mkdir(parents=True, exist_ok=True)
- config = Config(
- tenant_id=tenant_id,
- client_id=client_id,
- client_secret=client_secret,
- download_path=str(download_path)
- )
- csv_path = args.csv # Use the CSV file path provided via the --csv argument
- syncer = OneDriveSyncer(config, csv_path)
- syncer.run()
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement