Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # coding: utf-8
- import pymysql
- from pymysql.cursors import DictCursor
- from contextlib import closing
- from elasticsearch import Elasticsearch
- from elasticsearch import helpers
- def doc_generator(record, es_index):
- return {
- "_index": es_index,
- "_source": record,
- }
- def clean(record):
- for key, value in record.items():
- if not value:
- del record[key]
- return record
- def get_salon_ids():
- with closing(pymysql.connect(host='127.0.0.1',
- user='',
- password='',
- db='',
- charset='utf8mb4',
- cursorclass=DictCursor)):
- with connection.cursor() as cursor:
- query = 'SELECT distinct salon_id FROM clients'
- cursor.execute(query)
- yield cursor.fetchone()["salon_id"]
- def get_records_for_salon(salon_id):
- with closing(pymysql.connect(host='127.0.0.1',
- user='',
- password='',
- db='',
- charset='utf8mb4',
- cursorclass=DictCursor)):
- with connection.cursor() as cursor:
- query = "SELECT id, salon_id, phone, fullname, email, additional_phone, deleted FROM clients WHERE salon_id = " + str(
- salon_id)
- cursor.execute(query)
- yield cursor.fetchone()
- def buffer(size, gen):
- chunk = []
- for item in gen:
- if len(chunk) >= size:
- yield chunk
- chunk = [item]
- else:
- chunk.append(item)
- if chunk:
- yield chunk
- if __name__ == "__main__":
- docs = [doc_generator(clean(record), es_index) for salon_id in get_salon_ids()
- for record in get_records_for_salon(salon_id)]
- buffered = buffer(500, docs)
- for batch in buffered:
- helpers.bulk(es, batch)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement