Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- import threading
- import time
- import json
- import websocket # pip install websocket-client
- import random
- from pprint import pprint
- # 🔐 Настройки авторизации для Employee
- BASE_URL_EMPLOYEE = "http://127.0.0.1:8000/api/v1/employee_endpoints/authentication"
- WS_URL = "ws://127.0.0.1:8000/ws/notifications/"
- USERNAME = "svetlana_62"
- PASSWORD = "string"
- # 🌐 Настройки API для пациента
- BASE_URL_PATIENT = "http://127.0.0.1:8000/api/v1/patients_endpoints"
- AUTH_URL = f"{BASE_URL_PATIENT}/authentication"
- VISIT_URL = f"{BASE_URL_PATIENT}/visits/create-visit/"
- DOCTORS_URL = f"{BASE_URL_PATIENT}/city_id:1/all-doctors/"
- # 👤 Тестовые пользователи
- USERS = [
- {
- "phone_number": "+7 (768) 045-3987",
- "password": "string",
- "authenticated": True
- },
- {
- "phone_number": "+7 579 730 93 15",
- "first_name": "Тест",
- "last_name": "Тестович",
- "authenticated": False
- }
- ]
- MAX_RETRIES = 3
- TIMEOUT_SECONDS = 30
- # 📤 Универсальный отправщик запросов
- def send_request(name, url, method, data=None, headers=None):
- pprint(f"\n🟡 {name}")
- pprint(f"➡️ URL: {url}")
- pprint(f"➡️ Метод: {method}")
- pprint(f"➡️ Данные: {data}")
- request_func = requests.get if method == "GET" else requests.post
- headers = headers or {}
- for attempt in range(1, MAX_RETRIES + 1):
- try:
- response = request_func(url, json=data, headers=headers, timeout=TIMEOUT_SECONDS)
- pprint(f"📥 Ответ: {response.status_code} (Попытка {attempt}/{MAX_RETRIES})")
- if response.status_code in [200, 201]:
- try:
- return response.json()
- except Exception:
- pprint("⚠️ Ошибка парсинга JSON")
- return None
- else:
- pprint(f"❌ Ошибка: {response.text}")
- return None
- except requests.exceptions.Timeout:
- pprint(f"⚠️ Тайм-аут (Попытка {attempt}) - ожидание 5 сек...")
- time.sleep(5)
- except requests.exceptions.RequestException as e:
- pprint(f"❌ Ошибка соединения: {e}")
- return None
- return None
- # 🩺 Получение списка врачей
- def get_doctors():
- return send_request("📍 Получение врачей", DOCTORS_URL, "GET")
- # 📲 Авторизация пациента
- def login(phone, password):
- data = {"phone_number": phone, "password": password}
- response = send_request("📲 Авторизация пациента", f"{AUTH_URL}/login/", "POST", data)
- return response.get("access") if response else None
- # 👤 Получение профиля пациента
- def get_profile(access_token):
- headers = {"Authorization": f"Bearer {access_token}"}
- return send_request("👤 Получение профиля", f"{AUTH_URL}/me/", "GET", headers=headers)
- # 📝 Создание визита
- def create_visit(user, doctor, procedure, clinic, visit_date, time_slot_id):
- scenario = "Авторизованный" if user["authenticated"] else "Неавторизованный"
- visit_data = {
- "doctor_id": doctor["id"],
- "procedure_id": str(procedure["medical_procedure_id"]),
- "clinic_id": int(clinic["clinic_id"]),
- "date": visit_date,
- "time_slot_id": time_slot_id
- }
- headers = {}
- if user["authenticated"]:
- if "profile" in user:
- visit_data.update({
- "first_name": user["profile"]["first_name"],
- "last_name": user["profile"]["last_name"],
- "phone_number": user["profile"]["phone_number"]
- })
- if "access_token" in user:
- headers["Authorization"] = f"Bearer {user['access_token']}"
- else:
- visit_data.update({
- "first_name": user["first_name"],
- "last_name": user["last_name"],
- "phone_number": user["phone_number"]
- })
- pprint(f"📘 {scenario} пользователь создает визит")
- return send_request("📝 Создание визита", VISIT_URL, "POST", visit_data, headers=headers)
- # 🎧 Подключение к WebSocket для уведомлений
- def listen_notifications(token):
- def on_message(ws, message):
- pprint(f"🔔 Уведомление: {message}")
- def on_error(ws, error):
- pprint(f"❌ Ошибка WebSocket: {error}")
- def on_close(ws, code, msg):
- pprint("🔌 WebSocket отключен")
- def on_open(ws):
- pprint("🟢 WebSocket подключен")
- ws = websocket.WebSocketApp(
- WS_URL,
- on_message=on_message,
- on_error=on_error,
- on_close=on_close,
- on_open=on_open,
- header=[f"Authorization: Bearer {token}"]
- )
- ws.run_forever()
- # 👨⚕️ Имитация создания визита от пациента
- def simulate_patient_visit():
- doctors_data = get_doctors()
- if not doctors_data:
- return
- doctors = doctors_data.get("results", [])
- if not doctors:
- return
- doctor = doctors[0]
- procedures = doctor.get("procedures", [])
- weekly_schedule = doctor.get("weekly_schedule", [])
- if not procedures or not weekly_schedule:
- return
- schedule_block = weekly_schedule[0]
- schedules = schedule_block.get("schedules", [])
- if not schedules:
- return
- schedule = schedules[0]
- working_hours = schedule.get("working_hours_list", [])
- if not working_hours:
- return
- procedure = random.choice(procedures)
- time_slot = random.choice(working_hours)
- clinic = {
- "clinic_id": schedule["clinic_id"],
- "clinic_title": schedule["clinic_title"]
- }
- visit_date = schedule["date"]
- user = USERS[0]
- if user["authenticated"]:
- access_token = login(user["phone_number"], user["password"])
- if not access_token:
- pprint("❌ Не удалось авторизоваться пациентом")
- return
- user["access_token"] = access_token
- user["profile"] = get_profile(access_token)
- create_visit(user, doctor, procedure, clinic, visit_date, time_slot["time_slot_id"])
- # 🧑💼 Получение токена для сотрудника
- def get_employee_token():
- login_data = {"username": USERNAME, "password": PASSWORD}
- response = requests.post(f"{BASE_URL_EMPLOYEE}/login/", json=login_data)
- return response.json().get("access") if response.ok else None
- # 🧪 Запуск теста
- if __name__ == "__main__":
- pprint("🚀 Запуск теста уведомлений при создании визита")
- token = get_employee_token()
- if not token:
- pprint("⛔ Не удалось авторизовать сотрудника")
- exit()
- # 🔌 Запуск WebSocket клиента
- t = threading.Thread(target=listen_notifications, args=(token,))
- t.start()
- pprint("⏳ Ждем 10 секунд перед созданием визита пациентом...")
- time.sleep(10)
- # 🧑⚕️ Создание визита пациентом
- simulate_patient_visit()
- time.sleep(10) # Подождать для получения уведомления
- pprint("🏁 Тест завершен")
Advertisement
Add Comment
Please, Sign In to add comment