Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class BatchGetItem:
- # TODO: finish the class (make it pretty)
- def __init__(
- self, keys: List[Dict], db_connection, table_name,
- step: int = 100, max_attempt_qty: int = 3, base_backoff_ms: int = 50
- ):
- self.db_connection = db_connection
- self.table_name = table_name
- self.keys = keys
- self.keys_qty = len(keys)
- self.result = []
- self.step = step
- self.max_attempt_qty = max_attempt_qty
- self.base_backoff_ms = base_backoff_ms
- def get(self):
- current_attempt = 0
- for first_limit in range(0, self.keys_qty, self.step):
- second_limit = first_limit + self.step
- self._batch_get_item(self.keys[first_limit: second_limit])
- last_item = second_limit >= self.keys_qty
- last_attempt = current_attempt == self.max_attempt_qty
- if last_attempt or last_item:
- if last_attempt and not last_item:
- logger.error(f"Too many items for 'batch_get_item' {self.keys_qty}. Processed {second_limit}.")
- break
- time_sleep = self.calculate_sleep(current_attempt)
- current_attempt += 1
- time.sleep(time_sleep)
- def _batch_get_item(self, bunch_keys):
- try:
- response = self.db_connection.batch_get_item(
- RequestItems={self.table_name: {"Keys": bunch_keys}}
- )
- except ClientError as e:
- raise db_exceptions.DbSelectException(str(e))
- self.result.extend(response["Responses"][self.table_name])
- def calculate_sleep(self, current_attempt: int) -> float:
- sleep_time_ms = self.base_backoff_ms * 2 ** current_attempt # calculate exponential backoff
- sleep_time_secs = sleep_time_ms / 1000 # convert milliseconds to seconds
- return sleep_time_secs
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement