Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import traceback
- from django.core.exceptions import ValidationError
- from rest_framework.response import Response
- from rest_framework.views import APIView
- from accounts.services.authentication import CustomBasicAuthentication
- from common.mixins import ExceptionHandlerMixin
- from common.services import create_log
- from ..serializers.partner_enquiry import (
- CRMViewAPISerializer,
- CustomerDetailsAPISerializer,
- FeasibilityCheckAPISerializer,
- InstallationAPISerializer,
- RaiseComplainAPISerializer,
- RechargeAPISerializer,
- SRDetailsGetAPISerializer,
- AdlCustomerCheckAPISerializer,
- )
- from ..services.partner_enquiry import (
- CustomException,
- decrypt,
- encrypt,
- installation,
- customer_details_get,
- )
- from drf_yasg.utils import swagger_auto_schema
- class InstallationAPI(ExceptionHandlerMixin, APIView):
- """After getting the feasibility and payment confirmation from the customer,
- This API is called to confirm that customer is ready to connect further for installation.
- """
- authentication_classes = [CustomBasicAuthentication]
- permission_classes = []
- @swagger_auto_schema(
- request_body=InstallationAPISerializer,
- responses={
- 200: """{
- "data": {"status": "Success", "viProspectId": "ENQ-1"},
- "message": {"message": "", "code": "Success"},
- }"""
- },
- )
- def post(self, request):
- try:
- user = request.user
- if not user or not user.is_authenticated:
- raise CustomException("FAILED", "Unauthorized Access")
- serializer = InstallationAPISerializer(data=decrypt(request.data["data"]))
- serializer.is_valid()
- if serializer.errors:
- error_list = [
- f"VALIDATIONERROR! {error}: {serializer.errors[error][0]}"
- for error in serializer.errors
- ]
- raise CustomException("RETRY", error_list[0])
- response_data = installation(user, **serializer.validated_data)
- return Response(
- {
- "data": response_data,
- "message": {"message": "", "code": "Success"},
- }
- )
- except Exception as e:
- code = "FAILED"
- msg = "Something went wrong.Please contact the administrator."
- error_info = "\n".join(traceback.format_exception(*sys.exc_info()))
- print(error_info)
- if isinstance(e, ValidationError):
- error_info = "\n".join(e.messages)
- code = "RETRY"
- msg = error_info
- if isinstance(e, CustomException):
- code = e.args[0]
- msg = e.args[1]
- create_log(
- action="VI Installation",
- action_details=f"VI Installation API ran into the following error.{error_info}",
- action_by="Anonymous User",
- log_type="ERROR",
- )
- return Response(
- status=406,
- data={"data": {}, "message": {"message": str(msg), "code": code}},
- )
- class CustomerDetailsAPI(ExceptionHandlerMixin, APIView):
- """Fetch Registered and Closed Customer details based on provided date."""
- authentication_classes = [CustomBasicAuthentication]
- permission_classes = []
- @swagger_auto_schema(
- request_body=CustomerDetailsAPISerializer,
- responses={
- 200: """{
- "data": [
- {
- "MSISDN": "8849574265",
- "AccountNo": "1234567",
- "Status ": "Registered",
- "LastStatusDate": "21-12-2022 14:00:00",
- },
- {
- "MSISDN": "8849574364",
- "AccountNo": "1234568",
- "Status ": "Closed",
- "LastStatusDate": "01-01-2023 00:00:00",
- },
- {
- "MSISDN": "8849574367",
- "AccountNo": "1234566",
- "Status ": "Closed",
- "LastStatusDate": "02-01-2022 01:00:01",
- },
- {
- "MSISDN": "8849574364",
- "AccountNo": "1234564",
- "Status ": "Registered",
- "LastStatusDate": "02-08-2022 02:02:03",
- },
- ],
- "message": {"message": "", "code": "Success"},
- }"""
- },
- )
- def post(self, request):
- try:
- user = request.user
- if not user or not user.is_authenticated:
- raise CustomException("FAILED", "Unauthorized Access")
- serializer = CustomerDetailsAPISerializer(data=request.data)
- serializer.is_valid()
- if serializer.errors:
- error_list = [
- f"VALIDATIONERROR! {error}: {serializer.errors[error][0]}"
- for error in serializer.errors
- ]
- raise CustomException("RETRY", error_list[0])
- response_data = customer_details_get(user, **serializer.validated_data)
- return Response(
- {
- "data": response_data,
- "message": {"message": "", "code": "Success"},
- }
- )
- except Exception as e:
- code = "FAILED"
- msg = "Something went wrong.Please contact the administrator."
- error_info = "\n".join(traceback.format_exception(*sys.exc_info()))
- print(error_info)
- if isinstance(e, ValidationError):
- error_info = "\n".join(e.messages)
- code = "RETRY"
- msg = error_info
- if isinstance(e, CustomException):
- code = e.args[0]
- msg = e.args[1]
- create_log(
- action="VI Customer Detail",
- action_details=f"VI Customer Detail API ran into the following error.{error_info}",
- action_by="Anonymous User",
- log_type="ERROR",
- )
- return Response(
- {"data": {}, "message": {"message": str(msg), "code": code}}
- )
- class RechargeAPI(ExceptionHandlerMixin, APIView):
- """API to Initiate payment based on given account no, plan poid,
- mast online plan poid, payment amount and merchant trans id.
- It will return unique YOUBB order Id."""
- authentication_classes = [CustomBasicAuthentication]
- permission_classes = []
- @swagger_auto_schema(
- request_body=RechargeAPISerializer,
- responses={
- 200: """{
- "data":{
- "paymentStatus": "SUCCESS",
- "youBBorderId": "10859125",
- "planPoid": "1415188680",
- "merchantOrderId": "1415179779",
- },
- "message": {"message": "", "code": "Success"},
- }"""
- },
- )
- def post(self, request):
- try:
- user = request.user
- if not user or not user.is_authenticated:
- raise CustomException("FAILED", "Unauthorized Access")
- serializer = RechargeAPISerializer(data=request.data)
- serializer.is_valid()
- if serializer.errors:
- error_list = [
- f"VALIDATIONERROR! {error}: {serializer.errors[error][0]}"
- for error in serializer.errors
- ]
- raise CustomException("RETRY", error_list[0])
- response_data = {
- "paymentStatus": "SUCCESS",
- "youBBorderId": "10859125",
- "planPoid": "1415188680",
- "merchantOrderId": "1415179779",
- }
- return Response(
- {
- "data": response_data,
- "message": {"message": "", "code": "Success"},
- }
- )
- except Exception as e:
- code = "FAILED"
- msg = "Something went wrong.Please contact the administrator."
- error_info = "\n".join(traceback.format_exception(*sys.exc_info()))
- print(error_info)
- if isinstance(e, ValidationError):
- error_info = "\n".join(e.messages)
- code = "RETRY"
- msg = error_info
- if isinstance(e, CustomException):
- code = e.args[0]
- msg = e.args[1]
- create_log(
- action="VI Recharge",
- action_details=f"VI Recharge API ran into the following error.{error_info}",
- action_by="Anonymous User",
- log_type="ERROR",
- )
- return Response(
- {"data": {}, "message": {"message": str(msg), "code": code}}
- )
- class FeasibilityCheckAPI(ExceptionHandlerMixin, APIView):
- """API to check the Feasibility of the lead generated."""
- authentication_classes = [CustomBasicAuthentication]
- permission_classes = []
- @swagger_auto_schema(
- request_body=FeasibilityCheckAPISerializer,
- responses={
- 200: """{
- "data":{
- "prospectId": "123456789",
- "IsFeasible": "Y",
- "message": "Lead is generated.",
- },
- "message": {"message": "", "code": "Success"},
- }"""
- },
- )
- def post(self, request):
- try:
- user = request.user
- if not user or not user.is_authenticated:
- raise CustomException("FAILED", "Unauthorized Access")
- serializer = FeasibilityCheckAPISerializer(data=request.data)
- serializer.is_valid()
- if serializer.errors:
- error_list = [
- f"VALIDATIONERROR! {error}: {serializer.errors[error][0]}"
- for error in serializer.errors
- ]
- raise CustomException("RETRY", error_list[0])
- response_data = {
- "prospectId": "123456789",
- "IsFeasible": "Y",
- "message": "Lead is generated.",
- }
- return Response(
- {
- "data": response_data,
- "message": {"message": "", "code": "Success"},
- }
- )
- except Exception as e:
- code = "FAILED"
- msg = "Something went wrong.Please contact the administrator."
- error_info = "\n".join(traceback.format_exception(*sys.exc_info()))
- print(error_info)
- if isinstance(e, ValidationError):
- error_info = "\n".join(e.messages)
- code = "RETRY"
- msg = error_info
- if isinstance(e, CustomException):
- code = e.args[0]
- msg = e.args[1]
- create_log(
- action="VI Feasibility Check",
- action_details=f"VI Feasibility Check API ran into the following error.{error_info}",
- action_by="Anonymous User",
- log_type="ERROR",
- )
- return Response(
- {"data": {}, "message": {"message": str(msg), "code": code}}
- )
- class CRMViewAPI(ExceptionHandlerMixin, APIView):
- """API to Fetch Customer’s account details who is on convergent plan"""
- authentication_classes = [CustomBasicAuthentication]
- permission_classes = []
- @swagger_auto_schema(
- request_body=CRMViewAPISerializer,
- responses={
- 200: """{
- "data":{
- "MSISDN": "8849574256",
- "IsExistingMember": "Y",
- "customerDetails": {
- "customerName": "Ravi Kumar",
- "accountNumber": "2694963",
- "userId": "",
- "registeredNumber": "7016655858",
- "registeredEmailId": "ravikumar@gmail.com",
- "installedCity": "",
- "installationAddress": "",
- "dateOfInstallation": "",
- "accountStatus": "",
- "planName": "10053/YOU JACKAL/10Mbps-1Mbps/300GB/3Months/Rs1859",
- "planValidity": "-90",
- "balance": {"mbBalance": "307200.0", "dayBalance": "90.0"},
- "dateOfLastRenewal": "",
- "framedIPStatus": "",
- },
- "installationStatus": {},
- },
- "message": {"message": "", "code": "Success"},
- }"""
- },
- )
- def post(self, request):
- try:
- user = request.user
- if not user or not user.is_authenticated:
- raise CustomException("FAILED", "Unauthorized Access")
- serializer = CRMViewAPISerializer(data=request.data)
- serializer.is_valid()
- if serializer.errors:
- error_list = [
- f"VALIDATIONERROR! {error}: {serializer.errors[error][0]}"
- for error in serializer.errors
- ]
- raise CustomException("RETRY", error_list[0])
- response_data = {
- "MSISDN": "8849574256",
- "IsExistingMember": "Y",
- "customerDetails": {
- "customerName": "Ravi Kumar",
- "accountNumber": "2694963",
- "userId": "",
- "registeredNumber": "7016655858",
- "registeredEmailId": "ravikumar@gmail.com",
- "installedCity": "",
- "installationAddress": "",
- "dateOfInstallation": "",
- "accountStatus": "",
- "planName": "10053/YOU JACKAL/10Mbps-1Mbps/300GB/3Months/Rs1859",
- "planValidity": "-90",
- "balance": {"mbBalance": "307200.0", "dayBalance": "90.0"},
- "dateOfLastRenewal": "",
- "framedIPStatus": "",
- },
- "installationStatus": {},
- }
- return Response(
- {
- "data": response_data,
- "message": {"message": "", "code": "Success"},
- }
- )
- except Exception as e:
- code = "FAILED"
- msg = "Something went wrong.Please contact the administrator."
- error_info = "\n".join(traceback.format_exception(*sys.exc_info()))
- print(error_info)
- if isinstance(e, ValidationError):
- error_info = "\n".join(e.messages)
- code = "RETRY"
- msg = error_info
- if isinstance(e, CustomException):
- code = e.args[0]
- msg = e.args[1]
- create_log(
- action="VI CRM View",
- action_details=f"VI CRM View API ran into the following error.{error_info}",
- action_by="Anonymous User",
- log_type="ERROR",
- )
- return Response(
- {"data": {}, "message": {"message": str(msg), "code": code}}
- )
- class SRDetailsGetAPI(ExceptionHandlerMixin, APIView):
- """API to Fetch Customer’s account details who is on convergent plan"""
- authentication_classes = [CustomBasicAuthentication]
- permission_classes = []
- @swagger_auto_schema(
- request_body=SRDetailsGetAPISerializer,
- responses={
- 200: """{
- "data":{
- "SRValues": [
- {
- "srID": "2023060129880",
- "customerName": "SAGAR KHATRI ",
- "subProcessCode": "4.K",
- "description": "ViTopazComplaint",
- "SRStatus": "Pending",
- "SRDate": "01-Jun-2023 13:06",
- },
- {
- "srID": "2023053129869",
- "customerName": "SAGAR KHATRI ",
- "subProcessCode": "2.D",
- "description": "ViTopazComplaint",
- "SRStatus": "Closed",
- "SRDate": "31-May-2023 10:55",
- },
- ]
- },
- "message": {"message": "", "code": "Success"},
- }"""
- },
- )
- def post(self, request):
- try:
- user = request.user
- if not user or not user.is_authenticated:
- raise CustomException("FAILED", "Unauthorized Access")
- serializer = SRDetailsGetAPISerializer(data=request.data)
- serializer.is_valid()
- if serializer.errors:
- error_list = [
- f"VALIDATIONERROR! {error}: {serializer.errors[error][0]}"
- for error in serializer.errors
- ]
- raise CustomException("RETRY", error_list[0])
- response_data = {
- "SRValues": [
- {
- "srID": "2023060129880",
- "customerName": "SAGAR KHATRI ",
- "subProcessCode": "4.K",
- "description": "ViTopazComplaint",
- "SRStatus": "Pending",
- "SRDate": "01-Jun-2023 13:06",
- },
- {
- "srID": "2023053129869",
- "customerName": "SAGAR KHATRI ",
- "subProcessCode": "2.D",
- "description": "ViTopazComplaint",
- "SRStatus": "Closed",
- "SRDate": "31-May-2023 10:55",
- },
- ]
- }
- return Response(
- {
- "data": response_data,
- "message": {"message": "", "code": "Success"},
- }
- )
- except Exception as e:
- code = "FAILED"
- msg = "Something went wrong.Please contact the administrator."
- error_info = "\n".join(traceback.format_exception(*sys.exc_info()))
- print(error_info)
- if isinstance(e, ValidationError):
- error_info = "\n".join(e.messages)
- code = "RETRY"
- msg = error_info
- if isinstance(e, CustomException):
- code = e.args[0]
- msg = e.args[1]
- create_log(
- action="VI SR Details",
- action_details=f"VI SR Details API ran into the following error.{error_info}",
- action_by="Anonymous User",
- log_type="ERROR",
- )
- return Response(
- {"data": {}, "message": {"message": str(msg), "code": code}}
- )
- class RaiseComplainAPI(ExceptionHandlerMixin, APIView):
- """Convergent customer can raise complaint if facing any issue with Adl Broadband."""
- authentication_classes = [CustomBasicAuthentication]
- permission_classes = []
- @swagger_auto_schema(
- request_body=RaiseComplainAPISerializer,
- responses={
- 200: """{
- "data":{
- "outageavail": "True",
- "outageval": {
- "outageId": "",
- "downtime": "",
- "city": "",
- "node": "",
- "area": "",
- "comments": "",
- "exp_up_time": "",
- "status": "",
- "act_up_time": "",
- "up_comment": "",
- "down_uid": "",
- "up_uid": "",
- "actdowntime": "",
- "oms_id": "",
- "reason": "",
- "cust_affected": "",
- "hrs": "24 ",
- "mins": "",
- },
- "compgenerate": "False",
- "complainId": "",
- "message": "Outage in Area",
- },
- "message": {"message": "", "code": "Success"},
- }"""
- },
- )
- def post(self, request):
- try:
- user = request.user
- if not user or not user.is_authenticated:
- raise CustomException("FAILED", "Unauthorized Access")
- serializer = RaiseComplainAPISerializer(data=request.data)
- serializer.is_valid()
- if serializer.errors:
- error_list = [
- f"VALIDATIONERROR! {error}: {serializer.errors[error][0]}"
- for error in serializer.errors
- ]
- raise CustomException("RETRY", error_list[0])
- response_data = {
- "outageavail": "True",
- "outageval": {
- "outageId": "",
- "downtime": "",
- "city": "",
- "node": "",
- "area": "",
- "comments": "",
- "exp_up_time": "",
- "status": "",
- "act_up_time": "",
- "up_comment": "",
- "down_uid": "",
- "up_uid": "",
- "actdowntime": "",
- "oms_id": "",
- "reason": "",
- "cust_affected": "",
- "hrs": "24 ",
- "mins": "",
- },
- "compgenerate": "False",
- "complainId": "",
- "message": "Outage in Area",
- }
- return Response(
- {
- "data": response_data,
- "message": {"message": "", "code": "Success"},
- }
- )
- except Exception as e:
- code = "FAILED"
- msg = "Something went wrong.Please contact the administrator."
- error_info = "\n".join(traceback.format_exception(*sys.exc_info()))
- print(error_info)
- if isinstance(e, ValidationError):
- error_info = "\n".join(e.messages)
- code = "RETRY"
- msg = error_info
- if isinstance(e, CustomException):
- code = e.args[0]
- msg = e.args[1]
- create_log(
- action="VI Raise Complain",
- action_details=f"VI Raise Complain API ran into the following error.{error_info}",
- action_by="Anonymous User",
- log_type="ERROR",
- )
- return Response(
- {"data": {}, "message": {"message": str(msg), "code": code}}
- )
- class AdlCustomerCheckAPI(ExceptionHandlerMixin, APIView):
- """Fetch Customer details of Adl based on given account no,username or mobileno."""
- authentication_classes = [CustomBasicAuthentication]
- permission_classes = []
- @swagger_auto_schema(
- request_body=AdlCustomerCheckAPISerializer,
- responses={
- 200: """{
- "data":{
- "MSISDN": "8849574256",
- "IsExistingMember": "Y",
- "customerDetails": [
- {
- "accountNo": "2694963",
- "name": "WOLF GREY",
- "plan": "10053/YOU JACKAL/10Mbps-1Mbps/300GB/3Months/Rs1859",
- "planDescription": "YOU JACKAL 300 GB 3 Months",
- "planValidity": "-90",
- "planMB": "2048",
- "bandwidth": "102399",
- "lowerBandwidth": "208",
- "status": "INSERVICE",
- "rcTag": "MBNOCF",
- "city": "SURAT",
- "suspendedDays": "0",
- "balance": {
- "mbBalance": "307200.0",
- "hourBalance": "0",
- "dayBalance": "90.0",
- },
- "isCurrentPlanRenewable": "true",
- "currentPlanPoid": "1412875777",
- "subscriptionAmt": "2500",
- "RBSQueue": {},
- }
- ],
- },
- "message": {"message": "", "code": "Success"},
- }"""
- },
- )
- def post(self, request):
- try:
- user = request.user
- if not user or not user.is_authenticated:
- raise CustomException("FAILED", "Unauthorized Access")
- serializer = AdlCustomerCheckAPISerializer(data=request.data)
- serializer.is_valid()
- if serializer.errors:
- error_list = [
- f"VALIDATIONERROR! {error}: {serializer.errors[error][0]}"
- for error in serializer.errors
- ]
- raise CustomException("RETRY", error_list[0])
- response_data = {
- "MSISDN": "8849574256",
- "IsExistingMember": "Y",
- "customerDetails": [
- {
- "accountNo": "2694963",
- "name": "WOLF GREY",
- "plan": "10053/YOU JACKAL/10Mbps-1Mbps/300GB/3Months/Rs1859",
- "planDescription": "YOU JACKAL 300 GB 3 Months",
- "planValidity": "-90",
- "planMB": "2048",
- "bandwidth": "102399",
- "lowerBandwidth": "208",
- "status": "INSERVICE",
- "rcTag": "MBNOCF",
- "city": "SURAT",
- "suspendedDays": "0",
- "balance": {
- "mbBalance": "307200.0",
- "hourBalance": "0",
- "dayBalance": "90.0",
- },
- "isCurrentPlanRenewable": "true",
- "currentPlanPoid": "1412875777",
- "subscriptionAmt": "2500",
- "RBSQueue": {},
- }
- ],
- }
- encrypted_data = encrypt(response_data)
- return Response(
- {
- "data": encrypted_data,
- "message": {"message": "", "code": "Success"},
- }
- )
- except Exception as e:
- code = "FAILED"
- msg = "Something went wrong.Please contact the administrator."
- error_info = "\n".join(traceback.format_exception(*sys.exc_info()))
- print(error_info)
- if isinstance(e, ValidationError):
- error_info = "\n".join(e.messages)
- code = "RETRY"
- msg = error_info
- if isinstance(e, CustomException):
- code = e.args[0]
- msg = e.args[1]
- create_log(
- action="VI Adl Check",
- action_details=f"VI Adl Check API ran into the following error.{error_info}",
- action_by="Anonymous User",
- log_type="ERROR",
- )
- return Response(
- {"data": {}, "message": {"message": str(msg), "code": code}}
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement