Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import annotations
- import asyncio
- import html
- import os
- import re
- import time
- from argparse import ArgumentParser
- from contextlib import contextmanager
- from datetime import datetime
- from typing import Dict, Iterable, Optional, Tuple
- import aiohttp
- import httpx
- import requests
- from nostr_data_logging import NoStrDataLogger
- import uvloop
- from lxml import etree
- import get_changed
- from mapping import (
- XML_ROOT_ELEMENT,
- CALC_PARTITION_SIZE
- )
- from storage_handler import StorageData
- from environment_handler import Environment
- from spark_interfax_parser import SparkInterfaxParser
- environment = Environment()
- # CONCURRENCY = 1000
- # RESPONSE_VA
- def _validate(response: str, pattern: str) -> Optional[str]:
- """
- Первичная валидация ответа от сервиса СПАРК.
- Args:
- response (str): xml ответ от сервиса СПАРК;
- pattern (str): регулярное выражение по которому происходит первичная валидация ответа.
- Описывается в конфигурационном файле.
- Returns:
- Optional[str]: в случае успешной валидации И присутствия данных во внутренней xml ответа
- вернется внутренняя xml с конвертированными ссылками на именнованные и числовые символы
- в соответствующие Unicode символы.
- """
- match = re.fullmatch(pattern, response)
- if match and match.group(1) == 'True':
- return html.unescape(match.group(2))
- @contextmanager
- def _report_time(logger: NoStrDataLogger, test: str) -> None:
- """
- Логирование времени затраченного на выполнение кода в контексте.
- Args:
- logger (NoStrDataLogger): логер, используемый для записи времени выполенения;
- test (str): проводимая операция (в контексте) для идентификации в логах.
- """
- t0 = time.time()
- yield
- logger.info('Time needed for %s: %.2fs' % (test, time.time() - t0))
- async def _produce(
- # session: aiohttp.ClientSession,
- client: httpx.AsyncClient,
- queue: asyncio.Queue,
- semaphore: asyncio.Semaphore,
- method: str,
- kwarg: Dict[str, str],
- logger: NoStrDataLogger
- ) -> None:
- """
- Провайдер ответов для связующей асинхронной очереди. Отправляет запрос
- с использованием предварительно открытой сессии в сервис СПАРК и
- складывает кортеж <аргументы запроса, ответ сервиса> в очередь.
- Args:
- session (aiohttp.ClientSession): сессия общения с сервисом СПАРК.
- Является инкапсуляцией пула соединений с сервисом;
- queue (asyncio.Queue): асинхронная очередь,
- связующая провайдер и потребитель ответов от сервиса;
- semaphore (asyncio.Semaphore): ограничитель на количество одновременных
- запросов в сервис СПАРК;
- method (str): наименование метода в сервисе СПАРК;
- kwarg (Dict[str, str]): словарь аргументов для совершения POST-запроса в сервис СПАРК;
- logger (NoStrDataLogger): логер, используемый для сообщений по текущему методу.
- """
- async with semaphore:
- try:
- rdata = environment.get_request_data(method, kwarg).encode()
- response = await client.post(environment.url, content=rdata)
- print(response.elapsed.total_seconds())
- queue.put_nowait((kwarg, response.text))
- except httpx.RequestError as exc:
- logger.warning(str(kwarg) + ': ' + repr(exc))
- asyncio.ensure_future(_produce(client, queue, semaphore, method, kwarg, logger))
- # except (aiohttp.client_exceptions.ClientConnectionError,
- # aiohttp.client_exceptions.ClientPayloadError) as clierr:
- # logger.warning(str(kwarg) + ': ' + repr(clierr))
- # asyncio.ensure_future(_produce(session, queue, semaphore, method, kwarg, logger))
- # except asyncio.exceptions.TimeoutError as timeouterr:
- # logger.warning(str(kwarg) + ': ' + repr(timeouterr))
- # asyncio.ensure_future(_produce(session, queue, method, semaphore, kwarg, logger))
- async def _consume(
- queue: asyncio.Queue,
- parser: SparkInterfaxParser,
- xml_validator: etree.XMLParser,
- root: str,
- pattern: str,
- logger: NoStrDataLogger
- ) -> None:
- """
- Потребитель ответов для связующей асинхронной очереди.
- Достает из оредеди кортеж <аргументы запроса, ответ сервиса> и обрабатывает его.
- Args:
- queue (asyncio.Queue): асинхронная очередь,
- связующая провайдер и потребитель ответов от сервиса;
- parser (SparkInterfaxParser): парсер для внутренних xml, содержащих необходимые данные.
- Также является контейнером для результирующего множества таблиц;
- xml_validator (etree.XMLParser): валидатор на соответствие xsd схеме для внутренних xml;
- root (str): элемент, являющийся корневым во внутренней xml.
- Ниже его в структуре xml находятся непосредственно извлекаемые данные.
- Явно прописываемый параметр в файле mapping.py в переменной XML_ROOT_ELEMENT
- для каждого метода (если не указан, будет использован 'Data/Report');
- pattern (str): Регулярное выражение по которому происходит первичная валидация ответа.
- Описывается в конфигурационном файле;
- logger (NoStrDataLogger): логер, используемый для сообщений по текущему методу.
- """
- while True:
- kwarg, response = await queue.get()
- _handle_response(kwarg, response, parser, xml_validator, root, pattern, logger)
- queue.task_done()
- def _handle_response(
- kwarg: Dict[str, str],
- response: str,
- parser: SparkInterfaxParser,
- xml_validator: etree.XMLParser,
- root: str,
- pattern: str,
- logger: NoStrDataLogger
- ) -> None:
- """
- Обработка xml-ответов от сервиса СПАРК.
- Args:
- kwarg (Dict[str, str]): аргументы, с которыми отправлялся запрос.
- Необходимы для возможности логирования ошибок при обработке, как например ошибка
- валидации по xsd схеме;
- response (str): xml ответ от сервиса СПАРК;
- parser (SparkInterfaxParser): парсер для внутренних xml, содержащих необходимые данные.
- Также является контейнером для результирующего множества таблиц;
- xml_validator (etree.XMLParser): валидатор на соответствие xsd схеме для внутренних xml;
- root (str): элемент, являющийся корневым во внутренней xml.
- Ниже его в структуре xml находятся непосредственно извлекаемые данные.
- Явно прописываемый параметр в файле mapping.py в переменной XML_ROOT_ELEMENT
- для каждого метода (если не указан, будет использован 'Data/Report');
- pattern (str): Регулярное выражение по которому происходит первичная валидация ответа.
- Описывается в конфигурационном файле;
- logger (NoStrDataLogger): логер, используемый для сообщений по текущему методу.
- """
- validated = _validate(response, pattern)
- if validated is not None:
- try:
- eltree = etree.fromstring(
- environment.remove_prefix(validated, '<?xml version="1.0" encoding="UTF-8"?>'),
- xml_validator
- )
- parser.parse(eltree.find(root))
- except etree.XMLSyntaxError as err:
- logger.warning(str(kwarg) + ': ' + repr(err))
- async def _run(
- parser: SparkInterfaxParser,
- xml_validator: etree.XMLParser,
- logger: NoStrDataLogger,
- method: str,
- args: Iterable[Dict[str, str]]
- ) -> None:
- """
- Инициализация обработки списка входных аргументов по определенному методу.
- Args:
- parser (SparkInterfaxParser): парсер для внутренних xml, содержащих необходимые данные.
- Также является контейнером результирующего множества таблиц;
- xml_validator (etree.XMLParser): валидатор на соответствие xsd схеме для внутренних xml;
- logger (NoStrDataLogger): логер, используемый для сообщений по текущему методу.
- method (str): наименование метода в сервисе СПАРК;
- args (Iterable[Dict[str, str]]): список именнованных аргументов по которым требуется
- получить ответ от сервиса СПАРК.
- """
- root = XML_ROOT_ELEMENT.get(method, 'Data/Report')
- pattern = environment.get_response_validation_pattern(method)
- # timeout = aiohttp.ClientTimeout(total=40)
- # connector = aiohttp.TCPConnector(limit=10)
- semaphore = asyncio.Semaphore(100)
- queue = asyncio.Queue()
- # async with aiohttp.ClientSession(
- # headers=environment.headers,
- # cookies=environment.cookies,
- # connector=connector,
- # timeout=timeout
- # ) as session:
- async with httpx.AsyncClient(
- headers=environment.headers,
- cookies=environment.cookies,
- timeout=httpx.Timeout(10.0),
- limits=httpx.Limits(max_connections=50, max_keepalive_connections=10),
- verify=False
- ) as client:
- consumer = asyncio.ensure_future(
- _consume(queue, parser, xml_validator, root, pattern, logger))
- tasks = [
- _produce(client, queue, semaphore, method, arg, logger)
- for arg in args
- ]
- await asyncio.gather(*tasks)
- await queue.join()
- consumer.cancel()
- async def main(methods: Iterable[str]) -> None:
- """
- Обработчик методов сервиса СПАРК. Производит:
- - получение списка компаний, имеющих на текущую/заданную дату какие-либо изменения;
- - вызов определенных методов по данному списку;
- - сохранение результатов вызовов методов сервиса.
- Args:
- methods (Iterable[str]): список методов,
- по которым необходимо запросить данные от сервиса СПАРК.
- """
- method = 'GetChangedCompanies'
- gcc_request_data = environment.get_request_data(
- method,
- {
- 'changeDate': environment.change_date() + 'T00:00:00.000',
- 'changeType': environment.change_type()
- }
- )
- pattern = environment.get_response_validation_pattern(method)
- companies = _validate(
- requests.post(
- environment.url,
- headers=environment.headers,
- cookies=environment.cookies,
- data=gcc_request_data
- ).text,
- pattern
- )
- logger = environment.get_logger('General')
- if companies is not None:
- try:
- companies, companies_method_result = get_changed.companies(companies)
- if method in methods:
- StorageData.save_tables(method, companies_method_result)
- del companies_method_result
- except etree.XMLSyntaxError as err:
- logger.error(err)
- companies = []
- else:
- companies = []
- num_companies = len(companies)
- method = 'GetChangedEntrepreneurs'
- gce_request_data = environment.get_request_data(
- method,
- {
- 'changeDate': environment.change_date() + 'T00:00:00.000',
- 'changeType': environment.change_type()
- }
- )
- pattern = environment.get_response_validation_pattern(method)
- entrepreneurs = _validate(
- requests.post(
- environment.url,
- headers=environment.headers,
- cookies=environment.cookies,
- data=gce_request_data
- ).text,
- pattern
- )
- if entrepreneurs is not None:
- try:
- entrepreneurs, entrepreneurs_method_result = get_changed.entrepreneurs(entrepreneurs)
- if method in methods:
- StorageData.save_tables(method, entrepreneurs_method_result)
- del entrepreneurs_method_result
- except etree.XMLSyntaxError as err:
- logger.error(err)
- entrepreneurs = []
- else:
- entrepreneurs = []
- num_entrepreneurs = len(entrepreneurs)
- # -------------------------------------------------------------------------
- # -------------------------------- 1 --------------------------------------
- # -------------------------------------------------------------------------
- # =========================================================================
- method = 'GetCompanyArbitrationSummary'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetEntrepreneurArbitrationSummary'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_entrepreneurs}')
- if partition_size > num_entrepreneurs:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, entrepreneurs)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_entrepreneurs // partition_size) + 1
- partition_size = num_entrepreneurs // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(entrepreneurs[i*partition_size:])
- else:
- splitted.append(entrepreneurs[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyLeasings'
- # =========================================================================
- leasing_report_attrs = []
- if method in methods:
- args = [{**company, 'leasingStatus': '0'} for company in companies]
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {len(args)}')
- if partition_size > len(args):
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, args)
- for row in parser.result.get('Leasings', []):
- leasing_report_attrs.append((row[1], row[4]))
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (len(args) // partition_size) + 1
- partition_size = len(args) // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(args[i*partition_size:])
- else:
- splitted.append(args[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- for row in parser.result.get('Leasings', []):
- leasing_report_attrs.append((row[1], row[4]))
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetEntrepreneurLeasings'
- # =========================================================================
- if method in methods:
- args = [{**entrepreneur, 'leasingStatus': '0'} for entrepreneur in entrepreneurs]
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {len(args)}')
- if partition_size > len(args):
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, args)
- for row in parser.result.get('Leasings', []):
- leasing_report_attrs.append((row[1], row[4]))
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (len(args) // partition_size) + 1
- partition_size = len(args) // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(args[i*partition_size:])
- else:
- splitted.append(args[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- for row in parser.result.get('Leasings', []):
- leasing_report_attrs.append((row[1], row[4]))
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetLeasingReport'
- # =========================================================================
- if method in methods:
- leasing_report_attrs = [
- {'id': attr[0], 'contractNumber': attr[1]}
- for attr in set(leasing_report_attrs)
- ]
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {len(leasing_report_attrs)}')
- if partition_size > len(leasing_report_attrs):
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, leasing_report_attrs)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (len(leasing_report_attrs) // partition_size) + 1
- partition_size = len(leasing_report_attrs) // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(leasing_report_attrs[i*partition_size:])
- else:
- splitted.append(leasing_report_attrs[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanySparkRisksReportXML'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # -------------------------------------------------------------------------
- # -------------------------------- 2 --------------------------------------
- # -------------------------------------------------------------------------
- # =========================================================================
- method = 'GetCompanyPledges'
- # =========================================================================
- pledge_report_attrs = []
- if method in methods:
- args = [{**company, 'pledgeStatus': '0'} for company in companies]
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {len(args)}')
- if partition_size > len(args):
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, args)
- for row in parser.result.get('Pledges', []):
- pledge_report_attrs.append((row[1], row[3], row[5]))
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (len(args) // partition_size) + 1
- partition_size = len(args) // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(args[i*partition_size:])
- else:
- splitted.append(args[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- for row in parser.result.get('Pledges', []):
- pledge_report_attrs.append((row[1], row[3], row[5]))
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetEntrepreneurPledges'
- # =========================================================================
- if method in methods:
- args = [{**entrepreneur, 'pledgeStatus': '0'} for entrepreneur in entrepreneurs]
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {len(args)}')
- if partition_size > len(args):
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, args)
- for row in parser.result.get('Pledges', []):
- pledge_report_attrs.append((row[1], row[3], row[5]))
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (len(args) // partition_size) + 1
- partition_size = len(args) // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(args[i*partition_size:])
- else:
- splitted.append(args[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- for row in parser.result.get('Pledges', []):
- pledge_report_attrs.append((row[1], row[3], row[5]))
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetPledgeReport'
- # =========================================================================
- if method in methods:
- pledge_report_attrs = [
- {'id': attr[0], 'notificationNumber': attr[1], 'contractNumber': attr[2]}
- for attr in set(pledge_report_attrs)
- ]
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {len(pledge_report_attrs)}')
- if partition_size > len(pledge_report_attrs):
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, pledge_report_attrs)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (len(pledge_report_attrs) // partition_size) + 1
- partition_size = len(pledge_report_attrs) // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(pledge_report_attrs[i*partition_size:])
- else:
- splitted.append(pledge_report_attrs[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyExecutionProceedings'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetEntrepreneurExecutionProceedings'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_entrepreneurs}')
- if partition_size > num_entrepreneurs:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, entrepreneurs)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_entrepreneurs // partition_size) + 1
- partition_size = num_entrepreneurs // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(entrepreneurs[i*partition_size:])
- else:
- splitted.append(entrepreneurs[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyCounterparties'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # -------------------------------------------------------------------------
- # -------------------------------- 3 --------------------------------------
- # -------------------------------------------------------------------------
- # =========================================================================
- method = 'GetCompanyLicenses'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyFinancialAnalysis'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyRiskFactors'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetBankAccountingReport101102_101'
- # =========================================================================
- if method in methods:
- args = environment.get_bank_accounting_report_arguments_list('101')
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {len(args)}')
- if partition_size > len(args):
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method[:-4], args)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (len(args) // partition_size) + 1
- partition_size = len(args) // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method[:-4], data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetBankAccountingReport101102_102'
- # =========================================================================
- if method in methods:
- args = environment.get_bank_accounting_report_arguments_list('102')
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method[:-4], 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {len(args)}')
- if partition_size > len(args):
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method[:-4], args)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (len(args) // partition_size) + 1
- partition_size = len(args) // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method[:-4], data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyPaymentDiscipline'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size:(i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyAccountingReport'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetEntrepreneurShortReport'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_entrepreneurs}')
- if partition_size > num_entrepreneurs:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, entrepreneurs)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_entrepreneurs // partition_size) + 1
- partition_size = num_entrepreneurs // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(entrepreneurs[i*partition_size:])
- else:
- splitted.append(entrepreneurs[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyExtendedReport'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # -------------------------------------------------------------------------
- # -------------------------------- 4 --------------------------------------
- # -------------------------------------------------------------------------
- # =========================================================================
- method = 'GetOwnershipHierarchy'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyStructure'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- # =========================================================================
- method = 'GetCompanyPredecessorSuccessor'
- # =========================================================================
- if method in methods:
- logger = environment.get_logger(method)
- parser = SparkInterfaxParser(method)
- partition_size = CALC_PARTITION_SIZE.get(method, 200000)
- xml_validator = etree.XMLParser(
- schema=etree.XMLSchema(
- etree.parse(environment.get_xsd_schema_file(method))
- ),
- huge_tree=True
- )
- logger.info(f'Len {method} arguments list: {num_companies}')
- if partition_size > num_companies:
- with _report_time(logger, method):
- await _run(parser, xml_validator, logger, method, companies)
- StorageData.save_tables(method, parser.result)
- else:
- splitted = []
- num_partitions = (num_companies // partition_size) + 1
- partition_size = num_companies // num_partitions
- for i in range(num_partitions):
- if i == num_partitions - 1:
- splitted.append(companies[i*partition_size:])
- else:
- splitted.append(companies[i*partition_size : (i+1)*partition_size])
- for i, data_partition in enumerate(splitted):
- with _report_time(logger, f'{method} part ({i+1}/{len(splitted)})'):
- await _run(parser, xml_validator, logger, method, data_partition)
- StorageData.save_partitions(method, parser.result, i)
- parser.cleanup()
- StorageData.concat_partitions(method)
- StorageData.cleanup_tmp_dir()
- if __name__ == '__main__':
- parser = ArgumentParser(prog=__file__)
- parser.add_argument(
- '--methods',
- default=environment.all_methods,
- nargs='*',
- help='method names separated with space; by default all methods are called'
- )
- args = parser.parse_args()
- session_ids = [
- filename[:-3]
- for filename in os.listdir(environment.tmp_dir)
- if filename.endswith('.ok')
- ]
- try:
- environment.cookies = {'ASP.NET_SessionId': session_ids[0]}
- except IndexError:
- raise RuntimeError('Session identifier not found')
- asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
- asyncio.run(main(args.methods))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement