Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import csv
- import logging
- import os
- import re
- from collections import namedtuple
- from datetime import date
- from time import sleep
- from typing import Tuple, List, Union, Dict
- from urllib.parse import urlparse
- import requests
- import xlsxwriter
- from django.contrib.auth import authenticate, login
- from django.contrib.auth.models import User, Group
- from django.core.mail import EmailMessage
- from django.db import connection
- from .settings import HOME_PAGE, SECRET_AUTH_STUB
- logger = logging.getLogger('django')
- def sql(command: str, result_type: str="list") -> Union[
- Tuple[List[tuple], bool],
- List[Dict[str, Union[date, int]]],
- List[tuple]
- ]:
- """
- Gets raw SQL command and returns result as Tuple[List[tuple], bool].
- :param command: Raw SQL command.
- :return: Tuple[List[tuple], bool]
- """
- def dict_fetch_all(cursor):
- """Return all rows from a cursor as a dict."""
- columns = [col[0] for col in cursor.description]
- return [
- dict(zip(columns, row))
- for row in cursor.fetchall()
- ]
- def named_tuple_fetch_all(cursor):
- """Return all rows from a cursor as a namedtuple."""
- desc = cursor.description
- nt_result = namedtuple('Result', [col[0] for col in desc])
- return [nt_result(*row) for row in cursor.fetchall()]
- if isinstance(command, str) and command:
- cursor = connection.cursor()
- cursor.execute(command)
- if result_type == "list":
- return cursor.fetchall(), True
- if result_type == "dict":
- return dict_fetch_all(cursor=cursor)
- if result_type == "namedtuple":
- return named_tuple_fetch_all(cursor=cursor)
- return [], False
- def pars_pex(text, xml_teg):
- xml_name = re.search(r"<({})>(.+?)<".format(xml_teg), text)\
- .group(0)\
- .replace('<{}>'.format(xml_teg), '')\
- .replace('<', '')
- return xml_name
- def array_split(lst, sz):
- """
- For example, split array by 2 elements in arrays:
- [1, 2, 3, 4, 5, 6, 7] -> [[1, 2], [3, 4], [5, 6], [7]]
- """
- return [lst[i:i+sz] for i in range(0, len(lst), sz)]
- class RegUsers:
- _G_URL = "https://apps.smvgroup.com/smvservices/ldapservices.asmx"
- def __init__(self, login_name, password, request):
- self.login_name = login_name.lower()
- self.password = password
- self.request = request
- def get_user_to_site(self):
- try:
- User.objects.get(username=self.login_name)
- except (User.DoesNotExist, ValueError):
- return False
- return True
- def reg_users(self, grop_list):
- if self.get_user_to_site():
- is_user = User.objects.get(username=self.login_name)
- is_user.set_password(SECRET_AUTH_STUB)
- is_user.save()
- else:
- user_add = User(username=self.login_name)
- user_add.set_password(SECRET_AUTH_STUB)
- user_add.save()
- for gorup_name in grop_list:
- try:
- group = Group.objects.get(name=gorup_name)
- except Group.DoesNotExist:
- raise Exception(f'Group "{gorup_name}" does not exist. '
- f'Perhaps, you need to "manage.py create_default_groups"')
- user_add.groups.add(group)
- user_add.save()
- def get_user_is_portal(self):
- url = self._G_URL
- data = "<?xml version=\"1.0\" encoding=\"utf-8\"?><soap12:Envelope xmlns:xsi=\"http://www.w3.org/2001/" \
- "XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:soap12=\"http://www.w3.org" \
- "/2003/05/soap-envelope\"><soap12:Body><LDapAuthenticateUser xmlns=\"http://services." \
- "starcom.mediavest.group/\"><message><UserCredentials><Username>{}</U" \
- "sername><Password><![CDATA[{}]]></Password></UserCredentials><Access" \
- "Key>maksim.nekrasov@vivaki.ru</AccessKey></message></LDapAuthenticateUser>" \
- "</soap12:Body></soap12:Envelope>"
- headers = {'content-type': "application/soap+xml; charset=utf-8"}
- data = data.format(self.login_name, self.password)
- req_auth = requests.post(url=url, data=data.encode('utf-8'), headers=headers)
- if req_auth.status_code == 200:
- error_message = pars_pex(req_auth.text, 'ErrorMessage')
- authe_result = pars_pex(req_auth.text, 'AutheticationResult')
- if authe_result == 'Successful':
- return True, None
- return False, error_message
- return False, None
- def authenticate_users(self, is_staff=False):
- user_auth = authenticate(username=self.login_name, password=self.password if is_staff else SECRET_AUTH_STUB)
- if user_auth:
- login(self.request, user_auth)
- return True
- return False
- def get_next_url(self):
- url_return = HOME_PAGE
- if 'next' in self.request.GET:
- url_pars = urlparse(self.request.GET.get('next'))
- if url_pars[0] == '' and url_pars[1] == '' and url_pars[2] != '':
- url_return = url_pars[2]
- return url_return
- def is_staff(self):
- try:
- user = User.objects.get(username=self.login_name)
- except (User.DoesNotExist, ValueError):
- return False
- if user.is_staff:
- return True
- return False
- def xlsx_from_csv_nearby(csv_filename: str):
- excel_filename = csv_filename.replace(".csv", ".xlsx")
- with open(csv_filename, encoding="utf-8") as csv_file:
- workbook = xlsxwriter.workbook.Workbook(excel_filename)
- worksheet = workbook.add_worksheet()
- reader = csv.reader(csv_file, delimiter=";")
- for r, row in enumerate(reader):
- for c, col in enumerate(row):
- worksheet.write(r, c, col)
- workbook.close()
- return excel_filename
- def csv_from_dicts_list(dicts_list: list, filename: str):
- with open(filename, 'w', encoding='utf-8') as output_file:
- keys = dicts_list[0].keys()
- dict_writer = csv.DictWriter(output_file, list(keys), delimiter=";")
- dict_writer.writeheader()
- chunked_dicts = array_split(dicts_list, 100) # Don't pass huge lists to write, celery can't wait so long
- logger.info(f'CSV: wrote headers, writing {len(dicts_list)} rows in {len(chunked_dicts)} chunks')
- for i, chunk in enumerate(chunked_dicts):
- dict_writer.writerows(chunk)
- def xlsx_from_dicts_list(dicts_list: list, filename: str):
- workbook = xlsxwriter.workbook.Workbook(filename)
- worksheet = workbook.add_worksheet()
- dicts_keys = sorted(dicts_list[0].keys())
- for c, col in enumerate(dicts_keys):
- worksheet.write(0, c, col)
- for r, stat in enumerate(dicts_list):
- for c, dict_key in enumerate(dicts_keys):
- worksheet.write(r + 1, c, str(stat[dict_key]))
- workbook.close()
- return filename
- def send_xlsx_email(email: str, title: str = '', body: str = '', attachments: list = []):
- email = EmailMessage(title, body, ['TOOLS@PFX'], [email])
- for attachment in attachments:
- with open(attachment, 'rb') as send_file:
- f = send_file.read()
- email.attach(
- attachment.split(os.sep)[-1],
- f, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- )
- tries = 0
- while tries < 4:
- try:
- email.send()
- return
- except Exception as e:
- tries += 1
- logger.error(f'Email send error (try: {tries})\n{e}')
- sleep(30)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement