Advertisement
Guest User

Untitled

a guest
Jul 11th, 2018
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.65 KB | None | 0 0
  1. import asyncio
  2. import argparse
  3. import logging.config
  4.  
  5. import asyncpg
  6. import aiofiles
  7. import ujson as json
  8. from lxml import etree
  9.  
  10. from egrul_nipple.storage.models import db, History, LegalEntity, RelationKind
  11. from egrul_nipple import transformer
  12. from egrul_nipple import config
  13.  
  14.  
  15. config.LOGGING['loggers']['']['level'] = 'INFO'
  16. logging.config.dictConfig(config.LOGGING)
  17.  
  18. logger = logging.getLogger(__name__)
  19.  
  20.  
  21. def to_element(tag: str, obj: dict) -> etree.Element:
  22.     el = etree.Element(tag)
  23.     if isinstance(obj, str):
  24.         el.text = obj
  25.     else:
  26.         for k, v in obj.items():
  27.             if isinstance(v, str):
  28.                 el.set(k, v)
  29.             elif isinstance(v, dict):
  30.                 el.append(to_element(k, v))
  31.             elif isinstance(v, list):
  32.                 for item in v:
  33.                     el.append(to_element(k, item))
  34.     return el
  35.  
  36.  
  37. relations = {}
  38.  
  39.  
  40. async def reapply_entities_links(data, company_id):
  41.     ogrns = {i['ogrn'] for item in data for i in item['entities']}
  42.     ogrn2id = {
  43.         le.ogrn: le.id for le in
  44.         await db.execute(LegalEntity.select(LegalEntity.ogrn, LegalEntity.id).where(LegalEntity.ogrn << ogrns))
  45.     }
  46.  
  47.     all_links = {}
  48.     for links in data:
  49.         entity_links_keys = set()
  50.  
  51.         date = links['date']
  52.         for entity, links_data in zip(links['entities'], links['entity_links']):
  53.             entity_id = ogrn2id[entity['ogrn']]
  54.             for link in links_data:
  55.                 relation_kind = relations[link['kind'], link.get('subkind')]
  56.                 key = entity_id, relation_kind
  57.                 entity_links_keys.add(key)
  58.  
  59.                 if key not in all_links:
  60.                     all_links[key] = {
  61.                         'legal_entity': company_id,
  62.                         'entity': entity_id,
  63.                         'created_date': date,
  64.                         'relation_kind': relation_kind
  65.                     }
  66.  
  67.         for key in all_links.keys() - entity_links_keys:
  68.             if all_links[key].get('inactive_date') is None:
  69.                 all_links[key]['inactive_date'] = date
  70.  
  71.     fields = 'legal_entity', 'entity', 'relation_kind', 'created_date', 'inactive_date'
  72.     return [tuple(d.get(f, '') for f in fields) for d in all_links.values()]
  73.  
  74.  
  75. async def process(in_q, out_q):
  76.     while True:
  77.         entity_links, company_id = await in_q.get()
  78.         rows = await reapply_entities_links(entity_links, company_id)
  79.         await out_q.put(rows)
  80.         in_q.task_done()
  81.  
  82.  
  83. async def writer(fp, queue, delimiter=','):
  84.     count = 0
  85.     while True:
  86.         rows_items = await queue.get()
  87.         for row in rows_items:
  88.             await fp.write(f'{delimiter.join(map(str, row))}\n')
  89.         queue.task_done()
  90.         count += 1
  91.         if count % 1000 == 0:
  92.             logger.info(f'{count} rows done')
  93.  
  94.  
  95. async def scan_companies(from_id, to_id, queue):
  96.  
  97.     conn = await asyncpg.connect(
  98.         database=config.DATABASE['database'],
  99.         user=config.DATABASE['user'],
  100.         password=config.DATABASE['password'],
  101.         host=config.DATABASE['host'],
  102.     )
  103.    
  104.     query = History.select(History.legal_entity_id, History.jsondata)
  105.     if from_id:
  106.         query = query.where(History.legal_entity_id >= from_id)
  107.     if to_id:
  108.         query = query.where(History.legal_entity_id < to_id)
  109.     query = query.order_by(History.legal_entity_id, History.date)
  110.  
  111.     sql = db.database.get_cursor().mogrify(*query.sql()).decode()
  112.     current_company = None
  113.     buffer = []
  114.  
  115.     async with conn.transaction():
  116.         async for company_id, raw_data in conn.cursor(sql):
  117.             if company_id != current_company and buffer:
  118.                 await queue.put((buffer, current_company))
  119.                 buffer = []
  120.  
  121.             current_company = company_id
  122.             data = json.loads(raw_data)
  123.  
  124.             if 'СвЮЛ' in data:
  125.                 element = to_element('СвЮЛ', data['СвЮЛ'])
  126.                 t = transformer.EGRULTransformer(element)
  127.             else:
  128.                 element = to_element('СвИП', data['СвИП'])
  129.                 t = transformer.EGRIPTransformer(element)
  130.  
  131.             date = t.get_build_date()
  132.  
  133.             # noinspection PyProtectedMember
  134.             entities, entity_links = t._get_entity_links_data()
  135.             result = {
  136.                 'entity_links': entity_links,
  137.                 'entities': entities,
  138.                 'date': date,
  139.             }
  140.  
  141.             buffer.append(result)
  142.  
  143.         if buffer:
  144.             await queue.put((buffer, current_company))
  145.  
  146.  
  147. async def main(from_id, to_id, n_tasks, filepath):
  148.     for rel in (await db.execute(RelationKind.select())):
  149.         relations[(rel.kind, rel.subkind)] = rel.id
  150.  
  151.     fp = await aiofiles.open(filepath, 'w')
  152.     in_q = asyncio.Queue(maxsize=4)
  153.     out_q = asyncio.Queue()
  154.     scanner_task = loop.create_task(scan_companies(from_id, to_id, in_q))
  155.     writer_task = loop.create_task(writer(fp, out_q))
  156.     processor_tasks = [loop.create_task(process(in_q, out_q)) for _ in range(n_tasks)]
  157.  
  158.     await scanner_task
  159.     await in_q.join()
  160.     await out_q.join()
  161.     await fp.close()
  162.     for task in [scanner_task, writer_task, *processor_tasks]:
  163.         task.cancel()
  164.  
  165.  
  166. if __name__ == '__main__':
  167.     parser = argparse.ArgumentParser()
  168.     parser.add_argument('--from-id', type=int, default=None)
  169.     parser.add_argument('--to-id', type=int, default=None)
  170.     parser.add_argument('--n-tasks', type=int, default=100)
  171.     parser.add_argument('--filepath', type=str, default='out.csv')
  172.  
  173.     args = parser.parse_args()
  174.  
  175.     loop = asyncio.get_event_loop()
  176.     loop.run_until_complete(main(**vars(args)))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement