Guest User

Untitled

a guest
Sep 30th, 2025
65
0
10 hours
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Dart 40.37 KB | Source Code | 0 0
  1. #!/usr/bin/env python3
  2. """
  3. migration_full.py (FIXED VERSION)
  4. Key fixes:
  5. - Uses DB-rebuilt physician map instead of local map
  6. - Optimized batch_create to fetch schema once per batch (not per doc)
  7. - Skips rows with invalid bookingDate
  8. - Logs patient errors
  9. - Increased booked_tests concurrency after optimization
  10. """
  11. import time
  12. import random
  13. import json
  14. import sys
  15. import re
  16. import logging
  17. from datetime import datetime, timezone, timedelta
  18. from concurrent.futures import ThreadPoolExecutor, as_completed
  19. from typing import Dict, List, Any, Tuple
  20. from collections import defaultdict
  21. import requests
  22. from requests.adapters import HTTPAdapter, Retry
  23. # Appwrite SDK
  24. from appwrite.client import Client
  25. from appwrite.services.databases import Databases
  26. from appwrite.services.users import Users
  27. from appwrite.id import ID
  28. from appwrite.query import Query
  29. # Google Sheets
  30. import gspread
  31. from oauth2client.service_account import ServiceAccountCredentials
  32. # robust date parsing (optional)
  33. try:
  34.     from dateutil import parser as dateutil_parser
  35. except Exception:
  36.     dateutil_parser = None
  37. import secrets
  38. # ---------- Config ----------
  39. APPWRITE_ENDPOINT = getattr(secrets, "APPWRITE_ENDPOINT", None)
  40. PROJECT = getattr(secrets, "APPWRITE_PROJECT_ID", None)
  41. API_KEY = getattr(secrets, "APPWRITE_API_KEY", None)
  42. DATABASE_ID = getattr(secrets, "DATABASE_ID", None)
  43. GS_CREDS = getattr(secrets, "GS_CREDS", None) or getattr(secrets, "GSHEETS_CREDENTIALS_JSON", None)
  44. SPREADSHEET_ID = getattr(secrets, "SPREADSHEET_ID", None)
  45. COLLECTION_HINTS = getattr(secrets, "COLLECTION_HINTS", {})
  46. LOGICAL_COLLECTIONS = [
  47.     "users", "clinics", "physicians", "patients", "bookings",
  48.     "booked_tests", "payments_ledger", "expenses", "internal_transfers"
  49. ]
  50. # concurrency tuning
  51. CONCURRENCY = 16
  52. PER_COLLECTION_CONCURRENCY = {
  53.     "patients": 4, # reduced further for localhost socket limits
  54.     "bookings": 6,
  55.     "booked_tests": 12,
  56.     "payments_ledger": 6,
  57.     "users": 4,
  58.     "physicians": 6,
  59.     "clinics": 6,
  60.     "expenses": 4,
  61.     "internal_transfers": 4,
  62. }
  63. RETRY_MAX = 4
  64. RETRY_BACKOFF_BASE = 0.6 # increased backoff for socket recovery
  65. ERROR_SHOW_LIMIT = 6
  66. BATCH_SIZE_DEFAULT = 200
  67. BATCH_JITTER_SLEEP = 0.05 # increased sleep between batches
  68. # ---------- Logging ----------
  69. logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
  70. logger = logging.getLogger("migration")
  71. # ---------- HTTP session ----------
  72. session = requests.Session()
  73. session.mount("https://", HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.5)))
  74. session.mount("http://", HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.5)))
  75. session.headers.update({
  76.     "X-Appwrite-Project": PROJECT,
  77.     "X-Appwrite-Key": API_KEY,
  78.     "Content-Type": "application/json",
  79. })
  80. # ---------- Appwrite SDK ----------
  81. client = Client()
  82. client.set_endpoint(APPWRITE_ENDPOINT).set_project(PROJECT).set_key(API_KEY)
  83. db = Databases(client)
  84. users_srv = Users(client)
  85. # ---------- sanitizer cache ----------
  86. COLLECTION_ATTRS_CACHE = {}
  87. SANITIZED_COUNTERS = defaultdict(int)
  88. SANITIZED_REMOVED_SAMPLE = defaultdict(list)
  89. # ---------- Helpers ----------
  90. def id_unique():
  91.     try:
  92.         return ID.unique()
  93.     except Exception:
  94.         return f"{int(time.time()*1000)}{random.randint(1000,9999)}"
  95. def open_sheet():
  96.     scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
  97.     if isinstance(GS_CREDS, dict):
  98.         gc = gspread.service_account_from_dict(GS_CREDS)
  99.     else:
  100.         gc = gspread.service_account(filename=GS_CREDS)
  101.     return gc.open_by_key(SPREADSHEET_ID)
  102. def appwrite_collections_list():
  103.     url = f"{APPWRITE_ENDPOINT}/databases/{DATABASE_ID}/collections"
  104.     r = session.get(url)
  105.     if r.status_code != 200:
  106.         raise RuntimeError(f"Failed to fetch collections via REST: {r.status_code} {r.text}")
  107.     return r.json()
  108. def build_collection_map() -> Dict[str, str]:
  109.     logger.info("-> Auto-mapping logical collections to real collection IDs...")
  110.     data = appwrite_collections_list()
  111.     collections = []
  112.     if isinstance(data, dict) and "collections" in data:
  113.         collections = data["collections"]
  114.     elif isinstance(data, list):
  115.         collections = data
  116.     else:
  117.         for v in data.values():
  118.             if isinstance(v, list):
  119.                 collections = v
  120.                 break
  121.     by_id = {}
  122.     by_name = {}
  123.     for c in collections:
  124.         cid = c.get("$id") or c.get("collectionId")
  125.         name = (c.get("name") or "").strip().lower()
  126.         if cid:
  127.             by_id[cid] = c
  128.         if name:
  129.             by_name[name] = c
  130.     mapping = {}
  131.     # hints first
  132.     for logical in LOGICAL_COLLECTIONS:
  133.         hint = COLLECTION_HINTS.get(logical)
  134.         if hint and hint in by_id:
  135.             mapping[logical] = hint
  136.     for logical in LOGICAL_COLLECTIONS:
  137.         if mapping.get(logical):
  138.             continue
  139.         if logical in by_id:
  140.             mapping[logical] = logical
  141.             continue
  142.         if logical.lower() in by_name:
  143.             mapping[logical] = by_name[logical.lower()].get("$id") or by_name[logical.lower()].get("collectionId")
  144.             continue
  145.         # contains
  146.         for nm, c in by_name.items():
  147.             if logical.lower() in nm:
  148.                 mapping[logical] = c.get("$id") or c.get("collectionId")
  149.                 break
  150.     logger.info("-> Collection mapping:")
  151.     for k in LOGICAL_COLLECTIONS:
  152.         logger.info(" %s -> %s", k.ljust(15), mapping.get(k))
  153.     missing = [k for k in LOGICAL_COLLECTIONS if not mapping.get(k)]
  154.     if missing:
  155.         raise RuntimeError(f"Unmapped collections: {missing}. Add to COLLECTION_HINTS if they exist.")
  156.     return mapping
  157. def get_collection_attributes(collection_id: str):
  158.     if collection_id in COLLECTION_ATTRS_CACHE:
  159.         return COLLECTION_ATTRS_CACHE[collection_id]
  160.     try:
  161.         url = f"{APPWRITE_ENDPOINT}/databases/{DATABASE_ID}/collections/{collection_id}/attributes"
  162.         r = session.get(url)
  163.         if r.status_code != 200:
  164.             logger.debug("get_collection_attributes non-200 %s for %s", r.status_code, collection_id)
  165.             COLLECTION_ATTRS_CACHE[collection_id] = None
  166.             return None
  167.         body = r.json()
  168.         attrs = []
  169.         if isinstance(body, dict) and "attributes" in body:
  170.             attrs = body["attributes"]
  171.         elif isinstance(body, list):
  172.             attrs = body
  173.         else:
  174.             for v in body.values():
  175.                 if isinstance(v, list):
  176.                     attrs = v
  177.                     break
  178.         keys = set()
  179.         for a in attrs:
  180.             if isinstance(a, dict):
  181.                 k = a.get("key")
  182.                 if k:
  183.                     keys.add(k)
  184.         COLLECTION_ATTRS_CACHE[collection_id] = keys
  185.         return keys
  186.     except Exception as e:
  187.         logger.debug("get_collection_attributes exception for %s: %s", collection_id, e)
  188.         COLLECTION_ATTRS_CACHE[collection_id] = None
  189.         return None
  190. def create_doc_with_retry(collection_id: str, doc_id: str, data: Dict[str, Any]) -> Tuple[bool, str]:
  191.     last_msg = ""
  192.     for attempt in range(RETRY_MAX):
  193.         try:
  194.             res = db.create_document(DATABASE_ID, collection_id, doc_id, data)
  195.             try:
  196.                 created_id = getattr(res, "$id", None) or getattr(res, "id", None) or doc_id
  197.             except Exception:
  198.                 created_id = doc_id
  199.             return True, str(created_id)
  200.         except Exception as e:
  201.             last_msg = str(e)
  202.             sleep_time = (RETRY_BACKOFF_BASE * (2 ** attempt)) + random.random() * 0.2
  203.             time.sleep(sleep_time)
  204.     return False, last_msg
  205. def batch_create(collection_id: str, docs: List[Dict[str, Any]], src_map: Dict[str, Tuple[int, str]] = None, concurrency: int = None) -> Tuple[int, List[str]]:
  206.     """
  207.    Create docs concurrently. src_map maps doc_id -> (sheet_row_index, sNo) for enriched errors.
  208.    Returns (success_count, enriched_error_list)
  209.    """
  210.     if concurrency is None:
  211.         concurrency = PER_COLLECTION_CONCURRENCY.get(collection_id, CONCURRENCY)
  212.     total = len(docs)
  213.     if total == 0:
  214.         return 0, []
  215.     # Fetch schema ONCE for entire batch
  216.     allowed_attrs = get_collection_attributes(collection_id)
  217.    
  218.     # Sanitize all documents in bulk
  219.     tasks = []
  220.     removed_keys_sample = []
  221.     sanitized_count = 0
  222.     start_time = time.time()
  223.    
  224.     if allowed_attrs is None:
  225.         tasks = [{"$id": d["$id"], "data": dict(d.get("data", {}))} for d in docs]
  226.     else:
  227.         for d in docs:
  228.             docid = d["$id"]
  229.             data = d.get("data", {})
  230.             cleaned = {k: v for k, v in data.items() if k in allowed_attrs}
  231.             if len(cleaned) < len(data):
  232.                 sanitized_count += 1
  233.                 removed = [k for k in data if k not in allowed_attrs]
  234.                 for r in removed:
  235.                     if r not in removed_keys_sample and len(removed_keys_sample) < 6:
  236.                         removed_keys_sample.append(r)
  237.             tasks.append({"$id": docid, "data": cleaned})
  238.    
  239.     # Log sanitization summary
  240.     if sanitized_count > 0:
  241.         SANITIZED_COUNTERS[collection_id] += sanitized_count
  242.         ex = SANITIZED_REMOVED_SAMPLE.get(collection_id, [])
  243.         for key in removed_keys_sample:
  244.             if key not in ex and len(ex) < 6:
  245.                 ex.append(key)
  246.         SANITIZED_REMOVED_SAMPLE[collection_id] = ex
  247.         logger.info("Sanitized %d/%d docs for %s in %.2f seconds", sanitized_count, total, collection_id, time.time() - start_time)
  248.    
  249.     successes = 0
  250.     errors = []
  251.     with ThreadPoolExecutor(max_workers=concurrency) as ex:
  252.         futures = {ex.submit(create_doc_with_retry, collection_id, t["$id"], t["data"]): t for t in tasks}
  253.         for i, fut in enumerate(as_completed(futures), 1):
  254.             t = futures[fut]
  255.             try:
  256.                 ok, msg = fut.result()
  257.             except Exception as e:
  258.                 ok, msg = False, str(e)
  259.             if ok:
  260.                 successes += 1
  261.             else:
  262.                 # Enrich with source row if available
  263.                 src_ctx = ""
  264.                 if src_map and t["$id"] in src_map:
  265.                     row_idx, sNo, sample = src_map[t["$id"]]
  266.                     src_ctx = f" [sheet_row_index={row_idx} sNo={sNo} sample={sample}]"
  267.                 if len(errors) < 300:
  268.                     errors.append(f"doc {t['$id']} -> {msg}{src_ctx}")
  269.             # Log progress every 100 documents
  270.             if i % 100 == 0:
  271.                 logger.info("Processed %d/%d documents for %s (success: %d, errors: %d)", i, total, collection_id, successes, len(errors))
  272.    
  273.     time.sleep(BATCH_JITTER_SLEEP + random.random() * BATCH_JITTER_SLEEP)
  274.     logger.info("Completed batch for %s: %d/%d in %.2f seconds", collection_id, successes, total, time.time() - start_time)
  275.     return successes, errors
  276. # ---------- helpers for parsing ----------
  277. def parse_date(date_str: str, time_str: str = ""):
  278.     if not date_str and not time_str:
  279.         return None
  280.     ds = (date_str or "").strip()
  281.     ts = (time_str or "").strip()
  282.     # try dateutil
  283.     if dateutil_parser:
  284.         try:
  285.             combined = ds
  286.             if ts:
  287.                 combined = combined + " " + ts
  288.             dt = dateutil_parser.parse(combined, dayfirst=True, default=datetime(2000,1,1))
  289.             if dt.year < 1000:
  290.                 dt = dt.replace(year=2000)
  291.             return dt.replace(microsecond=0).isoformat() + "Z"
  292.         except Exception:
  293.             pass
  294.     # try common formats
  295.     formats = [
  296.         "%d/%m/%Y", "%d-%m-%Y", "%Y-%m-%d", "%d-%b-%y", "%d-%b-%Y",
  297.         "%m/%d/%Y", "%b %d, %Y", "%d %b %Y"
  298.     ]
  299.     for fmt in formats:
  300.         try:
  301.             d = datetime.strptime(ds, fmt)
  302.             if ts:
  303.                 if ":" in ts:
  304.                     parts = ts.split(":")
  305.                     h = int(parts[0]); m = int(parts[1]) if len(parts) > 1 else 0; s = int(parts[2]) if len(parts) > 2 else 0
  306.                     d = d.replace(hour=h, minute=m, second=s)
  307.             return d.replace(microsecond=0).isoformat() + "Z"
  308.         except Exception:
  309.             continue
  310.     # fallback: if ds is iso-like
  311.     try:
  312.         d = datetime.fromisoformat(ds)
  313.         if ts:
  314.             if ":" in ts:
  315.                 parts = ts.split(":")
  316.                 h = int(parts[0]); m = int(parts[1]) if len(parts) > 1 else 0; s = int(parts[2]) if len(parts) > 2 else 0
  317.                 d = d.replace(hour=h, minute=m, second=s)
  318.         return d.replace(microsecond=0).isoformat() + "Z"
  319.     except Exception:
  320.         return None
  321. def parse_float_safe(val):
  322.     if val is None:
  323.         return 0.0
  324.     s = str(val).strip()
  325.     if s == "":
  326.         return 0.0
  327.     s_clean = re.sub(r'[^0-9.\-]', '', s)
  328.     if s_clean in ("", ".", "-"):
  329.         return 0.0
  330.     try:
  331.         return float(s_clean)
  332.     except Exception:
  333.         return 0.0
  334. # header helpers
  335. def header_index_map(rows: List[List[str]]):
  336.     if not rows or len(rows) == 0:
  337.         return {}
  338.     hdr = [ (h or "").strip().lower() for h in rows[0] ]
  339.     mapping = {}
  340.     for i, h in enumerate(hdr):
  341.         if not h:
  342.             continue
  343.         mapping[h] = i
  344.     return mapping
  345. def find_index_by_keywords(mapping: Dict[str,int], keywords: List[str]):
  346.     if not mapping:
  347.         return None
  348.     for kw in keywords:
  349.         for h, idx in mapping.items():
  350.             if kw in h:
  351.                 return idx
  352.     return None
  353. # ---------- Prepare docs (with source mapping) ----------
  354. def prepare_users_docs():
  355.     users_list = [
  356.         {'name': 'AFREEN', 'role': 'receptionist', 'isActive': False},
  357.         {'name': 'ALIZA', 'role': 'radiology_technician', 'isActive': True},
  358.         {'name': 'AMREEN', 'role': 'receptionist', 'isActive': False},
  359.         {'name': 'DR. DANISH HUSHAIN', 'role': 'doctor', 'isActive': True},
  360.         {'name': 'DR. PULKIT MARU', 'role': 'doctor', 'isActive': True},
  361.         {'name': 'FAIYAZ', 'role': 'supervisor', 'isActive': False},
  362.         {'name': 'FATIMA', 'role': 'nursing_assistant', 'isActive': True},
  363.         {'name': 'FIZA', 'role': 'receptionist', 'isActive': True},
  364.         {'name': 'HUZRA', 'role': 'nursing_assistant', 'isActive': False},
  365.         {'name': 'LAIBA', 'role': 'receptionist', 'isActive': True},
  366.         {'name': 'MANZAR', 'role': 'supervisor', 'isActive': True},
  367.         {'name': 'MUSKAN', 'role': 'receptionist', 'isActive': False},
  368.         {'name': 'OTHER', 'role': 'system', 'isActive': False},
  369.         {'name': 'RAHILA', 'role': 'receptionist', 'isActive': False},
  370.         {'name': 'SANJU', 'role': 'nursing_assistant', 'isActive': True},
  371.         {'name': 'SHILPI', 'role': 'typist', 'isActive': False},
  372.         {'name': 'TAHURA', 'role': 'receptionist', 'isActive': False},
  373.         {'name': 'ZEBA ZARIN', 'role': 'receptionist', 'isActive': False},
  374.     ]
  375.     docs = []
  376.     for u in users_list:
  377.         doc_id = id_unique()
  378.         data = {
  379.             "name": u['name'],
  380.             "role": u['role'],
  381.             "isActive": u['isActive'],
  382.             "createdAt": datetime.now(timezone.utc).replace(microsecond=0).isoformat()
  383.         }
  384.         docs.append({"$id": doc_id, "data": data})
  385.     return docs
  386. def prepare_clinics_physicians_docs(refphy_rows):
  387.     clinic_docs = []
  388.     clinic_map = {}
  389.     if not refphy_rows:
  390.         return clinic_docs, [], {}
  391.     def norm(s): return s.strip().lower() if s else ""
  392.     for idx, row in enumerate(refphy_rows[1:], start=2):
  393.         if len(row) < 5:
  394.             continue
  395.         cn = (row[4] or "").strip()
  396.         if not cn:
  397.             continue
  398.         key = norm(cn)
  399.         if key in clinic_map:
  400.             continue
  401.         cid = id_unique()
  402.         clinic_map[key] = cid
  403.         data = {"name": cn, "address": row[7].strip() if len(row) > 7 else "", "isActive": True}
  404.         clinic_docs.append({"$id": cid, "data": data})
  405.    
  406.     phys_docs = []
  407.     for row in refphy_rows[1:]:
  408.         if len(row) < 16:
  409.             continue
  410.         full = (row[1] or "").strip()
  411.         doccode = (row[15] or "").strip() # Column P (index 15)
  412.         if not doccode:
  413.             continue
  414.         pid = id_unique()
  415.         clinic_name = row[4].strip() if len(row) > 4 else ""
  416.         clinic_id = clinic_map.get(norm(clinic_name))
  417.         data = {
  418.             "title": row[0].strip() if row else "",
  419.             "fullName": full,
  420.             "specialty": row[3].strip() if len(row) > 3 else "",
  421.             "contact": row[5].strip() if len(row) > 5 else "",
  422.             "isActive": True,
  423.             "clinics": [clinic_id] if clinic_id else [],
  424.             "registrationNumber": row[13].strip() if len(row) > 13 else "",
  425.             "DocCode": doccode # ADD THIS - Column P
  426.         }
  427.         phys_docs.append({"$id": pid, "data": data})
  428.     return clinic_docs, phys_docs
  429. def prepare_patients_docs(patient_rows):
  430.     docs = []
  431.     patient_map = {}
  432.     if not patient_rows:
  433.         return docs, patient_map
  434.     for idx, row in enumerate(patient_rows[1:], start=2):
  435.         if not row or not (len(row) > 0 and (row[0] or "").strip()):
  436.             continue
  437.         doc_id = id_unique()
  438.         pid = (row[0] or "").strip()
  439.         patient_map[pid] = doc_id
  440.         data = {
  441.             "patientId": pid,
  442.             "pCarePatientId": row[14].strip() if len(row) > 14 else "",
  443.             "firstName": row[11].strip() if len(row) > 11 and row[11].strip() else "Unknown",
  444.             "middleName": row[12].strip() if len(row) > 12 else "",
  445.             "lastName": row[13].strip() if len(row) > 13 else "",
  446.             "sex": "Male" if (len(row) > 2 and row[2].strip().upper() == "M") else "Female",
  447.             "age": int(row[3].strip()) if len(row) > 3 and row[3].strip().isdigit() else None,
  448.             "ageUnit": row[4].strip() if len(row) > 4 else "",
  449.             "dob": parse_date(row[5].strip() if len(row) > 5 else "", ""),
  450.             "mobileNo": row[6].strip() if len(row) > 6 else "",
  451.             "altMobileNo": "",
  452.             "houseNo": row[7].strip() if len(row) > 7 else "",
  453.             "streetAddress": row[18].strip() if len(row) > 18 else (row[7].strip() if len(row) > 7 else ""),
  454.             "city": row[15].strip() if len(row) > 15 else "",
  455.             "state": row[16].strip() if len(row) > 16 else "",
  456.             "pincode": row[17].strip() if len(row) > 17 else "",
  457.             "referralPhysicianName": row[8].strip() if len(row) > 8 else "",
  458.             "referralPhysicianName2": row[9].strip() if len(row) > 9 else "",
  459.             "nameOfSpouseOrFather": row[10].strip() if len(row) > 10 else "",
  460.             "createdAt": datetime.now(timezone.utc).replace(microsecond=0).isoformat(),
  461.             "marketingConsent": False,
  462.             "registeredOn": None
  463.         }
  464.         docs.append({"$id": doc_id, "data": data})
  465.     return docs, patient_map
  466. def prepare_bookings_tests_payments(todaycash_rows, patient_map, phys_by_code, phys_by_name, self_phys_id, user_map):
  467.     """
  468.    Builds bookings, tests, payments lists
  469.    - booking.referringPhysician: mapped by DocCode (column U)
  470.    - test.doctor: mapped by name (column E), fallback to Self
  471.    """
  472.     bookings = []
  473.     tests = []
  474.     payments = []
  475.     src_map = {}
  476.     if not todaycash_rows:
  477.         return bookings, tests, payments, src_map
  478.     hdr_map = header_index_map(todaycash_rows)
  479.     idx_sno = find_index_by_keywords(hdr_map, ["s.no", "s.no.", "sno"]) or 0
  480.     idx_entry_by = find_index_by_keywords(hdr_map, ["entry by", "entryby"])
  481.     idx_pat = find_index_by_keywords(hdr_map, ["patid", "pat id", "patientid"]) or find_index_by_keywords(hdr_map, ["pcarepatid"])
  482.     idx_doccode = find_index_by_keywords(hdr_map, ["doccode", "doc code"]) # Column U
  483.     idx_doctor = find_index_by_keywords(hdr_map, ["doctor", "dr", "referred by"]) or 4 # Column E
  484.     idx_bookingid = find_index_by_keywords(hdr_map, ["bookingid", "booking id"])
  485.     idx_date = find_index_by_keywords(hdr_map, ["date"])
  486.     idx_time = find_index_by_keywords(hdr_map, ["time"])
  487.     idx_amount = find_index_by_keywords(hdr_map, ["amount", "paid amt", "paidamt"])
  488.     idx_paid_amt = find_index_by_keywords(hdr_map, ["paid amt", "paidamt"])
  489.     idx_testname = find_index_by_keywords(hdr_map, ["type", "testname", "test name", "subtype"]) or 2
  490.     idx_modality = find_index_by_keywords(hdr_map, ["type"]) or 1
  491.     skipped_count = 0
  492.     for row_index, row in enumerate(todaycash_rows[1:], start=2):
  493.         if not any((cell or "").strip() for cell in row):
  494.             continue
  495.        
  496.         sNo = (row[idx_sno].strip() if idx_sno is not None and idx_sno < len(row) else str(row_index))
  497.         booking_docid = id_unique()
  498.         # Map patient id
  499.         patval = (row[idx_pat].strip() if idx_pat is not None and idx_pat < len(row) else "")
  500.         patient_docid = patient_map.get(patval) if patval and patval != "0" else None
  501.         # Map DocCode (column U) -> physician id for booking.referringPhysician
  502.         doccode = (row[idx_doccode].strip() if idx_doccode is not None and idx_doccode < len(row) else "")
  503.         referring_phys_id = phys_by_code.get(doccode) if doccode else None
  504.         # Map entry by
  505.         entry_name_raw = (row[idx_entry_by].strip() if idx_entry_by is not None and idx_entry_by < len(row) else "")
  506.         entry_by_id = None
  507.         if entry_name_raw:
  508.             key = entry_name_raw.strip().lower()
  509.             entry_by_id = user_map.get(entry_name_raw) or user_map.get(key)
  510.         # Parse booking date/time
  511.         raw_date = (row[idx_date].strip() if idx_date is not None and idx_date < len(row) else "")
  512.         raw_time = (row[idx_time].strip() if idx_time is not None and idx_time < len(row) else "")
  513.         booking_iso = parse_date(raw_date, raw_time)
  514.        
  515.         if not booking_iso:
  516.             logger.warning("Skipping row %d (sNo=%s): Invalid date. raw_date='%s' raw_time='%s'",
  517.                           row_index, sNo, raw_date, raw_time)
  518.             skipped_count += 1
  519.             continue
  520.         booking_data = {
  521.             "bookingId": (row[idx_bookingid].strip() if idx_bookingid and idx_bookingid < len(row) and (row[idx_bookingid] or "").strip() else f"BOOK-{sNo}"),
  522.             "bookingDate": booking_iso,
  523.             "status": "Completed",
  524.             "isHomeCollection": False,
  525.             "entryBy": entry_by_id,
  526.             "patient": patient_docid,
  527.             "referringPhysician": referring_phys_id, # Mapped by DocCode
  528.         }
  529.         bookings.append({"$id": booking_docid, "data": booking_data})
  530.         src_map[booking_docid] = (row_index, sNo, " | ".join((col or "") for col in row[:6]))
  531.         # Map doctor name (column E) -> physician id for test.doctor
  532.         doctor_name_raw = (row[idx_doctor].strip() if idx_doctor is not None and idx_doctor < len(row) else "")
  533.         test_doctor_id = None
  534.         if doctor_name_raw:
  535.             # Normalize: remove DR./Dr., trim, lowercase
  536.             normalized = doctor_name_raw.upper().replace("DR.", "").replace("DR ", "").strip().lower()
  537.             test_doctor_id = phys_by_name.get(normalized)
  538.        
  539.         # Fallback to Self if no match
  540.         if not test_doctor_id and self_phys_id:
  541.             test_doctor_id = self_phys_id
  542.         # Create test
  543.         modality = (row[idx_modality].strip() if idx_modality is not None and idx_modality < len(row) else "")
  544.         testname = (row[idx_testname].strip() if idx_testname is not None and idx_testname < len(row) else "")
  545.         price_str = (row[idx_amount].strip() if idx_amount is not None and idx_amount < len(row) else "0")
  546.         price = parse_float_safe(price_str)
  547.         test_docid = id_unique()
  548.         test_data = {
  549.             "modality": modality,
  550.             "testName": testname,
  551.             "testPrice": price,
  552.             "doctor": test_doctor_id, # Mapped by name with fallback to Self
  553.             "status": "Completed",
  554.             "booking": booking_docid,
  555.             "entryBy": entry_by_id,
  556.         }
  557.         tests.append({"$id": test_docid, "data": test_data})
  558.         src_map[test_docid] = (row_index, sNo, " | ".join((col or "") for col in row[:6]))
  559.         # Payment
  560.         paid_str = (row[idx_paid_amt].strip() if idx_paid_amt is not None and idx_paid_amt < len(row) else "")
  561.         paid = parse_float_safe(paid_str)
  562.         if paid and paid > 0:
  563.             pay_docid = id_unique()
  564.             pay_data = {
  565.                 "internalTxnId": f"PAY-{(row[idx_bookingid].strip() if idx_bookingid and idx_bookingid < len(row) and (row[idx_bookingid] or '').strip() else sNo)}",
  566.                 "amount": paid,
  567.                 "paymentMethod": (row[idx_paid_amt-1].strip() if idx_paid_amt and idx_paid_amt-1 < len(row) and idx_paid_amt-1 >= 0 else "Cash"),
  568.                 "transactionDate": booking_iso,
  569.                 "status": "Completed",
  570.                 "entryBy": entry_by_id,
  571.                 "booking": booking_docid,
  572.             }
  573.             payments.append({"$id": pay_docid, "data": pay_data})
  574.             src_map[pay_docid] = (row_index, sNo, " | ".join((col or "") for col in row[:6]))
  575.     if skipped_count > 0:
  576.         logger.info("Skipped %d rows due to invalid dates", skipped_count)
  577.    
  578.     return bookings, tests, payments, src_map
  579. def prepare_expenses_docs(expenses_rows):
  580.     docs = []
  581.     if not expenses_rows:
  582.         return docs
  583.     hdr_map = header_index_map(expenses_rows)
  584.     idx_desc = find_index_by_keywords(hdr_map, ["description", "desc", "detail"])
  585.     idx_amount = find_index_by_keywords(hdr_map, ["amount", "amt"])
  586.     idx_from = find_index_by_keywords(hdr_map, ["from account", "from", "account"])
  587.     idx_entry = find_index_by_keywords(hdr_map, ["entry by", "entry", "entered by"])
  588.     idx_approved = find_index_by_keywords(hdr_map, ["approved", "approved by"])
  589.     idx_date = find_index_by_keywords(hdr_map, ["date"])
  590.     for row_idx, row in enumerate(expenses_rows[1:], start=2):
  591.         if not any((c or "").strip() for c in row):
  592.             continue
  593.         desc = row[idx_desc].strip() if idx_desc is not None and idx_desc < len(row) else "Expense"
  594.         amount = parse_float_safe(row[idx_amount]) if idx_amount is not None and idx_amount < len(row) else 0.0
  595.         fromacc = row[idx_from].strip() if idx_from is not None and idx_from < len(row) else ""
  596.         entry_by = row[idx_entry].strip() if idx_entry is not None and idx_entry < len(row) else ""
  597.         approved_by = row[idx_approved].strip() if idx_approved is not None and idx_approved < len(row) else ""
  598.         dt = parse_date(row[idx_date].strip()) if idx_date is not None and idx_date < len(row) and (row[idx_date] or "").strip() else None
  599.         data = {
  600.             "expenseId": f"EXP-{id_unique()}",
  601.             "expenseDate": dt,
  602.             "description": desc,
  603.             "type": "",
  604.             "subtype": "",
  605.             "amount": amount,
  606.             "fromAccount": fromacc,
  607.             "approvedBy": approved_by,
  608.             "entryBy": entry_by,
  609.         }
  610.         docs.append({"$id": id_unique(), "data": data})
  611.     return docs
  612. def prepare_internal_transfers_docs(transfers_rows):
  613.     docs = []
  614.     if not transfers_rows:
  615.         return docs
  616.     hdr_map = header_index_map(transfers_rows)
  617.     idx_date = find_index_by_keywords(hdr_map, ["date"])
  618.     idx_from = find_index_by_keywords(hdr_map, ["from"])
  619.     idx_to = find_index_by_keywords(hdr_map, ["to"])
  620.     idx_amount = find_index_by_keywords(hdr_map, ["amount", "amt"])
  621.     idx_entry = find_index_by_keywords(hdr_map, ["entry", "entry by"])
  622.     for row_idx, row in enumerate(transfers_rows[1:], start=2):
  623.         if not any((c or "").strip() for c in row):
  624.             continue
  625.         dt = parse_date(row[idx_date].strip()) if idx_date is not None and idx_date < len(row) and (row[idx_date] or "").strip() else None
  626.         frm = row[idx_from].strip() if idx_from is not None and idx_from < len(row) else ""
  627.         to = row[idx_to].strip() if idx_to is not None and idx_to < len(row) else ""
  628.         amount = parse_float_safe(row[idx_amount]) if idx_amount is not None and idx_amount < len(row) else 0.0
  629.         entry_by = row[idx_entry].strip() if idx_entry is not None and idx_entry < len(row) else ""
  630.         data = {
  631.             "transferId": f"TR-{id_unique()}",
  632.             "transferDate": dt,
  633.             "fromAccount": frm,
  634.             "toAccount": to,
  635.             "amount": amount,
  636.             "entryBy": entry_by,
  637.         }
  638.         docs.append({"$id": id_unique(), "data": data})
  639.     return docs
  640. # ---------- build maps from DB (used after uploading base collections) ----------
  641. def build_user_doc_map(collection_map):
  642.     logger.info("Building user map from DB (case-insensitive keys).")
  643.     users_map = {}
  644.     coll = collection_map["users"]
  645.     offset = 0
  646.     while True:
  647.         queries = [Query.limit(100), Query.offset(offset)]
  648.         res = db.list_documents(DATABASE_ID, coll, queries=queries)
  649.         docs = res.get('documents', [])
  650.         for d in docs:
  651.             name = (d.get("name") or "").strip()
  652.             if not name:
  653.                 continue
  654.             users_map[name] = d["$id"]
  655.             users_map[name.strip().lower()] = d["$id"]
  656.             users_map[re.sub(r'\s+', ' ', name.strip().lower())] = d["$id"]
  657.         if len(docs) < 100:
  658.             break
  659.         offset += 100
  660.     logger.info(" -> Mapped %d users", len(set([v for v in users_map.values()])))
  661.     return users_map
  662. def build_phys_code_map(collection_map):
  663.     """Build physician map by DocCode AND by normalized name"""
  664.     logger.info("Building physician code map from DB.")
  665.     phys_by_code = {} # DocCode -> physician ID
  666.     phys_by_name = {} # Normalized name -> physician ID
  667.     coll = collection_map["physicians"]
  668.     cursor = None
  669.     self_physician_id = None
  670.    
  671.     while True:
  672.         queries = [Query.limit(100)]
  673.         if cursor:
  674.             queries.append(Query.cursor_after(cursor))
  675.         res = db.list_documents(DATABASE_ID, coll, queries=queries)
  676.         docs = res.get('documents', [])
  677.         for d in docs:
  678.             doc_id = d["$id"]
  679.            
  680.             # Map by DocCode attribute
  681.             doccode = (d.get("DocCode") or "").strip()
  682.             if doccode:
  683.                 phys_by_code[doccode] = doc_id
  684.                 # Check if this is "Self" physician (DocCode 14882)
  685.                 if doccode == "14882":
  686.                     self_physician_id = doc_id
  687.            
  688.             # Map by normalized full name (strip DR./Dr. prefix, lowercase, trimmed)
  689.             full_name = (d.get("fullName") or "").strip()
  690.             if full_name:
  691.                 # Remove DR./Dr. prefix and normalize
  692.                 normalized = full_name.upper().replace("DR.", "").replace("DR ", "").strip().lower()
  693.                 if normalized:
  694.                     phys_by_name[normalized] = doc_id
  695.        
  696.         if len(docs) < 100:
  697.             break
  698.         cursor = docs[-1]["$id"]  # Use last document ID as cursor
  699.    
  700.     logger.info(" -> Mapped %d physician codes, %d unique names, Self physician ID: %s",
  701.                 len(phys_by_code), len(phys_by_name), self_physician_id or "NOT FOUND")
  702.    
  703.     if not self_physician_id:
  704.         logger.error("CRITICAL: 'Self' physician (DocCode 14882) not found in database!")
  705.    
  706.     return phys_by_code, phys_by_name, self_physician_id
  707. def build_patient_map(collection_map):
  708.     logger.info("Building patient map from DB with cursor-based pagination.")
  709.     patient_map = {}
  710.     coll = collection_map["patients"]
  711.     cursor = None
  712.     while True:
  713.         queries = [Query.limit(100)]
  714.         if cursor:
  715.             queries.append(Query.cursor_after(cursor))
  716.         res = db.list_documents(DATABASE_ID, coll, queries=queries)
  717.         docs = res.get('documents', [])
  718.         for d in docs:
  719.             pid = (d.get("patientId") or "").strip()
  720.             if pid:
  721.                 patient_map[pid] = d["$id"]
  722.         if len(docs) < 100:
  723.             break
  724.         cursor = docs[-1]["$id"]  # Use last document ID as cursor
  725.     logger.info(" -> Mapped %d patients", len(patient_map))
  726.     return patient_map
  727. # ---------- fetch sheets ----------
  728. def fetch_all_sheets(ss):
  729.     sheets = {}
  730.     sheet_names = ['RefPhy', 'Patients', 'TodayCash', 'TodayExpenses', 'IntTr']
  731.     for name in sheet_names:
  732.         logger.info("Fetching sheet: %s", name)
  733.         try:
  734.             ws = ss.worksheet(name)
  735.             sheets[name] = ws.get_all_values()
  736.         except Exception as e:
  737.             logger.warning("Sheet '%s' not found or error: %s", name, e)
  738.             sheets[name] = []
  739.         time.sleep(0.5)
  740.     return sheets
  741. # ---------- main run ----------
  742. def run():
  743.     # open spreadsheet
  744.     try:
  745.         ss = open_sheet()
  746.     except Exception as e:
  747.         logger.exception("Failed to open spreadsheet: %s", e)
  748.         return
  749.     collection_map = build_collection_map()
  750.     sheets = fetch_all_sheets(ss)
  751.     # Prepare users/clinics/physicians/patients docs locally
  752.     logger.info("Preparing base documents locally...")
  753.     user_docs = prepare_users_docs()
  754.     clinic_docs, phys_docs = prepare_clinics_physicians_docs(sheets.get('RefPhy', []))
  755.     patient_docs, _ = prepare_patients_docs(sheets.get('Patients', []))
  756.     # Upload base collections in order (users -> clinics -> physicians -> patients)
  757.     upload_order = [
  758.         ("users", user_docs),
  759.         ("clinics", clinic_docs),
  760.         ("physicians", phys_docs),
  761.         ("patients", patient_docs),
  762.     ]
  763.    
  764.     for logical, docs in upload_order:
  765.         if not docs:
  766.             logger.info("No docs for %s", logical)
  767.             continue
  768.         coll_id = collection_map.get(logical)
  769.         logger.info("Uploading %s (%d docs) -> %s", logical, len(docs), coll_id)
  770.         # chunk and create
  771.         batch_size = BATCH_SIZE_DEFAULT if logical != "patients" else 300
  772.         created_total = 0
  773.         error_total = 0
  774.         all_errors = []
  775.         for start in range(0, len(docs), batch_size):
  776.             batch = docs[start:start+batch_size]
  777.             succ, errs = batch_create(coll_id, batch, src_map=None, concurrency=PER_COLLECTION_CONCURRENCY.get(logical))
  778.             created_total += succ
  779.             error_total += len(errs)
  780.             all_errors.extend(errs)
  781.             logger.info("%s created so far: %d/%d (errors total: %d)", logical, created_total, len(docs), error_total)
  782.             # Extra sleep for patients to let sockets recover
  783.             if logical == "patients":
  784.                 time.sleep(0.3)
  785.        
  786.         # Log error samples for debugging
  787.         if all_errors:
  788.             logger.warning("%s had %d errors. Sample errors:", logical, len(all_errors))
  789.             for err in all_errors[:ERROR_SHOW_LIMIT]:
  790.                 logger.warning(" %s", err)
  791.        
  792.         # single sanitizer summary print for base collection (if any sanitized)
  793.         sanitized_count = SANITIZED_COUNTERS.get(coll_id, 0)
  794.         if sanitized_count:
  795.             logger.info("Sanitized for %s: %d docs had removed keys (sample removed keys: %s)", logical, sanitized_count, SANITIZED_REMOVED_SAMPLE.get(coll_id, [])[:6])
  796.         logger.info("%s final: %d/%d, errors: %d", logical, created_total, len(docs), error_total)
  797.         time.sleep(0.2)
  798.  
  799.     # Rebuild maps from DB AFTER base upload (this prevents race conditions / missing mapping)
  800.     logger.info("Rebuilding maps from DB after base collection uploads...")
  801.     user_map = build_user_doc_map(collection_map)
  802.     phys_by_code, phys_by_name, self_phys_id = build_phys_code_map(collection_map)  # Moved here
  803.     patient_map = build_patient_map(collection_map)
  804.  
  805.     # Prepare bookings/tests/payments using the DB maps (ensures correct entryBy & physician mapping)
  806.     bookings, tests, payments, src_map = prepare_bookings_tests_payments(
  807.         sheets.get('TodayCash', []),
  808.         patient_map,
  809.         phys_by_code,  # DocCode mapping for booking.referringPhysician
  810.         phys_by_name,  # Name mapping for test.doctor
  811.         self_phys_id,  # Fallback "Self" physician
  812.         user_map
  813.     )
  814.     # Upload collections: bookings, booked_tests, payments_ledger
  815.     logger.info("Uploading bookings/tests/payments...")
  816.     # bookings
  817.     coll_bookings = collection_map.get("bookings")
  818.     total = len(bookings)
  819.     created_total = 0
  820.     error_total = 0
  821.     batch_size = 300
  822.     for start in range(0, total, batch_size):
  823.         batch = bookings[start:start+batch_size]
  824.         # prepare src_map subset for these docs
  825.         subset_src_map = {d["$id"]: src_map.get(d["$id"]) for d in batch if d["$id"] in src_map}
  826.         succ, errs = batch_create(coll_bookings, batch, src_map=subset_src_map, concurrency=PER_COLLECTION_CONCURRENCY.get("bookings"))
  827.         created_total += succ
  828.         error_total += len(errs)
  829.         # print errors with row context (first few)
  830.         if errs:
  831.             logger.warning("bookings errors sample: %s", errs[:ERROR_SHOW_LIMIT])
  832.         logger.info("bookings created so far: %d/%d (errors total: %d)", created_total, total, error_total)
  833.     # sanitizer summary once
  834.     coll_id = coll_bookings
  835.     sanitized_count = SANITIZED_COUNTERS.get(coll_id, 0)
  836.     if sanitized_count:
  837.         logger.info("Sanitized for bookings: %d docs had removed keys (sample removed keys: %s)", sanitized_count, SANITIZED_REMOVED_SAMPLE.get(coll_id, [])[:6])
  838.     logger.info("bookings final: %d/%d, errors: %d", created_total, total, error_total)
  839.     # booked_tests
  840.     coll_tests = collection_map.get("booked_tests")
  841.     total = len(tests)
  842.     created_total = 0
  843.     error_total = 0
  844.     # bigger batch size to speed up
  845.     batch_size = 1000
  846.     for start in range(0, total, batch_size):
  847.         batch = tests[start:start+batch_size]
  848.         subset_src_map = {d["$id"]: src_map.get(d["$id"]) for d in batch if d["$id"] in src_map}
  849.         succ, errs = batch_create(coll_tests, batch, src_map=subset_src_map, concurrency=PER_COLLECTION_CONCURRENCY.get("booked_tests"))
  850.         created_total += succ
  851.         error_total += len(errs)
  852.         if errs:
  853.             logger.warning("booked_tests errors sample: %s", errs[:ERROR_SHOW_LIMIT])
  854.         logger.info("booked_tests created so far: %d/%d (errors total: %d)", created_total, total, error_total)
  855.     # sanitizer summary once
  856.     coll_id = coll_tests
  857.     sanitized_count = SANITIZED_COUNTERS.get(coll_id, 0)
  858.     if sanitized_count:
  859.         logger.info("Sanitized for booked_tests: %d docs had removed keys (sample removed keys: %s)", sanitized_count, SANITIZED_REMOVED_SAMPLE.get(coll_id, [])[:6])
  860.     logger.info("booked_tests final: %d/%d, errors: %d", created_total, total, error_total)
  861.     # payments_ledger
  862.     coll_payments = collection_map.get("payments_ledger")
  863.     total = len(payments)
  864.     created_total = 0
  865.     error_total = 0
  866.     batch_size = 300
  867.     for start in range(0, total, batch_size):
  868.         batch = payments[start:start+batch_size]
  869.         subset_src_map = {d["$id"]: src_map.get(d["$id"]) for d in batch if d["$id"] in src_map}
  870.         succ, errs = batch_create(coll_payments, batch, src_map=subset_src_map, concurrency=PER_COLLECTION_CONCURRENCY.get("payments_ledger"))
  871.         created_total += succ
  872.         error_total += len(errs)
  873.         if errs:
  874.             logger.warning("payments errors sample: %s", errs[:ERROR_SHOW_LIMIT])
  875.         logger.info("payments created so far: %d/%d (errors total: %d)", created_total, total, error_total)
  876.     coll_id = coll_payments
  877.     sanitized_count = SANITIZED_COUNTERS.get(coll_id, 0)
  878.     if sanitized_count:
  879.         logger.info("Sanitized for payments_ledger: %d docs had removed keys (sample removed keys: %s)", sanitized_count, SANITIZED_REMOVED_SAMPLE.get(coll_id, [])[:6])
  880.     logger.info("payments final: %d/%d, errors: %d", created_total, total, error_total)
  881.     # expenses and transfers
  882.     coll_exp = collection_map.get("expenses")
  883.     exp_docs = prepare_expenses_docs(sheets.get("TodayExpenses", []))
  884.     if exp_docs:
  885.         succ, errs = batch_create(coll_exp, exp_docs, src_map=None, concurrency=PER_COLLECTION_CONCURRENCY.get("expenses"))
  886.         logger.info("Expenses created: %d/%d errors: %d", succ, len(exp_docs), len(errs))
  887.     coll_tr = collection_map.get("internal_transfers")
  888.     tr_docs = prepare_internal_transfers_docs(sheets.get("IntTr", []))
  889.     if tr_docs:
  890.         succ, errs = batch_create(coll_tr, tr_docs, src_map=None, concurrency=PER_COLLECTION_CONCURRENCY.get("internal_transfers"))
  891.         logger.info("Transfers created: %d/%d errors: %d", succ, len(tr_docs), len(errs))
  892.     logger.info("=== MIGRATION COMPLETE ===")
  893. if __name__ == "__main__":
  894.     try:
  895.         run()
  896.     except KeyboardInterrupt:
  897.         logger.warning("Interrupted by user")
  898.     except Exception as e:
  899.         logger.exception("Fatal error: %s", e)
Advertisement
Add Comment
Please, Sign In to add comment