Advertisement
Guest User

Deduplicate code for Elastic

a guest
Dec 17th, 2024
18
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.05 KB | Source Code | 0 0
  1. from elastic_stack.common import *
  2. from elastic_stack.tools.logger import *
  3.  
  4. import configparser
  5. from elasticsearch import Elasticsearch, helpers
  6. from pathlib import Path
  7.  
  8. import hashlib
  9.  
  10. if not Path(CONFIG_PATH).is_file():
  11. raise FileNotFoundError("missing config")
  12. config = configparser.ConfigParser()
  13. config.read(CONFIG_PATH)
  14.  
  15. es = Elasticsearch(
  16. cloud_id=config["ELASTIC"]["cloud_id"],
  17. basic_auth=(config["ELASTIC"]["user"], config["ELASTIC"]["password"]),
  18. )
  19.  
  20. dict_of_duplicate_docs = {}
  21.  
  22. keys_to_include_in_hash = [PRIMARY_KEY]
  23.  
  24.  
  25. def populate_dict_of_duplicate_docs(hits):
  26. """
  27. This function populates a dictionary with duplicate documents based on a hash value generated from
  28. selected keys in the document.
  29.  
  30. :param hits: It is a list of search results returned by Elasticsearch. Each item in the list
  31. represents a document that matches the search query
  32. """
  33. for item in hits:
  34. combined_key = ""
  35. for mykey in keys_to_include_in_hash:
  36. combined_key += str(item["_source"][mykey])
  37. _id = item["_id"]
  38. hashval = hashlib.md5(combined_key.encode("utf-8")).digest()
  39. dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
  40.  
  41.  
  42. def scroll_over_all_docs(index, start, end):
  43. """
  44. This function scrolls over all documents in an Elasticsearch index within a specified time range and
  45. populates a dictionary of duplicate documents.
  46.  
  47. :param index: The name of the Elasticsearch index to search over
  48. :param start: The start timestamp for the range query, indicating the earliest timestamp for the
  49. documents to be searched
  50. :param end: The end parameter is a timestamp indicating the end of the time range for which the
  51. documents are being searched
  52. """
  53. data = es.search(index=index, scroll="1m", query={"range": {"@timestamp": {"gte": start, "lte": end}}})
  54. sid = data["_scroll_id"]
  55. scroll_size = len(data["hits"]["hits"])
  56. populate_dict_of_duplicate_docs(data["hits"]["hits"])
  57. while scroll_size > 0:
  58. data = es.scroll(scroll_id=sid, scroll="2m")
  59. populate_dict_of_duplicate_docs(data["hits"]["hits"])
  60. sid = data["_scroll_id"]
  61. scroll_size = len(data["hits"]["hits"])
  62.  
  63.  
  64. def loop_over_hashed_and_remove_duplicates(index):
  65. """
  66. This function loops over a dictionary of duplicate documents, removes duplicates, and deletes the
  67. remaining documents from Elasticsearch.
  68.  
  69. :param index: The name of the Elasticsearch index where the documents are stored
  70. :return: the result of the bulk operation performed on Elasticsearch to delete duplicate documents
  71. in the specified index.
  72. """
  73. list_actions = []
  74. res = None
  75. for _, array_of_ids in dict_of_duplicate_docs.items():
  76. if len(array_of_ids) > 1:
  77. list_actions.append(array_of_ids[0])
  78. actions = ({"_op_type": "delete", "_index": index, "_id": item} for item in list_actions)
  79. res = helpers.bulk(es, index=index, actions=actions)
  80. return res
  81.  
  82.  
  83. def clean_duplicate(index_name=None, start=None, end=None):
  84. """
  85. This function removes duplicate documents from a specified Elasticsearch index within a given time
  86. range.
  87.  
  88. :param index_name: The name of the Elasticsearch index to be cleaned of duplicates
  89. :param start: The starting point of the data to be retrieved from the specified index
  90. :param end: The end parameter is the end date or time for the data range that needs to be cleaned
  91. for duplicates
  92. """
  93. if not index_name or not start or not end:
  94. import sys
  95.  
  96. index_name = sys.argv[1]
  97. start = sys.argv[2]
  98. end = sys.argv[3]
  99. logger.info(f"Getting data from {index_name} between {start} -> {end}")
  100. scroll_over_all_docs(index_name, start, end)
  101. resp = loop_over_hashed_and_remove_duplicates(index_name)
  102. logger.info(f"Deleted {resp[0]} docs OK")
  103.  
  104.  
  105. if __name__ == "__main__":
  106. from sys import argv
  107.  
  108. index_name = argv[1]
  109. start = argv[2]
  110. end = argv[3]
  111. clean_duplicate(index_name, start, end)
  112.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement