Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import json
- import time
- import base64
- import email
- import vobject
- import pickle
- import mimetypes
- import ics
- from pathlib import Path
- from datetime import datetime, timedelta
- from googleapiclient.discovery import build
- from googleapiclient.http import MediaFileUpload
- from google_auth_oauthlib.flow import InstalledAppFlow
- from google.auth.transport.requests import Request
- from tqdm import tqdm
- class GoogleTakeoutImporter:
- """A class to import Google Takeout data back into Google services."""
- # Define the scopes needed for different services
- SCOPES = [
- 'https://www.googleapis.com/auth/drive',
- 'https://www.googleapis.com/auth/gmail.modify',
- 'https://www.googleapis.com/auth/calendar',
- 'https://www.googleapis.com/auth/contacts'
- ]
- def __init__(self, client_secret_file, takeout_dir):
- """Initialize the importer with client secret and takeout directory."""
- self.client_secret_file = client_secret_file
- self.takeout_dir = Path(takeout_dir)
- self.credentials = self._get_credentials()
- # Initialize services as None, to be created on demand
- self.drive_service = None
- self.gmail_service = None
- self.calendar_service = None
- self.contacts_service = None
- def _get_credentials(self):
- """Get and refresh OAuth 2.0 credentials."""
- creds = None
- token_file = 'token.pickle'
- # Check if we have saved credentials
- if os.path.exists(token_file):
- with open(token_file, 'rb') as token:
- creds = pickle.load(token)
- # If credentials don't exist or are invalid, get new ones
- if not creds or not creds.valid:
- if creds and creds.expired and creds.refresh_token:
- creds.refresh(Request())
- else:
- flow = InstalledAppFlow.from_client_secrets_file(
- self.client_secret_file, self.SCOPES)
- creds = flow.run_local_server(port=0)
- # Save credentials for next run
- with open(token_file, 'wb') as token:
- pickle.dump(creds, token)
- return creds
- def _get_drive_service(self):
- """Get or create the Drive API service."""
- if not self.drive_service:
- self.drive_service = build('drive', 'v3', credentials=self.credentials)
- return self.drive_service
- def _get_gmail_service(self):
- """Get or create the Gmail API service."""
- if not self.gmail_service:
- self.gmail_service = build('gmail', 'v1', credentials=self.credentials)
- return self.gmail_service
- def _get_calendar_service(self):
- """Get or create the Calendar API service."""
- if not self.calendar_service:
- self.calendar_service = build('calendar', 'v3', credentials=self.credentials)
- return self.calendar_service
- def _get_contacts_service(self):
- """Get or create the People (Contacts) API service."""
- if not self.contacts_service:
- self.contacts_service = build('people', 'v1', credentials=self.credentials)
- return self.contacts_service
- def _exponential_backoff(self, attempt, max_attempts=5, initial_delay=1):
- """Implement exponential backoff for API rate limits."""
- if attempt >= max_attempts:
- raise Exception(f"Maximum retry attempts ({max_attempts}) exceeded")
- delay = initial_delay * (2 ** attempt)
- time.sleep(delay)
- def import_drive_files(self, folder_path=None):
- """Import files from the Takeout Drive folder to Google Drive."""
- if folder_path:
- drive_dir = Path(folder_path)
- else:
- drive_dir = self.takeout_dir / "Drive"
- if not drive_dir.exists():
- print(f"Drive folder not found at {drive_dir}")
- return
- service = self._get_drive_service()
- # Create a mapping of folder paths to folder IDs for organizing files
- folder_mapping = {'root': 'root'}
- # Function to recursively upload files and folders
- def upload_folder(folder_path, parent_id='root'):
- print(f"Processing folder: {folder_path}")
- # First, create all the folders
- for item in folder_path.iterdir():
- if item.is_dir():
- print(f"Creating folder: {item.name}")
- # Create the folder in Drive
- folder_metadata = {
- 'name': item.name,
- 'mimeType': 'application/vnd.google-apps.folder',
- 'parents': [parent_id]
- }
- for attempt in range(5):
- try:
- folder = service.files().create(
- body=folder_metadata,
- fields='id'
- ).execute()
- folder_id = folder.get('id')
- folder_mapping[str(item)] = folder_id
- # Recursively upload the folder's contents
- upload_folder(item, folder_id)
- break
- except Exception as e:
- print(f"Error creating folder {item.name}, attempt {attempt+1}: {e}")
- self._exponential_backoff(attempt)
- # Then upload all files in the current folder
- for item in folder_path.iterdir():
- if item.is_file():
- print(f"Uploading file: {item.name}")
- mimetype = mimetypes.guess_type(item)[0]
- if not mimetype:
- mimetype = 'application/octet-stream'
- file_metadata = {
- 'name': item.name,
- 'parents': [parent_id]
- }
- media = MediaFileUpload(
- str(item),
- mimetype=mimetype,
- resumable=True
- )
- for attempt in range(5):
- try:
- service.files().create(
- body=file_metadata,
- media_body=media,
- fields='id'
- ).execute()
- break
- except Exception as e:
- print(f"Error uploading file {item.name}, attempt {attempt+1}: {e}")
- self._exponential_backoff(attempt)
- # Start the upload process
- upload_folder(drive_dir)
- print("Drive file import completed!")
- def import_gmail_messages(self, mbox_path=None):
- """Import emails from the Takeout Mail folder to Gmail."""
- if mbox_path:
- mail_path = Path(mbox_path)
- else:
- mail_path = self.takeout_dir / "Mail"
- if not mail_path.exists():
- print(f"Mail folder not found at {mail_path}")
- return
- service = self._get_gmail_service()
- # Find all .mbox files in the Mail directory
- mbox_files = list(mail_path.glob("**/*.mbox"))
- if not mbox_files:
- print("No .mbox files found")
- return
- for mbox_file in mbox_files:
- print(f"Processing mail file: {mbox_file}")
- # Parse the .mbox file (simplified approach)
- with open(mbox_file, 'r', encoding='utf-8', errors='ignore') as f:
- content = f.read()
- # Split by 'From ' header which typically separates messages in mbox format
- raw_messages = content.split('\nFrom ')
- if raw_messages[0].startswith('From '):
- raw_messages[0] = raw_messages[0][5:] # Remove the leading 'From ' from the first message
- else:
- raw_messages = raw_messages[1:] # Skip the first split if it doesn't start with 'From '
- print(f"Found {len(raw_messages)} messages")
- for idx, raw_message in enumerate(raw_messages):
- if not raw_message.strip():
- continue
- # Add back the 'From ' prefix except for the first message
- if idx > 0:
- raw_message = 'From ' + raw_message
- try:
- # Parse the message
- msg = email.message_from_string(raw_message)
- # Convert to Gmail API format
- encoded_message = base64.urlsafe_b64encode(raw_message.encode()).decode()
- body = {
- 'raw': encoded_message
- }
- # Import to Gmail
- for attempt in range(5):
- try:
- service.users().messages().import_(
- userId='me',
- body=body
- ).execute()
- if (idx + 1) % 10 == 0:
- print(f"Imported {idx + 1} messages...")
- break
- except Exception as e:
- print(f"Error importing message {idx}, attempt {attempt+1}: {e}")
- self._exponential_backoff(attempt)
- except Exception as e:
- print(f"Error processing message {idx}: {e}")
- print("Gmail message import completed!")
- def import_calendar_events(self, calendar_dir=None):
- """Import calendar events from the Takeout Calendar folder."""
- if calendar_dir:
- calendar_path = Path(calendar_dir)
- else:
- calendar_path = self.takeout_dir / "Calendar"
- if not calendar_path.exists():
- print(f"Calendar folder not found at {calendar_path}")
- return
- service = self._get_calendar_service()
- # Find all .ics files
- ics_files = list(calendar_path.glob("**/*.ics"))
- if not ics_files:
- print("No .ics files found")
- return
- print(f"Found {len(ics_files)} calendar files")
- # Get list of calendars to choose where to import
- calendars = {}
- try:
- calendar_list = service.calendarList().list().execute()
- for calendar_item in calendar_list.get('items', []):
- calendar_id = calendar_item['id']
- calendar_summary = calendar_item['summary']
- calendars[calendar_summary] = calendar_id
- print(f"- {calendar_summary}")
- except Exception as e:
- print(f"Error fetching calendars: {e}")
- return
- # Ask user which calendar to use
- print("\nPlease choose a calendar to import events into:")
- for i, calendar_name in enumerate(calendars.keys(), 1):
- print(f"{i}. {calendar_name}")
- choice = input("\nEnter calendar number (or press Enter for primary): ")
- if choice.strip():
- try:
- calendar_name = list(calendars.keys())[int(choice) - 1]
- calendar_id = calendars[calendar_name]
- print(f"Selected calendar: {calendar_name}")
- except (IndexError, ValueError):
- print("Invalid choice, using primary calendar")
- calendar_id = 'primary'
- else:
- print("Using primary calendar")
- calendar_id = 'primary'
- # Process each calendar file
- total_events = 0
- imported_events = 0
- for ics_file in ics_files:
- print(f"Processing calendar file: {ics_file}")
- try:
- with open(ics_file, 'r', encoding='utf-8', errors='ignore') as f:
- ics_content = f.read()
- # Parse the calendar file
- calendar = ics.Calendar(ics_content)
- # Process each event
- for event in calendar.events:
- total_events += 1
- try:
- # Convert ICS event to Google Calendar format
- google_event = self._convert_ics_to_google_event(event)
- # Import to Calendar
- for attempt in range(5):
- try:
- service.events().insert(
- calendarId=calendar_id,
- body=google_event
- ).execute()
- imported_events += 1
- if imported_events % 10 == 0:
- print(f"Imported {imported_events} events...")
- break
- except Exception as e:
- print(f"Error importing event, attempt {attempt+1}: {e}")
- self._exponential_backoff(attempt)
- except Exception as e:
- print(f"Error processing event: {e}")
- except Exception as e:
- print(f"Error reading calendar file {ics_file}: {e}")
- print(f"Calendar import completed! Imported {imported_events} out of {total_events} events.")
- def _convert_ics_to_google_event(self, ics_event):
- """Convert an ICS event to Google Calendar API format."""
- event = {}
- # Basic event details
- if ics_event.name:
- event['summary'] = ics_event.name
- if ics_event.description:
- event['description'] = ics_event.description
- if ics_event.location:
- event['location'] = ics_event.location
- # Handle start and end times
- if ics_event.begin:
- start_time = ics_event.begin.datetime
- # Check if it's an all-day event
- if start_time.hour == 0 and start_time.minute == 0 and start_time.second == 0:
- # All day events need date only
- event['start'] = {'date': start_time.strftime('%Y-%m-%d')}
- # For all-day events, end date is exclusive in Google Calendar
- end_time = ics_event.end.datetime if ics_event.end else (start_time + timedelta(days=1))
- event['end'] = {'date': end_time.strftime('%Y-%m-%d')}
- else:
- # Regular events need datetime with timezone
- event['start'] = {
- 'dateTime': start_time.isoformat(),
- 'timeZone': 'UTC' # Default to UTC if no timezone info
- }
- end_time = ics_event.end.datetime if ics_event.end else (start_time + timedelta(hours=1))
- event['end'] = {
- 'dateTime': end_time.isoformat(),
- 'timeZone': 'UTC'
- }
- # Handle recurrence
- if hasattr(ics_event, 'rrule') and ics_event.rrule:
- event['recurrence'] = [f"RRULE:{ics_event.rrule}"]
- # Handle attendees (if available)
- if hasattr(ics_event, 'attendee') and ics_event.attendee:
- attendees = []
- for attendee in ics_event.attendee:
- # Extract email from "mailto:[email protected]"
- if attendee.startswith('mailto:'):
- email = attendee[7:]
- attendees.append({'email': email})
- if attendees:
- event['attendees'] = attendees
- # Handle reminders (if available)
- if hasattr(ics_event, 'alarms') and ics_event.alarms:
- reminders = {
- 'useDefault': False,
- 'overrides': []
- }
- for alarm in ics_event.alarms:
- if hasattr(alarm, 'trigger') and alarm.trigger:
- # Convert the trigger time to minutes before event
- try:
- # This is a simplification - proper parsing would be more complex
- if isinstance(alarm.trigger, timedelta):
- minutes = int(alarm.trigger.total_seconds() / -60)
- if minutes > 0:
- reminders['overrides'].append({
- 'method': 'popup',
- 'minutes': minutes
- })
- except Exception:
- pass
- if reminders['overrides']:
- event['reminders'] = reminders
- return event
- def import_contacts(self, contacts_dir=None):
- """Import contacts from the Takeout Contacts folder."""
- if contacts_dir:
- contacts_path = Path(contacts_dir)
- else:
- contacts_path = self.takeout_dir / "Contacts"
- if not contacts_path.exists():
- print(f"Contacts folder not found at {contacts_path}")
- return
- service = self._get_contacts_service()
- # Find all .vcf files
- vcf_files = list(contacts_path.glob("**/*.vcf"))
- if not vcf_files:
- print("No .vcf files found")
- return
- print(f"Found {len(vcf_files)} contact files")
- for vcf_file in vcf_files:
- print(f"Processing contact file: {vcf_file}")
- try:
- with open(vcf_file, 'r', encoding='utf-8', errors='ignore') as f:
- vcf_content = f.read()
- # Parse all vCards in the file
- for vcard in vobject.readComponents(vcf_content):
- try:
- # Convert vCard to People API format
- person = self._convert_vcard_to_person(vcard)
- # Create contact
- for attempt in range(5):
- try:
- service.people().createContact(
- body=person
- ).execute()
- break
- except Exception as e:
- print(f"Error creating contact, attempt {attempt+1}: {e}")
- self._exponential_backoff(attempt)
- except Exception as e:
- print(f"Error processing contact: {e}")
- except Exception as e:
- print(f"Error reading contact file {vcf_file}: {e}")
- print("Contacts import completed!")
- def _convert_vcard_to_person(self, vcard):
- """Convert a vCard object to People API person format."""
- person = {}
- # Names
- if hasattr(vcard, 'n'):
- names = [{
- 'givenName': vcard.n.value.given or '',
- 'familyName': vcard.n.value.family or '',
- 'middleName': vcard.n.value.additional or '',
- 'honorificPrefix': vcard.n.value.prefix or '',
- 'honorificSuffix': vcard.n.value.suffix or ''
- }]
- # If there's a formatted name (FN), use it
- if hasattr(vcard, 'fn'):
- names[0]['displayName'] = vcard.fn.value
- person['names'] = names
- # Phone numbers
- if hasattr(vcard, 'tel') and vcard.tel:
- phones = []
- for tel in vcard.tel_list:
- phone_type = 'other'
- if hasattr(tel, 'type_param') and tel.type_param:
- if 'cell' in tel.type_param or 'mobile' in tel.type_param:
- phone_type = 'mobile'
- elif 'work' in tel.type_param:
- phone_type = 'work'
- elif 'home' in tel.type_param:
- phone_type = 'home'
- phones.append({
- 'value': tel.value,
- 'type': phone_type
- })
- if phones:
- person['phoneNumbers'] = phones
- # Email addresses
- if hasattr(vcard, 'email') and vcard.email:
- emails = []
- for email in vcard.email_list:
- email_type = 'other'
- if hasattr(email, 'type_param') and email.type_param:
- if 'work' in email.type_param:
- email_type = 'work'
- elif 'home' in email.type_param:
- email_type = 'home'
- emails.append({
- 'value': email.value,
- 'type': email_type
- })
- if emails:
- person['emailAddresses'] = emails
- # Addresses
- if hasattr(vcard, 'adr') and vcard.adr:
- addresses = []
- for adr in vcard.adr_list:
- address_type = 'other'
- if hasattr(adr, 'type_param') and adr.type_param:
- if 'work' in adr.type_param:
- address_type = 'work'
- elif 'home' in adr.type_param:
- address_type = 'home'
- addresses.append({
- 'type': address_type,
- 'streetAddress': adr.value.street,
- 'city': adr.value.city,
- 'region': adr.value.region,
- 'postalCode': adr.value.code,
- 'country': adr.value.country
- })
- if addresses:
- person['addresses'] = addresses
- # Organizations
- if hasattr(vcard, 'org'):
- orgs = [{
- 'name': vcard.org.value[0] if vcard.org.value else '',
- 'title': vcard.title.value if hasattr(vcard, 'title') else ''
- }]
- person['organizations'] = orgs
- return person
- def main():
- """Main function to demonstrate usage of the GoogleTakeoutImporter."""
- # Get input from user
- client_secret_file = input("Enter the path to your client_secret.json file: ")
- takeout_dir = input("Enter the path to your extracted Takeout directory: ")
- if not os.path.exists(client_secret_file):
- print(f"Error: client_secret.json not found at {client_secret_file}")
- return
- if not os.path.exists(takeout_dir):
- print(f"Error: Takeout directory not found at {takeout_dir}")
- return
- # Initialize the importer
- importer = GoogleTakeoutImporter(client_secret_file, takeout_dir)
- # Ask which services to import
- print("\nWhich data would you like to import?")
- print("1. Google Drive files")
- print("2. Gmail messages")
- print("3. Google Calendar events")
- print("4. Contacts")
- print("5. All of the above")
- choice = input("\nEnter your choice (1-5): ")
- if choice == '1' or choice == '5':
- importer.import_drive_files()
- if choice == '2' or choice == '5':
- importer.import_gmail_messages()
- if choice == '3' or choice == '5':
- importer.import_calendar_events()
- if choice == '4' or choice == '5':
- importer.import_contacts()
- print("\nImport process completed!")
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment