Guest User

Untitled

a guest
Jan 18th, 2018
111
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 11.51 KB | None | 0 0
  1. import asyncio
  2. import concurrent
  3. import multiprocessing as mp
  4. import os
  5. import csv
  6. from argparse import ArgumentParser
  7. from collections import Counter
  8. from functools import reduce
  9. from multiprocessing import Process, Lock
  10.  
  11. import numpy as np
  12. import pandas as pd
  13. from pymongo import MongoClient
  14.  
  15.  
  16. def get_args():
  17. parser = ArgumentParser('data-analytics')
  18.  
  19. parser.add_argument('--host',
  20. dest='host',
  21. default='localhost')
  22. parser.add_argument('--port',
  23. dest='port',
  24. default='34120',
  25. type=int)
  26. parser.add_argument('--db',
  27. dest='db',
  28. default='tribunais_extracao')
  29.  
  30. parser.add_argument('--aws-host',
  31. dest='aws_host',
  32. default='ec2-34-194-75-71.compute-1.amazonaws.com')
  33. parser.add_argument('--aws-port',
  34. dest='aws_port',
  35. default='3000',
  36. type=int)
  37. parser.add_argument('--aws-user',
  38. dest='aws_user',
  39. default='intelivix')
  40. parser.add_argument('--aws-passwd',
  41. dest='aws_passwd',
  42. default='cBcUWHAwYqZ6TYPoM6ekPPoW')
  43. parser.add_argument('--aws-db',
  44. dest='aws_db',
  45. default='tribunais')
  46.  
  47. parser.add_argument('--colls',
  48. dest='colls',
  49. nargs='+',
  50. required=True)
  51.  
  52. parser.add_argument('--csv',
  53. dest='csv',
  54. required=True)
  55.  
  56. parser.add_argument('--workers',
  57. dest='workers',
  58. default=16,
  59. type=int)
  60.  
  61. return parser.parse_args()
  62.  
  63.  
  64. def get_mongodb_local(args):
  65. return MongoClient(host=args.host,
  66. port=args.port)
  67.  
  68.  
  69. def get_mongodb_aws(args):
  70. return MongoClient(host=args.aws_host,
  71. port=args.aws_port,
  72. username=args.aws_user,
  73. password=args.aws_passwd)
  74.  
  75.  
  76. @asyncio.coroutine
  77. def show_stats(collection_name=None,
  78. local_collection=None,
  79. aws_collection=None,
  80. **kwargs):
  81.  
  82. print(f'collection: {collection_name}')
  83.  
  84. try:
  85. print(f'aws.count: '
  86. f'{aws_collection[collection_name].count()}')
  87. print(f'local.count: '
  88. f'{local_collection[collection_name].count()}',
  89. end='\n\n')
  90. except Exception as e:
  91. print('\n\nErro:')
  92. print(f'{e}', end='\n\n\n')
  93.  
  94.  
  95. @asyncio.coroutine
  96. def asynchronous(param=None,
  97. method_async=None,
  98. **kwargs):
  99.  
  100. tasks = []
  101.  
  102. for key, value in param.items():
  103. if isinstance(value, (list, tuple)) \
  104. or hasattr(value, ('gi_running')):
  105. for item in value:
  106. kwarg = {key: item}
  107. kwargs.update(kwarg)
  108.  
  109. fetch_async = method_async(**kwargs)
  110. tasks.append(asyncio.ensure_future(fetch_async))
  111. else:
  112. kwarg = {key: value}
  113. kwargs.update(kwarg)
  114.  
  115. fetch_async = method_async(**kwargs)
  116. tasks.append(asyncio.ensure_future(fetch_async))
  117. if tasks:
  118. yield from asyncio.wait(tasks)
  119.  
  120.  
  121. def run_async(kwargs_async=None, workers_max=16):
  122. loop = asyncio.new_event_loop()
  123.  
  124. asyncio.set_event_loop(loop)
  125.  
  126. ioloop = asyncio.get_event_loop()
  127.  
  128. executor = concurrent.futures.ProcessPoolExecutor(workers_max)
  129.  
  130. ioloop.set_default_executor(executor)
  131. ioloop.run_until_complete(asynchronous(**kwargs_async))
  132.  
  133. executor.shutdown(wait=True)
  134.  
  135. ioloop.close()
  136.  
  137.  
  138. @asyncio.coroutine
  139. def upsert_collection_local(collection_name=None,
  140. local_collection=None,
  141. items=None,
  142. **kwargs):
  143.  
  144. collec = local_collection[collection_name]
  145.  
  146. collec.update_one(
  147. items.get('query'),
  148. items.get('update'),
  149. upsert=True
  150. )
  151.  
  152.  
  153. def check(key):
  154.  
  155. find = {
  156. key: {
  157. '$exists': False
  158. }
  159. }
  160.  
  161. criteria = {
  162. 'estado': 1,
  163. 'numero':1,
  164. 'termos_cliente':1,
  165. 'fonte':1,
  166. 'spider':1
  167. }
  168.  
  169. query = collections[xcoll].find(find, criteria)
  170.  
  171. headers = ['numero', 'termo_cliente', 'estado']
  172. with open('sem_{}.csv'.format(key), 'w') as csvfile:
  173. webshooter_input = csv.writer(csvfile, delimiter=',')
  174.  
  175. webshooter_input.writerow(headers)
  176. for processo in query:
  177.  
  178. try:
  179. termo_cliente = processo.get('termos_cliente')[0]
  180. except IndexError:
  181. termo_cliente = None
  182.  
  183. row = [
  184. processo.get('numero'),
  185. termo_cliente,
  186. processo.get('estado')
  187. ]
  188. webshooter_input.writerow(row)
  189.  
  190. print(f'sem {key}: {query.count()}')
  191.  
  192.  
  193. if __name__ == '__main__':
  194. args = get_args()
  195. connect = get_mongodb_local(args)
  196. db = connect.get_database(args.db)
  197.  
  198. aws_connect = get_mongodb_aws(args)
  199. aws_db = aws_connect.get_database(args.aws_db)
  200.  
  201. collection_names = aws_db.collection_names()
  202.  
  203. colls = collection_names
  204.  
  205. collections = {}
  206. for coll in colls:
  207. if not db.get_collection(coll):
  208. db.create_collection(coll)
  209. collections.update({coll: db[coll]})
  210.  
  211. aws_colls = collection_names
  212.  
  213. aws_collections = {}
  214. for aws_coll in aws_colls:
  215. aws_collections.update({aws_coll: aws_db[aws_coll]})
  216.  
  217. for i, _coll in enumerate(sorted(collection_names)):
  218. print(f'{i+1} - Collection_name: {_coll}')
  219.  
  220. xcolls = args.colls
  221.  
  222. kwargs_async = dict(param={'collection_name': xcolls},
  223. local_collection=collections,
  224. aws_collection=aws_collections,
  225. method_async=None)
  226.  
  227. method = dict(method_async=show_stats)
  228. kwargs_async.update(method)
  229.  
  230. run_async(kwargs_async=kwargs_async)
  231.  
  232.  
  233. for collection_name in xcolls:
  234.  
  235. aws_collection = aws_collections[collection_name]
  236. collection = collections[collection_name]
  237. count = aws_collections[collection_name].find({}).count()
  238. batch = 100
  239.  
  240. if count > batch:
  241. for b in range(0, count, batch):
  242.  
  243. param = {
  244. 'items': (
  245. {
  246. 'query': {'_id': doc['_id']},
  247. 'update': {'$set': doc}
  248. }
  249. for doc in aws_collection.find({}).skip(b).limit(batch)
  250. )
  251. }
  252.  
  253. kwargs_async = dict(param=param,
  254. collection_name=collection_name,
  255. local_collection=collections,
  256. method_async=upsert_collection_local)
  257.  
  258. run_async(kwargs_async=kwargs_async, workers_max=args.workers)
  259. else:
  260. param = {
  261. 'items': (
  262. {
  263. 'query': {'_id': doc['_id']},
  264. 'update': {'$set': doc}
  265. }
  266. for doc in aws_collection.find({})
  267. )
  268. }
  269.  
  270. kwargs_async = dict(param=param,
  271. collection_name=collection_name,
  272. local_collection=collections,
  273. method_async=upsert_collection_local)
  274.  
  275. run_async(kwargs_async=kwargs_async, workers_max=args.workers)
  276.  
  277. xcoll = xcolls[0]
  278. path = f'{args.csv}'
  279.  
  280. xcsv = pd.read_csv(path,
  281. header=None,
  282. names=["idextracao_alpha",
  283. "numeroextracao_alpha",
  284. "comarcaextracao_alpha",
  285. "ufextracao_alpha",
  286. "termos_cliente",
  287. "data.digito",
  288. "estado",
  289. "data.origem",
  290. "numero",
  291. "is_valid",
  292. "data.tribunal",
  293. "data.ano",
  294. "data.justica",
  295. "data.sequencial",
  296. "justica",
  297. "uf_final"
  298. ],
  299. sep=";")
  300.  
  301. print(
  302. f'Quantidade de termos_cliente no csv: '
  303. f'{xcsv.count()}'
  304. )
  305.  
  306. find = {}
  307.  
  308. criteria = {
  309. 'estado': 1,
  310. 'numero':1,
  311. 'termos_cliente':1,
  312. 'fonte':1,
  313. 'spider':1
  314. }
  315.  
  316. df = pd.DataFrame(list(collections[xcoll].find(find, criteria)))
  317. print(df.count())
  318.  
  319. find = {
  320. 'termos_cliente': {
  321. '$exists': False
  322. }
  323. }
  324.  
  325. criteria = {
  326. 'estado': 1,
  327. 'numero':1,
  328. 'termos_cliente':1,
  329. 'fonte':1,
  330. 'spider':1
  331. }
  332.  
  333. df_sem_termos_cliente = pd.DataFrame(
  334. list(collections[xcoll].find(find, criteria))
  335. )
  336.  
  337. print(df_sem_termos_cliente.count())
  338.  
  339. find = {
  340. 'partes': {
  341. '$exists': False
  342. }
  343. }
  344.  
  345. criteria = {
  346. 'estado': 1,
  347. 'numero':1,
  348. 'termos_cliente':1,
  349. 'fonte':1,
  350. 'spider':1
  351. }
  352.  
  353. df_sem_partes = collections[xcoll].find(find, criteria)
  354.  
  355. check('partes')
  356.  
  357. check('andamentos')
  358.  
  359. find = {
  360. 'termos_cliente': {
  361. '$exists': True
  362. }
  363. }
  364.  
  365. criteria = {
  366. 'estado': 1,
  367. 'numero':1,
  368. 'termos_cliente':1,
  369. 'fonte':1,
  370. 'spider':1
  371. }
  372.  
  373. df_com_termos_cliente = pd.DataFrame(
  374. list(collections[xcoll].find(find, criteria))
  375. )
  376.  
  377. found_termos_cliente = df_com_termos_cliente.termos_cliente[
  378. [
  379. any((check in s) for check in xcsv.termos_cliente)
  380. for s in df_com_termos_cliente.termos_cliente
  381. ]
  382. ]
  383.  
  384. print(f'termos_cliente found: {found_termos_cliente.count()}')
  385.  
  386. qtd_not_found_termos_cliente = (
  387. (
  388. df_com_termos_cliente.termos_cliente.count()
  389. + len(df_sem_termos_cliente)
  390. )
  391. - found_termos_cliente.count()
  392. )
  393.  
  394. print(f'termos_cliente not found: {qtd_not_found_termos_cliente}')
  395.  
  396. total = (
  397. df_com_termos_cliente.termos_cliente.count()
  398. + len(df_sem_termos_cliente)
  399. )
  400.  
  401. print(f'Total: {total}')
  402.  
  403. extraido_csv_cliente = xcsv.termos_cliente[
  404. (
  405. [
  406. any((check in stc) for stc in (s for s in found_termos_cliente))
  407. for check in xcsv.termos_cliente
  408. ]
  409. )
  410. ]
  411.  
  412. print(
  413. f'Itens extraidos base cliente: '
  414. f'{extraido_csv_cliente.count()}'
  415. )
  416.  
  417. extraido_csv_cliente.to_csv(
  418. f'./extracted-collection_name-{xcoll}.csv',
  419. index=False
  420. )
  421.  
  422. nao_extraido_csv_cliente = xcsv.termos_cliente[
  423. ~(xcsv.termos_cliente.isin(extraido_csv_cliente))
  424. ]
  425.  
  426. print(
  427. f'Itens não extraidos base cliente: '
  428. f'{nao_extraido_csv_cliente.count()}'
  429. )
  430.  
  431. nao_extraido_csv_cliente.to_csv(
  432. f'./not-extracted-collection_name-{xcoll}.csv',
  433. index=False
  434. )
  435.  
  436. total_termos_cliente_csv = (
  437. extraido_csv_cliente.count()
  438. + nao_extraido_csv_cliente.count()
  439. )
  440.  
  441. print(f'Total: {total_termos_cliente_csv}')
  442.  
  443. print(f'Porcentagem do Match: '
  444. f'{(extraido_csv_cliente.count()/total_termos_cliente_csv)*100:.2f}%')
Add Comment
Please, Sign In to add comment