Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- migration_full.py (FIXED VERSION)
- Key fixes:
- - Uses DB-rebuilt physician map instead of local map
- - Optimized batch_create to fetch schema once per batch (not per doc)
- - Skips rows with invalid bookingDate
- - Logs patient errors
- - Increased booked_tests concurrency after optimization
- """
- import time
- import random
- import json
- import sys
- import re
- import logging
- from datetime import datetime, timezone, timedelta
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from typing import Dict, List, Any, Tuple
- from collections import defaultdict
- import requests
- from requests.adapters import HTTPAdapter, Retry
- # Appwrite SDK
- from appwrite.client import Client
- from appwrite.services.databases import Databases
- from appwrite.services.users import Users
- from appwrite.id import ID
- from appwrite.query import Query
- # Google Sheets
- import gspread
- from oauth2client.service_account import ServiceAccountCredentials
- # robust date parsing (optional)
- try:
- from dateutil import parser as dateutil_parser
- except Exception:
- dateutil_parser = None
- import secrets
- # ---------- Config ----------
- APPWRITE_ENDPOINT = getattr(secrets, "APPWRITE_ENDPOINT", None)
- PROJECT = getattr(secrets, "APPWRITE_PROJECT_ID", None)
- API_KEY = getattr(secrets, "APPWRITE_API_KEY", None)
- DATABASE_ID = getattr(secrets, "DATABASE_ID", None)
- GS_CREDS = getattr(secrets, "GS_CREDS", None) or getattr(secrets, "GSHEETS_CREDENTIALS_JSON", None)
- SPREADSHEET_ID = getattr(secrets, "SPREADSHEET_ID", None)
- COLLECTION_HINTS = getattr(secrets, "COLLECTION_HINTS", {})
- LOGICAL_COLLECTIONS = [
- "users", "clinics", "physicians", "patients", "bookings",
- "booked_tests", "payments_ledger", "expenses", "internal_transfers"
- ]
- # concurrency tuning
- CONCURRENCY = 16
- PER_COLLECTION_CONCURRENCY = {
- "patients": 4, # reduced further for localhost socket limits
- "bookings": 6,
- "booked_tests": 12,
- "payments_ledger": 6,
- "users": 4,
- "physicians": 6,
- "clinics": 6,
- "expenses": 4,
- "internal_transfers": 4,
- }
- RETRY_MAX = 4
- RETRY_BACKOFF_BASE = 0.6 # increased backoff for socket recovery
- ERROR_SHOW_LIMIT = 6
- BATCH_SIZE_DEFAULT = 200
- BATCH_JITTER_SLEEP = 0.05 # increased sleep between batches
- # ---------- Logging ----------
- logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
- logger = logging.getLogger("migration")
- # ---------- HTTP session ----------
- session = requests.Session()
- session.mount("https://", HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.5)))
- session.mount("http://", HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.5)))
- session.headers.update({
- "X-Appwrite-Project": PROJECT,
- "X-Appwrite-Key": API_KEY,
- "Content-Type": "application/json",
- })
- # ---------- Appwrite SDK ----------
- client = Client()
- client.set_endpoint(APPWRITE_ENDPOINT).set_project(PROJECT).set_key(API_KEY)
- db = Databases(client)
- users_srv = Users(client)
- # ---------- sanitizer cache ----------
- COLLECTION_ATTRS_CACHE = {}
- SANITIZED_COUNTERS = defaultdict(int)
- SANITIZED_REMOVED_SAMPLE = defaultdict(list)
- # ---------- Helpers ----------
- def id_unique():
- try:
- return ID.unique()
- except Exception:
- return f"{int(time.time()*1000)}{random.randint(1000,9999)}"
- def open_sheet():
- scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
- if isinstance(GS_CREDS, dict):
- gc = gspread.service_account_from_dict(GS_CREDS)
- else:
- gc = gspread.service_account(filename=GS_CREDS)
- return gc.open_by_key(SPREADSHEET_ID)
- def appwrite_collections_list():
- url = f"{APPWRITE_ENDPOINT}/databases/{DATABASE_ID}/collections"
- r = session.get(url)
- if r.status_code != 200:
- raise RuntimeError(f"Failed to fetch collections via REST: {r.status_code} {r.text}")
- return r.json()
- def build_collection_map() -> Dict[str, str]:
- logger.info("-> Auto-mapping logical collections to real collection IDs...")
- data = appwrite_collections_list()
- collections = []
- if isinstance(data, dict) and "collections" in data:
- collections = data["collections"]
- elif isinstance(data, list):
- collections = data
- else:
- for v in data.values():
- if isinstance(v, list):
- collections = v
- break
- by_id = {}
- by_name = {}
- for c in collections:
- cid = c.get("$id") or c.get("collectionId")
- name = (c.get("name") or "").strip().lower()
- if cid:
- by_id[cid] = c
- if name:
- by_name[name] = c
- mapping = {}
- # hints first
- for logical in LOGICAL_COLLECTIONS:
- hint = COLLECTION_HINTS.get(logical)
- if hint and hint in by_id:
- mapping[logical] = hint
- for logical in LOGICAL_COLLECTIONS:
- if mapping.get(logical):
- continue
- if logical in by_id:
- mapping[logical] = logical
- continue
- if logical.lower() in by_name:
- mapping[logical] = by_name[logical.lower()].get("$id") or by_name[logical.lower()].get("collectionId")
- continue
- # contains
- for nm, c in by_name.items():
- if logical.lower() in nm:
- mapping[logical] = c.get("$id") or c.get("collectionId")
- break
- logger.info("-> Collection mapping:")
- for k in LOGICAL_COLLECTIONS:
- logger.info(" %s -> %s", k.ljust(15), mapping.get(k))
- missing = [k for k in LOGICAL_COLLECTIONS if not mapping.get(k)]
- if missing:
- raise RuntimeError(f"Unmapped collections: {missing}. Add to COLLECTION_HINTS if they exist.")
- return mapping
- def get_collection_attributes(collection_id: str):
- if collection_id in COLLECTION_ATTRS_CACHE:
- return COLLECTION_ATTRS_CACHE[collection_id]
- try:
- url = f"{APPWRITE_ENDPOINT}/databases/{DATABASE_ID}/collections/{collection_id}/attributes"
- r = session.get(url)
- if r.status_code != 200:
- logger.debug("get_collection_attributes non-200 %s for %s", r.status_code, collection_id)
- COLLECTION_ATTRS_CACHE[collection_id] = None
- return None
- body = r.json()
- attrs = []
- if isinstance(body, dict) and "attributes" in body:
- attrs = body["attributes"]
- elif isinstance(body, list):
- attrs = body
- else:
- for v in body.values():
- if isinstance(v, list):
- attrs = v
- break
- keys = set()
- for a in attrs:
- if isinstance(a, dict):
- k = a.get("key")
- if k:
- keys.add(k)
- COLLECTION_ATTRS_CACHE[collection_id] = keys
- return keys
- except Exception as e:
- logger.debug("get_collection_attributes exception for %s: %s", collection_id, e)
- COLLECTION_ATTRS_CACHE[collection_id] = None
- return None
- def create_doc_with_retry(collection_id: str, doc_id: str, data: Dict[str, Any]) -> Tuple[bool, str]:
- last_msg = ""
- for attempt in range(RETRY_MAX):
- try:
- res = db.create_document(DATABASE_ID, collection_id, doc_id, data)
- try:
- created_id = getattr(res, "$id", None) or getattr(res, "id", None) or doc_id
- except Exception:
- created_id = doc_id
- return True, str(created_id)
- except Exception as e:
- last_msg = str(e)
- sleep_time = (RETRY_BACKOFF_BASE * (2 ** attempt)) + random.random() * 0.2
- time.sleep(sleep_time)
- return False, last_msg
- def batch_create(collection_id: str, docs: List[Dict[str, Any]], src_map: Dict[str, Tuple[int, str]] = None, concurrency: int = None) -> Tuple[int, List[str]]:
- """
- Create docs concurrently. src_map maps doc_id -> (sheet_row_index, sNo) for enriched errors.
- Returns (success_count, enriched_error_list)
- """
- if concurrency is None:
- concurrency = PER_COLLECTION_CONCURRENCY.get(collection_id, CONCURRENCY)
- total = len(docs)
- if total == 0:
- return 0, []
- # Fetch schema ONCE for entire batch
- allowed_attrs = get_collection_attributes(collection_id)
- # Sanitize all documents in bulk
- tasks = []
- removed_keys_sample = []
- sanitized_count = 0
- start_time = time.time()
- if allowed_attrs is None:
- tasks = [{"$id": d["$id"], "data": dict(d.get("data", {}))} for d in docs]
- else:
- for d in docs:
- docid = d["$id"]
- data = d.get("data", {})
- cleaned = {k: v for k, v in data.items() if k in allowed_attrs}
- if len(cleaned) < len(data):
- sanitized_count += 1
- removed = [k for k in data if k not in allowed_attrs]
- for r in removed:
- if r not in removed_keys_sample and len(removed_keys_sample) < 6:
- removed_keys_sample.append(r)
- tasks.append({"$id": docid, "data": cleaned})
- # Log sanitization summary
- if sanitized_count > 0:
- SANITIZED_COUNTERS[collection_id] += sanitized_count
- ex = SANITIZED_REMOVED_SAMPLE.get(collection_id, [])
- for key in removed_keys_sample:
- if key not in ex and len(ex) < 6:
- ex.append(key)
- SANITIZED_REMOVED_SAMPLE[collection_id] = ex
- logger.info("Sanitized %d/%d docs for %s in %.2f seconds", sanitized_count, total, collection_id, time.time() - start_time)
- successes = 0
- errors = []
- with ThreadPoolExecutor(max_workers=concurrency) as ex:
- futures = {ex.submit(create_doc_with_retry, collection_id, t["$id"], t["data"]): t for t in tasks}
- for i, fut in enumerate(as_completed(futures), 1):
- t = futures[fut]
- try:
- ok, msg = fut.result()
- except Exception as e:
- ok, msg = False, str(e)
- if ok:
- successes += 1
- else:
- # Enrich with source row if available
- src_ctx = ""
- if src_map and t["$id"] in src_map:
- row_idx, sNo, sample = src_map[t["$id"]]
- src_ctx = f" [sheet_row_index={row_idx} sNo={sNo} sample={sample}]"
- if len(errors) < 300:
- errors.append(f"doc {t['$id']} -> {msg}{src_ctx}")
- # Log progress every 100 documents
- if i % 100 == 0:
- logger.info("Processed %d/%d documents for %s (success: %d, errors: %d)", i, total, collection_id, successes, len(errors))
- time.sleep(BATCH_JITTER_SLEEP + random.random() * BATCH_JITTER_SLEEP)
- logger.info("Completed batch for %s: %d/%d in %.2f seconds", collection_id, successes, total, time.time() - start_time)
- return successes, errors
- # ---------- helpers for parsing ----------
- def parse_date(date_str: str, time_str: str = ""):
- if not date_str and not time_str:
- return None
- ds = (date_str or "").strip()
- ts = (time_str or "").strip()
- # try dateutil
- if dateutil_parser:
- try:
- combined = ds
- if ts:
- combined = combined + " " + ts
- dt = dateutil_parser.parse(combined, dayfirst=True, default=datetime(2000,1,1))
- if dt.year < 1000:
- dt = dt.replace(year=2000)
- return dt.replace(microsecond=0).isoformat() + "Z"
- except Exception:
- pass
- # try common formats
- formats = [
- "%d/%m/%Y", "%d-%m-%Y", "%Y-%m-%d", "%d-%b-%y", "%d-%b-%Y",
- "%m/%d/%Y", "%b %d, %Y", "%d %b %Y"
- ]
- for fmt in formats:
- try:
- d = datetime.strptime(ds, fmt)
- if ts:
- if ":" in ts:
- parts = ts.split(":")
- h = int(parts[0]); m = int(parts[1]) if len(parts) > 1 else 0; s = int(parts[2]) if len(parts) > 2 else 0
- d = d.replace(hour=h, minute=m, second=s)
- return d.replace(microsecond=0).isoformat() + "Z"
- except Exception:
- continue
- # fallback: if ds is iso-like
- try:
- d = datetime.fromisoformat(ds)
- if ts:
- if ":" in ts:
- parts = ts.split(":")
- h = int(parts[0]); m = int(parts[1]) if len(parts) > 1 else 0; s = int(parts[2]) if len(parts) > 2 else 0
- d = d.replace(hour=h, minute=m, second=s)
- return d.replace(microsecond=0).isoformat() + "Z"
- except Exception:
- return None
- def parse_float_safe(val):
- if val is None:
- return 0.0
- s = str(val).strip()
- if s == "":
- return 0.0
- s_clean = re.sub(r'[^0-9.\-]', '', s)
- if s_clean in ("", ".", "-"):
- return 0.0
- try:
- return float(s_clean)
- except Exception:
- return 0.0
- # header helpers
- def header_index_map(rows: List[List[str]]):
- if not rows or len(rows) == 0:
- return {}
- hdr = [ (h or "").strip().lower() for h in rows[0] ]
- mapping = {}
- for i, h in enumerate(hdr):
- if not h:
- continue
- mapping[h] = i
- return mapping
- def find_index_by_keywords(mapping: Dict[str,int], keywords: List[str]):
- if not mapping:
- return None
- for kw in keywords:
- for h, idx in mapping.items():
- if kw in h:
- return idx
- return None
- # ---------- Prepare docs (with source mapping) ----------
- def prepare_users_docs():
- users_list = [
- {'name': 'AFREEN', 'role': 'receptionist', 'isActive': False},
- {'name': 'ALIZA', 'role': 'radiology_technician', 'isActive': True},
- {'name': 'AMREEN', 'role': 'receptionist', 'isActive': False},
- {'name': 'DR. DANISH HUSHAIN', 'role': 'doctor', 'isActive': True},
- {'name': 'DR. PULKIT MARU', 'role': 'doctor', 'isActive': True},
- {'name': 'FAIYAZ', 'role': 'supervisor', 'isActive': False},
- {'name': 'FATIMA', 'role': 'nursing_assistant', 'isActive': True},
- {'name': 'FIZA', 'role': 'receptionist', 'isActive': True},
- {'name': 'HUZRA', 'role': 'nursing_assistant', 'isActive': False},
- {'name': 'LAIBA', 'role': 'receptionist', 'isActive': True},
- {'name': 'MANZAR', 'role': 'supervisor', 'isActive': True},
- {'name': 'MUSKAN', 'role': 'receptionist', 'isActive': False},
- {'name': 'OTHER', 'role': 'system', 'isActive': False},
- {'name': 'RAHILA', 'role': 'receptionist', 'isActive': False},
- {'name': 'SANJU', 'role': 'nursing_assistant', 'isActive': True},
- {'name': 'SHILPI', 'role': 'typist', 'isActive': False},
- {'name': 'TAHURA', 'role': 'receptionist', 'isActive': False},
- {'name': 'ZEBA ZARIN', 'role': 'receptionist', 'isActive': False},
- ]
- docs = []
- for u in users_list:
- doc_id = id_unique()
- data = {
- "name": u['name'],
- "role": u['role'],
- "isActive": u['isActive'],
- "createdAt": datetime.now(timezone.utc).replace(microsecond=0).isoformat()
- }
- docs.append({"$id": doc_id, "data": data})
- return docs
- def prepare_clinics_physicians_docs(refphy_rows):
- clinic_docs = []
- clinic_map = {}
- if not refphy_rows:
- return clinic_docs, [], {}
- def norm(s): return s.strip().lower() if s else ""
- for idx, row in enumerate(refphy_rows[1:], start=2):
- if len(row) < 5:
- continue
- cn = (row[4] or "").strip()
- if not cn:
- continue
- key = norm(cn)
- if key in clinic_map:
- continue
- cid = id_unique()
- clinic_map[key] = cid
- data = {"name": cn, "address": row[7].strip() if len(row) > 7 else "", "isActive": True}
- clinic_docs.append({"$id": cid, "data": data})
- phys_docs = []
- for row in refphy_rows[1:]:
- if len(row) < 16:
- continue
- full = (row[1] or "").strip()
- doccode = (row[15] or "").strip() # Column P (index 15)
- if not doccode:
- continue
- pid = id_unique()
- clinic_name = row[4].strip() if len(row) > 4 else ""
- clinic_id = clinic_map.get(norm(clinic_name))
- data = {
- "title": row[0].strip() if row else "",
- "fullName": full,
- "specialty": row[3].strip() if len(row) > 3 else "",
- "contact": row[5].strip() if len(row) > 5 else "",
- "isActive": True,
- "clinics": [clinic_id] if clinic_id else [],
- "registrationNumber": row[13].strip() if len(row) > 13 else "",
- "DocCode": doccode # ADD THIS - Column P
- }
- phys_docs.append({"$id": pid, "data": data})
- return clinic_docs, phys_docs
- def prepare_patients_docs(patient_rows):
- docs = []
- patient_map = {}
- if not patient_rows:
- return docs, patient_map
- for idx, row in enumerate(patient_rows[1:], start=2):
- if not row or not (len(row) > 0 and (row[0] or "").strip()):
- continue
- doc_id = id_unique()
- pid = (row[0] or "").strip()
- patient_map[pid] = doc_id
- data = {
- "patientId": pid,
- "pCarePatientId": row[14].strip() if len(row) > 14 else "",
- "firstName": row[11].strip() if len(row) > 11 and row[11].strip() else "Unknown",
- "middleName": row[12].strip() if len(row) > 12 else "",
- "lastName": row[13].strip() if len(row) > 13 else "",
- "sex": "Male" if (len(row) > 2 and row[2].strip().upper() == "M") else "Female",
- "age": int(row[3].strip()) if len(row) > 3 and row[3].strip().isdigit() else None,
- "ageUnit": row[4].strip() if len(row) > 4 else "",
- "dob": parse_date(row[5].strip() if len(row) > 5 else "", ""),
- "mobileNo": row[6].strip() if len(row) > 6 else "",
- "altMobileNo": "",
- "houseNo": row[7].strip() if len(row) > 7 else "",
- "streetAddress": row[18].strip() if len(row) > 18 else (row[7].strip() if len(row) > 7 else ""),
- "city": row[15].strip() if len(row) > 15 else "",
- "state": row[16].strip() if len(row) > 16 else "",
- "pincode": row[17].strip() if len(row) > 17 else "",
- "referralPhysicianName": row[8].strip() if len(row) > 8 else "",
- "referralPhysicianName2": row[9].strip() if len(row) > 9 else "",
- "nameOfSpouseOrFather": row[10].strip() if len(row) > 10 else "",
- "createdAt": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
- "marketingConsent": False,
- "registeredOn": None
- }
- docs.append({"$id": doc_id, "data": data})
- return docs, patient_map
- def prepare_bookings_tests_payments(todaycash_rows, patient_map, phys_by_code, phys_by_name, self_phys_id, user_map):
- """
- Builds bookings, tests, payments lists
- - booking.referringPhysician: mapped by DocCode (column U)
- - test.doctor: mapped by name (column E), fallback to Self
- """
- bookings = []
- tests = []
- payments = []
- src_map = {}
- if not todaycash_rows:
- return bookings, tests, payments, src_map
- hdr_map = header_index_map(todaycash_rows)
- idx_sno = find_index_by_keywords(hdr_map, ["s.no", "s.no.", "sno"]) or 0
- idx_entry_by = find_index_by_keywords(hdr_map, ["entry by", "entryby"])
- idx_pat = find_index_by_keywords(hdr_map, ["patid", "pat id", "patientid"]) or find_index_by_keywords(hdr_map, ["pcarepatid"])
- idx_doccode = find_index_by_keywords(hdr_map, ["doccode", "doc code"]) # Column U
- idx_doctor = find_index_by_keywords(hdr_map, ["doctor", "dr", "referred by"]) or 4 # Column E
- idx_bookingid = find_index_by_keywords(hdr_map, ["bookingid", "booking id"])
- idx_date = find_index_by_keywords(hdr_map, ["date"])
- idx_time = find_index_by_keywords(hdr_map, ["time"])
- idx_amount = find_index_by_keywords(hdr_map, ["amount", "paid amt", "paidamt"])
- idx_paid_amt = find_index_by_keywords(hdr_map, ["paid amt", "paidamt"])
- idx_testname = find_index_by_keywords(hdr_map, ["type", "testname", "test name", "subtype"]) or 2
- idx_modality = find_index_by_keywords(hdr_map, ["type"]) or 1
- skipped_count = 0
- for row_index, row in enumerate(todaycash_rows[1:], start=2):
- if not any((cell or "").strip() for cell in row):
- continue
- sNo = (row[idx_sno].strip() if idx_sno is not None and idx_sno < len(row) else str(row_index))
- booking_docid = id_unique()
- # Map patient id
- patval = (row[idx_pat].strip() if idx_pat is not None and idx_pat < len(row) else "")
- patient_docid = patient_map.get(patval) if patval and patval != "0" else None
- # Map DocCode (column U) -> physician id for booking.referringPhysician
- doccode = (row[idx_doccode].strip() if idx_doccode is not None and idx_doccode < len(row) else "")
- referring_phys_id = phys_by_code.get(doccode) if doccode else None
- # Map entry by
- entry_name_raw = (row[idx_entry_by].strip() if idx_entry_by is not None and idx_entry_by < len(row) else "")
- entry_by_id = None
- if entry_name_raw:
- key = entry_name_raw.strip().lower()
- entry_by_id = user_map.get(entry_name_raw) or user_map.get(key)
- # Parse booking date/time
- raw_date = (row[idx_date].strip() if idx_date is not None and idx_date < len(row) else "")
- raw_time = (row[idx_time].strip() if idx_time is not None and idx_time < len(row) else "")
- booking_iso = parse_date(raw_date, raw_time)
- if not booking_iso:
- logger.warning("Skipping row %d (sNo=%s): Invalid date. raw_date='%s' raw_time='%s'",
- row_index, sNo, raw_date, raw_time)
- skipped_count += 1
- continue
- booking_data = {
- "bookingId": (row[idx_bookingid].strip() if idx_bookingid and idx_bookingid < len(row) and (row[idx_bookingid] or "").strip() else f"BOOK-{sNo}"),
- "bookingDate": booking_iso,
- "status": "Completed",
- "isHomeCollection": False,
- "entryBy": entry_by_id,
- "patient": patient_docid,
- "referringPhysician": referring_phys_id, # Mapped by DocCode
- }
- bookings.append({"$id": booking_docid, "data": booking_data})
- src_map[booking_docid] = (row_index, sNo, " | ".join((col or "") for col in row[:6]))
- # Map doctor name (column E) -> physician id for test.doctor
- doctor_name_raw = (row[idx_doctor].strip() if idx_doctor is not None and idx_doctor < len(row) else "")
- test_doctor_id = None
- if doctor_name_raw:
- # Normalize: remove DR./Dr., trim, lowercase
- normalized = doctor_name_raw.upper().replace("DR.", "").replace("DR ", "").strip().lower()
- test_doctor_id = phys_by_name.get(normalized)
- # Fallback to Self if no match
- if not test_doctor_id and self_phys_id:
- test_doctor_id = self_phys_id
- # Create test
- modality = (row[idx_modality].strip() if idx_modality is not None and idx_modality < len(row) else "")
- testname = (row[idx_testname].strip() if idx_testname is not None and idx_testname < len(row) else "")
- price_str = (row[idx_amount].strip() if idx_amount is not None and idx_amount < len(row) else "0")
- price = parse_float_safe(price_str)
- test_docid = id_unique()
- test_data = {
- "modality": modality,
- "testName": testname,
- "testPrice": price,
- "doctor": test_doctor_id, # Mapped by name with fallback to Self
- "status": "Completed",
- "booking": booking_docid,
- "entryBy": entry_by_id,
- }
- tests.append({"$id": test_docid, "data": test_data})
- src_map[test_docid] = (row_index, sNo, " | ".join((col or "") for col in row[:6]))
- # Payment
- paid_str = (row[idx_paid_amt].strip() if idx_paid_amt is not None and idx_paid_amt < len(row) else "")
- paid = parse_float_safe(paid_str)
- if paid and paid > 0:
- pay_docid = id_unique()
- pay_data = {
- "internalTxnId": f"PAY-{(row[idx_bookingid].strip() if idx_bookingid and idx_bookingid < len(row) and (row[idx_bookingid] or '').strip() else sNo)}",
- "amount": paid,
- "paymentMethod": (row[idx_paid_amt-1].strip() if idx_paid_amt and idx_paid_amt-1 < len(row) and idx_paid_amt-1 >= 0 else "Cash"),
- "transactionDate": booking_iso,
- "status": "Completed",
- "entryBy": entry_by_id,
- "booking": booking_docid,
- }
- payments.append({"$id": pay_docid, "data": pay_data})
- src_map[pay_docid] = (row_index, sNo, " | ".join((col or "") for col in row[:6]))
- if skipped_count > 0:
- logger.info("Skipped %d rows due to invalid dates", skipped_count)
- return bookings, tests, payments, src_map
- def prepare_expenses_docs(expenses_rows):
- docs = []
- if not expenses_rows:
- return docs
- hdr_map = header_index_map(expenses_rows)
- idx_desc = find_index_by_keywords(hdr_map, ["description", "desc", "detail"])
- idx_amount = find_index_by_keywords(hdr_map, ["amount", "amt"])
- idx_from = find_index_by_keywords(hdr_map, ["from account", "from", "account"])
- idx_entry = find_index_by_keywords(hdr_map, ["entry by", "entry", "entered by"])
- idx_approved = find_index_by_keywords(hdr_map, ["approved", "approved by"])
- idx_date = find_index_by_keywords(hdr_map, ["date"])
- for row_idx, row in enumerate(expenses_rows[1:], start=2):
- if not any((c or "").strip() for c in row):
- continue
- desc = row[idx_desc].strip() if idx_desc is not None and idx_desc < len(row) else "Expense"
- amount = parse_float_safe(row[idx_amount]) if idx_amount is not None and idx_amount < len(row) else 0.0
- fromacc = row[idx_from].strip() if idx_from is not None and idx_from < len(row) else ""
- entry_by = row[idx_entry].strip() if idx_entry is not None and idx_entry < len(row) else ""
- approved_by = row[idx_approved].strip() if idx_approved is not None and idx_approved < len(row) else ""
- dt = parse_date(row[idx_date].strip()) if idx_date is not None and idx_date < len(row) and (row[idx_date] or "").strip() else None
- data = {
- "expenseId": f"EXP-{id_unique()}",
- "expenseDate": dt,
- "description": desc,
- "type": "",
- "subtype": "",
- "amount": amount,
- "fromAccount": fromacc,
- "approvedBy": approved_by,
- "entryBy": entry_by,
- }
- docs.append({"$id": id_unique(), "data": data})
- return docs
- def prepare_internal_transfers_docs(transfers_rows):
- docs = []
- if not transfers_rows:
- return docs
- hdr_map = header_index_map(transfers_rows)
- idx_date = find_index_by_keywords(hdr_map, ["date"])
- idx_from = find_index_by_keywords(hdr_map, ["from"])
- idx_to = find_index_by_keywords(hdr_map, ["to"])
- idx_amount = find_index_by_keywords(hdr_map, ["amount", "amt"])
- idx_entry = find_index_by_keywords(hdr_map, ["entry", "entry by"])
- for row_idx, row in enumerate(transfers_rows[1:], start=2):
- if not any((c or "").strip() for c in row):
- continue
- dt = parse_date(row[idx_date].strip()) if idx_date is not None and idx_date < len(row) and (row[idx_date] or "").strip() else None
- frm = row[idx_from].strip() if idx_from is not None and idx_from < len(row) else ""
- to = row[idx_to].strip() if idx_to is not None and idx_to < len(row) else ""
- amount = parse_float_safe(row[idx_amount]) if idx_amount is not None and idx_amount < len(row) else 0.0
- entry_by = row[idx_entry].strip() if idx_entry is not None and idx_entry < len(row) else ""
- data = {
- "transferId": f"TR-{id_unique()}",
- "transferDate": dt,
- "fromAccount": frm,
- "toAccount": to,
- "amount": amount,
- "entryBy": entry_by,
- }
- docs.append({"$id": id_unique(), "data": data})
- return docs
- # ---------- build maps from DB (used after uploading base collections) ----------
- def build_user_doc_map(collection_map):
- logger.info("Building user map from DB (case-insensitive keys).")
- users_map = {}
- coll = collection_map["users"]
- offset = 0
- while True:
- queries = [Query.limit(100), Query.offset(offset)]
- res = db.list_documents(DATABASE_ID, coll, queries=queries)
- docs = res.get('documents', [])
- for d in docs:
- name = (d.get("name") or "").strip()
- if not name:
- continue
- users_map[name] = d["$id"]
- users_map[name.strip().lower()] = d["$id"]
- users_map[re.sub(r'\s+', ' ', name.strip().lower())] = d["$id"]
- if len(docs) < 100:
- break
- offset += 100
- logger.info(" -> Mapped %d users", len(set([v for v in users_map.values()])))
- return users_map
- def build_phys_code_map(collection_map):
- """Build physician map by DocCode AND by normalized name"""
- logger.info("Building physician code map from DB.")
- phys_by_code = {} # DocCode -> physician ID
- phys_by_name = {} # Normalized name -> physician ID
- coll = collection_map["physicians"]
- cursor = None
- self_physician_id = None
- while True:
- queries = [Query.limit(100)]
- if cursor:
- queries.append(Query.cursor_after(cursor))
- res = db.list_documents(DATABASE_ID, coll, queries=queries)
- docs = res.get('documents', [])
- for d in docs:
- doc_id = d["$id"]
- # Map by DocCode attribute
- doccode = (d.get("DocCode") or "").strip()
- if doccode:
- phys_by_code[doccode] = doc_id
- # Check if this is "Self" physician (DocCode 14882)
- if doccode == "14882":
- self_physician_id = doc_id
- # Map by normalized full name (strip DR./Dr. prefix, lowercase, trimmed)
- full_name = (d.get("fullName") or "").strip()
- if full_name:
- # Remove DR./Dr. prefix and normalize
- normalized = full_name.upper().replace("DR.", "").replace("DR ", "").strip().lower()
- if normalized:
- phys_by_name[normalized] = doc_id
- if len(docs) < 100:
- break
- cursor = docs[-1]["$id"] # Use last document ID as cursor
- logger.info(" -> Mapped %d physician codes, %d unique names, Self physician ID: %s",
- len(phys_by_code), len(phys_by_name), self_physician_id or "NOT FOUND")
- if not self_physician_id:
- logger.error("CRITICAL: 'Self' physician (DocCode 14882) not found in database!")
- return phys_by_code, phys_by_name, self_physician_id
- def build_patient_map(collection_map):
- logger.info("Building patient map from DB with cursor-based pagination.")
- patient_map = {}
- coll = collection_map["patients"]
- cursor = None
- while True:
- queries = [Query.limit(100)]
- if cursor:
- queries.append(Query.cursor_after(cursor))
- res = db.list_documents(DATABASE_ID, coll, queries=queries)
- docs = res.get('documents', [])
- for d in docs:
- pid = (d.get("patientId") or "").strip()
- if pid:
- patient_map[pid] = d["$id"]
- if len(docs) < 100:
- break
- cursor = docs[-1]["$id"] # Use last document ID as cursor
- logger.info(" -> Mapped %d patients", len(patient_map))
- return patient_map
- # ---------- fetch sheets ----------
- def fetch_all_sheets(ss):
- sheets = {}
- sheet_names = ['RefPhy', 'Patients', 'TodayCash', 'TodayExpenses', 'IntTr']
- for name in sheet_names:
- logger.info("Fetching sheet: %s", name)
- try:
- ws = ss.worksheet(name)
- sheets[name] = ws.get_all_values()
- except Exception as e:
- logger.warning("Sheet '%s' not found or error: %s", name, e)
- sheets[name] = []
- time.sleep(0.5)
- return sheets
- # ---------- main run ----------
- def run():
- # open spreadsheet
- try:
- ss = open_sheet()
- except Exception as e:
- logger.exception("Failed to open spreadsheet: %s", e)
- return
- collection_map = build_collection_map()
- sheets = fetch_all_sheets(ss)
- # Prepare users/clinics/physicians/patients docs locally
- logger.info("Preparing base documents locally...")
- user_docs = prepare_users_docs()
- clinic_docs, phys_docs = prepare_clinics_physicians_docs(sheets.get('RefPhy', []))
- patient_docs, _ = prepare_patients_docs(sheets.get('Patients', []))
- # Upload base collections in order (users -> clinics -> physicians -> patients)
- upload_order = [
- ("users", user_docs),
- ("clinics", clinic_docs),
- ("physicians", phys_docs),
- ("patients", patient_docs),
- ]
- for logical, docs in upload_order:
- if not docs:
- logger.info("No docs for %s", logical)
- continue
- coll_id = collection_map.get(logical)
- logger.info("Uploading %s (%d docs) -> %s", logical, len(docs), coll_id)
- # chunk and create
- batch_size = BATCH_SIZE_DEFAULT if logical != "patients" else 300
- created_total = 0
- error_total = 0
- all_errors = []
- for start in range(0, len(docs), batch_size):
- batch = docs[start:start+batch_size]
- succ, errs = batch_create(coll_id, batch, src_map=None, concurrency=PER_COLLECTION_CONCURRENCY.get(logical))
- created_total += succ
- error_total += len(errs)
- all_errors.extend(errs)
- logger.info("%s created so far: %d/%d (errors total: %d)", logical, created_total, len(docs), error_total)
- # Extra sleep for patients to let sockets recover
- if logical == "patients":
- time.sleep(0.3)
- # Log error samples for debugging
- if all_errors:
- logger.warning("%s had %d errors. Sample errors:", logical, len(all_errors))
- for err in all_errors[:ERROR_SHOW_LIMIT]:
- logger.warning(" %s", err)
- # single sanitizer summary print for base collection (if any sanitized)
- sanitized_count = SANITIZED_COUNTERS.get(coll_id, 0)
- if sanitized_count:
- logger.info("Sanitized for %s: %d docs had removed keys (sample removed keys: %s)", logical, sanitized_count, SANITIZED_REMOVED_SAMPLE.get(coll_id, [])[:6])
- logger.info("%s final: %d/%d, errors: %d", logical, created_total, len(docs), error_total)
- time.sleep(0.2)
- # Rebuild maps from DB AFTER base upload (this prevents race conditions / missing mapping)
- logger.info("Rebuilding maps from DB after base collection uploads...")
- user_map = build_user_doc_map(collection_map)
- phys_by_code, phys_by_name, self_phys_id = build_phys_code_map(collection_map) # Moved here
- patient_map = build_patient_map(collection_map)
- # Prepare bookings/tests/payments using the DB maps (ensures correct entryBy & physician mapping)
- bookings, tests, payments, src_map = prepare_bookings_tests_payments(
- sheets.get('TodayCash', []),
- patient_map,
- phys_by_code, # DocCode mapping for booking.referringPhysician
- phys_by_name, # Name mapping for test.doctor
- self_phys_id, # Fallback "Self" physician
- user_map
- )
- # Upload collections: bookings, booked_tests, payments_ledger
- logger.info("Uploading bookings/tests/payments...")
- # bookings
- coll_bookings = collection_map.get("bookings")
- total = len(bookings)
- created_total = 0
- error_total = 0
- batch_size = 300
- for start in range(0, total, batch_size):
- batch = bookings[start:start+batch_size]
- # prepare src_map subset for these docs
- subset_src_map = {d["$id"]: src_map.get(d["$id"]) for d in batch if d["$id"] in src_map}
- succ, errs = batch_create(coll_bookings, batch, src_map=subset_src_map, concurrency=PER_COLLECTION_CONCURRENCY.get("bookings"))
- created_total += succ
- error_total += len(errs)
- # print errors with row context (first few)
- if errs:
- logger.warning("bookings errors sample: %s", errs[:ERROR_SHOW_LIMIT])
- logger.info("bookings created so far: %d/%d (errors total: %d)", created_total, total, error_total)
- # sanitizer summary once
- coll_id = coll_bookings
- sanitized_count = SANITIZED_COUNTERS.get(coll_id, 0)
- if sanitized_count:
- logger.info("Sanitized for bookings: %d docs had removed keys (sample removed keys: %s)", sanitized_count, SANITIZED_REMOVED_SAMPLE.get(coll_id, [])[:6])
- logger.info("bookings final: %d/%d, errors: %d", created_total, total, error_total)
- # booked_tests
- coll_tests = collection_map.get("booked_tests")
- total = len(tests)
- created_total = 0
- error_total = 0
- # bigger batch size to speed up
- batch_size = 1000
- for start in range(0, total, batch_size):
- batch = tests[start:start+batch_size]
- subset_src_map = {d["$id"]: src_map.get(d["$id"]) for d in batch if d["$id"] in src_map}
- succ, errs = batch_create(coll_tests, batch, src_map=subset_src_map, concurrency=PER_COLLECTION_CONCURRENCY.get("booked_tests"))
- created_total += succ
- error_total += len(errs)
- if errs:
- logger.warning("booked_tests errors sample: %s", errs[:ERROR_SHOW_LIMIT])
- logger.info("booked_tests created so far: %d/%d (errors total: %d)", created_total, total, error_total)
- # sanitizer summary once
- coll_id = coll_tests
- sanitized_count = SANITIZED_COUNTERS.get(coll_id, 0)
- if sanitized_count:
- logger.info("Sanitized for booked_tests: %d docs had removed keys (sample removed keys: %s)", sanitized_count, SANITIZED_REMOVED_SAMPLE.get(coll_id, [])[:6])
- logger.info("booked_tests final: %d/%d, errors: %d", created_total, total, error_total)
- # payments_ledger
- coll_payments = collection_map.get("payments_ledger")
- total = len(payments)
- created_total = 0
- error_total = 0
- batch_size = 300
- for start in range(0, total, batch_size):
- batch = payments[start:start+batch_size]
- subset_src_map = {d["$id"]: src_map.get(d["$id"]) for d in batch if d["$id"] in src_map}
- succ, errs = batch_create(coll_payments, batch, src_map=subset_src_map, concurrency=PER_COLLECTION_CONCURRENCY.get("payments_ledger"))
- created_total += succ
- error_total += len(errs)
- if errs:
- logger.warning("payments errors sample: %s", errs[:ERROR_SHOW_LIMIT])
- logger.info("payments created so far: %d/%d (errors total: %d)", created_total, total, error_total)
- coll_id = coll_payments
- sanitized_count = SANITIZED_COUNTERS.get(coll_id, 0)
- if sanitized_count:
- logger.info("Sanitized for payments_ledger: %d docs had removed keys (sample removed keys: %s)", sanitized_count, SANITIZED_REMOVED_SAMPLE.get(coll_id, [])[:6])
- logger.info("payments final: %d/%d, errors: %d", created_total, total, error_total)
- # expenses and transfers
- coll_exp = collection_map.get("expenses")
- exp_docs = prepare_expenses_docs(sheets.get("TodayExpenses", []))
- if exp_docs:
- succ, errs = batch_create(coll_exp, exp_docs, src_map=None, concurrency=PER_COLLECTION_CONCURRENCY.get("expenses"))
- logger.info("Expenses created: %d/%d errors: %d", succ, len(exp_docs), len(errs))
- coll_tr = collection_map.get("internal_transfers")
- tr_docs = prepare_internal_transfers_docs(sheets.get("IntTr", []))
- if tr_docs:
- succ, errs = batch_create(coll_tr, tr_docs, src_map=None, concurrency=PER_COLLECTION_CONCURRENCY.get("internal_transfers"))
- logger.info("Transfers created: %d/%d errors: %d", succ, len(tr_docs), len(errs))
- logger.info("=== MIGRATION COMPLETE ===")
- if __name__ == "__main__":
- try:
- run()
- except KeyboardInterrupt:
- logger.warning("Interrupted by user")
- except Exception as e:
- logger.exception("Fatal error: %s", e)
Advertisement
Add Comment
Please, Sign In to add comment