Advertisement
chuuupa

process

Oct 19th, 2021
727
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 60.55 KB | None | 0 0
  1. from __future__ import annotations
  2.  
  3. import asyncio
  4. import html
  5. import os
  6. import re
  7. import time
  8. from argparse import ArgumentParser
  9. from contextlib import contextmanager
  10. from datetime import datetime
  11. from typing import Dict, Iterable, Optional, Tuple
  12.  
  13. import aiohttp
  14. import httpx
  15. import requests
  16. from nostr_data_logging import NoStrDataLogger
  17. import uvloop
  18. from lxml import etree
  19.  
  20. import get_changed
  21.  
  22.  
  23. from mapping import (
  24.     XML_ROOT_ELEMENT,
  25.     CALC_PARTITION_SIZE
  26. )
  27.  
  28. from storage_handler import StorageData
  29. from environment_handler import Environment
  30. from spark_interfax_parser import SparkInterfaxParser
  31.  
  32.  
  33. environment = Environment()
  34. # CONCURRENCY = 1000
  35. # RESPONSE_VA
  36.  
  37.  
  38. def _validate(response: str, pattern: str) -> Optional[str]:
  39.     """
  40.    Первичная валидация ответа от сервиса СПАРК.
  41.  
  42.    Args:
  43.        response (str): xml ответ от сервиса СПАРК;
  44.  
  45.        pattern (str): регулярное выражение по которому происходит первичная валидация ответа.
  46.            Описывается в конфигурационном файле.
  47.  
  48.    Returns:
  49.        Optional[str]: в случае успешной валидации И присутствия данных во внутренней xml ответа
  50.            вернется внутренняя xml с конвертированными ссылками на именнованные и числовые символы
  51.            в соответствующие Unicode символы.
  52.    """
  53.  
  54.     match = re.fullmatch(pattern, response)
  55.  
  56.     if match and match.group(1) == 'True':
  57.         return html.unescape(match.group(2))
  58.  
  59.  
  60. @contextmanager
  61. def _report_time(logger: NoStrDataLogger, test: str) -> None:
  62.     """
  63.    Логирование времени затраченного на выполнение кода в контексте.
  64.  
  65.    Args:
  66.        logger (NoStrDataLogger): логер, используемый для записи времени выполенения;
  67.  
  68.        test (str): проводимая операция (в контексте) для идентификации в логах.
  69.    """
  70.  
  71.     t0 = time.time()
  72.     yield
  73.     logger.info('Time needed for %s: %.2fs' % (test, time.time() - t0))
  74.  
  75.  
  76. async def _produce(
  77.     # session: aiohttp.ClientSession,
  78.     client: httpx.AsyncClient,
  79.     queue: asyncio.Queue,
  80.     semaphore: asyncio.Semaphore,
  81.     method: str,
  82.     kwarg: Dict[str, str],
  83.     logger: NoStrDataLogger
  84. ) -> None:
  85.     """
  86.    Провайдер ответов для связующей асинхронной очереди. Отправляет запрос
  87.        с использованием предварительно открытой сессии в сервис СПАРК и
  88.        складывает кортеж <аргументы запроса, ответ сервиса> в очередь.
  89.  
  90.    Args:
  91.        session (aiohttp.ClientSession): сессия общения с сервисом СПАРК.
  92.            Является инкапсуляцией пула соединений с сервисом;
  93.  
  94.        queue (asyncio.Queue): асинхронная очередь,
  95.            связующая провайдер и потребитель ответов от сервиса;
  96.        
  97.        semaphore (asyncio.Semaphore): ограничитель на количество одновременных
  98.            запросов в сервис СПАРК;
  99.  
  100.        method (str): наименование метода в сервисе СПАРК;
  101.  
  102.        kwarg (Dict[str, str]): словарь аргументов для совершения POST-запроса в сервис СПАРК;
  103.  
  104.        logger (NoStrDataLogger): логер, используемый для сообщений по текущему методу.
  105.    """
  106.  
  107.     async with semaphore:
  108.         try:
  109.             rdata = environment.get_request_data(method, kwarg).encode()
  110.             response = await client.post(environment.url, content=rdata)
  111.             print(response.elapsed.total_seconds())
  112.             queue.put_nowait((kwarg, response.text))
  113.         except httpx.RequestError as exc:
  114.             logger.warning(str(kwarg) + ': ' + repr(exc))
  115.             asyncio.ensure_future(_produce(client, queue, semaphore, method, kwarg, logger))
  116.  
  117.         # except (aiohttp.client_exceptions.ClientConnectionError,
  118.         #         aiohttp.client_exceptions.ClientPayloadError) as clierr:
  119.         #     logger.warning(str(kwarg) + ': ' + repr(clierr))
  120.         #     asyncio.ensure_future(_produce(session, queue, semaphore, method, kwarg, logger))
  121.  
  122.         # except asyncio.exceptions.TimeoutError as timeouterr:
  123.         #     logger.warning(str(kwarg) + ': ' + repr(timeouterr))
  124.         #     asyncio.ensure_future(_produce(session, queue, method, semaphore, kwarg, logger))
  125.  
  126.  
  127. async def _consume(
  128.     queue: asyncio.Queue,
  129.     parser: SparkInterfaxParser,
  130.     xml_validator: etree.XMLParser,
  131.     root: str,
  132.     pattern: str,
  133.     logger: NoStrDataLogger
  134. ) -> None:
  135.     """
  136.    Потребитель ответов для связующей асинхронной очереди.
  137.        Достает из оредеди кортеж <аргументы запроса, ответ сервиса> и обрабатывает его.
  138.  
  139.    Args:
  140.        queue (asyncio.Queue): асинхронная очередь,
  141.            связующая провайдер и потребитель ответов от сервиса;
  142.  
  143.        parser (SparkInterfaxParser): парсер для внутренних xml, содержащих необходимые данные.
  144.            Также является контейнером для результирующего множества таблиц;
  145.  
  146.        xml_validator (etree.XMLParser): валидатор на соответствие xsd схеме для внутренних xml;
  147.  
  148.        root (str): элемент, являющийся корневым во внутренней xml.
  149.            Ниже его в структуре xml находятся непосредственно извлекаемые данные.
  150.            Явно прописываемый параметр в файле mapping.py в переменной XML_ROOT_ELEMENT
  151.                для каждого метода (если не указан, будет использован 'Data/Report');
  152.  
  153.        pattern (str): Регулярное выражение по которому происходит первичная валидация ответа.
  154.            Описывается в конфигурационном файле;
  155.  
  156.        logger (NoStrDataLogger): логер, используемый для сообщений по текущему методу.
  157.    """
  158.  
  159.     while True:
  160.         kwarg, response = await queue.get()
  161.         _handle_response(kwarg, response, parser, xml_validator, root, pattern, logger)
  162.         queue.task_done()
  163.  
  164.  
  165. def _handle_response(
  166.     kwarg: Dict[str, str],
  167.     response: str,
  168.     parser: SparkInterfaxParser,
  169.     xml_validator: etree.XMLParser,
  170.     root: str,
  171.     pattern: str,
  172.     logger: NoStrDataLogger
  173. ) -> None:
  174.     """
  175.    Обработка xml-ответов от сервиса СПАРК.
  176.  
  177.    Args:
  178.        kwarg (Dict[str, str]): аргументы, с которыми отправлялся запрос.
  179.            Необходимы для возможности логирования ошибок при обработке, как например ошибка  
  180.                валидации по xsd схеме;
  181.  
  182.        response (str): xml ответ от сервиса СПАРК;
  183.  
  184.        parser (SparkInterfaxParser): парсер для внутренних xml, содержащих необходимые данные.
  185.            Также является контейнером для результирующего множества таблиц;
  186.  
  187.        xml_validator (etree.XMLParser): валидатор на соответствие xsd схеме для внутренних xml;
  188.  
  189.        root (str): элемент, являющийся корневым во внутренней xml.
  190.            Ниже его в структуре xml находятся непосредственно извлекаемые данные.
  191.            Явно прописываемый параметр в файле mapping.py в переменной XML_ROOT_ELEMENT
  192.                для каждого метода (если не указан, будет использован 'Data/Report');
  193.  
  194.        pattern (str): Регулярное выражение по которому происходит первичная валидация ответа.
  195.            Описывается в конфигурационном файле;
  196.  
  197.        logger (NoStrDataLogger): логер, используемый для сообщений по текущему методу.
  198.    """
  199.  
  200.     validated = _validate(response, pattern)
  201.  
  202.     if validated is not None:
  203.         try:
  204.             eltree = etree.fromstring(
  205.                 environment.remove_prefix(validated, '<?xml version="1.0" encoding="UTF-8"?>'),
  206.                 xml_validator
  207.             )
  208.             parser.parse(eltree.find(root))
  209.  
  210.         except etree.XMLSyntaxError as err:
  211.             logger.warning(str(kwarg) + ': ' + repr(err))
  212.  
  213.  
  214. async def _run(
  215.     parser: SparkInterfaxParser,
  216.     xml_validator: etree.XMLParser,
  217.     logger: NoStrDataLogger,
  218.     method: str,
  219.     args: Iterable[Dict[str, str]]
  220. ) -> None:
  221.     """
  222.    Инициализация обработки списка входных аргументов по определенному методу.
  223.  
  224.    Args:
  225.        parser (SparkInterfaxParser): парсер для внутренних xml, содержащих необходимые данные.
  226.            Также является контейнером результирующего множества таблиц;
  227.  
  228.        xml_validator (etree.XMLParser): валидатор на соответствие xsd схеме для внутренних xml;
  229.  
  230.        logger (NoStrDataLogger): логер, используемый для сообщений по текущему методу.
  231.  
  232.        method (str): наименование метода в сервисе СПАРК;
  233.  
  234.        args (Iterable[Dict[str, str]]): список именнованных аргументов по которым требуется
  235.            получить ответ от сервиса СПАРК.
  236.    """
  237.  
  238.     root = XML_ROOT_ELEMENT.get(method, 'Data/Report')
  239.     pattern = environment.get_response_validation_pattern(method)
  240.  
  241.     # timeout = aiohttp.ClientTimeout(total=40)
  242.     # connector = aiohttp.TCPConnector(limit=10)
  243.     semaphore = asyncio.Semaphore(100)
  244.  
  245.     queue = asyncio.Queue()
  246.  
  247.     # async with aiohttp.ClientSession(
  248.     #     headers=environment.headers,
  249.     #     cookies=environment.cookies,
  250.     #     connector=connector,
  251.     #     timeout=timeout
  252.     # ) as session:
  253.  
  254.     async with httpx.AsyncClient(
  255.         headers=environment.headers,
  256.         cookies=environment.cookies,
  257.         timeout=httpx.Timeout(10.0),
  258.         limits=httpx.Limits(max_connections=50, max_keepalive_connections=10),
  259.         verify=False
  260.     ) as client:
  261.  
  262.         consumer = asyncio.ensure_future(
  263.             _consume(queue, parser, xml_validator, root, pattern, logger))
  264.        
  265.         tasks = [
  266.             _produce(client, queue, semaphore, method, arg, logger)
  267.             for arg in args
  268.         ]
  269.  
  270.         await asyncio.gather(*tasks)
  271.         await queue.join()
  272.        
  273.         consumer.cancel()
  274.  
  275.  
  276. async def main(methods: Iterable[str]) -> None:
  277.     """
  278.    Обработчик методов сервиса СПАРК. Производит:
  279.        - получение списка компаний, имеющих на текущую/заданную дату какие-либо изменения;
  280.        - вызов определенных методов по данному списку;
  281.        - сохранение результатов вызовов методов сервиса.
  282.  
  283.    Args:
  284.        methods (Iterable[str]): список методов,
  285.            по которым необходимо запросить данные от сервиса СПАРК.
  286.    """
  287.    
  288.     method = 'GetChangedCompanies'
  289.  
  290.     gcc_request_data = environment.get_request_data(
  291.         method,
  292.         {
  293.             'changeDate': environment.change_date() + 'T00:00:00.000',
  294.             'changeType': environment.change_type()
  295.         }
  296.     )
  297.  
  298.     pattern = environment.get_response_validation_pattern(method)
  299.  
  300.     companies = _validate(
  301.         requests.post(
  302.             environment.url,
  303.             headers=environment.headers,
  304.             cookies=environment.cookies,
  305.             data=gcc_request_data
  306.         ).text,
  307.         pattern
  308.     )
  309.    
  310.     logger = environment.get_logger('General')
  311.    
  312.     if companies is not None:
  313.         try:
  314.             companies, companies_method_result = get_changed.companies(companies)
  315.             if method in methods:
  316.                 StorageData.save_tables(method, companies_method_result)
  317.             del companies_method_result
  318.         except etree.XMLSyntaxError as err:
  319.             logger.error(err)
  320.             companies = []
  321.     else:
  322.         companies = []
  323.  
  324.     num_companies = len(companies)
  325.  
  326.     method = 'GetChangedEntrepreneurs'
  327.  
  328.     gce_request_data = environment.get_request_data(
  329.         method,
  330.         {
  331.             'changeDate': environment.change_date() + 'T00:00:00.000',
  332.             'changeType': environment.change_type()
  333.         }
  334.     )
  335.  
  336.     pattern = environment.get_response_validation_pattern(method)
  337.  
  338.     entrepreneurs = _validate(
  339.         requests.post(
  340.             environment.url,
  341.             headers=environment.headers,
  342.             cookies=environment.cookies,
  343.             data=gce_request_data
  344.             ).text,
  345.             pattern
  346.         )
  347.  
  348.     if entrepreneurs is not None:
  349.         try:
  350.             entrepreneurs, entrepreneurs_method_result = get_changed.entrepreneurs(entrepreneurs)
  351.             if method in methods:
  352.                 StorageData.save_tables(method, entrepreneurs_method_result)
  353.             del entrepreneurs_method_result
  354.         except etree.XMLSyntaxError as err:
  355.             logger.error(err)
  356.             entrepreneurs = []
  357.     else:
  358.         entrepreneurs = []
  359.  
  360.     num_entrepreneurs = len(entrepreneurs)
  361.  
  362.  
  363.     # -------------------------------------------------------------------------
  364.     # -------------------------------- 1 --------------------------------------
  365.     # -------------------------------------------------------------------------
  366.  
  367.     # =========================================================================
  368.     method = 'GetCompanyArbitrationSummary'
  369.     # =========================================================================
  370.  
  371.     if method in methods:
  372.         logger = environment.get_logger(method)
  373.         parser = SparkInterfaxParser(method)
  374.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  375.         xml_validator = etree.XMLParser(
  376.             schema=etree.XMLSchema(
  377.                 etree.parse(environment.get_xsd_schema_file(method))
  378.             ),
  379.             huge_tree=True
  380.         )
  381.         logger.info(f'Len {method} arguments list: {num_companies}')
  382.  
  383.         if partition_size > num_companies:
  384.             with _report_time(logger, method):
  385.                 await _run(parser, xml_validator, logger, method, companies)
  386.             StorageData.save_tables(method, parser.result)
  387.  
  388.         else:
  389.             splitted = []
  390.             num_partitions = (num_companies // partition_size) + 1
  391.             partition_size = num_companies // num_partitions
  392.  
  393.             for i in range(num_partitions):
  394.                 if i == num_partitions - 1:
  395.                     splitted.append(companies[i*partition_size:])
  396.                 else:
  397.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  398.  
  399.             for i, data_partition in enumerate(splitted):
  400.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  401.                     await _run(parser, xml_validator, logger, method, data_partition)
  402.            
  403.                 StorageData.save_partitions(method, parser.result, i)
  404.                 parser.cleanup()
  405.  
  406.             StorageData.concat_partitions(method)
  407.             StorageData.cleanup_tmp_dir()
  408.  
  409.     # =========================================================================
  410.     method = 'GetEntrepreneurArbitrationSummary'
  411.     # =========================================================================
  412.  
  413.     if method in methods:
  414.         logger = environment.get_logger(method)
  415.         parser = SparkInterfaxParser(method)
  416.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  417.         xml_validator = etree.XMLParser(
  418.             schema=etree.XMLSchema(
  419.                 etree.parse(environment.get_xsd_schema_file(method))
  420.             ),
  421.             huge_tree=True
  422.         )
  423.         logger.info(f'Len {method} arguments list: {num_entrepreneurs}')
  424.  
  425.         if partition_size > num_entrepreneurs:
  426.             with _report_time(logger, method):
  427.                 await _run(parser, xml_validator, logger, method, entrepreneurs)
  428.             StorageData.save_tables(method, parser.result)
  429.  
  430.         else:
  431.             splitted = []
  432.             num_partitions = (num_entrepreneurs // partition_size) + 1
  433.             partition_size = num_entrepreneurs // num_partitions
  434.  
  435.             for i in range(num_partitions):
  436.                 if i == num_partitions - 1:
  437.                     splitted.append(entrepreneurs[i*partition_size:])
  438.                 else:
  439.                     splitted.append(entrepreneurs[i*partition_size : (i+1)*partition_size])
  440.  
  441.             for i, data_partition in enumerate(splitted):
  442.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  443.                     await _run(parser, xml_validator, logger, method, data_partition)
  444.            
  445.                 StorageData.save_partitions(method, parser.result, i)
  446.                 parser.cleanup()
  447.  
  448.             StorageData.concat_partitions(method)
  449.             StorageData.cleanup_tmp_dir()
  450.  
  451.     # =========================================================================
  452.     method = 'GetCompanyLeasings'
  453.     # =========================================================================
  454.  
  455.     leasing_report_attrs = []
  456.  
  457.     if method in methods:
  458.         args = [{**company, 'leasingStatus': '0'} for company in companies]
  459.         logger = environment.get_logger(method)
  460.         parser = SparkInterfaxParser(method)
  461.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  462.         xml_validator = etree.XMLParser(
  463.             schema=etree.XMLSchema(
  464.                 etree.parse(environment.get_xsd_schema_file(method))
  465.             ),
  466.             huge_tree=True
  467.         )
  468.         logger.info(f'Len {method} arguments list: {len(args)}')
  469.  
  470.         if partition_size > len(args):
  471.             with _report_time(logger, method):
  472.                 await _run(parser, xml_validator, logger, method, args)
  473.  
  474.                 for row in parser.result.get('Leasings', []):
  475.                     leasing_report_attrs.append((row[1], row[4]))
  476.  
  477.             StorageData.save_tables(method, parser.result)
  478.  
  479.         else:
  480.             splitted = []
  481.             num_partitions = (len(args) // partition_size) + 1
  482.             partition_size = len(args) // num_partitions
  483.  
  484.             for i in range(num_partitions):
  485.                 if i == num_partitions - 1:
  486.                     splitted.append(args[i*partition_size:])
  487.                 else:
  488.                     splitted.append(args[i*partition_size : (i+1)*partition_size])
  489.  
  490.             for i, data_partition in enumerate(splitted):
  491.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  492.                     await _run(parser, xml_validator, logger, method, data_partition)
  493.  
  494.                 for row in parser.result.get('Leasings', []):
  495.                     leasing_report_attrs.append((row[1], row[4]))
  496.            
  497.                 StorageData.save_partitions(method, parser.result, i)
  498.                 parser.cleanup()
  499.  
  500.             StorageData.concat_partitions(method)
  501.             StorageData.cleanup_tmp_dir()
  502.    
  503.     # =========================================================================
  504.     method = 'GetEntrepreneurLeasings'
  505.     # =========================================================================
  506.  
  507.     if method in methods:
  508.         args = [{**entrepreneur, 'leasingStatus': '0'} for entrepreneur in entrepreneurs]
  509.         logger = environment.get_logger(method)
  510.         parser = SparkInterfaxParser(method)
  511.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  512.         xml_validator = etree.XMLParser(
  513.             schema=etree.XMLSchema(
  514.                 etree.parse(environment.get_xsd_schema_file(method))
  515.             ),
  516.             huge_tree=True
  517.         )
  518.         logger.info(f'Len {method} arguments list: {len(args)}')
  519.  
  520.         if partition_size > len(args):
  521.             with _report_time(logger, method):
  522.                 await _run(parser, xml_validator, logger, method, args)
  523.  
  524.                 for row in parser.result.get('Leasings', []):
  525.                     leasing_report_attrs.append((row[1], row[4]))
  526.  
  527.             StorageData.save_tables(method, parser.result)
  528.  
  529.         else:
  530.             splitted = []
  531.             num_partitions = (len(args) // partition_size) + 1
  532.             partition_size = len(args) // num_partitions
  533.  
  534.             for i in range(num_partitions):
  535.                 if i == num_partitions - 1:
  536.                     splitted.append(args[i*partition_size:])
  537.                 else:
  538.                     splitted.append(args[i*partition_size : (i+1)*partition_size])
  539.  
  540.             for i, data_partition in enumerate(splitted):
  541.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  542.                     await _run(parser, xml_validator, logger, method, data_partition)
  543.                
  544.                 for row in parser.result.get('Leasings', []):
  545.                     leasing_report_attrs.append((row[1], row[4]))
  546.            
  547.                 StorageData.save_partitions(method, parser.result, i)
  548.                 parser.cleanup()
  549.  
  550.             StorageData.concat_partitions(method)
  551.             StorageData.cleanup_tmp_dir()
  552.        
  553.     # =========================================================================
  554.     method = 'GetLeasingReport'
  555.     # =========================================================================
  556.  
  557.     if method in methods:
  558.         leasing_report_attrs = [
  559.             {'id': attr[0], 'contractNumber': attr[1]}
  560.             for attr in set(leasing_report_attrs)
  561.         ]
  562.  
  563.         logger = environment.get_logger(method)
  564.         parser = SparkInterfaxParser(method)
  565.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  566.         xml_validator = etree.XMLParser(
  567.             schema=etree.XMLSchema(
  568.                 etree.parse(environment.get_xsd_schema_file(method))
  569.             ),
  570.             huge_tree=True
  571.         )
  572.         logger.info(f'Len {method} arguments list: {len(leasing_report_attrs)}')
  573.  
  574.         if partition_size > len(leasing_report_attrs):
  575.             with _report_time(logger, method):
  576.                 await _run(parser, xml_validator, logger, method, leasing_report_attrs)
  577.             StorageData.save_tables(method, parser.result)
  578.  
  579.         else:
  580.             splitted = []
  581.             num_partitions = (len(leasing_report_attrs) // partition_size) + 1
  582.             partition_size = len(leasing_report_attrs) // num_partitions
  583.  
  584.             for i in range(num_partitions):
  585.                 if i == num_partitions - 1:
  586.                     splitted.append(leasing_report_attrs[i*partition_size:])
  587.                 else:
  588.                     splitted.append(leasing_report_attrs[i*partition_size : (i+1)*partition_size])
  589.  
  590.             for i, data_partition in enumerate(splitted):
  591.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  592.                     await _run(parser, xml_validator, logger, method, data_partition)
  593.            
  594.                 StorageData.save_partitions(method, parser.result, i)
  595.                 parser.cleanup()
  596.  
  597.             StorageData.concat_partitions(method)
  598.             StorageData.cleanup_tmp_dir()
  599.  
  600.     # =========================================================================
  601.     method = 'GetCompanySparkRisksReportXML'
  602.     # =========================================================================
  603.  
  604.     if method in methods:
  605.         logger = environment.get_logger(method)
  606.         parser = SparkInterfaxParser(method)
  607.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  608.         xml_validator = etree.XMLParser(
  609.             schema=etree.XMLSchema(
  610.                 etree.parse(environment.get_xsd_schema_file(method))
  611.             ),
  612.             huge_tree=True
  613.         )
  614.         logger.info(f'Len {method} arguments list: {num_companies}')
  615.  
  616.         if partition_size > num_companies:
  617.             with _report_time(logger, method):
  618.                 await _run(parser, xml_validator, logger, method, companies)
  619.             StorageData.save_tables(method, parser.result)
  620.  
  621.         else:
  622.             splitted = []
  623.             num_partitions = (num_companies // partition_size) + 1
  624.             partition_size = num_companies // num_partitions
  625.  
  626.             for i in range(num_partitions):
  627.                 if i == num_partitions - 1:
  628.                     splitted.append(companies[i*partition_size:])
  629.                 else:
  630.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  631.  
  632.             for i, data_partition in enumerate(splitted):
  633.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  634.                     await _run(parser, xml_validator, logger, method, data_partition)
  635.            
  636.                 StorageData.save_partitions(method, parser.result, i)
  637.                 parser.cleanup()
  638.  
  639.             StorageData.concat_partitions(method)
  640.             StorageData.cleanup_tmp_dir()
  641.  
  642.  
  643.     # -------------------------------------------------------------------------
  644.     # -------------------------------- 2 --------------------------------------
  645.     # -------------------------------------------------------------------------
  646.  
  647.     # =========================================================================
  648.     method = 'GetCompanyPledges'
  649.     # =========================================================================
  650.  
  651.     pledge_report_attrs = []
  652.  
  653.     if method in methods:
  654.         args = [{**company, 'pledgeStatus': '0'} for company in companies]
  655.         logger = environment.get_logger(method)
  656.         parser = SparkInterfaxParser(method)
  657.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  658.         xml_validator = etree.XMLParser(
  659.             schema=etree.XMLSchema(
  660.                 etree.parse(environment.get_xsd_schema_file(method))
  661.             ),
  662.             huge_tree=True
  663.         )
  664.         logger.info(f'Len {method} arguments list: {len(args)}')
  665.  
  666.         if partition_size > len(args):
  667.             with _report_time(logger, method):
  668.                 await _run(parser, xml_validator, logger, method, args)
  669.  
  670.                 for row in parser.result.get('Pledges', []):
  671.                     pledge_report_attrs.append((row[1], row[3], row[5]))
  672.  
  673.             StorageData.save_tables(method, parser.result)
  674.  
  675.         else:
  676.             splitted = []
  677.             num_partitions = (len(args) // partition_size) + 1
  678.             partition_size = len(args) // num_partitions
  679.  
  680.             for i in range(num_partitions):
  681.                 if i == num_partitions - 1:
  682.                     splitted.append(args[i*partition_size:])
  683.                 else:
  684.                     splitted.append(args[i*partition_size : (i+1)*partition_size])
  685.  
  686.             for i, data_partition in enumerate(splitted):
  687.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  688.                     await _run(parser, xml_validator, logger, method, data_partition)
  689.  
  690.                 for row in parser.result.get('Pledges', []):
  691.                     pledge_report_attrs.append((row[1], row[3], row[5]))
  692.            
  693.                 StorageData.save_partitions(method, parser.result, i)
  694.                 parser.cleanup()
  695.  
  696.             StorageData.concat_partitions(method)
  697.             StorageData.cleanup_tmp_dir()
  698.    
  699.     # =========================================================================
  700.     method = 'GetEntrepreneurPledges'
  701.     # =========================================================================
  702.  
  703.     if method in methods:
  704.         args = [{**entrepreneur, 'pledgeStatus': '0'} for entrepreneur in entrepreneurs]
  705.         logger = environment.get_logger(method)
  706.         parser = SparkInterfaxParser(method)
  707.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  708.         xml_validator = etree.XMLParser(
  709.             schema=etree.XMLSchema(
  710.                 etree.parse(environment.get_xsd_schema_file(method))
  711.             ),
  712.             huge_tree=True
  713.         )
  714.         logger.info(f'Len {method} arguments list: {len(args)}')
  715.  
  716.         if partition_size > len(args):
  717.             with _report_time(logger, method):
  718.                 await _run(parser, xml_validator, logger, method, args)
  719.  
  720.                 for row in parser.result.get('Pledges', []):
  721.                     pledge_report_attrs.append((row[1], row[3], row[5]))
  722.  
  723.             StorageData.save_tables(method, parser.result)
  724.  
  725.         else:
  726.             splitted = []
  727.             num_partitions = (len(args) // partition_size) + 1
  728.             partition_size = len(args) // num_partitions
  729.  
  730.             for i in range(num_partitions):
  731.                 if i == num_partitions - 1:
  732.                     splitted.append(args[i*partition_size:])
  733.                 else:
  734.                     splitted.append(args[i*partition_size : (i+1)*partition_size])
  735.  
  736.             for i, data_partition in enumerate(splitted):
  737.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  738.                     await _run(parser, xml_validator, logger, method, data_partition)
  739.                
  740.                 for row in parser.result.get('Pledges', []):
  741.                     pledge_report_attrs.append((row[1], row[3], row[5]))
  742.            
  743.                 StorageData.save_partitions(method, parser.result, i)
  744.                 parser.cleanup()
  745.  
  746.             StorageData.concat_partitions(method)
  747.             StorageData.cleanup_tmp_dir()
  748.        
  749.     # =========================================================================
  750.     method = 'GetPledgeReport'
  751.     # =========================================================================
  752.  
  753.     if method in methods:
  754.         pledge_report_attrs = [
  755.             {'id': attr[0], 'notificationNumber': attr[1], 'contractNumber': attr[2]}
  756.             for attr in set(pledge_report_attrs)
  757.         ]
  758.  
  759.         logger = environment.get_logger(method)
  760.         parser = SparkInterfaxParser(method)
  761.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  762.         xml_validator = etree.XMLParser(
  763.             schema=etree.XMLSchema(
  764.                 etree.parse(environment.get_xsd_schema_file(method))
  765.             ),
  766.             huge_tree=True
  767.         )
  768.         logger.info(f'Len {method} arguments list: {len(pledge_report_attrs)}')
  769.  
  770.         if partition_size > len(pledge_report_attrs):
  771.             with _report_time(logger, method):
  772.                 await _run(parser, xml_validator, logger, method, pledge_report_attrs)
  773.             StorageData.save_tables(method, parser.result)
  774.  
  775.         else:
  776.             splitted = []
  777.             num_partitions = (len(pledge_report_attrs) // partition_size) + 1
  778.             partition_size = len(pledge_report_attrs) // num_partitions
  779.  
  780.             for i in range(num_partitions):
  781.                 if i == num_partitions - 1:
  782.                     splitted.append(pledge_report_attrs[i*partition_size:])
  783.                 else:
  784.                     splitted.append(pledge_report_attrs[i*partition_size : (i+1)*partition_size])
  785.  
  786.             for i, data_partition in enumerate(splitted):
  787.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  788.                     await _run(parser, xml_validator, logger, method, data_partition)
  789.            
  790.                 StorageData.save_partitions(method, parser.result, i)
  791.                 parser.cleanup()
  792.  
  793.             StorageData.concat_partitions(method)
  794.             StorageData.cleanup_tmp_dir()
  795.  
  796.     # =========================================================================
  797.     method = 'GetCompanyExecutionProceedings'
  798.     # =========================================================================
  799.  
  800.     if method in methods:
  801.         logger = environment.get_logger(method)
  802.         parser = SparkInterfaxParser(method)
  803.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  804.         xml_validator = etree.XMLParser(
  805.             schema=etree.XMLSchema(
  806.                 etree.parse(environment.get_xsd_schema_file(method))
  807.             ),
  808.             huge_tree=True
  809.         )
  810.         logger.info(f'Len {method} arguments list: {num_companies}')
  811.  
  812.         if partition_size > num_companies:
  813.             with _report_time(logger, method):
  814.                 await _run(parser, xml_validator, logger, method, companies)
  815.             StorageData.save_tables(method, parser.result)
  816.  
  817.         else:
  818.             splitted = []
  819.             num_partitions = (num_companies // partition_size) + 1
  820.             partition_size = num_companies // num_partitions
  821.  
  822.             for i in range(num_partitions):
  823.                 if i == num_partitions - 1:
  824.                     splitted.append(companies[i*partition_size:])
  825.                 else:
  826.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  827.  
  828.             for i, data_partition in enumerate(splitted):
  829.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  830.                     await _run(parser, xml_validator, logger, method, data_partition)
  831.            
  832.                 StorageData.save_partitions(method, parser.result, i)
  833.                 parser.cleanup()
  834.  
  835.             StorageData.concat_partitions(method)
  836.             StorageData.cleanup_tmp_dir()
  837.  
  838.     # =========================================================================
  839.     method = 'GetEntrepreneurExecutionProceedings'
  840.     # =========================================================================
  841.    
  842.     if method in methods:
  843.         logger = environment.get_logger(method)
  844.         parser = SparkInterfaxParser(method)
  845.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  846.         xml_validator = etree.XMLParser(
  847.             schema=etree.XMLSchema(
  848.                 etree.parse(environment.get_xsd_schema_file(method))
  849.             ),
  850.             huge_tree=True
  851.         )
  852.         logger.info(f'Len {method} arguments list: {num_entrepreneurs}')
  853.  
  854.         if partition_size > num_entrepreneurs:
  855.             with _report_time(logger, method):
  856.                 await _run(parser, xml_validator, logger, method, entrepreneurs)
  857.             StorageData.save_tables(method, parser.result)
  858.  
  859.         else:
  860.             splitted = []
  861.             num_partitions = (num_entrepreneurs // partition_size) + 1
  862.             partition_size = num_entrepreneurs // num_partitions
  863.  
  864.             for i in range(num_partitions):
  865.                 if i == num_partitions - 1:
  866.                     splitted.append(entrepreneurs[i*partition_size:])
  867.                 else:
  868.                     splitted.append(entrepreneurs[i*partition_size : (i+1)*partition_size])
  869.  
  870.             for i, data_partition in enumerate(splitted):
  871.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  872.                     await _run(parser, xml_validator, logger, method, data_partition)
  873.            
  874.                 StorageData.save_partitions(method, parser.result, i)
  875.                 parser.cleanup()
  876.  
  877.             StorageData.concat_partitions(method)
  878.             StorageData.cleanup_tmp_dir()
  879.  
  880.     # =========================================================================
  881.     method = 'GetCompanyCounterparties'
  882.     # =========================================================================
  883.    
  884.     if method in methods:
  885.         logger = environment.get_logger(method)
  886.         parser = SparkInterfaxParser(method)
  887.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  888.         xml_validator = etree.XMLParser(
  889.             schema=etree.XMLSchema(
  890.                 etree.parse(environment.get_xsd_schema_file(method))
  891.             ),
  892.             huge_tree=True
  893.         )
  894.         logger.info(f'Len {method} arguments list: {num_companies}')
  895.  
  896.         if partition_size > num_companies:
  897.             with _report_time(logger, method):
  898.                 await _run(parser, xml_validator, logger, method, companies)
  899.             StorageData.save_tables(method, parser.result)
  900.  
  901.         else:
  902.             splitted = []
  903.             num_partitions = (num_companies // partition_size) + 1
  904.             partition_size = num_companies // num_partitions
  905.  
  906.             for i in range(num_partitions):
  907.                 if i == num_partitions - 1:
  908.                     splitted.append(companies[i*partition_size:])
  909.                 else:
  910.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  911.  
  912.             for i, data_partition in enumerate(splitted):
  913.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  914.                     await _run(parser, xml_validator, logger, method, data_partition)
  915.            
  916.                 StorageData.save_partitions(method, parser.result, i)
  917.                 parser.cleanup()
  918.  
  919.             StorageData.concat_partitions(method)
  920.             StorageData.cleanup_tmp_dir()
  921.  
  922.  
  923.     # -------------------------------------------------------------------------
  924.     # -------------------------------- 3 --------------------------------------
  925.     # -------------------------------------------------------------------------
  926.  
  927.     # =========================================================================
  928.     method = 'GetCompanyLicenses'
  929.     # =========================================================================
  930.    
  931.     if method in methods:
  932.         logger = environment.get_logger(method)
  933.         parser = SparkInterfaxParser(method)
  934.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  935.         xml_validator = etree.XMLParser(
  936.             schema=etree.XMLSchema(
  937.                 etree.parse(environment.get_xsd_schema_file(method))
  938.             ),
  939.             huge_tree=True
  940.         )
  941.         logger.info(f'Len {method} arguments list: {num_companies}')
  942.  
  943.         if partition_size > num_companies:
  944.             with _report_time(logger, method):
  945.                 await _run(parser, xml_validator, logger, method, companies)
  946.             StorageData.save_tables(method, parser.result)
  947.  
  948.         else:
  949.             splitted = []
  950.             num_partitions = (num_companies // partition_size) + 1
  951.             partition_size = num_companies // num_partitions
  952.  
  953.             for i in range(num_partitions):
  954.                 if i == num_partitions - 1:
  955.                     splitted.append(companies[i*partition_size:])
  956.                 else:
  957.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  958.  
  959.             for i, data_partition in enumerate(splitted):
  960.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  961.                     await _run(parser, xml_validator, logger, method, data_partition)
  962.            
  963.                 StorageData.save_partitions(method, parser.result, i)
  964.                 parser.cleanup()
  965.  
  966.             StorageData.concat_partitions(method)
  967.             StorageData.cleanup_tmp_dir()
  968.  
  969.     # =========================================================================
  970.     method = 'GetCompanyFinancialAnalysis'
  971.     # =========================================================================
  972.  
  973.     if method in methods:
  974.         logger = environment.get_logger(method)
  975.         parser = SparkInterfaxParser(method)
  976.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  977.         xml_validator = etree.XMLParser(
  978.             schema=etree.XMLSchema(
  979.                 etree.parse(environment.get_xsd_schema_file(method))
  980.             ),
  981.             huge_tree=True
  982.         )
  983.         logger.info(f'Len {method} arguments list: {num_companies}')
  984.  
  985.         if partition_size > num_companies:
  986.             with _report_time(logger, method):
  987.                 await _run(parser, xml_validator, logger, method, companies)
  988.             StorageData.save_tables(method, parser.result)
  989.  
  990.         else:
  991.             splitted = []
  992.             num_partitions = (num_companies // partition_size) + 1
  993.             partition_size = num_companies // num_partitions
  994.  
  995.             for i in range(num_partitions):
  996.                 if i == num_partitions - 1:
  997.                     splitted.append(companies[i*partition_size:])
  998.                 else:
  999.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  1000.  
  1001.             for i, data_partition in enumerate(splitted):
  1002.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1003.                     await _run(parser, xml_validator, logger, method, data_partition)
  1004.            
  1005.                 StorageData.save_partitions(method, parser.result, i)
  1006.                 parser.cleanup()
  1007.  
  1008.             StorageData.concat_partitions(method)
  1009.             StorageData.cleanup_tmp_dir()
  1010.  
  1011.     # =========================================================================
  1012.     method = 'GetCompanyRiskFactors'
  1013.     # =========================================================================
  1014.    
  1015.     if method in methods:
  1016.         logger = environment.get_logger(method)
  1017.         parser = SparkInterfaxParser(method)
  1018.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  1019.         xml_validator = etree.XMLParser(
  1020.             schema=etree.XMLSchema(
  1021.                 etree.parse(environment.get_xsd_schema_file(method))
  1022.             ),
  1023.             huge_tree=True
  1024.         )
  1025.         logger.info(f'Len {method} arguments list: {num_companies}')
  1026.  
  1027.         if partition_size > num_companies:
  1028.             with _report_time(logger, method):
  1029.                 await _run(parser, xml_validator, logger, method, companies)
  1030.             StorageData.save_tables(method, parser.result)
  1031.  
  1032.         else:
  1033.             splitted = []
  1034.             num_partitions = (num_companies // partition_size) + 1
  1035.             partition_size = num_companies // num_partitions
  1036.  
  1037.             for i in range(num_partitions):
  1038.                 if i == num_partitions - 1:
  1039.                     splitted.append(companies[i*partition_size:])
  1040.                 else:
  1041.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  1042.  
  1043.             for i, data_partition in enumerate(splitted):
  1044.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1045.                     await _run(parser, xml_validator, logger, method, data_partition)
  1046.            
  1047.                 StorageData.save_partitions(method, parser.result, i)
  1048.                 parser.cleanup()
  1049.  
  1050.             StorageData.concat_partitions(method)
  1051.             StorageData.cleanup_tmp_dir()
  1052.  
  1053.     # =========================================================================
  1054.     method = 'GetBankAccountingReport101102_101'
  1055.     # =========================================================================
  1056.  
  1057.     if method in methods:
  1058.         args = environment.get_bank_accounting_report_arguments_list('101')
  1059.         logger = environment.get_logger(method)
  1060.         parser = SparkInterfaxParser(method)
  1061.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  1062.         xml_validator = etree.XMLParser(
  1063.             schema=etree.XMLSchema(
  1064.                 etree.parse(environment.get_xsd_schema_file(method))
  1065.             ),
  1066.             huge_tree=True
  1067.         )
  1068.         logger.info(f'Len {method} arguments list: {len(args)}')
  1069.  
  1070.         if partition_size > len(args):
  1071.             with _report_time(logger, method):
  1072.                 await _run(parser, xml_validator, logger, method[:-4], args)
  1073.             StorageData.save_tables(method, parser.result)
  1074.  
  1075.         else:
  1076.             splitted = []
  1077.             num_partitions = (len(args) // partition_size) + 1
  1078.             partition_size = len(args) // num_partitions
  1079.  
  1080.             for i in range(num_partitions):
  1081.                 if i == num_partitions - 1:
  1082.                     splitted.append(companies[i*partition_size:])
  1083.                 else:
  1084.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  1085.  
  1086.             for i, data_partition in enumerate(splitted):
  1087.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1088.                     await _run(parser, xml_validator, logger, method[:-4], data_partition)
  1089.            
  1090.                 StorageData.save_partitions(method, parser.result, i)
  1091.                 parser.cleanup()
  1092.  
  1093.             StorageData.concat_partitions(method)
  1094.             StorageData.cleanup_tmp_dir()
  1095.  
  1096.     # =========================================================================
  1097.     method = 'GetBankAccountingReport101102_102'
  1098.     # =========================================================================
  1099.    
  1100.     if method in methods:
  1101.         args = environment.get_bank_accounting_report_arguments_list('102')
  1102.         logger = environment.get_logger(method)
  1103.         parser = SparkInterfaxParser(method)
  1104.         partition_size = CALC_PARTITION_SIZE.get(method[:-4], 200000)
  1105.         xml_validator = etree.XMLParser(
  1106.             schema=etree.XMLSchema(
  1107.                 etree.parse(environment.get_xsd_schema_file(method))
  1108.             ),
  1109.             huge_tree=True
  1110.         )
  1111.         logger.info(f'Len {method} arguments list: {len(args)}')
  1112.  
  1113.         if partition_size > len(args):
  1114.             with _report_time(logger, method):
  1115.                 await _run(parser, xml_validator, logger, method[:-4], args)
  1116.             StorageData.save_tables(method, parser.result)
  1117.  
  1118.         else:
  1119.             splitted = []
  1120.             num_partitions = (len(args) // partition_size) + 1
  1121.             partition_size = len(args) // num_partitions
  1122.  
  1123.             for i in range(num_partitions):
  1124.                 if i == num_partitions - 1:
  1125.                     splitted.append(companies[i*partition_size:])
  1126.                 else:
  1127.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  1128.  
  1129.             for i, data_partition in enumerate(splitted):
  1130.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1131.                     await _run(parser, xml_validator, logger, method[:-4], data_partition)
  1132.            
  1133.                 StorageData.save_partitions(method, parser.result, i)
  1134.                 parser.cleanup()
  1135.  
  1136.             StorageData.concat_partitions(method)
  1137.             StorageData.cleanup_tmp_dir()
  1138.  
  1139.     # =========================================================================
  1140.     method = 'GetCompanyPaymentDiscipline'
  1141.     # =========================================================================
  1142.  
  1143.     if method in methods:
  1144.         logger = environment.get_logger(method)
  1145.         parser = SparkInterfaxParser(method)
  1146.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  1147.         xml_validator = etree.XMLParser(
  1148.             schema=etree.XMLSchema(
  1149.                 etree.parse(environment.get_xsd_schema_file(method))
  1150.             ),
  1151.             huge_tree=True
  1152.         )
  1153.         logger.info(f'Len {method} arguments list: {num_companies}')
  1154.  
  1155.         if partition_size > num_companies:
  1156.             with _report_time(logger, method):
  1157.                 await _run(parser, xml_validator, logger, method, companies)
  1158.  
  1159.             StorageData.save_tables(method, parser.result)
  1160.  
  1161.         else:
  1162.             splitted = []
  1163.             num_partitions = (num_companies // partition_size) + 1
  1164.             partition_size = num_companies // num_partitions
  1165.  
  1166.             for i in range(num_partitions):
  1167.                 if i == num_partitions - 1:
  1168.                     splitted.append(companies[i*partition_size:])
  1169.                 else:
  1170.                     splitted.append(companies[i*partition_size:(i+1)*partition_size])
  1171.  
  1172.             for i, data_partition in enumerate(splitted):
  1173.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1174.                     await _run(parser, xml_validator, logger, method, data_partition)
  1175.            
  1176.                 StorageData.save_partitions(method, parser.result, i)
  1177.                 parser.cleanup()
  1178.  
  1179.             StorageData.concat_partitions(method)
  1180.             StorageData.cleanup_tmp_dir()
  1181.  
  1182.     # =========================================================================
  1183.     method = 'GetCompanyAccountingReport'
  1184.     # =========================================================================
  1185.  
  1186.     if method in methods:
  1187.         logger = environment.get_logger(method)
  1188.         parser = SparkInterfaxParser(method)
  1189.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  1190.         xml_validator = etree.XMLParser(
  1191.             schema=etree.XMLSchema(
  1192.                 etree.parse(environment.get_xsd_schema_file(method))
  1193.             ),
  1194.             huge_tree=True
  1195.         )
  1196.         logger.info(f'Len {method} arguments list: {num_companies}')
  1197.  
  1198.         if partition_size > num_companies:
  1199.             with _report_time(logger, method):
  1200.                 await _run(parser, xml_validator, logger, method, companies)
  1201.             StorageData.save_tables(method, parser.result)
  1202.  
  1203.         else:
  1204.             splitted = []
  1205.             num_partitions = (num_companies // partition_size) + 1
  1206.             partition_size = num_companies // num_partitions
  1207.  
  1208.             for i in range(num_partitions):
  1209.                 if i == num_partitions - 1:
  1210.                     splitted.append(companies[i*partition_size:])
  1211.                 else:
  1212.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  1213.  
  1214.             for i, data_partition in enumerate(splitted):
  1215.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1216.                     await _run(parser, xml_validator, logger, method, data_partition)
  1217.            
  1218.                 StorageData.save_partitions(method, parser.result, i)
  1219.                 parser.cleanup()
  1220.  
  1221.             StorageData.concat_partitions(method)
  1222.             StorageData.cleanup_tmp_dir()
  1223.  
  1224.     # =========================================================================
  1225.     method = 'GetEntrepreneurShortReport'
  1226.     # =========================================================================
  1227.    
  1228.     if method in methods:
  1229.         logger = environment.get_logger(method)
  1230.         parser = SparkInterfaxParser(method)
  1231.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  1232.         xml_validator = etree.XMLParser(
  1233.             schema=etree.XMLSchema(
  1234.                 etree.parse(environment.get_xsd_schema_file(method))
  1235.             ),
  1236.             huge_tree=True
  1237.         )
  1238.         logger.info(f'Len {method} arguments list: {num_entrepreneurs}')
  1239.  
  1240.         if partition_size > num_entrepreneurs:
  1241.             with _report_time(logger, method):
  1242.                 await _run(parser, xml_validator, logger, method, entrepreneurs)
  1243.             StorageData.save_tables(method, parser.result)
  1244.  
  1245.         else:
  1246.             splitted = []
  1247.             num_partitions = (num_entrepreneurs // partition_size) + 1
  1248.             partition_size = num_entrepreneurs // num_partitions
  1249.  
  1250.             for i in range(num_partitions):
  1251.                 if i == num_partitions - 1:
  1252.                     splitted.append(entrepreneurs[i*partition_size:])
  1253.                 else:
  1254.                     splitted.append(entrepreneurs[i*partition_size : (i+1)*partition_size])
  1255.  
  1256.             for i, data_partition in enumerate(splitted):
  1257.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1258.                     await _run(parser, xml_validator, logger, method, data_partition)
  1259.            
  1260.                 StorageData.save_partitions(method, parser.result, i)
  1261.                 parser.cleanup()
  1262.  
  1263.             StorageData.concat_partitions(method)
  1264.             StorageData.cleanup_tmp_dir()
  1265.  
  1266.     # =========================================================================
  1267.     method = 'GetCompanyExtendedReport'
  1268.     # =========================================================================
  1269.  
  1270.     if method in methods:
  1271.         logger = environment.get_logger(method)
  1272.         parser = SparkInterfaxParser(method)
  1273.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  1274.         xml_validator = etree.XMLParser(
  1275.             schema=etree.XMLSchema(
  1276.                 etree.parse(environment.get_xsd_schema_file(method))
  1277.             ),
  1278.             huge_tree=True
  1279.         )
  1280.         logger.info(f'Len {method} arguments list: {num_companies}')
  1281.  
  1282.         if partition_size > num_companies:
  1283.             with _report_time(logger, method):
  1284.                 await _run(parser, xml_validator, logger, method, companies)
  1285.             StorageData.save_tables(method, parser.result)
  1286.  
  1287.         else:
  1288.             splitted = []
  1289.             num_partitions = (num_companies // partition_size) + 1
  1290.             partition_size = num_companies // num_partitions
  1291.  
  1292.             for i in range(num_partitions):
  1293.                 if i == num_partitions - 1:
  1294.                     splitted.append(companies[i*partition_size:])
  1295.                 else:
  1296.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  1297.  
  1298.             for i, data_partition in enumerate(splitted):
  1299.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1300.                     await _run(parser, xml_validator, logger, method, data_partition)
  1301.            
  1302.                 StorageData.save_partitions(method, parser.result, i)
  1303.                 parser.cleanup()
  1304.  
  1305.             StorageData.concat_partitions(method)
  1306.             StorageData.cleanup_tmp_dir()
  1307.  
  1308.    
  1309.     # -------------------------------------------------------------------------
  1310.     # -------------------------------- 4 --------------------------------------
  1311.     # -------------------------------------------------------------------------
  1312.  
  1313.     # =========================================================================
  1314.     method = 'GetOwnershipHierarchy'
  1315.     # =========================================================================
  1316.  
  1317.     if method in methods:
  1318.         logger = environment.get_logger(method)
  1319.         parser = SparkInterfaxParser(method)
  1320.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  1321.         xml_validator = etree.XMLParser(
  1322.             schema=etree.XMLSchema(
  1323.                 etree.parse(environment.get_xsd_schema_file(method))
  1324.             ),
  1325.             huge_tree=True
  1326.         )
  1327.         logger.info(f'Len {method} arguments list: {num_companies}')
  1328.  
  1329.         if partition_size > num_companies:
  1330.             with _report_time(logger, method):
  1331.                 await _run(parser, xml_validator, logger, method, companies)
  1332.             StorageData.save_tables(method, parser.result)
  1333.  
  1334.         else:
  1335.             splitted = []
  1336.             num_partitions = (num_companies // partition_size) + 1
  1337.             partition_size = num_companies // num_partitions
  1338.  
  1339.             for i in range(num_partitions):
  1340.                 if i == num_partitions - 1:
  1341.                     splitted.append(companies[i*partition_size:])
  1342.                 else:
  1343.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  1344.  
  1345.             for i, data_partition in enumerate(splitted):
  1346.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1347.                     await _run(parser, xml_validator, logger, method, data_partition)
  1348.            
  1349.                 StorageData.save_partitions(method, parser.result, i)
  1350.                 parser.cleanup()
  1351.  
  1352.             StorageData.concat_partitions(method)
  1353.             StorageData.cleanup_tmp_dir()
  1354.  
  1355.     # =========================================================================
  1356.     method = 'GetCompanyStructure'
  1357.     # =========================================================================
  1358.  
  1359.     if method in methods:
  1360.         logger = environment.get_logger(method)
  1361.         parser = SparkInterfaxParser(method)
  1362.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  1363.         xml_validator = etree.XMLParser(
  1364.             schema=etree.XMLSchema(
  1365.                 etree.parse(environment.get_xsd_schema_file(method))
  1366.             ),
  1367.             huge_tree=True
  1368.         )
  1369.         logger.info(f'Len {method} arguments list: {num_companies}')
  1370.  
  1371.         if partition_size > num_companies:
  1372.             with _report_time(logger, method):
  1373.                 await _run(parser, xml_validator, logger, method, companies)
  1374.             StorageData.save_tables(method, parser.result)
  1375.  
  1376.         else:
  1377.             splitted = []
  1378.             num_partitions = (num_companies // partition_size) + 1
  1379.             partition_size = num_companies // num_partitions
  1380.  
  1381.             for i in range(num_partitions):
  1382.                 if i == num_partitions - 1:
  1383.                     splitted.append(companies[i*partition_size:])
  1384.                 else:
  1385.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  1386.  
  1387.             for i, data_partition in enumerate(splitted):
  1388.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1389.                     await _run(parser, xml_validator, logger, method, data_partition)
  1390.            
  1391.                 StorageData.save_partitions(method, parser.result, i)
  1392.                 parser.cleanup()
  1393.  
  1394.             StorageData.concat_partitions(method)
  1395.             StorageData.cleanup_tmp_dir()
  1396.  
  1397.     # =========================================================================
  1398.     method = 'GetCompanyPredecessorSuccessor'
  1399.     # =========================================================================
  1400.  
  1401.     if method in methods:
  1402.         logger = environment.get_logger(method)
  1403.         parser = SparkInterfaxParser(method)
  1404.         partition_size = CALC_PARTITION_SIZE.get(method, 200000)
  1405.         xml_validator = etree.XMLParser(
  1406.             schema=etree.XMLSchema(
  1407.                 etree.parse(environment.get_xsd_schema_file(method))
  1408.             ),
  1409.             huge_tree=True
  1410.         )
  1411.         logger.info(f'Len {method} arguments list: {num_companies}')
  1412.  
  1413.         if partition_size > num_companies:
  1414.             with _report_time(logger, method):
  1415.                 await _run(parser, xml_validator, logger, method, companies)
  1416.             StorageData.save_tables(method, parser.result)
  1417.  
  1418.         else:
  1419.             splitted = []
  1420.             num_partitions = (num_companies // partition_size) + 1
  1421.             partition_size = num_companies // num_partitions
  1422.  
  1423.             for i in range(num_partitions):
  1424.                 if i == num_partitions - 1:
  1425.                     splitted.append(companies[i*partition_size:])
  1426.                 else:
  1427.                     splitted.append(companies[i*partition_size : (i+1)*partition_size])
  1428.  
  1429.             for i, data_partition in enumerate(splitted):
  1430.                 with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
  1431.                     await _run(parser, xml_validator, logger, method, data_partition)
  1432.            
  1433.                 StorageData.save_partitions(method, parser.result, i)
  1434.                 parser.cleanup()
  1435.  
  1436.             StorageData.concat_partitions(method)
  1437.             StorageData.cleanup_tmp_dir()
  1438.  
  1439.  
  1440. if __name__ == '__main__':
  1441.     parser = ArgumentParser(prog=__file__)
  1442.  
  1443.     parser.add_argument(
  1444.         '--methods',
  1445.         default=environment.all_methods,
  1446.         nargs='*',
  1447.         help='method names separated with space; by default all methods are called'
  1448.     )
  1449.  
  1450.     args = parser.parse_args()
  1451.  
  1452.     session_ids = [
  1453.         filename[:-3]
  1454.         for filename in os.listdir(environment.tmp_dir)
  1455.         if filename.endswith('.ok')
  1456.     ]
  1457.     try:
  1458.         environment.cookies = {'ASP.NET_SessionId': session_ids[0]}
  1459.     except IndexError:
  1460.         raise RuntimeError('Session identifier not found')
  1461.  
  1462.     asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  1463.     asyncio.run(main(args.methods))
  1464.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement