Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import sqlite3
- import json
- import unicodedata
- from elasticsearch import Elasticsearch
- conn = sqlite3.connect('SK_Prison_and_Court_Guard.sqlite')
- conn.row_factory = sqlite3.Row
- curs = conn.cursor()
- cursor = conn.execute('select * from data;')
- last_sqlite_id = len(cursor.fetchall())
- print "Last SQLite ID from DB: ", last_sqlite_id
- es = Elasticsearch(host='elk.myserver.sk', port=9200, http_auth=('auth_user', 'auth_pass'),)
- response = es.search(
- index="testpy",
- body={
- "query": {
- "bool": {
- "must": [{"match": {"_type": "testpy"}}]
- }
- },
- "aggs": {
- "max_price": {"max": {"field": "invoice_id"}}},
- "sort": {"invoice_id": {"order": "desc"}},
- "size": 1
- }
- )
- for hit in response['hits']['hits']:
- last_elastic_id = (hit['_source']['invoice_id'])
- print "Last Elasticsearch ID is: ", (last_elastic_id)
- # Temp WorkAround if isn't nothing in index
- # If is empty use counter with number 1
- # last_sqlite_id = 999999
- counter = last_elastic_id + 1
- while counter < last_sqlite_id:
- try:
- d = str(counter)
- curs.execute("SELECT * FROM data WHERE invoice_id=" + d + "")
- recs = curs.fetchall()
- rows = [dict(rec) for rec in recs]
- rows_json = json.dumps(rows)
- chunk = unicodedata.normalize('NFKD', unicode(rows_json, 'utf-8', 'ignore')).encode('ASCII', 'ignore').replace(
- '[', '').replace(']', '').replace('"null"', 'null').replace('999999.99', '0.0')
- print "Next try to push Document to ELK with ID: ", (counter)
- res = es.index(index="testpy", doc_type='testpy', body=chunk)
- print "Result - Created : ", (res['created']), "OK\n"
- counter += 1
- except:
- counter += 1
- print ("Something is wrong with ELK or Data in Table")
- print ("Finish")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement