Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3.6
- # -*- coding: utf-8 -*-
- """
- Payments processor: money entrypoint. Interactive CLI application.
- """
- import sys
- import re
- import datetime
- import os
- import traceback
- from typing import Callable, Any, Union
- DIR = os.path.dirname(os.path.abspath(__file__))
- TOP_DIR = os.path.dirname(os.path.dirname(DIR))
- sys.path.insert(0, TOP_DIR)
- from payments import get_core, processor_func, Request, Context, Income, exception_notify
- from payments.core.errors import ProgrammingError
- from payments.models.Acct import PaymentType, ServiceType
- from payments.models.User import User
- from payments.protocols.RinetAPI2 import RinetAPI, RinetAPIException
- out_charset = os.environ.get("MONEY_CHARSET", "koi8-r")
- error_message = "неверный ввод"
- L_FORMAT = "%-16s%.02f %s %s %04d%02d%02d%02d%02d %-14.02f %.04f %s %s"
- validation_errors = {
- "Internet-cards are forbidden": "Интернет-карты запрещены",
- "Uralsib is forbidden": "Банк Уралсиб запрещен",
- "Outdated service type": "Устаревший тип сервиса",
- "Unsupported payment type": "Тип платежа не поддерживается",
- "Unsupported service type": "Тип сервиса не поддерживается",
- }
- PTYPES_BILL = ('O',) # Ptypes with required bill
- core = get_core()
- acct_sess = core.db.session("acct")
- """ >>>>>>> ARGV PARSE <<<<<<< """
- # Required args
- user = None
- if len(sys.argv) < 2 or len(sys.argv[1]) < 9 or sys.argv[1][:7] != "--user=":
- print("First arg must be --user=user!")
- sys.exit(1)
- user = sys.argv[1][7:]
- curr_argv = 2
- # Arg functions
- def has_argv():
- global curr_argv
- return curr_argv < len(sys.argv)
- def get_argv():
- global curr_argv
- val = sys.argv[curr_argv]
- curr_argv += 1
- return val
- def rest_argv():
- global curr_argv
- val = " ".join(sys.argv[curr_argv:])
- curr_argv = len(sys.argv)
- return val
- # Optional args
- if has_argv() and sys.argv[curr_argv] == "--verbose":
- verbosearg = True
- curr_argv += 1
- elif has_argv() and sys.argv[curr_argv] == "--noverbose":
- verbosearg = False
- curr_argv += 1
- else:
- verbosearg = core.DEVEL
- """ >>>>>>> IO HELPERS <<<<<<< """
- def c(s: str):
- """ Convert charset """
- global out_charset
- return s.encode(out_charset)
- def cprint(s: str, nonl=False, file=sys.stdout):
- """ Print with convertation """
- try:
- file.buffer.write(c(str(s)))
- if not nonl:
- file.buffer.write(c("\n"))
- file.flush()
- except BrokenPipeError:
- try:
- print("Broken pipe", file=sys.stderr)
- except Exception:
- pass
- sys.exit(255)
- def read_val(message: str, converter: Callable = str, argv: bool=True) -> Any:
- """
- Read input value with given type-converter (or get from argument)
- """
- global error_message, out_charset
- from_argv = False
- while True:
- if argv and has_argv():
- from_argv = True
- raw = get_argv()
- else:
- cprint(message, nonl=True)
- try:
- raw = sys.stdin.buffer.readline()
- raw = raw.decode(out_charset)
- raw = raw[:-1]
- except (KeyboardInterrupt, EOFError):
- cprint("Exit")
- sys.exit(0)
- try:
- data = converter(raw)
- return data
- except ValueError as err:
- if from_argv:
- raise
- else:
- cprint("%s (%s)" % (error_message, err))
- def list_select(header: str, message: str, options: list, argv: bool=True):
- """
- Let user select option from list of options (or select by argument)
- Each option should be formed as [value, ...aliases0-N, comment]
- If options have only one element and last column of this element is
- dict {"auto": "auto"} - autoselect this option
- """
- if len(options) == 1 and isinstance(options[0][-1], dict) \
- and options[0][-1].get("auto") == "auto":
- cprint("%s%s" % (message, options[0][-2]))
- return options[0][0]
- val = None
- from_argv = False
- if argv and has_argv():
- val = get_argv().lower()
- from_argv = True
- else:
- cprint(header)
- index = {}
- for option in options:
- result = option[0]
- comment = option[-1]
- aliases = option[1:-1]
- for alias in aliases:
- index[alias.lower()] = result
- aliases_str = ". ".join(aliases)
- if not from_argv:
- cprint("%s: %s" % (aliases_str, comment))
- while True:
- if not from_argv:
- val = read_val(message, str, argv).lower()
- if val in index:
- return index[val]
- else:
- if from_argv:
- raise ValueError("Value %s not found" % val)
- else:
- cprint(error_message)
- def confirm(message: str, default: bool = None):
- """ Ask for user's confirmation """
- choices = None
- if default is not None:
- if default:
- choices = "[Y/n]"
- else:
- choices = "[y/N]"
- else:
- choices = "[y/n]"
- message = "%s %s " % (message, choices)
- while True:
- val = read_val(message)
- if val:
- if val == "y" or val == "Y":
- return True
- elif val == "n" or val == "N":
- return False
- else:
- if default is not None:
- return default
- def request_error(request):
- """ print request error """
- if request.reason:
- if request.reason in validation_errors:
- reason = validation_errors[request.reason]
- else:
- lidx = request.reason.rfind(":")
- if lidx > -1:
- lpart = request.reason[:lidx]
- else:
- lpart = None
- if lpart is not None and lpart in validation_errors:
- reason = validation_errors[lpart] + request.reason[lidx:]
- else:
- reason = request.reason
- else:
- reason = "неизвестная причина"
- cprint("Платеж запрещен: %s" % reason)
- sys.exit(2)
- """ >>>>>>> TYPE HELPERS <<<<<<< """
- RE_NUM = re.compile(r'^\d+$')
- def word_in(val):
- """ String with no spaces type helper """
- val = str(val)
- val = val.strip()
- if " " in val:
- raise ValueError("Пробелы запрещены")
- return val
- def inn_in(val):
- """ INN """
- if len(val) != 10 and len(val) != 12:
- raise ValueError("ИНН должен состоять из 10 или 12 цифр")
- val = str(val)
- if not RE_NUM.match(val):
- raise ValueError("ИНН должен состоять из 10 или 12 цифр")
- return val
- def billnum_in(val):
- """ Bill number """
- if len(val) != 8:
- raise ValueError("Номер счёта должен состоять из 8 цифр")
- val = str(val)
- if not RE_NUM.match(val):
- raise ValueError("Номер счёта должен состоять из 8 цифр")
- return val
- RE_DATE = re.compile(r"^(\d{4})(\d{2})(\d{2})$")
- def date_in(empty_for_today: bool):
- """ YYYYMMDD date type helper with auto-value of today option """
- def inner(val):
- val = str(val)
- val = val.strip()
- if val in ("", "-") and empty_for_today:
- return datetime.datetime.now()
- matches = RE_DATE.match(val)
- if not matches:
- raise ValueError("Wrong date format")
- return datetime.datetime(
- year=int(matches.group(1)),
- month=int(matches.group(2)),
- day=int(matches.group(3)),
- hour=0,
- minute=0,
- second=0
- )
- return inner
- """ >>>>>>> OPTIONS HELPERS <<<<<<< """
- def dict_options(cls) -> list:
- """ Get dict options list """
- global acct_sess
- query = acct_sess.query(cls).order_by(cls.id)
- result = []
- for obj in query:
- result.append([obj.name, str(obj.id), obj.name, obj.descr])
- return result
- rinet_api = None
- RE_PHONELOGIN = re.compile("^(8P)(.+)$")
- get_bills_parameter = {
- "name": "Параметр: ",
- "header": "== Выберите параметр для поиска счёта ==",
- "list": [
- ["login", "1", "Логин"],
- ["inn", "2", "ИНН"],
- ["bill", "3", "Ввести номер счёта вручную"],
- ]
- }
- get_bills_contract_type = {
- "name": "Тип договора: ",
- "header": "== Выберите тип договора для поиска счёта =="
- }
- def get_bills(in_options):
- global core, rinet_api
- """ Get unpaid bills for client as options list """
- if rinet_api is None:
- rinet_api = RinetAPI(core.config.rinet_api_token)
- rinet_api.onError("throw")
- try:
- login = in_options.get("login")
- except KeyError:
- raise ProgrammingError("Trying to get unpaid bills but no login set")
- # Ask for a parameter to search bill
- reply = list_select(
- get_bills_parameter["header"],
- get_bills_parameter["name"],
- get_bills_parameter["list"],
- argv=False
- )
- param = {}
- auto = False
- if reply == "login":
- # Search bills by login
- matches = RE_PHONELOGIN.match(login)
- if matches:
- # Phone login: let user select the contract type
- foreign_login = "%s#%s" % (matches.group(1), matches.group(2))
- option_list = [
- [login, "1", login, "Внутренний"],
- [foreign_login, "2", foreign_login, "Межгород"]
- ]
- param = {"login": list_select(
- get_bills_contract_type["header"],
- get_bills_contract_type["name"],
- option_list,
- argv=False
- )}
- else:
- # Usual login, search by it
- param = {"login": login}
- elif reply == "inn":
- # Search bills by INN
- param = {"inn": read_val("ИНН: ", inn_in, False)}
- elif reply == "bill":
- # Search bills by bill number
- param = {"bill_num": read_val("Номер счёта: ", billnum_in, False)}
- auto = True
- else:
- raise ProgrammingError("Wrong bill parameter reply \"%s\"" % reply)
- # Get bills
- num_tries = 0
- while True:
- num_tries += 1
- try:
- rinet_api.call("c_kassa_getbills", param)
- break
- except RinetAPIException as err:
- cprint("Ошибка получения неоплаченных счетов: %s" % (err))
- if num_tries == 3:
- cprint("Не удалось получить неоплаченные счета")
- sys.exit(1)
- # Make options list
- res = []
- for n, bill in enumerate(rinet_api.result):
- amount = bill["bill_sum"] - bill["bill_pay"]
- if amount <= 0:
- continue
- name = "Счёт %s" % bill["bill_num"]
- for row in bill["row_set"]:
- name += "\n - %s: %d" % (row["row_descr"], row["row_sum"])
- name += "\n Итого: %d" % bill["bill_sum"]
- if amount != bill["bill_sum"]:
- name = "%s (не оплачено: %d руб)" % (name, amount)
- row = [{"amount": amount, "bill": bill}, str(n + 1), name]
- if auto:
- row.append({"auto":"auto"})
- res.append(row)
- # Check if user got unpaid bills
- if not res:
- cprint("Счетов не найдено")
- sys.exit(4)
- return res
- # Main payment options to read from stdin or argv
- payment_options = {
- "login": {
- "name": "Пользователь",
- "format": word_in
- },
- "ptype": {
- "name": "Тип платежа",
- "header": "============ Типы платежа ===========",
- "list": dict_options(PaymentType)
- },
- "stype": {
- "name": "Тип сервиса",
- "header": "============ Типы сервиса ===========",
- "list": dict_options(ServiceType)
- },
- "date": {
- "name": "Дата платежа [YYYYMMDD, Enter - текущая]",
- "format": date_in(True)
- },
- "amount": {
- "name": "Сумма (RUR)",
- "format": float
- },
- "comment": {
- "name": "Примечания"
- }
- }
- payment_option_bill = { # Payment option to replace "amount" for bill-payments
- "name": "Счёт",
- "header": "============ Неоплаченные счета ===========",
- "generator": get_bills
- }
- edit_header = "============ Что требует замены ==========="
- edit_list = [[y, str(x), payment_options[y]["name"]]
- for x, y in enumerate(payment_options)]
- edit_list.append(["!QUIT", "q", "Выход из редактирования"])
- edit_list.append(["!EXIT", "x", "Выход из программы"])
- def fake_user(context: Context, name: str, login: str, dlogin: Union[int,None],
- service_name: str, ltype_id: int, organization_id: int = 1):
- """ Make fake user object """
- ltype = context.manager('user').getClientTypeByID(ltype_id)
- ptype = context.manager('user').getPaymentTypeByID(1)
- organization = context.manager('user').getOrganizationByID(organization_id)
- return User(
- id = 0,
- remote_id = 0,
- name = name,
- first_name = "",
- middle_name = "",
- last_name = "",
- ulogin = login,
- blogin = login,
- dlogin = dlogin,
- service_name = service_name,
- private = False,
- privcorp = False,
- connection_date = datetime.datetime.now().date(),
- active = True,
- cancelled = False,
- status_id = 10,
- ltype = ltype,
- payment_type = ptype,
- organization = organization
- )
- def make_request(context, in_options):
- """ Make request from input data """
- global user
- if in_options["ptype"] in PTYPES_BILL:
- amount = in_options["amount"]["amount"]
- comment = "p/o %s s4et %s" % (in_options["comment"],
- in_options["amount"]["bill"]["bill_num"])
- bill = in_options["amount"]["bill"]
- else:
- amount = in_options["amount"]
- comment = in_options["comment"]
- bill = None
- request = Request(
- entrypoint_type="cli",
- entrypoint_name="money",
- raw_acct=in_options["login"],
- amount=amount,
- time=in_options["date"],
- data={
- "payment_type": in_options["ptype"],
- "service_type": in_options["stype"],
- "comment": comment,
- "operator": user
- }
- )
- if bill is not None:
- request.data["bill"] = bill
- return request
- def fmt_log_request(request: Request):
- res = "acct: %s, ptype: %s, stype: %s, sum: %s, date: %s" % (
- request.raw_acct,
- request.data['payment_type'],
- request.data['service_type'],
- request.amount,
- request.time.strftime('%Y.%m.%d')
- )
- if "noreceipt" in request.data:
- res = "%s %sreceipt" % (res, "no" if request.data["noreceipt"] else "")
- return res
- def read_opt(option: dict, in_options: dict, argv=True):
- """ Read option configured by dict """
- message = "%s: " % option["name"]
- if "generator" in option:
- # Read generated option
- options = option["generator"](in_options)
- return list_select(option["header"], message, options, argv)
- if "list" in option:
- # Read list option
- return list_select(option["header"], message, option["list"], argv)
- elif "format" in option:
- # Read simple option with format
- return read_val(message, option["format"], argv=argv)
- else:
- # Read simple option without format
- return read_val(message, argv=argv)
- def income_row(income: Income):
- """ Generate incomes-like row from income object """
- acnt = income.getAccountName()
- sum_ = income.getGenericAmount()
- ptype = income.payment_type.name
- stype = income.service_type.name
- ctime = income.creation_time
- curr_sum = income.getAmount()
- curr_rate = 27.0
- operator = income.operator
- time = income.payment_system_time
- comment = income.comment
- if time.date() != ctime.date():
- dcomment = "%04d%02d%02d" % (time.year, time.month, time.day)
- comment = "%s %s" % (dcomment, comment)
- return L_FORMAT % (
- acnt,
- sum_,
- ptype,
- stype,
- ctime.year,
- ctime.month,
- ctime.day,
- ctime.hour,
- ctime.minute,
- curr_sum,
- curr_rate,
- operator,
- comment
- )
- # Create logger
- logger = core.getLogger("money")
- logger.stderr_enabled = verbosearg
- logger.stdout_enabled = verbosearg
- logger.setNamedMark("_money")
- def main():
- """ MAIN """
- global logger, payment_options, core, edit_header, verbosearg
- # Read options
- in_options = {}
- for idx, option in payment_options.items():
- # Read option
- try:
- in_options[idx] = read_opt(option, in_options)
- except ValueError as err:
- cprint("%s: %s (%s)" % (option["name"], error_message, str(err)),
- file=sys.stderr)
- sys.exit(1)
- # Replace reading an amount with reading a bill for O ptype
- if idx == "ptype" and in_options[idx] in PTYPES_BILL:
- payment_options["amount"] = payment_option_bill
- payment_options["comment"]["name"] = "Номер кассового ордера"
- # Read comment from rest argv if it's left
- rest_argv_v = rest_argv()
- if rest_argv_v:
- in_options["comment"] = "%s %s" % (in_options["comment"], rest_argv_v)
- # Create context and bind logger
- context = Context()
- context.setLogger(logger)
- logger.setNamedMark("U", user)
- # Create payment request and validate until it's confirmed by user
- edit_mode = False
- payment_request = None
- while True:
- # Create payment request
- payment_request = make_request(context, in_options)
- context.setRequest(payment_request)
- # Check request
- logger.info("Validating %s" % fmt_log_request(payment_request))
- processor_res = processor_func(context, checkmode=True)
- # Exit if not valid
- if not processor_res:
- request_error(payment_request)
- # Print payment rows
- for income in payment_request.incomes:
- # Print payment row
- cprint(income_row(income))
- if not edit_mode:
- # Ask if everyting's OK and exit loop if so
- if confirm("Все ли в порядке?", default=False):
- break
- # Let user edit data
- edit_mode = True
- # Show edit table
- idx = list_select(edit_header, "?: ", edit_list, False)
- # Read edit table result
- if idx == "!QUIT":
- # Exit edit mode
- edit_mode = False
- elif idx == "!EXIT":
- sys.exit(3)
- else:
- # Let user edit input option
- val = in_options[idx]
- if idx == "ptype" and val in PTYPES_BILL:
- cprint("Нельзя изменить тип платежа %s. Начните заново." % val)
- elif idx == "login" and in_options[idx] in PTYPES_BILL:
- cprint(("Нельзя изменить логин, если тип платежа %s." % val)
- + " Начните заново.")
- else:
- option = payment_options[idx]
- in_options[idx] = read_opt(option, in_options)
- # Ask whether to print a receipt if receipt marked as needed by validator
- if payment_request.receipt_needed:
- context.request.data["noreceipt"] = not confirm("Печатать чек?")
- if context.request.data["noreceipt"]:
- context.request.incomes[0].comment += " noreceipt"
- # Make payment
- logger.info("Paying %s" % fmt_log_request(payment_request))
- processor_res = processor_func(context)
- # Show payment rows or failure
- if processor_res:
- # Payment ok, show rows
- for income in context.request.incomes:
- row = income_row(income)
- cprint("%d %s" % (income.dbid, row))
- else:
- # Payment failed
- request_error(context.request)
- if __name__ == "__main__":
- try:
- main()
- except Exception as err:
- # Global exception handler: log, notify exception and re-raise
- try:
- logger.exception(err)
- except Exception as err2:
- trb = traceback.format_exception(type(err2), err2, err2.__traceback__)
- print("Exception in exception logger: %s" % trb, file=sys.stderr)
- try:
- exception_notify(err, "money")
- except Exception as err2:
- trb = traceback.format_exception(type(err2), err2, err2.__traceback__)
- print("Exception in exception notification: %s" % trb, file=sys.stderr)
- logger.exception(err2)
- raise
Add Comment
Please, Sign In to add comment