Advertisement
Mochinov

Untitled

Mar 25th, 2024
27
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.47 KB | None | 0 0
  1. import json
  2. import redis
  3.  
  4. from clickhouse_driver.dbapi.extras import DictCursor
  5.  
  6. from config.settings import REDIS_HOST, REDIS_PORT
  7. from forecasts.clickhouse.db import get_native_clickhouse_connection
  8. from tetris_optimizer.api.constants import TABLES_DESCRIPTION_FIELDS
  9.  
  10. import json
  11.  
  12. class MasterDataCache:
  13.  
  14. def __init__(self, db=9):
  15. """
  16. Initialize the MasterDataCache instance.
  17. """
  18. self.host = REDIS_HOST
  19. self.port = REDIS_PORT
  20. self.db = db
  21. self.redis_client = None
  22.  
  23. def __enter__(self):
  24. """
  25. Context manager entry point for safely opening Redis connection.
  26. """
  27. try:
  28. self.redis_client = redis.Redis(host=self.host, port=self.port, db=self.db)
  29. except redis.exceptions.RedisError as e:
  30. print(f"Error connecting to Redis: {e}")
  31. raise
  32.  
  33. return self
  34.  
  35. def __exit__(self, exc_type, exc_val, exc_tb):
  36. """
  37. Context manager exit point for safely closing Redis connection.
  38. """
  39. if self.redis_client:
  40. self.redis_client.close()
  41.  
  42. def serialize_data(self, data):
  43. """
  44. Serialize data into a string format.
  45.  
  46. Args:
  47. data (dict): The data to be serialized.
  48.  
  49. Returns:
  50. str: The serialized data.
  51. """
  52. serialized_data = json.dumps(data)
  53. return serialized_data
  54.  
  55. def deserialize_data(self, serialized_data):
  56. """
  57. Deserialize data from a string format.
  58.  
  59. Args:
  60. serialized_data (str): The serialized data.
  61.  
  62. Returns:
  63. dict: The deserialized data.
  64. """
  65. data = json.loads(serialized_data)
  66. return data
  67.  
  68. def get_master_data_from_source(self, tetris_scenario_id):
  69. """
  70. Retrieve master data for a given scenario ID from the cache.
  71.  
  72. Args:
  73. tetris_scenario_id (int): The ID of the scenario.
  74.  
  75. Returns:
  76. dict: The master data for the given scenario ID.
  77. """
  78. if not self.redis_client:
  79. raise ValueError("Redis client is not initialized.")
  80.  
  81. redis_key = f"description_scenario:{tetris_scenario_id}"
  82. cached_data = self.redis_client.get(redis_key)
  83.  
  84. if cached_data:
  85. return self.deserialize_data(cached_data)
  86. else:
  87. master_data = self.load_master_data_from_source(tetris_scenario_id)
  88. serialized_data = self.serialize_data(master_data)
  89. self.redis_client.set(redis_key, serialized_data)
  90.  
  91. return master_data
  92.  
  93. def load_master_data_from_source(self, tetris_scenario_id):
  94. """
  95. Load master data for a given scenario ID from the data source.
  96.  
  97. Args:
  98. tetris_scenario_id (int): The ID of the scenario.
  99.  
  100. Returns:
  101. dict: The master data for the given scenario ID.
  102. """
  103.  
  104. data = {}
  105. with get_native_clickhouse_connection() as conn:
  106. with conn.cursor(cursor_factory=DictCursor) as cursor:
  107. for table, fields in TABLES_DESCRIPTION_FIELDS.items():
  108. id_field, desc_field = list(fields.items())[0]
  109. query = (
  110. f'SELECT {id_field}, {desc_field} '
  111. f'FROM {table} '
  112. f'WHERE tetris_scenario_id == {tetris_scenario_id} '
  113. )
  114. cursor.execute(query)
  115. results = cursor.fetchall()
  116.  
  117. table_dict = {}
  118.  
  119. if results:
  120. for row in results:
  121. id_value = row[id_field]
  122. desc_value = row[desc_field]
  123. table_dict[str(id_value)] = desc_value
  124. else:
  125. table_dict = None
  126.  
  127. data[table] = table_dict
  128.  
  129. return data
  130.  
  131. def update_master_data(self, tetris_scenario_id):
  132. """
  133. Update master data for a given scenario ID.
  134.  
  135. Args:
  136. tetris_scenario_id (int): The ID of the scenario.
  137. """
  138. if not self.redis_client:
  139. raise ValueError("Redis client is not initialized.")
  140.  
  141. redis_key = f"description_scenario:{tetris_scenario_id}"
  142.  
  143. new_data = {}
  144. with get_native_clickhouse_connection() as conn:
  145. with conn.cursor(cursor_factory=DictCursor) as cursor:
  146. for table, fields in TABLES_DESCRIPTION_FIELDS.items():
  147. id_field, desc_field = list(fields.items())[0]
  148. query = (
  149. f'SELECT {id_field}, {desc_field} '
  150. f'FROM {table} '
  151. f'WHERE tetris_scenario_id == {tetris_scenario_id} '
  152. )
  153. cursor.execute(query)
  154. results = cursor.fetchall()
  155.  
  156. table_dict = {}
  157.  
  158. if results:
  159. for row in results:
  160. id_value = row[id_field]
  161. desc_value = row[desc_field]
  162. table_dict[str(id_value)] = desc_value
  163. else:
  164. table_dict = None
  165.  
  166. new_data[table] = table_dict
  167.  
  168. serialized_data = self.serialize_data(new_data)
  169. self.redis_client.set(redis_key, serialized_data)
  170.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement