Advertisement
Guest User

Untitled

a guest
May 14th, 2019
125
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.08 KB | None | 0 0
  1. #!/usr/bin/env python
  2. # coding: utf-8
  3.  
  4.  
  5. import pymysql
  6. from pymysql.cursors import DictCursor
  7. from contextlib import closing
  8. from elasticsearch import Elasticsearch
  9. from elasticsearch import helpers
  10.  
  11.  
  12. def doc_generator(record, es_index):
  13.     return {
  14.         "_index": es_index,
  15.         "_source": record,
  16.     }
  17.  
  18.  
  19. def clean(record):
  20.     for key, value in record.items():
  21.         if not value:
  22.             del record[key]
  23.     return record
  24.  
  25.  
  26. def get_salon_ids():
  27.     with closing(pymysql.connect(host='127.0.0.1',
  28.                                  user='',
  29.                                  password='',
  30.                                  db='',
  31.                                  charset='utf8mb4',
  32.                                  cursorclass=DictCursor)):
  33.         with connection.cursor() as cursor:
  34.             query = 'SELECT distinct salon_id FROM clients'
  35.             cursor.execute(query)
  36.             yield cursor.fetchone()["salon_id"]
  37.  
  38.  
  39. def get_records_for_salon(salon_id):
  40.     with closing(pymysql.connect(host='127.0.0.1',
  41.                                  user='',
  42.                                  password='',
  43.                                  db='',
  44.                                  charset='utf8mb4',
  45.                                  cursorclass=DictCursor)):
  46.         with connection.cursor() as cursor:
  47.             query = "SELECT id, salon_id, phone, fullname, email, additional_phone, deleted FROM clients WHERE salon_id = " + str(
  48.                 salon_id)
  49.             cursor.execute(query)
  50.             yield cursor.fetchone()
  51.  
  52.  
  53. def buffer(size, gen):
  54.     chunk = []
  55.  
  56.     for item in gen:
  57.         if len(chunk) >= size:
  58.             yield chunk
  59.             chunk = [item]
  60.         else:
  61.             chunk.append(item)
  62.  
  63.     if chunk:
  64.         yield chunk
  65.  
  66.  
  67. if __name__ == "__main__":
  68.     docs = [doc_generator(clean(record), es_index) for salon_id in get_salon_ids()
  69.                                                 for record in get_records_for_salon(salon_id)]
  70.     buffered = buffer(500, docs)
  71.     for batch in buffered:
  72.         helpers.bulk(es, batch)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement