Advertisement
Guest User

Download OneDrive site using Python and rclone

a guest
Mar 9th, 2025
38
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 21.69 KB | None | 0 0
  1. import csv
  2. import json
  3. import os
  4. import requests
  5. import time
  6. import threading
  7. import logging
  8. from datetime import datetime, timedelta
  9. from contextlib import contextmanager
  10. from concurrent.futures import ThreadPoolExecutor, as_completed
  11. import subprocess
  12. from pathlib import Path
  13. from typing import Dict, List, Any
  14. from dataclasses import dataclass
  15. import signal
  16. import tempfile
  17. import argparse
  18.  
  19. # Configure logging
  20. logging.basicConfig(
  21. level=logging.INFO,
  22. format='%(asctime)s - %(levelname)s - %(message)s',
  23. handlers=[
  24. logging.FileHandler('onedrive_sync.log'),
  25. logging.StreamHandler()
  26. ]
  27. )
  28. logger = logging.getLogger(__name__)
  29.  
  30. @dataclass
  31. class Config:
  32. tenant_id: str
  33. client_id: str
  34. client_secret: str
  35. download_path: str
  36. batch_size: int = 1000
  37. max_workers: int = 15
  38. max_retries: int = 5
  39. requests_per_minute: int = 4000
  40. token_refresh_minutes: int = 30
  41.  
  42. class TokenManager:
  43. def __init__(self, config: Config):
  44. self.config = config
  45. self.tokens: Dict[str, Dict] = {}
  46. self.lock = threading.Lock()
  47.  
  48. def generate_token(self) -> Dict:
  49. token_url = f"https://login.microsoftonline.com/{self.config.tenant_id}/oauth2/v2.0/token"
  50. token_body = {
  51. "grant_type": "client_credentials",
  52. "scope": "https://graph.microsoft.com/.default",
  53. "client_id": self.config.client_id,
  54. "client_secret": self.config.client_secret
  55. }
  56. response = requests.post(token_url, data=token_body)
  57. token_response = response.json()
  58.  
  59. if 'error' in token_response:
  60. raise Exception(f"Token generation failed: {token_response['error_description']}")
  61.  
  62. access_token = token_response["access_token"]
  63. expires_in = token_response["expires_in"]
  64. expiry_time = (datetime.utcnow() + timedelta(seconds=expires_in)).isoformat() + "Z"
  65. return {"access_token": access_token, "expiry": expiry_time}
  66.  
  67. def is_token_expired(self, token_data: Dict) -> bool:
  68. expiry_time = datetime.fromisoformat(token_data['expiry'].rstrip('Z'))
  69. return datetime.utcnow() + timedelta(minutes=self.config.token_refresh_minutes) >= expiry_time
  70.  
  71. def get_token(self, email: str) -> Dict:
  72. with self.lock:
  73. if email not in self.tokens or self.is_token_expired(self.tokens[email]):
  74. self.tokens[email] = self.generate_token()
  75. return self.tokens[email]
  76.  
  77. class ProgressTracker:
  78. def __init__(self, total_accounts: int):
  79. self.start_time = datetime.now()
  80. self.completed = 0
  81. self.failed = 0
  82. self.total = total_accounts
  83. self.lock = threading.Lock()
  84.  
  85. def update_progress(self, success: bool):
  86. with self.lock:
  87. if success:
  88. self.completed += 1
  89. else:
  90. self.failed += 1
  91.  
  92. elapsed = (datetime.now() - self.start_time).total_seconds() / 3600
  93. completion_rate = self.completed / max(1, elapsed)
  94. estimated_remaining = (self.total - self.completed) / max(1, completion_rate)
  95.  
  96. self.log_progress(elapsed, estimated_remaining)
  97.  
  98. def log_progress(self, elapsed: float, estimated_remaining: float):
  99. logger.info(
  100. f"Progress: {self.completed}/{self.total} ({self.completed/self.total*100:.2f}%)\n"
  101. f"Failed: {self.failed}\n"
  102. f"Elapsed Hours: {elapsed:.2f}\n"
  103. f"Estimated Remaining Hours: {estimated_remaining:.2f}"
  104. )
  105.  
  106. class RcloneManager:
  107. def __init__(self, config: Config):
  108. self.config = config
  109. # Make sure to use the correct path for rclone config
  110. self.config_path = Path(os.getenv('APPDATA')) / 'rclone' / 'rclone.conf'
  111. self.config_path.parent.mkdir(parents=True, exist_ok=True)
  112. if not self.config_path.exists():
  113. self.config_path.touch()
  114.  
  115. def update_config(self, email: str, one_drive_id: str, token: Dict) -> None:
  116. try:
  117. config_content = self.config_path.read_text() if self.config_path.exists() else ""
  118.  
  119. # Normalize email for rclone config name
  120. safe_email = email.replace('@', '_').replace('.', '_').lower()
  121.  
  122. new_config = f"""
  123. [onedrive_auto_{safe_email}]
  124. type = onedrive
  125. client_id = {self.config.client_id}
  126. client_secret = {self.config.client_secret}
  127. auth_url = https://login.microsoftonline.com/{self.config.tenant_id}/oauth2/v2.0/authorize
  128. token_url = https://login.microsoftonline.com/{self.config.tenant_id}/oauth2/v2.0/token
  129. drive_id = {one_drive_id}
  130. drive_type = business
  131. token = {json.dumps(token)}
  132. """
  133. # Remove any existing config for this email
  134. sections = config_content.split('[')
  135. filtered_sections = [s for s in sections if not s.startswith(f'onedrive_auto_{safe_email}')]
  136. config_content = '['.join(filtered_sections)
  137.  
  138. # Add new config
  139. config_content += new_config
  140.  
  141. # Write the updated config
  142. self.config_path.write_text(config_content.strip())
  143. logger.info(f"Updated rclone config for {email}")
  144.  
  145. # Verify the config was written
  146. if not self.config_path.exists():
  147. raise Exception("Config file was not created")
  148.  
  149. # Add debug logging
  150. logger.debug(f"Config file contents for {email}:\n{self.config_path.read_text()}")
  151.  
  152. except Exception as e:
  153. logger.error(f"Error updating rclone config for {email}: {str(e)}")
  154. raise
  155.  
  156. class ThrottleManager:
  157. def __init__(self):
  158. self.last_check_time = time.time()
  159. self.threshold_speed = 1 * 1024 * 1024 # 1 MB/s in bytes
  160. self.check_interval = 10 # Check every 10 seconds
  161. self.speed_history = []
  162. self.history_size = 5
  163. self.throttle_threshold = 0.8
  164. self.cooloff_duration = 100
  165. self.is_throttled = False
  166. self.active_processes = []
  167. self.lock = threading.Lock()
  168.  
  169. def parse_rclone_progress(self, line: str) -> float:
  170. """Parse rclone output to get transferred bytes"""
  171. try:
  172. # Log the raw line for debugging
  173. logger.debug(f"Raw rclone output: {line}")
  174.  
  175. if "Transferred:" in line:
  176. # Split the line and find the speed part
  177. parts = line.strip().split(',')
  178. for part in parts:
  179. part = part.strip()
  180. logger.debug(f"Parsing part: {part}")
  181.  
  182. # Look for various speed formats
  183. if any(unit in part for unit in ['/s', 'B/s', 'iB/s']):
  184. # Extract the numeric value and unit
  185. speed_parts = part.split()
  186. if len(speed_parts) >= 2:
  187. try:
  188. value = float(speed_parts[0])
  189. unit = speed_parts[1]
  190.  
  191. # Convert to bytes based on unit
  192. if 'GB' in unit or 'GiB' in unit:
  193. value *= 1024 * 1024 * 1024
  194. elif 'MB' in unit or 'MiB' in unit:
  195. value *= 1024 * 1024
  196. elif 'KB' in unit or 'KiB' in unit:
  197. value *= 1024
  198.  
  199. logger.debug(f"Parsed speed: {value/1024/1024:.2f} MB/s")
  200. return value
  201. except ValueError:
  202. logger.debug(f"Failed to parse speed value from: {speed_parts}")
  203. continue
  204.  
  205. return 0
  206. except Exception as e:
  207. logger.debug(f"Error parsing rclone progress: {str(e)}")
  208. return 0
  209.  
  210. def check_throttling(self, process) -> bool:
  211. """Check if we're being throttled based on transfer speed"""
  212. current_time = time.time()
  213. if current_time - self.last_check_time >= self.check_interval:
  214. try:
  215. # Read all available output
  216. output_lines = []
  217. while True:
  218. line = process.stdout.readline()
  219. if not line:
  220. break
  221.  
  222. if isinstance(line, bytes):
  223. try:
  224. line = line.decode('utf-8')
  225. except UnicodeDecodeError:
  226. line = line.decode('utf-8', errors='ignore')
  227.  
  228. output_lines.append(line)
  229.  
  230. # Process all lines and get the most recent speed
  231. current_speed = 0
  232. for line in output_lines:
  233. speed = self.parse_rclone_progress(line)
  234. if speed > 0:
  235. current_speed = speed
  236.  
  237. # Only consider non-zero speeds for history
  238. if current_speed > 0:
  239. self.speed_history.append(current_speed)
  240. if len(self.speed_history) > self.history_size:
  241. self.speed_history.pop(0)
  242.  
  243. logger.debug(f"Current transfer speed: {current_speed/1024/1024:.2f} MB/s")
  244. logger.debug(f"Speed history: {[speed/1024/1024 for speed in self.speed_history]}")
  245.  
  246. # Only check for throttling if we have enough history
  247. if len(self.speed_history) >= self.history_size:
  248. avg_speed = sum(self.speed_history) / len(self.speed_history)
  249. if avg_speed < self.threshold_speed:
  250. with self.lock:
  251. self.is_throttled = True
  252. logger.warning(f"Throttling detected! Average speed: {avg_speed/1024/1024:.2f} MB/s")
  253. return True
  254.  
  255. self.last_check_time = current_time
  256. except Exception as e:
  257. logger.debug(f"Error in throttle checking: {str(e)}")
  258. return False
  259. return False
  260.  
  261. def handle_throttling(self):
  262. """Handle throttling by stopping all processes and waiting"""
  263. logger.warning("Starting cooloff period...")
  264.  
  265. # Stop all active processes
  266. with self.lock:
  267. for process in self.active_processes:
  268. try:
  269. process.terminate()
  270. process.wait(timeout=10)
  271. except Exception as e:
  272. logger.error(f"Error terminating process: {e}")
  273. self.active_processes.clear()
  274.  
  275. # Wait for cooloff period
  276. time.sleep(self.cooloff_duration)
  277.  
  278. with self.lock:
  279. self.is_throttled = False
  280. self.consecutive_slow_counts = 0
  281. logger.info("Cooloff period completed, resuming operations")
  282.  
  283. def register_process(self, process):
  284. """Register a new rclone process"""
  285. with self.lock:
  286. self.active_processes.append(process)
  287.  
  288. def unregister_process(self, process):
  289. """Unregister a completed rclone process"""
  290. with self.lock:
  291. if process in self.active_processes:
  292. self.active_processes.remove(process)
  293.  
  294. class DownloadManager:
  295. def __init__(self, config: Config, progress_tracker: ProgressTracker):
  296. self.config = config
  297. self.progress_tracker = progress_tracker
  298. self.token_manager = TokenManager(config)
  299. self.rclone_manager = RcloneManager(config)
  300. self.throttle_manager = ThrottleManager()
  301.  
  302. def download_with_retry(self, email: str, one_drive_id: str) -> bool:
  303. normalized_email = email.replace('@', '_').replace('.', '_').lower()
  304. retry_count = 0
  305.  
  306. while retry_count < self.config.max_retries:
  307. try:
  308. # Check if we're currently throttled
  309. if self.throttle_manager.is_throttled:
  310. self.throttle_manager.handle_throttling()
  311.  
  312. token = self.token_manager.get_token(email)
  313. self.rclone_manager.update_config(email, one_drive_id, token)
  314.  
  315. download_path = Path(self.config.download_path) / normalized_email
  316. if not download_path.exists():
  317. download_path.mkdir(parents=True, exist_ok=True) # Ensure directory exists
  318.  
  319. # Start rclone process with pipe for output monitoring
  320. process = subprocess.Popen([
  321. 'rclone', 'copy',
  322. f'onedrive_auto_{normalized_email}:',
  323. str(download_path),
  324. '--user-agent', 'ISV|rclone.org|rclone/1.68.2',
  325. '--transfers', '20',
  326. '--checkers', '20',
  327. '--tpslimit', '2000',
  328. '--retries', '3',
  329. '--low-level-retries', '10',
  330. '--ignore-size',
  331. '--ignore-checksum',
  332. '--update',
  333. '--stats', '1s', # Update stats every second
  334. '--progress',
  335. '-v' # Verbose output
  336. ], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  337. universal_newlines=False,
  338. bufsize=1,
  339. encoding=None)
  340.  
  341. self.throttle_manager.register_process(process)
  342.  
  343. while process.poll() is None:
  344. if self.throttle_manager.check_throttling(process):
  345. process.terminate()
  346. self.throttle_manager.handle_throttling()
  347. break
  348.  
  349. self.throttle_manager.unregister_process(process)
  350.  
  351. if process.returncode == 0:
  352. logger.info(f"Download completed for {email}")
  353. self.progress_tracker.update_progress(True)
  354. return True
  355. else:
  356. stderr_output = process.stderr.read()
  357. logger.error(f"Download failed for {email}: {stderr_output}")
  358. raise Exception(f"Download failed: {stderr_output}")
  359.  
  360. except Exception as e:
  361. retry_count += 1
  362. delay = 60
  363. logger.error(f"Attempt {retry_count} failed for {email}. Retrying in {delay} seconds. Error: {e}")
  364. time.sleep(delay)
  365.  
  366. logger.error(f"Failed to download for {email} after {self.config.max_retries} attempts")
  367. self.progress_tracker.update_progress(False)
  368. return False
  369.  
  370. class OneDriveSyncer:
  371. def __init__(self, config: Config, csv_path: str):
  372. self.config = config
  373. self.csv_path = csv_path
  374. self.status_file = Path('onedrive_status.json')
  375. self.checkpoint_file = Path('sync_checkpoint.json')
  376. self.download_manager = None
  377. self.should_stop = False
  378. self.lock = threading.Lock() # Add a lock for thread-safe status updates
  379.  
  380. def load_status_data(self) -> Dict:
  381. if self.status_file.exists():
  382. return {item['onedriveid']: item for item in json.loads(self.status_file.read_text())}
  383. return {}
  384.  
  385. def save_status_data(self, status_data: Dict):
  386. # Write to a temporary file first, then rename it atomically
  387. with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(self.status_file), delete=False) as tmp_file:
  388. tmp_file.write(json.dumps(list(status_data.values()), indent=4))
  389. tmp_file_path = tmp_file.name
  390. os.replace(tmp_file_path, self.status_file)
  391.  
  392. def process_csv(self):
  393. status_data = self.load_status_data()
  394. new_entries = []
  395.  
  396. with open(self.csv_path, newline='', encoding='utf-8') as csvfile:
  397. reader = csv.DictReader(csvfile)
  398. for row in reader:
  399. onedriveid = row['onedriveid']
  400. email = row['email']
  401. if onedriveid not in status_data:
  402. status_data[onedriveid] = {
  403. 'onedriveid': onedriveid,
  404. 'email': email,
  405. 'status': 'pending'
  406. }
  407. new_entries.append({'onedriveid': onedriveid, 'email': email, 'status': 'pending'})
  408.  
  409. # Save any new entries to the status file
  410. if new_entries:
  411. current_status = self.load_status_data()
  412. current_status.update({item['onedriveid']: item for item in new_entries})
  413. self.save_status_data(current_status)
  414.  
  415. return status_data
  416.  
  417. def process_batch(self, batch: List[Dict]):
  418. with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
  419. futures_to_items = {}
  420. # Submit all items in the batch at once
  421. for item in batch:
  422. if self.should_stop:
  423. break
  424. if item['status'] == 'pending':
  425. future = executor.submit(
  426. self.download_manager.download_with_retry,
  427. item['email'],
  428. item['onedriveid']
  429. )
  430. futures_to_items[future] = item
  431.  
  432. # Process completed futures
  433. for future in as_completed(futures_to_items.keys()):
  434. item = futures_to_items[future]
  435. try:
  436. success = future.result()
  437. if success:
  438. item['status'] = 'completed'
  439. else:
  440. item['status'] = 'failed'
  441. # Update status file after each item
  442. with self.lock:
  443. self.save_status_data(self.status_data)
  444. except Exception as e:
  445. logger.error(f"Unexpected error in download thread for {item['email']}: {e}")
  446. item['status'] = 'failed'
  447. with self.lock:
  448. self.save_status_data(self.status_data)
  449.  
  450. def signal_handler(self, signum, frame):
  451. logger.info("Received shutdown signal. Completing current batch...")
  452. self.should_stop = True
  453.  
  454. def run(self):
  455. # Register signal handlers
  456. signal.signal(signal.SIGINT, self.signal_handler)
  457. signal.signal(signal.SIGTERM, self.signal_handler)
  458.  
  459. # Initialize status
  460. self.status_data = self.process_csv()
  461. pending_items = [item for item in self.status_data.values() if item['status'] == 'pending']
  462.  
  463. # Initialize progress tracker and download manager
  464. self.progress_tracker = ProgressTracker(len(pending_items))
  465. self.download_manager = DownloadManager(self.config, self.progress_tracker)
  466.  
  467. # Load checkpoint
  468. start_index = 0
  469. if self.checkpoint_file.exists():
  470. checkpoint_data = json.loads(self.checkpoint_file.read_text())
  471. start_index = checkpoint_data.get('last_processed_index', 0)
  472. logger.info(f"Resuming from index {start_index}")
  473.  
  474. # Process in batches
  475. pending_items_list = list(pending_items)
  476. for i in range(start_index, len(pending_items_list), self.config.batch_size):
  477. if self.should_stop:
  478. break
  479.  
  480. batch = pending_items_list[i:i + self.config.batch_size]
  481. logger.info(f"Processing batch {i//self.config.batch_size + 1}")
  482.  
  483. # Process the entire batch at once
  484. self.process_batch(batch)
  485.  
  486. # Update checkpoint after each batch
  487. with self.lock:
  488. self.checkpoint_file.write_text(json.dumps({'last_processed_index': i + len(batch)}))
  489.  
  490. logger.info("Sync completed or stopped")
  491.  
  492. def load_secrets(secrets_path: str) -> Dict[str, str]:
  493. """Load secrets from a JSON file."""
  494. with open(secrets_path, 'r') as f:
  495. return json.load(f)
  496.  
  497. def main():
  498. # Parse command-line arguments
  499. parser = argparse.ArgumentParser(description="Sync OneDrive accounts.")
  500. parser.add_argument("--folder", type=str, required=True, help="Name of the folder (e.g., 'batch1').")
  501. parser.add_argument("--csv", type=str, required=True, help="Path to the CSV file containing OneDrive ID details.")
  502. args = parser.parse_args()
  503.  
  504. # Load secrets from secrets.json
  505. secrets = load_secrets("secrets.json")
  506. tenant_id = secrets.get("tenant_id")
  507. client_id = secrets.get("client_id")
  508. client_secret = secrets.get("client_secret")
  509.  
  510. if not all([tenant_id, client_id, client_secret]):
  511. raise ValueError("Missing required secrets in secrets.json.")
  512.  
  513. # Create downloads directory
  514. static_download_path = "Z:/data/"
  515. dynamic_download_path = args.folder
  516. download_path = Path(static_download_path) / dynamic_download_path
  517. download_path.mkdir(parents=True, exist_ok=True)
  518.  
  519. config = Config(
  520. tenant_id=tenant_id,
  521. client_id=client_id,
  522. client_secret=client_secret,
  523. download_path=str(download_path)
  524. )
  525.  
  526. csv_path = args.csv # Use the CSV file path provided via the --csv argument
  527. syncer = OneDriveSyncer(config, csv_path)
  528. syncer.run()
  529.  
  530. if __name__ == "__main__":
  531. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement