Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- # coding: utf-8
- import odoolib
- import sys
- import random
- import threading
- from datetime import datetime, date
- def closing(username):
- connection = odoolib.get_connection(hostname="localhost", database="12hlc", login=username, password=username, port=80)
- config_model = connection.get_model('pos.config')
- session_model = connection.get_model('pos.session')
- order_model = connection.get_model('pos.order')
- # Search config
- config_id = config_model.search([('name', 'ilike', pos_id)])
- if len(config_id) > 1:
- config_id = config_id[0]
- # Open session
- config_model.open_session_cb(config_id)
- config_model.open_session_cb(config_id)
- pos_config = config_model.read(config_id, ['current_session_id', 'journal_ids', 'pricelist_id'])[0]
- session_id = pos_config['current_session_id'][0]
- order_ids = order_model.search([['session_id', '=', session_id]])
- t1 = datetime.now()
- session_model.action_pos_session_closing_control(session_id)
- t2 = datetime.now()
- print("%s closed session %s in %s seconds (%s orders)" % (username, session_id, (t2-t1).seconds, len(order_ids)))
- def selling(username, size):
- connection = odoolib.get_connection(hostname="localhost", database="12hlc", login=username, password=username, port=80)
- config_model = connection.get_model('pos.config')
- order_model = connection.get_model('pos.order')
- product_model = connection.get_model("product.product")
- abs_model = connection.get_model('account.bank.statement')
- # Search config
- config_id = config_model.search([('name', 'ilike', pos_id)])
- if len(config_id) > 1:
- config_id = config_id[0]
- # Open session
- config_model.open_session_cb(config_id)
- config_model.open_session_cb(config_id)
- pos_config = config_model.read(config_id, ['current_session_id', 'journal_ids', 'pricelist_id'])[0]
- session_id = pos_config['current_session_id'][0]
- pricelist_id = pos_config['pricelist_id'][0]
- product_ids = product_model.search([['sale_ok', '=', True], ['available_in_pos', '=', True]])
- abs_ids = abs_model.search_read([['state', '=', 'open'], ['pos_session_id', '=', session_id]], ['account_id','journal_id'])
- for i in range(0, size):
- uid = '%s_%s' % (i, datetime.now().isoformat())
- payment_method = random.choice(abs_ids)
- order_data = {
- 'amount_paid': 55000,
- 'amount_return': 0,
- 'amount_total': 55000,
- 'amount_tax': 5000,
- 'creation_date': datetime.now().isoformat(),
- 'customer_count': 1,
- 'fiscal_position_id': False,
- 'floor': False,
- 'floor_id': False,
- 'loyalty_points': 0,
- 'name': 'order_%s' % uid,
- 'partner_id': False,
- 'pos_session_id': session_id,
- 'pricelist_id': pricelist_id,
- 'sequence_number': i,
- 'table_id': False,
- 'uid': uid,
- 'user_id': False,
- 'lines': [[0, 0, {
- 'discount': 0,
- 'id': 1,
- # 'note': "",
- 'pack_lot_ids': [],
- 'price_subtotal': 50000,
- 'price_subtotal_incl': 55000,
- 'price_unit': 55000,
- 'product_id': random.choice(product_ids),
- 'qty': 1,
- # 'sale_advisor': 1046,
- 'tax_ids': [[6, 0, [4]]]
- }]],
- 'statement_ids': [[0, 0, {
- 'account_id': payment_method['account_id'][0],
- 'amount': 55000,
- 'journal_id': payment_method['journal_id'][0],
- 'name': date.today().strftime("%Y-%m-%d"),
- 'statement_id': payment_method['id'],
- }]]
- }
- res = order_model.create_from_ui([{
- 'id': uid,
- 'data': order_data,
- 'to_invoice': False
- }])
- print(username, res)
- if __name__ == "__main__":
- if len(sys.argv) < 3:
- print("Usage : lt_pos.py concurrency task")
- print("")
- print("Ex : lt_pos.py 3 sell")
- print(" lt_pos.py 4 close")
- print(" To choose the number of pos order to create:")
- print(" lt_pos.py 4 sell 1000")
- else:
- concurrency = int(sys.argv[1])
- lst_thd = []
- for i in range(5, concurrency+5):
- pos_id = "00000%s" % i
- pos_id = pos_id[-5:]
- username = 'sm%s@mydb.com' % pos_id
- if sys.argv[2] == "sell":
- sizing = 1000
- if len(sys.argv) > 3:
- sizing = int(sys.argv[3])
- thd = threading.Thread(target=selling, args=(username, sizing))
- else:
- thd = threading.Thread(target=closing, args=(username,))
- thd.start()
- lst_thd.append(thd)
- for thd in lst_thd:
- thd.join()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement