Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Bulk deletes messages from ES that match a certain search query.
- """
- import logging
- import json
- from elasticsearch5 import Elasticsearch
- from elasticsearch5 import helpers
- logging.basicConfig(level=logging.INFO)
- ES_HOST = 'xxx'
- ES_PORT = 9200
- INDEX_PATTERN = 'graylog*'
- REQUEST_TIMEOUT = 300
- BODY = {
- 'size': 10000,
- 'query': {
- 'bool': {
- 'must': {
- 'query_string': {
- 'query': 'xxx'
- }
- }
- }
- }
- }
- def get_indices(es, pattern='*'):
- """Returns all indices in es that match pattern.
- Args:
- es (Elasticsearch): Elasticsearch object
- pattern (str, optional): Pattern to match index names to. Defaults to '*'.
- Returns:
- dict: Dictionary of indices.
- """
- return es.indices.get(pattern, request_timeout=REQUEST_TIMEOUT)
- def get_documents(es, index, body):
- """Returns result of search request in body
- Args:
- es (Elasticsearch): Elasticsearch object
- index (str): Index to search in
- body (dict): ES search request
- Returns:
- list: List of dict of results.
- """
- logging.info('Getting documents on index {}'.format(index))
- output = es.search(body=body, index=index, request_timeout=REQUEST_TIMEOUT)
- return output['hits']['hits']
- def bulk_delete_documents_generator(documents):
- """Generator that yields bulk api delete actions
- Args:
- documents (list): ES document object
- Yields:
- dict: ES bulk API action
- """
- for document in documents:
- yield {
- '_op_type': 'delete',
- '_index': document['_index'],
- '_id': document['_id'],
- '_type': document['_type']
- }
- def delete_documents(es, index, documents):
- """Bulk deletes documents from index
- Args:
- es (Elasticsearch): Elasticserach object
- index (str): Target index
- documents (list): List of documents to delete.
- """
- logging.info('Deleting documents on index {}'.format(index))
- index_settings = es.indices.get_settings(index=index, request_timeout=REQUEST_TIMEOUT)
- if 'blocks' in index_settings[index]['settings']['index']:
- if index_settings[index]['settings']['index']['blocks']['write']:
- logging.info('Enabling index write')
- es.indices.put_settings(body={'index': {'blocks': {'write': False}}}, index=index, request_timeout=REQUEST_TIMEOUT)
- # we use bulk API to delete
- logging.info('Sending bulk delete API request')
- helpers.bulk(es, bulk_delete_documents_generator(documents), request_timeout=REQUEST_TIMEOUT)
- if 'blocks' in index_settings[index]['settings']['index']:
- if index_settings[index]['settings']['index']['blocks']['write']:
- logging.info('Disabling index write')
- es.indices.put_settings(body={ 'index': { 'blocks': { 'write': True }}}, index=index, request_timeout=REQUEST_TIMEOUT)
- def main():
- es = Elasticsearch([{
- 'host': ES_HOST,
- 'port': ES_PORT
- }])
- indices = get_indices(es, pattern=INDEX_PATTERN)
- documents_by_index = {}
- body = BODY
- for index in indices.keys():
- documents_by_index[index] = get_documents(es, index, body)
- if documents_by_index[index]:
- delete_documents(es, index, documents_by_index[index])
- with open('search_results.json', 'a') as f:
- f.write(json.dumps(documents_by_index, sort_keys=True, indent=2))
- # with open('/home/haase/work/temp/elasticsearch_delete_documents/search_results.json', 'r') as f:
- # documents_by_index = json.loads(f.read())
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement