Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import concurrent
- import multiprocessing as mp
- import os
- import csv
- from argparse import ArgumentParser
- from collections import Counter
- from functools import reduce
- from multiprocessing import Process, Lock
- import numpy as np
- import pandas as pd
- from pymongo import MongoClient
- def get_args():
- parser = ArgumentParser('data-analytics')
- parser.add_argument('--host',
- dest='host',
- default='localhost')
- parser.add_argument('--port',
- dest='port',
- default='34120',
- type=int)
- parser.add_argument('--db',
- dest='db',
- default='tribunais_extracao')
- parser.add_argument('--aws-host',
- dest='aws_host',
- default='ec2-34-194-75-71.compute-1.amazonaws.com')
- parser.add_argument('--aws-port',
- dest='aws_port',
- default='3000',
- type=int)
- parser.add_argument('--aws-user',
- dest='aws_user',
- default='intelivix')
- parser.add_argument('--aws-passwd',
- dest='aws_passwd',
- default='cBcUWHAwYqZ6TYPoM6ekPPoW')
- parser.add_argument('--aws-db',
- dest='aws_db',
- default='tribunais')
- parser.add_argument('--colls',
- dest='colls',
- nargs='+',
- required=True)
- parser.add_argument('--csv',
- dest='csv',
- required=True)
- parser.add_argument('--workers',
- dest='workers',
- default=16,
- type=int)
- return parser.parse_args()
- def get_mongodb_local(args):
- return MongoClient(host=args.host,
- port=args.port)
- def get_mongodb_aws(args):
- return MongoClient(host=args.aws_host,
- port=args.aws_port,
- username=args.aws_user,
- password=args.aws_passwd)
- @asyncio.coroutine
- def show_stats(collection_name=None,
- local_collection=None,
- aws_collection=None,
- **kwargs):
- print(f'collection: {collection_name}')
- try:
- print(f'aws.count: '
- f'{aws_collection[collection_name].count()}')
- print(f'local.count: '
- f'{local_collection[collection_name].count()}',
- end='\n\n')
- except Exception as e:
- print('\n\nErro:')
- print(f'{e}', end='\n\n\n')
- @asyncio.coroutine
- def asynchronous(param=None,
- method_async=None,
- **kwargs):
- tasks = []
- for key, value in param.items():
- if isinstance(value, (list, tuple)) \
- or hasattr(value, ('gi_running')):
- for item in value:
- kwarg = {key: item}
- kwargs.update(kwarg)
- fetch_async = method_async(**kwargs)
- tasks.append(asyncio.ensure_future(fetch_async))
- else:
- kwarg = {key: value}
- kwargs.update(kwarg)
- fetch_async = method_async(**kwargs)
- tasks.append(asyncio.ensure_future(fetch_async))
- if tasks:
- yield from asyncio.wait(tasks)
- def run_async(kwargs_async=None, workers_max=16):
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- ioloop = asyncio.get_event_loop()
- executor = concurrent.futures.ProcessPoolExecutor(workers_max)
- ioloop.set_default_executor(executor)
- ioloop.run_until_complete(asynchronous(**kwargs_async))
- executor.shutdown(wait=True)
- ioloop.close()
- @asyncio.coroutine
- def upsert_collection_local(collection_name=None,
- local_collection=None,
- items=None,
- **kwargs):
- collec = local_collection[collection_name]
- collec.update_one(
- items.get('query'),
- items.get('update'),
- upsert=True
- )
- def check(key):
- find = {
- key: {
- '$exists': False
- }
- }
- criteria = {
- 'estado': 1,
- 'numero':1,
- 'termos_cliente':1,
- 'fonte':1,
- 'spider':1
- }
- query = collections[xcoll].find(find, criteria)
- headers = ['numero', 'termo_cliente', 'estado']
- with open('sem_{}.csv'.format(key), 'w') as csvfile:
- webshooter_input = csv.writer(csvfile, delimiter=',')
- webshooter_input.writerow(headers)
- for processo in query:
- try:
- termo_cliente = processo.get('termos_cliente')[0]
- except IndexError:
- termo_cliente = None
- row = [
- processo.get('numero'),
- termo_cliente,
- processo.get('estado')
- ]
- webshooter_input.writerow(row)
- print(f'sem {key}: {query.count()}')
- if __name__ == '__main__':
- args = get_args()
- connect = get_mongodb_local(args)
- db = connect.get_database(args.db)
- aws_connect = get_mongodb_aws(args)
- aws_db = aws_connect.get_database(args.aws_db)
- collection_names = aws_db.collection_names()
- colls = collection_names
- collections = {}
- for coll in colls:
- if not db.get_collection(coll):
- db.create_collection(coll)
- collections.update({coll: db[coll]})
- aws_colls = collection_names
- aws_collections = {}
- for aws_coll in aws_colls:
- aws_collections.update({aws_coll: aws_db[aws_coll]})
- for i, _coll in enumerate(sorted(collection_names)):
- print(f'{i+1} - Collection_name: {_coll}')
- xcolls = args.colls
- kwargs_async = dict(param={'collection_name': xcolls},
- local_collection=collections,
- aws_collection=aws_collections,
- method_async=None)
- method = dict(method_async=show_stats)
- kwargs_async.update(method)
- run_async(kwargs_async=kwargs_async)
- for collection_name in xcolls:
- aws_collection = aws_collections[collection_name]
- collection = collections[collection_name]
- count = aws_collections[collection_name].find({}).count()
- batch = 100
- if count > batch:
- for b in range(0, count, batch):
- param = {
- 'items': (
- {
- 'query': {'_id': doc['_id']},
- 'update': {'$set': doc}
- }
- for doc in aws_collection.find({}).skip(b).limit(batch)
- )
- }
- kwargs_async = dict(param=param,
- collection_name=collection_name,
- local_collection=collections,
- method_async=upsert_collection_local)
- run_async(kwargs_async=kwargs_async, workers_max=args.workers)
- else:
- param = {
- 'items': (
- {
- 'query': {'_id': doc['_id']},
- 'update': {'$set': doc}
- }
- for doc in aws_collection.find({})
- )
- }
- kwargs_async = dict(param=param,
- collection_name=collection_name,
- local_collection=collections,
- method_async=upsert_collection_local)
- run_async(kwargs_async=kwargs_async, workers_max=args.workers)
- xcoll = xcolls[0]
- path = f'{args.csv}'
- xcsv = pd.read_csv(path,
- header=None,
- names=["idextracao_alpha",
- "numeroextracao_alpha",
- "comarcaextracao_alpha",
- "ufextracao_alpha",
- "termos_cliente",
- "data.digito",
- "estado",
- "data.origem",
- "numero",
- "is_valid",
- "data.tribunal",
- "data.ano",
- "data.justica",
- "data.sequencial",
- "justica",
- "uf_final"
- ],
- sep=";")
- print(
- f'Quantidade de termos_cliente no csv: '
- f'{xcsv.count()}'
- )
- find = {}
- criteria = {
- 'estado': 1,
- 'numero':1,
- 'termos_cliente':1,
- 'fonte':1,
- 'spider':1
- }
- df = pd.DataFrame(list(collections[xcoll].find(find, criteria)))
- print(df.count())
- find = {
- 'termos_cliente': {
- '$exists': False
- }
- }
- criteria = {
- 'estado': 1,
- 'numero':1,
- 'termos_cliente':1,
- 'fonte':1,
- 'spider':1
- }
- df_sem_termos_cliente = pd.DataFrame(
- list(collections[xcoll].find(find, criteria))
- )
- print(df_sem_termos_cliente.count())
- find = {
- 'partes': {
- '$exists': False
- }
- }
- criteria = {
- 'estado': 1,
- 'numero':1,
- 'termos_cliente':1,
- 'fonte':1,
- 'spider':1
- }
- df_sem_partes = collections[xcoll].find(find, criteria)
- check('partes')
- check('andamentos')
- find = {
- 'termos_cliente': {
- '$exists': True
- }
- }
- criteria = {
- 'estado': 1,
- 'numero':1,
- 'termos_cliente':1,
- 'fonte':1,
- 'spider':1
- }
- df_com_termos_cliente = pd.DataFrame(
- list(collections[xcoll].find(find, criteria))
- )
- found_termos_cliente = df_com_termos_cliente.termos_cliente[
- [
- any((check in s) for check in xcsv.termos_cliente)
- for s in df_com_termos_cliente.termos_cliente
- ]
- ]
- print(f'termos_cliente found: {found_termos_cliente.count()}')
- qtd_not_found_termos_cliente = (
- (
- df_com_termos_cliente.termos_cliente.count()
- + len(df_sem_termos_cliente)
- )
- - found_termos_cliente.count()
- )
- print(f'termos_cliente not found: {qtd_not_found_termos_cliente}')
- total = (
- df_com_termos_cliente.termos_cliente.count()
- + len(df_sem_termos_cliente)
- )
- print(f'Total: {total}')
- extraido_csv_cliente = xcsv.termos_cliente[
- (
- [
- any((check in stc) for stc in (s for s in found_termos_cliente))
- for check in xcsv.termos_cliente
- ]
- )
- ]
- print(
- f'Itens extraidos base cliente: '
- f'{extraido_csv_cliente.count()}'
- )
- extraido_csv_cliente.to_csv(
- f'./extracted-collection_name-{xcoll}.csv',
- index=False
- )
- nao_extraido_csv_cliente = xcsv.termos_cliente[
- ~(xcsv.termos_cliente.isin(extraido_csv_cliente))
- ]
- print(
- f'Itens não extraidos base cliente: '
- f'{nao_extraido_csv_cliente.count()}'
- )
- nao_extraido_csv_cliente.to_csv(
- f'./not-extracted-collection_name-{xcoll}.csv',
- index=False
- )
- total_termos_cliente_csv = (
- extraido_csv_cliente.count()
- + nao_extraido_csv_cliente.count()
- )
- print(f'Total: {total_termos_cliente_csv}')
- print(f'Porcentagem do Match: '
- f'{(extraido_csv_cliente.count()/total_termos_cliente_csv)*100:.2f}%')
Add Comment
Please, Sign In to add comment