Advertisement
darkor

class BatchGetItem

Sep 19th, 2023
477
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.87 KB | None | 0 0
  1. class BatchGetItem:
  2.     # TODO: finish the class (make it pretty)
  3.     def __init__(
  4.         self, keys: List[Dict], db_connection, table_name,
  5.         step: int = 100, max_attempt_qty: int = 3, base_backoff_ms: int = 50
  6.     ):
  7.         self.db_connection = db_connection
  8.         self.table_name = table_name
  9.  
  10.         self.keys = keys
  11.         self.keys_qty = len(keys)
  12.         self.result = []
  13.  
  14.         self.step = step
  15.         self.max_attempt_qty = max_attempt_qty
  16.         self.base_backoff_ms = base_backoff_ms
  17.  
  18.     def get(self):
  19.         current_attempt = 0
  20.         for first_limit in range(0, self.keys_qty, self.step):
  21.             second_limit = first_limit + self.step
  22.             self._batch_get_item(self.keys[first_limit: second_limit])
  23.  
  24.             last_item = second_limit >= self.keys_qty
  25.             last_attempt = current_attempt == self.max_attempt_qty
  26.             if last_attempt or last_item:
  27.                 if last_attempt and not last_item:
  28.                     logger.error(f"Too many items for 'batch_get_item' {self.keys_qty}. Processed {second_limit}.")
  29.                     break
  30.  
  31.             time_sleep = self.calculate_sleep(current_attempt)
  32.             current_attempt += 1
  33.             time.sleep(time_sleep)
  34.  
  35.     def _batch_get_item(self, bunch_keys):
  36.         try:
  37.             response = self.db_connection.batch_get_item(
  38.                 RequestItems={self.table_name: {"Keys": bunch_keys}}
  39.             )
  40.         except ClientError as e:
  41.             raise db_exceptions.DbSelectException(str(e))
  42.  
  43.         self.result.extend(response["Responses"][self.table_name])
  44.  
  45.     def calculate_sleep(self, current_attempt: int) -> float:
  46.         sleep_time_ms = self.base_backoff_ms * 2 ** current_attempt  # calculate exponential backoff
  47.         sleep_time_secs = sleep_time_ms / 1000  # convert milliseconds to seconds
  48.         return sleep_time_secs
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement