Advertisement
Guest User

Untitled

a guest
Aug 5th, 2020
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.71 KB | None | 0 0
  1. """Bulk deletes messages from ES that match a certain search query.
  2. """
  3. import logging
  4. import json
  5.  
  6. from elasticsearch5 import Elasticsearch
  7. from elasticsearch5 import helpers
  8.  
  9. logging.basicConfig(level=logging.INFO)
  10.  
  11. ES_HOST         = 'xxx'
  12. ES_PORT         = 9200
  13. INDEX_PATTERN   = 'graylog*'
  14. REQUEST_TIMEOUT = 300
  15. BODY            = {
  16.     'size': 10000,
  17.     'query': {
  18.         'bool': {
  19.             'must': {
  20.                 'query_string': {
  21.                 'query': 'xxx'
  22.                 }
  23.             }
  24.         }
  25.     }
  26. }
  27.  
  28. def get_indices(es, pattern='*'):
  29.     """Returns all indices in es that match pattern.
  30.  
  31.    Args:
  32.        es (Elasticsearch): Elasticsearch object
  33.        pattern (str, optional): Pattern to match index names to. Defaults to '*'.
  34.  
  35.    Returns:
  36.        dict: Dictionary of indices.
  37.    """
  38.     return es.indices.get(pattern, request_timeout=REQUEST_TIMEOUT)
  39.  
  40. def get_documents(es, index, body):
  41.     """Returns result of search request in body
  42.  
  43.    Args:
  44.        es (Elasticsearch): Elasticsearch object
  45.        index (str): Index to search in
  46.        body (dict): ES search request
  47.  
  48.    Returns:
  49.        list: List of dict of results.
  50.    """
  51.     logging.info('Getting documents on index {}'.format(index))
  52.     output = es.search(body=body, index=index, request_timeout=REQUEST_TIMEOUT)
  53.     return output['hits']['hits']
  54.  
  55. def bulk_delete_documents_generator(documents):
  56.     """Generator that yields bulk api delete actions
  57.  
  58.    Args:
  59.        documents (list): ES document object
  60.  
  61.    Yields:
  62.        dict: ES bulk API action
  63.    """
  64.     for document in documents:
  65.         yield {
  66.             '_op_type': 'delete',
  67.             '_index': document['_index'],
  68.             '_id': document['_id'],
  69.             '_type': document['_type']
  70.         }
  71.  
  72. def delete_documents(es, index, documents):
  73.     """Bulk deletes documents from index
  74.  
  75.    Args:
  76.        es (Elasticsearch): Elasticserach object
  77.        index (str): Target index
  78.        documents (list): List of documents to delete.
  79.    """
  80.     logging.info('Deleting documents on index {}'.format(index))
  81.     index_settings = es.indices.get_settings(index=index, request_timeout=REQUEST_TIMEOUT)
  82.     if 'blocks' in index_settings[index]['settings']['index']:
  83.         if index_settings[index]['settings']['index']['blocks']['write']:
  84.             logging.info('Enabling index write')
  85.             es.indices.put_settings(body={'index': {'blocks': {'write': False}}}, index=index, request_timeout=REQUEST_TIMEOUT)
  86.     # we use bulk API to delete
  87.     logging.info('Sending bulk delete API request')
  88.     helpers.bulk(es, bulk_delete_documents_generator(documents), request_timeout=REQUEST_TIMEOUT)
  89.     if 'blocks' in index_settings[index]['settings']['index']:
  90.         if index_settings[index]['settings']['index']['blocks']['write']:
  91.             logging.info('Disabling index write')
  92.             es.indices.put_settings(body={ 'index': { 'blocks': { 'write': True }}}, index=index, request_timeout=REQUEST_TIMEOUT)
  93.  
  94. def main():
  95.     es = Elasticsearch([{
  96.         'host': ES_HOST,
  97.         'port': ES_PORT
  98.     }])
  99.     indices = get_indices(es, pattern=INDEX_PATTERN)
  100.     documents_by_index = {}
  101.     body = BODY
  102.     for index in indices.keys():
  103.         documents_by_index[index] = get_documents(es, index, body)
  104.         if documents_by_index[index]:
  105.             delete_documents(es, index, documents_by_index[index])
  106.     with open('search_results.json', 'a') as f:
  107.         f.write(json.dumps(documents_by_index, sort_keys=True, indent=2))
  108.     # with open('/home/haase/work/temp/elasticsearch_delete_documents/search_results.json', 'r') as f:
  109.     #     documents_by_index = json.loads(f.read())
  110.  
  111. if __name__ == "__main__":
  112.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement