iamrealnurs

Untitled

Aug 4th, 2025
392
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.61 KB | Source Code | 0 0
  1. import requests
  2. import threading
  3. import time
  4. import json
  5. import websocket  # pip install websocket-client
  6. import random
  7. from pprint import pprint
  8.  
  9. # 🔐 Настройки авторизации для Employee
  10. BASE_URL_EMPLOYEE = "http://127.0.0.1:8000/api/v1/employee_endpoints/authentication"
  11. WS_URL = "ws://127.0.0.1:8000/ws/notifications/"
  12. USERNAME = "svetlana_62"
  13. PASSWORD = "string"
  14.  
  15. # 🌐 Настройки API для пациента
  16. BASE_URL_PATIENT = "http://127.0.0.1:8000/api/v1/patients_endpoints"
  17. AUTH_URL = f"{BASE_URL_PATIENT}/authentication"
  18. VISIT_URL = f"{BASE_URL_PATIENT}/visits/create-visit/"
  19. DOCTORS_URL = f"{BASE_URL_PATIENT}/city_id:1/all-doctors/"
  20.  
  21. # 👤 Тестовые пользователи
  22. USERS = [
  23.     {
  24.         "phone_number": "+7 (768) 045-3987",
  25.         "password": "string",
  26.         "authenticated": True
  27.     },
  28.     {
  29.         "phone_number": "+7 579 730 93 15",
  30.         "first_name": "Тест",
  31.         "last_name": "Тестович",
  32.         "authenticated": False
  33.     }
  34. ]
  35.  
  36. MAX_RETRIES = 3
  37. TIMEOUT_SECONDS = 30
  38.  
  39.  
  40. # 📤 Универсальный отправщик запросов
  41. def send_request(name, url, method, data=None, headers=None):
  42.     pprint(f"\n🟡 {name}")
  43.     pprint(f"➡️ URL: {url}")
  44.     pprint(f"➡️ Метод: {method}")
  45.     pprint(f"➡️ Данные: {data}")
  46.  
  47.     request_func = requests.get if method == "GET" else requests.post
  48.     headers = headers or {}
  49.  
  50.     for attempt in range(1, MAX_RETRIES + 1):
  51.         try:
  52.             response = request_func(url, json=data, headers=headers, timeout=TIMEOUT_SECONDS)
  53.             pprint(f"📥 Ответ: {response.status_code} (Попытка {attempt}/{MAX_RETRIES})")
  54.  
  55.             if response.status_code in [200, 201]:
  56.                 try:
  57.                     return response.json()
  58.                 except Exception:
  59.                     pprint("⚠️ Ошибка парсинга JSON")
  60.                     return None
  61.             else:
  62.                 pprint(f"❌ Ошибка: {response.text}")
  63.                 return None
  64.         except requests.exceptions.Timeout:
  65.             pprint(f"⚠️ Тайм-аут (Попытка {attempt}) - ожидание 5 сек...")
  66.             time.sleep(5)
  67.         except requests.exceptions.RequestException as e:
  68.             pprint(f"❌ Ошибка соединения: {e}")
  69.             return None
  70.     return None
  71.  
  72.  
  73. # 🩺 Получение списка врачей
  74. def get_doctors():
  75.     return send_request("📍 Получение врачей", DOCTORS_URL, "GET")
  76.  
  77.  
  78. # 📲 Авторизация пациента
  79. def login(phone, password):
  80.     data = {"phone_number": phone, "password": password}
  81.     response = send_request("📲 Авторизация пациента", f"{AUTH_URL}/login/", "POST", data)
  82.     return response.get("access") if response else None
  83.  
  84.  
  85. # 👤 Получение профиля пациента
  86. def get_profile(access_token):
  87.     headers = {"Authorization": f"Bearer {access_token}"}
  88.     return send_request("👤 Получение профиля", f"{AUTH_URL}/me/", "GET", headers=headers)
  89.  
  90.  
  91. # 📝 Создание визита
  92. def create_visit(user, doctor, procedure, clinic, visit_date, time_slot_id):
  93.     scenario = "Авторизованный" if user["authenticated"] else "Неавторизованный"
  94.     visit_data = {
  95.         "doctor_id": doctor["id"],
  96.         "procedure_id": str(procedure["medical_procedure_id"]),
  97.         "clinic_id": int(clinic["clinic_id"]),
  98.         "date": visit_date,
  99.         "time_slot_id": time_slot_id
  100.     }
  101.  
  102.     headers = {}
  103.     if user["authenticated"]:
  104.         if "profile" in user:
  105.             visit_data.update({
  106.                 "first_name": user["profile"]["first_name"],
  107.                 "last_name": user["profile"]["last_name"],
  108.                 "phone_number": user["profile"]["phone_number"]
  109.             })
  110.         if "access_token" in user:
  111.             headers["Authorization"] = f"Bearer {user['access_token']}"
  112.     else:
  113.         visit_data.update({
  114.             "first_name": user["first_name"],
  115.             "last_name": user["last_name"],
  116.             "phone_number": user["phone_number"]
  117.         })
  118.  
  119.     pprint(f"📘 {scenario} пользователь создает визит")
  120.     return send_request("📝 Создание визита", VISIT_URL, "POST", visit_data, headers=headers)
  121.  
  122.  
  123. # 🎧 Подключение к WebSocket для уведомлений
  124. def listen_notifications(token):
  125.     def on_message(ws, message):
  126.         pprint(f"🔔 Уведомление: {message}")
  127.  
  128.     def on_error(ws, error):
  129.         pprint(f"❌ Ошибка WebSocket: {error}")
  130.  
  131.     def on_close(ws, code, msg):
  132.         pprint("🔌 WebSocket отключен")
  133.  
  134.     def on_open(ws):
  135.         pprint("🟢 WebSocket подключен")
  136.  
  137.     ws = websocket.WebSocketApp(
  138.         WS_URL,
  139.         on_message=on_message,
  140.         on_error=on_error,
  141.         on_close=on_close,
  142.         on_open=on_open,
  143.         header=[f"Authorization: Bearer {token}"]
  144.     )
  145.     ws.run_forever()
  146.  
  147.  
  148. # 👨‍⚕️ Имитация создания визита от пациента
  149. def simulate_patient_visit():
  150.     doctors_data = get_doctors()
  151.     if not doctors_data:
  152.         return
  153.     doctors = doctors_data.get("results", [])
  154.     if not doctors:
  155.         return
  156.  
  157.     doctor = doctors[0]
  158.     procedures = doctor.get("procedures", [])
  159.     weekly_schedule = doctor.get("weekly_schedule", [])
  160.     if not procedures or not weekly_schedule:
  161.         return
  162.  
  163.     schedule_block = weekly_schedule[0]
  164.     schedules = schedule_block.get("schedules", [])
  165.     if not schedules:
  166.         return
  167.  
  168.     schedule = schedules[0]
  169.     working_hours = schedule.get("working_hours_list", [])
  170.     if not working_hours:
  171.         return
  172.  
  173.     procedure = random.choice(procedures)
  174.     time_slot = random.choice(working_hours)
  175.     clinic = {
  176.         "clinic_id": schedule["clinic_id"],
  177.         "clinic_title": schedule["clinic_title"]
  178.     }
  179.     visit_date = schedule["date"]
  180.  
  181.     user = USERS[0]
  182.     if user["authenticated"]:
  183.         access_token = login(user["phone_number"], user["password"])
  184.         if not access_token:
  185.             pprint("❌ Не удалось авторизоваться пациентом")
  186.             return
  187.         user["access_token"] = access_token
  188.         user["profile"] = get_profile(access_token)
  189.  
  190.     create_visit(user, doctor, procedure, clinic, visit_date, time_slot["time_slot_id"])
  191.  
  192.  
  193. # 🧑‍💼 Получение токена для сотрудника
  194. def get_employee_token():
  195.     login_data = {"username": USERNAME, "password": PASSWORD}
  196.     response = requests.post(f"{BASE_URL_EMPLOYEE}/login/", json=login_data)
  197.     return response.json().get("access") if response.ok else None
  198.  
  199.  
  200. # 🧪 Запуск теста
  201. if __name__ == "__main__":
  202.     pprint("🚀 Запуск теста уведомлений при создании визита")
  203.     token = get_employee_token()
  204.     if not token:
  205.         pprint("⛔ Не удалось авторизовать сотрудника")
  206.         exit()
  207.  
  208.     # 🔌 Запуск WebSocket клиента
  209.     t = threading.Thread(target=listen_notifications, args=(token,))
  210.     t.start()
  211.  
  212.     pprint("⏳ Ждем 10 секунд перед созданием визита пациентом...")
  213.     time.sleep(10)
  214.  
  215.     # 🧑‍⚕️ Создание визита пациентом
  216.     simulate_patient_visit()
  217.  
  218.     time.sleep(10)  # Подождать для получения уведомления
  219.     pprint("🏁 Тест завершен")
  220.  
  221.  
Advertisement
Add Comment
Please, Sign In to add comment