Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import csv
- import io
- from collections import defaultdict, deque
- from csv import DictReader
- import graphene
- from graphene.types import InputObjectType
- from django.core.exceptions import ValidationError
- from ...core.mutations import ModelMutation, BaseMutation
- from ...core.types import Upload
- from ...core.types.common import TargetError
- from ....account.models import User
- from ....core.permissions import TargetPermissions
- from ....partner.models import Partner
- from ....target import models
- from ....target.error_codes import TargetErrorCode
- from dateutil import parser, relativedelta
- from datetime import datetime as dt
- from ....target.models import Achievement, TargetUser, PartnerTargetSales
- from ...utils import check_if_attempted_from_valid_parent
- class CreateTargetInput(InputObjectType):
- user = graphene.ID(description="ID of the Agent.", name="user", required=True)
- month = graphene.Date(required=True, description="Target month(YYYY-MM)")
- target_orders = graphene.Int(
- description="Number of orders for the target.", required=True
- )
- target_sales = graphene.Int(
- description="Amount of sales for the target.", required=True
- )
- class CreateTarget(ModelMutation):
- class Arguments:
- input = CreateTargetInput(
- required=True, description="Fields required to create a target."
- )
- class Meta:
- description = "Creates a new target of an agent."
- model = models.Target
- permissions = (TargetPermissions.MANAGE_TARGETS,)
- error_type_class = TargetError
- error_type_field = "target_errors"
- @classmethod
- def clean_input(cls, info, instance, data):
- user = cls.get_node_or_error(info, data["user"])
- requestor = info.context.user
- if requestor.groups.first().name in ["dco", "dcm", "cm"]:
- check_if_attempted_from_valid_parent(requestor, user)
- cleaned_input = super().clean_input(info, instance, data)
- if cleaned_input['target_orders'] < 1:
- raise ValidationError(
- {
- "target_orders": ValidationError(
- "Target order should be positive number",
- code=TargetErrorCode.INVALID,
- )
- }
- )
- if cleaned_input['target_sales'] < 1:
- raise ValidationError(
- {
- "target_sales": ValidationError(
- "Target sales should be positive number",
- code=TargetErrorCode.INVALID,
- )
- }
- )
- user = cleaned_input['user']
- month = parser.parse(str(cleaned_input['month'])).replace(day=1)
- cleaned_input['month'] = month
- target = models.Target.objects.filter(user=user, month=month)
- if target:
- raise ValidationError(
- {
- "month": ValidationError(
- "Target for specified month already exists.",
- code=TargetErrorCode.ALREADY_EXISTS,
- )
- }
- )
- return cleaned_input
- @classmethod
- def save(cls, info, instance, cleaned_input):
- instance.save()
- user = cleaned_input['user']
- month = cleaned_input['month']
- achievement = Achievement.objects.filter(user=user, month=month).first()
- if achievement:
- achievement.target = instance
- achievement.save()
- class UpdateTargetInput(graphene.InputObjectType):
- target_orders = graphene.Int(
- description="Number of orders for the target.", required=False
- )
- target_sales = graphene.Int(
- description="Amount of sales for the target.", required=False
- )
- class UpdateTarget(ModelMutation):
- class Arguments:
- id = graphene.ID(required=True, description="ID of a target to update.")
- input = UpdateTargetInput(
- required=True, description="Fields required to update a target."
- )
- class Meta:
- description = "Updates a new target of an agent."
- model = models.Target
- permissions = (TargetPermissions.MANAGE_TARGETS,)
- error_type_class = TargetError
- error_type_field = "target_errors"
- @classmethod
- def clean_input(cls, info, instance, data):
- target_month = instance.month.strftime("%Y-%m")
- current_month = dt.now().date().strftime("%Y-%m")
- if target_month < current_month:
- raise ValidationError(
- {
- "id": ValidationError(
- "This item can not be modified.",
- code=TargetErrorCode.INVALID
- )
- }
- )
- target_orders = data.get("target_orders")
- target_sales = data.get("target_sales")
- if not target_sales and not target_orders:
- raise ValidationError(
- {
- "target_sales": ValidationError(
- "Target sales can not be empty",
- code=TargetErrorCode.INVALID
- ),
- "target_orders": ValidationError(
- "Target orders can not be empty",
- code=TargetErrorCode.INVALID
- )
- }
- )
- if target_orders < 1:
- raise ValidationError(
- {
- "target_orders": ValidationError(
- "Target order should be positive number",
- code=TargetErrorCode.INVALID
- )
- }
- )
- if target_sales < 1:
- raise ValidationError(
- {
- "target_sales": ValidationError(
- "Target sales should be positive number",
- code=TargetErrorCode.INVALID,
- )
- }
- )
- return super().clean_input(info, instance, data)
- class CreateTargetsInBulk(BaseMutation):
- data1 = graphene.String()
- graph = defaultdict(list)
- class Arguments:
- file = Upload(
- required=True,
- description="A file.... later",
- )
- # file_tag = graphene.String("Tag name of the file.", required=True)
- class Meta:
- description = "Upload file(csv) to create targets in bulk."
- error_type_class = TargetError
- error_type_field = "target_errors"
- permissions = (TargetPermissions.MANAGE_TARGETS,)
- # @classmethod
- # def clean_system_file(cls, instance, file_tag):
- # files = instance.documents.filter(file_tag=file_tag)
- # if files:
- # for file in files:
- # file.content_file.delete()
- # files.delete()
- @classmethod
- def get_parents_list(self, source):
- if len(self.graph) == 0:
- all_users = User.objects.all()
- for user in all_users:
- if user.parent_id:
- self.graph[user.id].append(user.parent_id)
- parents_list = []
- visited = set()
- queue = deque()
- visited.add(source)
- queue.append(source)
- while queue:
- s = queue.popleft()
- for neighbour in self.graph[s]:
- if neighbour not in visited:
- parents_list.append(neighbour)
- visited.add(neighbour)
- queue.append(neighbour)
- return parents_list
- @classmethod
- def perform_mutation(cls, _root, info, file):
- # user = info.context.user
- # image_data = info.context.FILES.get(image)
- # validate_image_file(image_data, "image")
- #
- # cls.clean_system_file(user, file_tag)
- # the_image_file = models.Document.objects.create(
- # content_file=image_data,
- # mimetype=image_data.content_type,
- # file_tag=file_tag
- # )
- # user.documents.add(the_image_file)
- print("\n\n--------------\n\n")
- try:
- csv_file = info.context.FILES.get(file)
- if not csv_file.name.endswith(".csv"):
- print('add exception here!')
- data_set = csv_file.read().decode("UTF-8")
- io_string = io.StringIO(data_set)
- csv_reader = csv.reader(io_string, delimiter=',')
- header = next(csv_reader)
- no_of_columns = len(header)
- agents = {}
- partners = {}
- all_agents = User.objects.filter(groups__name='agent')
- all_partners = Partner.objects.all()
- for agent in all_agents:
- agents[agent.email] = agent
- for partner in all_partners:
- partners[partner.partner_id] = partner
- # print(agents)
- # print(partners)
- target_user_list = []
- partner_target_sales_list = []
- raw_data = []
- for row in csv_reader:
- raw_data.append({'row': row, 'target_user_objects_list': []})
- # toDo 1: target_user bulk create
- for item in raw_data:
- # print(row)
- email = item['row'][0]
- month = item['row'][1]
- agent = agents.get(email, None)
- if agent is None:
- pass
- # riase exception here
- target_user_item = TargetUser(
- user=agent,
- month=month
- )
- target_user_list.append(target_user_item)
- item['target_user_objects_list'].append(target_user_item)
- parents_list = cls.get_parents_list(agent.id)
- for parent_id in parents_list:
- target_user_item = TargetUser(
- user_id=parent_id,
- month=month
- )
- target_user_list.append(target_user_item)
- item['target_user_objects_list'].append(target_user_item)
- # target_user_list.append(target_user_item)
- item['target_obj'] = target_user_item
- TargetUser.objects.bulk_create(target_user_list)
- # toDo 2: partner_target_sales bulk create
- for item in raw_data:
- target_user_objects_list = item['target_user_objects_list']
- for target_user_obj in target_user_objects_list:
- index = 2
- while index + 1 < no_of_columns:
- if len(item['row'][index]) == 0:
- index += 2
- continue
- partner_id = item['row'][index]
- target_sales = item['row'][index + 1]
- partner_obj = partners.get(partner_id, None)
- if partner_obj is None:
- pass
- partner_target_sales_item = PartnerTargetSales(
- target_user=target_user_obj,
- partner=partner_obj,
- target_sales=target_sales
- )
- partner_target_sales_list.append(partner_target_sales_item)
- index += 2
- PartnerTargetSales.objects.bulk_create(partner_target_sales_list)
- except Exception as e:
- print(e)
- print('Did not work')
- print("\n\n--------------\n\n")
- data1 = "uploading functionality is working"
- return CreateTargetsInBulk(data1=data1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement