Advertisement
Guest User

py-helpers

a guest
Feb 6th, 2019
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.76 KB | None | 0 0
  1. import csv
  2. import logging
  3. import os
  4. import re
  5. from collections import namedtuple
  6. from datetime import date
  7. from time import sleep
  8. from typing import Tuple, List, Union, Dict
  9. from urllib.parse import urlparse
  10.  
  11. import requests
  12. import xlsxwriter
  13. from django.contrib.auth import authenticate, login
  14. from django.contrib.auth.models import User, Group
  15. from django.core.mail import EmailMessage
  16. from django.db import connection
  17.  
  18. from .settings import HOME_PAGE, SECRET_AUTH_STUB
  19.  
  20. logger = logging.getLogger('django')
  21.  
  22.  
  23. def sql(command: str, result_type: str="list") -> Union[
  24.     Tuple[List[tuple], bool],
  25.     List[Dict[str, Union[date, int]]],
  26.     List[tuple]
  27. ]:
  28.     """
  29.    Gets raw SQL command and returns result as Tuple[List[tuple], bool].
  30.    :param command: Raw SQL command.
  31.    :return: Tuple[List[tuple], bool]
  32.    """
  33.  
  34.     def dict_fetch_all(cursor):
  35.         """Return all rows from a cursor as a dict."""
  36.         columns = [col[0] for col in cursor.description]
  37.         return [
  38.             dict(zip(columns, row))
  39.             for row in cursor.fetchall()
  40.         ]
  41.  
  42.     def named_tuple_fetch_all(cursor):
  43.         """Return all rows from a cursor as a namedtuple."""
  44.         desc = cursor.description
  45.         nt_result = namedtuple('Result', [col[0] for col in desc])
  46.         return [nt_result(*row) for row in cursor.fetchall()]
  47.  
  48.     if isinstance(command, str) and command:
  49.         cursor = connection.cursor()
  50.         cursor.execute(command)
  51.  
  52.         if result_type == "list":
  53.             return cursor.fetchall(), True
  54.         if result_type == "dict":
  55.             return dict_fetch_all(cursor=cursor)
  56.         if result_type == "namedtuple":
  57.             return named_tuple_fetch_all(cursor=cursor)
  58.     return [], False
  59.  
  60.  
  61. def pars_pex(text, xml_teg):
  62.     xml_name = re.search(r"<({})>(.+?)<".format(xml_teg), text)\
  63.         .group(0)\
  64.         .replace('<{}>'.format(xml_teg), '')\
  65.         .replace('<', '')
  66.  
  67.     return xml_name
  68.  
  69.  
  70. def array_split(lst, sz):
  71.     """
  72.    For example, split array by 2 elements in arrays:
  73.    [1, 2, 3, 4, 5, 6, 7] -> [[1, 2], [3, 4], [5, 6], [7]]
  74.    """
  75.     return [lst[i:i+sz] for i in range(0, len(lst), sz)]
  76.  
  77.  
  78. class RegUsers:
  79.     _G_URL = "https://apps.smvgroup.com/smvservices/ldapservices.asmx"
  80.  
  81.     def __init__(self, login_name, password, request):
  82.         self.login_name = login_name.lower()
  83.         self.password = password
  84.         self.request = request
  85.  
  86.     def get_user_to_site(self):
  87.         try:
  88.             User.objects.get(username=self.login_name)
  89.         except (User.DoesNotExist, ValueError):
  90.             return False
  91.  
  92.         return True
  93.  
  94.     def reg_users(self, grop_list):
  95.         if self.get_user_to_site():
  96.             is_user = User.objects.get(username=self.login_name)
  97.             is_user.set_password(SECRET_AUTH_STUB)
  98.             is_user.save()
  99.         else:
  100.             user_add = User(username=self.login_name)
  101.             user_add.set_password(SECRET_AUTH_STUB)
  102.             user_add.save()
  103.  
  104.             for gorup_name in grop_list:
  105.                 try:
  106.                     group = Group.objects.get(name=gorup_name)
  107.                 except Group.DoesNotExist:
  108.                     raise Exception(f'Group "{gorup_name}" does not exist. '
  109.                                     f'Perhaps, you need to "manage.py create_default_groups"')
  110.                 user_add.groups.add(group)
  111.             user_add.save()
  112.  
  113.     def get_user_is_portal(self):
  114.         url = self._G_URL
  115.         data = "<?xml version=\"1.0\" encoding=\"utf-8\"?><soap12:Envelope xmlns:xsi=\"http://www.w3.org/2001/" \
  116.                "XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:soap12=\"http://www.w3.org" \
  117.                "/2003/05/soap-envelope\"><soap12:Body><LDapAuthenticateUser xmlns=\"http://services." \
  118.                "starcom.mediavest.group/\"><message><UserCredentials><Username>{}</U" \
  119.                "sername><Password><![CDATA[{}]]></Password></UserCredentials><Access" \
  120.                "Key>maksim.nekrasov@vivaki.ru</AccessKey></message></LDapAuthenticateUser>" \
  121.                "</soap12:Body></soap12:Envelope>"
  122.         headers = {'content-type': "application/soap+xml; charset=utf-8"}
  123.  
  124.         data = data.format(self.login_name, self.password)
  125.         req_auth = requests.post(url=url, data=data.encode('utf-8'), headers=headers)
  126.  
  127.         if req_auth.status_code == 200:
  128.             error_message = pars_pex(req_auth.text, 'ErrorMessage')
  129.             authe_result = pars_pex(req_auth.text, 'AutheticationResult')
  130.  
  131.             if authe_result == 'Successful':
  132.                 return True, None
  133.  
  134.             return False, error_message
  135.  
  136.         return False, None
  137.  
  138.     def authenticate_users(self, is_staff=False):
  139.         user_auth = authenticate(username=self.login_name, password=self.password if is_staff else SECRET_AUTH_STUB)
  140.  
  141.         if user_auth:
  142.             login(self.request, user_auth)
  143.             return True
  144.  
  145.         return False
  146.  
  147.     def get_next_url(self):
  148.         url_return = HOME_PAGE
  149.         if 'next' in self.request.GET:
  150.             url_pars = urlparse(self.request.GET.get('next'))
  151.             if url_pars[0] == '' and url_pars[1] == '' and url_pars[2] != '':
  152.                 url_return = url_pars[2]
  153.  
  154.         return url_return
  155.  
  156.     def is_staff(self):
  157.         try:
  158.             user = User.objects.get(username=self.login_name)
  159.         except (User.DoesNotExist, ValueError):
  160.             return False
  161.  
  162.         if user.is_staff:
  163.             return True
  164.  
  165.         return False
  166.  
  167.  
  168. def xlsx_from_csv_nearby(csv_filename: str):
  169.     excel_filename = csv_filename.replace(".csv", ".xlsx")
  170.     with open(csv_filename, encoding="utf-8") as csv_file:
  171.         workbook = xlsxwriter.workbook.Workbook(excel_filename)
  172.         worksheet = workbook.add_worksheet()
  173.         reader = csv.reader(csv_file, delimiter=";")
  174.         for r, row in enumerate(reader):
  175.             for c, col in enumerate(row):
  176.                 worksheet.write(r, c, col)
  177.         workbook.close()
  178.     return excel_filename
  179.  
  180.  
  181. def csv_from_dicts_list(dicts_list: list, filename: str):
  182.     with open(filename, 'w', encoding='utf-8') as output_file:
  183.         keys = dicts_list[0].keys()
  184.         dict_writer = csv.DictWriter(output_file, list(keys), delimiter=";")
  185.         dict_writer.writeheader()
  186.         chunked_dicts = array_split(dicts_list, 100)  # Don't pass huge lists to write, celery can't wait so long
  187.         logger.info(f'CSV: wrote headers, writing {len(dicts_list)} rows in {len(chunked_dicts)} chunks')
  188.         for i, chunk in enumerate(chunked_dicts):
  189.             dict_writer.writerows(chunk)
  190.  
  191.  
  192. def xlsx_from_dicts_list(dicts_list: list, filename: str):
  193.     workbook = xlsxwriter.workbook.Workbook(filename)
  194.     worksheet = workbook.add_worksheet()
  195.  
  196.     dicts_keys = sorted(dicts_list[0].keys())
  197.  
  198.     for c, col in enumerate(dicts_keys):
  199.         worksheet.write(0, c, col)
  200.  
  201.     for r, stat in enumerate(dicts_list):
  202.         for c, dict_key in enumerate(dicts_keys):
  203.             worksheet.write(r + 1, c, str(stat[dict_key]))
  204.  
  205.     workbook.close()
  206.     return filename
  207.  
  208.  
  209. def send_xlsx_email(email: str, title: str = '', body: str = '', attachments: list = []):
  210.     email = EmailMessage(title, body, ['TOOLS@PFX'], [email])
  211.     for attachment in attachments:
  212.         with open(attachment, 'rb') as send_file:
  213.             f = send_file.read()
  214.             email.attach(
  215.                 attachment.split(os.sep)[-1],
  216.                 f, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
  217.             )
  218.     tries = 0
  219.     while tries < 4:
  220.         try:
  221.             email.send()
  222.             return
  223.         except Exception as e:
  224.             tries += 1
  225.             logger.error(f'Email send error (try: {tries})\n{e}')
  226.             sleep(30)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement