Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from elastic_stack.common import *
- from elastic_stack.tools.logger import *
- import configparser
- from elasticsearch import Elasticsearch, helpers
- from pathlib import Path
- import hashlib
- if not Path(CONFIG_PATH).is_file():
- raise FileNotFoundError("missing config")
- config = configparser.ConfigParser()
- config.read(CONFIG_PATH)
- es = Elasticsearch(
- cloud_id=config["ELASTIC"]["cloud_id"],
- basic_auth=(config["ELASTIC"]["user"], config["ELASTIC"]["password"]),
- )
- dict_of_duplicate_docs = {}
- keys_to_include_in_hash = [PRIMARY_KEY]
- def populate_dict_of_duplicate_docs(hits):
- """
- This function populates a dictionary with duplicate documents based on a hash value generated from
- selected keys in the document.
- :param hits: It is a list of search results returned by Elasticsearch. Each item in the list
- represents a document that matches the search query
- """
- for item in hits:
- combined_key = ""
- for mykey in keys_to_include_in_hash:
- combined_key += str(item["_source"][mykey])
- _id = item["_id"]
- hashval = hashlib.md5(combined_key.encode("utf-8")).digest()
- dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
- def scroll_over_all_docs(index, start, end):
- """
- This function scrolls over all documents in an Elasticsearch index within a specified time range and
- populates a dictionary of duplicate documents.
- :param index: The name of the Elasticsearch index to search over
- :param start: The start timestamp for the range query, indicating the earliest timestamp for the
- documents to be searched
- :param end: The end parameter is a timestamp indicating the end of the time range for which the
- documents are being searched
- """
- data = es.search(index=index, scroll="1m", query={"range": {"@timestamp": {"gte": start, "lte": end}}})
- sid = data["_scroll_id"]
- scroll_size = len(data["hits"]["hits"])
- populate_dict_of_duplicate_docs(data["hits"]["hits"])
- while scroll_size > 0:
- data = es.scroll(scroll_id=sid, scroll="2m")
- populate_dict_of_duplicate_docs(data["hits"]["hits"])
- sid = data["_scroll_id"]
- scroll_size = len(data["hits"]["hits"])
- def loop_over_hashed_and_remove_duplicates(index):
- """
- This function loops over a dictionary of duplicate documents, removes duplicates, and deletes the
- remaining documents from Elasticsearch.
- :param index: The name of the Elasticsearch index where the documents are stored
- :return: the result of the bulk operation performed on Elasticsearch to delete duplicate documents
- in the specified index.
- """
- list_actions = []
- res = None
- for _, array_of_ids in dict_of_duplicate_docs.items():
- if len(array_of_ids) > 1:
- list_actions.append(array_of_ids[0])
- actions = ({"_op_type": "delete", "_index": index, "_id": item} for item in list_actions)
- res = helpers.bulk(es, index=index, actions=actions)
- return res
- def clean_duplicate(index_name=None, start=None, end=None):
- """
- This function removes duplicate documents from a specified Elasticsearch index within a given time
- range.
- :param index_name: The name of the Elasticsearch index to be cleaned of duplicates
- :param start: The starting point of the data to be retrieved from the specified index
- :param end: The end parameter is the end date or time for the data range that needs to be cleaned
- for duplicates
- """
- if not index_name or not start or not end:
- import sys
- index_name = sys.argv[1]
- start = sys.argv[2]
- end = sys.argv[3]
- logger.info(f"Getting data from {index_name} between {start} -> {end}")
- scroll_over_all_docs(index_name, start, end)
- resp = loop_over_hashed_and_remove_duplicates(index_name)
- logger.info(f"Deleted {resp[0]} docs OK")
- if __name__ == "__main__":
- from sys import argv
- index_name = argv[1]
- start = argv[2]
- end = argv[3]
- clean_duplicate(index_name, start, end)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement