Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import argparse
- import logging.config
- import asyncpg
- import aiofiles
- import ujson as json
- from lxml import etree
- from egrul_nipple.storage.models import db, History, LegalEntity, RelationKind
- from egrul_nipple import transformer
- from egrul_nipple import config
- config.LOGGING['loggers']['']['level'] = 'INFO'
- logging.config.dictConfig(config.LOGGING)
- logger = logging.getLogger(__name__)
- def to_element(tag: str, obj: dict) -> etree.Element:
- el = etree.Element(tag)
- if isinstance(obj, str):
- el.text = obj
- else:
- for k, v in obj.items():
- if isinstance(v, str):
- el.set(k, v)
- elif isinstance(v, dict):
- el.append(to_element(k, v))
- elif isinstance(v, list):
- for item in v:
- el.append(to_element(k, item))
- return el
- relations = {}
- async def reapply_entities_links(data, company_id):
- ogrns = {i['ogrn'] for item in data for i in item['entities']}
- ogrn2id = {
- le.ogrn: le.id for le in
- await db.execute(LegalEntity.select(LegalEntity.ogrn, LegalEntity.id).where(LegalEntity.ogrn << ogrns))
- }
- all_links = {}
- for links in data:
- entity_links_keys = set()
- date = links['date']
- for entity, links_data in zip(links['entities'], links['entity_links']):
- entity_id = ogrn2id[entity['ogrn']]
- for link in links_data:
- relation_kind = relations[link['kind'], link.get('subkind')]
- key = entity_id, relation_kind
- entity_links_keys.add(key)
- if key not in all_links:
- all_links[key] = {
- 'legal_entity': company_id,
- 'entity': entity_id,
- 'created_date': date,
- 'relation_kind': relation_kind
- }
- for key in all_links.keys() - entity_links_keys:
- if all_links[key].get('inactive_date') is None:
- all_links[key]['inactive_date'] = date
- fields = 'legal_entity', 'entity', 'relation_kind', 'created_date', 'inactive_date'
- return [tuple(d.get(f, '') for f in fields) for d in all_links.values()]
- async def process(in_q, out_q):
- while True:
- entity_links, company_id = await in_q.get()
- rows = await reapply_entities_links(entity_links, company_id)
- await out_q.put(rows)
- in_q.task_done()
- async def writer(fp, queue, delimiter=','):
- count = 0
- while True:
- rows_items = await queue.get()
- for row in rows_items:
- await fp.write(f'{delimiter.join(map(str, row))}\n')
- queue.task_done()
- count += 1
- if count % 1000 == 0:
- logger.info(f'{count} rows done')
- async def scan_companies(from_id, to_id, queue):
- conn = await asyncpg.connect(
- database=config.DATABASE['database'],
- user=config.DATABASE['user'],
- password=config.DATABASE['password'],
- host=config.DATABASE['host'],
- )
- query = History.select(History.legal_entity_id, History.jsondata)
- if from_id:
- query = query.where(History.legal_entity_id >= from_id)
- if to_id:
- query = query.where(History.legal_entity_id < to_id)
- query = query.order_by(History.legal_entity_id, History.date)
- sql = db.database.get_cursor().mogrify(*query.sql()).decode()
- current_company = None
- buffer = []
- async with conn.transaction():
- async for company_id, raw_data in conn.cursor(sql):
- if company_id != current_company and buffer:
- await queue.put((buffer, current_company))
- buffer = []
- current_company = company_id
- data = json.loads(raw_data)
- if 'СвЮЛ' in data:
- element = to_element('СвЮЛ', data['СвЮЛ'])
- t = transformer.EGRULTransformer(element)
- else:
- element = to_element('СвИП', data['СвИП'])
- t = transformer.EGRIPTransformer(element)
- date = t.get_build_date()
- # noinspection PyProtectedMember
- entities, entity_links = t._get_entity_links_data()
- result = {
- 'entity_links': entity_links,
- 'entities': entities,
- 'date': date,
- }
- buffer.append(result)
- if buffer:
- await queue.put((buffer, current_company))
- async def main(from_id, to_id, n_tasks, filepath):
- for rel in (await db.execute(RelationKind.select())):
- relations[(rel.kind, rel.subkind)] = rel.id
- fp = await aiofiles.open(filepath, 'w')
- in_q = asyncio.Queue(maxsize=4)
- out_q = asyncio.Queue()
- scanner_task = loop.create_task(scan_companies(from_id, to_id, in_q))
- writer_task = loop.create_task(writer(fp, out_q))
- processor_tasks = [loop.create_task(process(in_q, out_q)) for _ in range(n_tasks)]
- await scanner_task
- await in_q.join()
- await out_q.join()
- await fp.close()
- for task in [scanner_task, writer_task, *processor_tasks]:
- task.cancel()
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--from-id', type=int, default=None)
- parser.add_argument('--to-id', type=int, default=None)
- parser.add_argument('--n-tasks', type=int, default=100)
- parser.add_argument('--filepath', type=str, default='out.csv')
- args = parser.parse_args()
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main(**vars(args)))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement