Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Database Processing Script
- This script:
- a - can drop a database
- b - can build a database
- c - can call plugins, mainly for the purpose of inserting data into database.
- Usage:
- # Drop the dev database
- python dbloader.py --drop-db DEV
- # Create and seed the dev database
- python dbloader.py --build DEV
- # Run the sample plugin
- python dbloader.py --plugin=sample DEV
- Staging Environments:
- Look in appconfig.py to see them:
- --------------------------------
- * LOCAL
- * DEV
- * STAGING
- * QA
- * PROD
- """
- import logging
- import random
- import uuid
- from datetime import datetime, timedelta
- from plumbum import cli, local
- from reahl.component.config import StoredConfiguration
- from reahl.component.context import ExecutionContext
- from reahl.domain.systemaccountmodel import EmailAndPasswordSystemAccount
- from reahl.sqlalchemysupport import Session, metadata
- import datatablebootstrap
- from appconfig import App
- from datatablebootstrap import Ability, Check, UserAbility
- APP = App()
- LOGGER = logging.getLogger(__name__)
- ability_labels = 'Inactive SearchChecks EditChecks CreateUser'.split()
- def new_account(email='admin@site.org'):
- password = 'topsecret'
- account = EmailAndPasswordSystemAccount(email=email)
- Session.add(account)
- account.set_new_password(account.email, password)
- account.activate()
- return account
- def new_check(bank_account_id, checkno):
- day = random.randint(1, 25)
- month = 11
- year = 2018
- issue_date = datetime(year, month, day)
- cleared_date = issue_date + timedelta(days=random.randint(5, 20))
- tid = str(random.randint(1545, 678678))
- def random_check_amount():
- return random.randint(5000, 200000)
- if random.randint(0, 100) < 50:
- amount = amount_cleared = random_check_amount()
- else:
- amount = random_check_amount()
- amount_cleared = random_check_amount()
- _ = Check(bank_account_id=bank_account_id, check_number=checkno,
- amount=amount, amount_cleared=amount_cleared,
- date_issued=issue_date, date_cleared=cleared_date,
- transaction_id=tid, status_id=random.randint(0, 9),
- issue_type_id=random.randint(0, 1),
- )
- Session.add(_)
- return _
- def dummy_data():
- ability = dict()
- for _ in ability_labels:
- ability[_] = datatablebootstrap.Ability(label=_)
- LOGGER.warning(f"Ability dict={ability}")
- account_ability = dict()
- # inactive
- # viewer
- # editor
- # security
- # admin
- account_ability['admin'] = 'EditChecks CreateUser'.split()
- account_ability['security'] = ['CreateUser']
- account_ability['editor'] = ['EditChecks']
- account_ability['viewer'] = ['SearchChecks']
- account_ability['batchuser'] = ['Inactive']
- for account, abilities in account_ability.items():
- acct = new_account(email=f"{account}@ofiglobal.com")
- for ability in abilities:
- _ability = Session.query(Ability).filter_by(label=ability).one()
- Session.add(UserAbility(account_id=acct.id, ability_id=_ability.id))
- import random
- amount = 10
- check_numbers = random.sample(range(10, 50), amount)
- for check_number in check_numbers:
- bank_account = random_table_row(datatablebootstrap.BankAccount)
- new_check(bank_account.id, check_number)
- def random_table_row(t):
- import random
- rand = random.randrange(0, Session.query(t).count())
- row = Session.query(t)[rand]
- return row
- def id_label_table_to_dict(t, label_column_name='label'):
- _ = dict()
- for row in Session.query(t).all():
- i = getattr(row, label_column_name)
- _[i] = row.id
- return _
- def populate(table, values, label_column_name='label'):
- for i, value in enumerate(values.splitlines()):
- value = value.strip()
- row_object = table(id=i)
- setattr(row_object, label_column_name, value)
- Session.add(row_object)
- Session.commit()
- def populate_check_issue_types():
- data = """System-Generated
- Manual
- Paid
- """
- populate(datatablebootstrap.CheckIssueType, data, label_column_name='issue_type')
- def populate_plan_types():
- data = """529
- M52
- OP
- N9
- I29
- T29
- """
- populate(datatablebootstrap.CheckPlan, data, label_column_name='plan_type')
- def populate_bank_accounts(hide_account_number=False):
- data = """PC529
- NM529
- """
- plan_id = id_label_table_to_dict(datatablebootstrap.CheckPlan, label_column_name='plan_type')
- data = """NM 111 SHRHOLDR
- NM 111 SHRHOLDR
- NM 111 12B1
- NM 111 COMSN
- PC 111 SHRHOLDR
- PC 111 SHRHOLDR
- """
- for i, line in enumerate(data.splitlines()):
- LOGGER.warning(f"Processing {line}")
- plan_label, account_number, account_type = line.split()
- plan_label += '529'
- account_number = account_number
- if hide_account_number:
- account_number = account_number[0:len(account_number)-1]
- Session.add(datatablebootstrap.BankAccount(
- plan_id=plan_id[plan_label],
- account_number=account_number,
- account_type=account_type))
- Session.commit()
- def populate_check_statuses():
- data = """OUTSTANDING
- GIRL YOU KNOCK ME OUT
- """
- populate(datatablebootstrap.CheckStatus, data, label_column_name='status')
- def populate_abilities():
- description = {
- 'Inactive': "not enabled for any actions.",
- 'SearchChecks': "are able to search for and view checks.",
- 'EditChecks': "are able to search for and edit checks.",
- 'CreateUser': "are able to create new users and set permission levels."
- }
- for ability in ability_labels:
- Session.add(datatablebootstrap.Ability(label=ability, description=description[ability]))
- class MyApp(cli.Application):
- # PROGNAME = "db-setup"
- VERSION = "7.3"
- plugin = cli.SwitchAttr(
- "--plugin", str, default=None,
- help="load a plugin file from db/plugin/ to execute against a built database. Ex: python dbloader.py --plugin=sample"
- )
- build = cli.Flag('build', help='Create the database and populate with seed data.')
- drop_db = cli.Flag('drop-db', help="Drop the database. BE CAREFUL!")
- create_db = cli.Flag('create-db', help="Create the database.")
- list_stages = cli.Flag('list-stages', help="Show the stages.")
- create_tables = cli.Flag('create-tables', help="Create the database tables.")
- build_lookups = cli.Flag('build-lookups', help="Populate the lookup tables.")
- dummy_data = cli.Flag('dummy-data', help="Add dummy data.")
- def _drop_db(self):
- from sqlalchemy_utils.functions import drop_database
- drop_database(self.db_uri)
- def _create_db(self):
- from sqlalchemy_utils.functions import create_database
- create_database(self.db_uri)
- def _list_stages(self):
- for name, member in APP.stages:
- LOGGER.warning(f"{name, member}")
- def _create_tables(self):
- reahl = local["reahl"]
- reahl["createdbtables", APP.stage.path]()
- def _build_lookups(self, sensitive=False):
- LOGGER.warning("Building lookups.")
- populate_check_issue_types()
- populate_check_statuses()
- populate_abilities()
- populate_plan_types()
- populate_bank_accounts(sensitive)
- def main(self, stage_str):
- # Boilerplate to get SQLAlchemy URL and Reahl convenience to SQLAlchemy
- APP.set_stage(stage_str)
- print("I will now read {0}".format(APP.stage))
- ExecutionContext().install()
- config = StoredConfiguration(APP.stage.path)
- config.configure()
- self.db_uri = config.reahlsystem.connection_uri
- print("DB URI = {}".format(self.db_uri))
- metadata.bind = self.db_uri
- # End Boilerplate
- APP.protect_production()
- if self.list_stages:
- self._list_stages()
- if self.drop_db:
- self._drop_db()
- if self.create_db or self.build:
- self._create_db()
- if self.create_tables or self.build:
- self._create_tables()
- if self.build_lookups or self.build:
- self._build_lookups(sensitive=APP.sensitive)
- Session.commit()
- #LOGGER.warning("Inspect database and then hit return.")
- #input("Inspect database and then hit return.")
- if self.dummy_data or self.build:
- dummy_data()
- if self.plugin:
- import importlib
- module = importlib.import_module(f"db.plugin.{self.plugin}", '.')
- plugin = module.Plugin(APP)
- return_code = plugin.run()
- if not return_code:
- Session.commit()
- return 0
- else:
- LOGGER.warning(f"Plugin {self.plugin} failed with code {return_code}.")
- return return_code
- Session.commit()
- if __name__ == "__main__":
- MyApp.run()
Add Comment
Please, Sign In to add comment