Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class DataAccess(metaclass=Singleton):
- def __init__(self):
- self.client = None
- self.db_client = None
- self.connect()
- @auto_retry_decorator
- def connect(self):
- """connect to db"""
- try:
- host = os.getenv(inventory_constants.DB_HOST)
- port = os.getenv(inventory_constants.DB_PORT)
- username = os.getenv(inventory_constants.DB_USER)
- password = os.getenv(inventory_constants.DB_PASSWORD)
- logging.info("Connecting to mongodb")
- uri = "mongodb" + "://"+username+":"+password+"@"+host+":"+port
- self.client = pymongo.MongoClient(uri, connect=False,
- appname="")
- self.db_client = self.client[os.getenv(inventory_constants.DB_NAME)]
- except ConnectionFailure as ex:
- logging.error(ex)
- raise DBConnectionFailure(ex)
- @auto_retry_decorator
- def create(self, collection_name, records):
- try:
- logging.info("Creating the collection in db.")
- collection = self.db_client[collection_name]
- is_list = isinstance(records, list)
- if is_list: # array, use insert_many
- logging.info("Performing insert many operation.")
- result = collection.insert_many(records)
- return True, result.inserted_ids
- logging.info("Performing insert one operation")
- result = collection.insert_one(records)
- return True, result.inserted_id
- except BaseException as ex:
- logging.exception(ex)
- logging.error("Error while performing insert operations.")
- raise DataBaseError(ex)
- @auto_retry_decorator
- def get(self, collection_name, filter_dictionary={}, **kwargs):
- try:
- is_dictionary = isinstance(filter_dictionary, dict)
- if not is_dictionary:
- logging.warning("DB Get operation: filter_dict is not a instance of dict")
- raise TypeError(message_consts.INVALID_TYPE_DEFAULT)
- count = 0
- records = []
- limit, skip = get_limit_skip(**kwargs)
- if kwargs.get("sort_by") and kwargs.get("order_by"):
- cursor = get_document_order_by(self, collection_name, filter_dictionary,
- limit=limit, skip=skip, **kwargs)
- elif limit:
- cursor = get_document_limit(self, collection_name, filter_dictionary,
- limit=limit, skip=skip, **kwargs)
- else:
- cursor = get_document(self, collection_name, filter_dictionary, **kwargs)
- for each_row in cursor:
- count = count + 1
- records.append(each_row)
- logging.info("DB GET operation: count %s of records. ", count)
- return count, records
- except BaseException as ex:
- logging.error("Error while performing db get operation")
- logging.exception(ex)
- raise DataBaseError(ex)
- Traceback (most recent call last):
Add Comment
Please, Sign In to add comment