from datetime import datetime from typing import List from loguru import logger from sqlalchemy import func, literal from sqlalchemy.orm import Session from ..models.base import Inventory, BasPart, InventorySub, InventoryExtraDefine from ..models.product import Product, ProductPackage cls_code_d = { '0302': '0102', '0301': '0101', } def validate(text: str): return text.replace('\'', '’') def upsert_inventory(db: Session, product: Product, packages: List[ProductPackage] = None, cat_no_prefix=''): inv_code = f'{cat_no_prefix}{product.cat_no}' inventory = Inventory( cInvCode=inv_code, cInvName=validate(product.cn_name), cEnglishName=validate(product.en_name), dSDate=datetime.today(), dModifyDate=datetime.now(), iMassDate=product.mass_day, cInvDefine4=product.mf, cInvDefine5=product.mw, cInvDefine6=product.cas, cInvDefine8=product.purity, ) if not cat_no_prefix: inventory.cInvCCode = product.cls_code # cInvDefine1=product.cls_name # 网站分类名称 inventory.cInvDefine10 = f'M-{product.cat_no}' inventory.bSelf = 1 inventory.bFree1 = 1 inventory.iGroupType = 0 inventory.cGroupCode = '01' inventory.cComUnitCode = '01' # PictureGUID=? inventory.cProductUnit = None inventory.cMassUnit = 1 # TODO hardcode inventory.bConfigFree1 = 1 else: inventory.cInvCCode = cls_code_d.get(product.cls_code, '0101') inventory.cInvDefine10 = inv_code inventory.bSelf = 0 inventory.bFree1 = 0 inventory.iGroupType = 2 inventory.cGroupCode = '03' inventory.cComUnitCode = '0301' inventory.cAssComUnitCode = '0302' inventory.cSAComUnitCode = '0302' inventory.cPUComUnitCode = '0302' inventory.cSTComUnitCode = '0302' inventory.cCAComUnitCode = '0302' inventory.cProductUnit = '0301' inventory.cMassUnit = 1 # TODO hardcode inventory.bConfigFree1 = 0 inventory.iPlanDefault = 3 inventory.cShopUnit = '0301' origin_inventory = db.query( Inventory ).with_hint(Inventory, 'WITH(NOLOCK)').filter( Inventory.cInvCode == inv_code ).limit(1).first() if not origin_inventory: # add db.add(inventory) db.add(InventorySub(cInvSubCode=inv_code)) db.add(InventoryExtraDefine( cInvCode=inv_code, cidefine1=product.smiles, cidefine2=product.ghs_icon01, cidefine3=product.ghs_icon02, cidefine4=product.ghs_icon03, cidefine5=product.remark, cidefine6=product.handler, # TODO packages(cidefine12)? )) logger.debug(f'inserting inventory {inventory.__dict__}') else: diff = { field: getattr(inventory, field) for field in inventory.__dict__ if not field.startswith('_sa_') and field not in {'dSDate', 'dModifyDate'} and getattr(origin_inventory, field) != getattr(inventory, field) } if diff: db.query( Inventory ).filter( Inventory.cInvCode == inv_code ).update(diff) logger.debug(f'updating inventory {inv_code} from {origin_inventory} to {diff}') # TODO update inventory extra define db.commit() upsert_packages(db, product, cat_no_prefix, packages) def upsert_packages(db: Session, product: Product, cat_no_prefix='', packages: List[ProductPackage] = None): if cat_no_prefix: inv_code = f'{cat_no_prefix}{product.cat_no}' bas_part = db.query( BasPart ).with_hint(BasPart, 'WITH(NOLOCK)').filter( BasPart.InvCode == inv_code ).limit(1).first() if bas_part: return max_id = db.query(func.max(BasPart.PartId)).scalar() db.add(BasPart( PartId=max_id + 1, InvCode=inv_code, bVirtual=1, LLC=1 )) db.commit() elif packages is not None: tmp = { package.package: package for package in packages } for each in db.query( BasPart ).filter( BasPart.InvCode == product.cat_no, BasPart.Free1.in_(tmp.keys()) ).all(): tmp.pop(each.Free1) for package in tmp.values(): db.add(BasPart( PartId=db.query(func.max(BasPart.PartId)).scalar() + 1, InvCode=product.cat_no, Free1=package.package, bVirtual=0 )) db.commit() if not db.query(literal(True)).filter( db.query(BasPart.PartId).filter(BasPart.InvCode == product.cat_no, BasPart.Free1 == 'ETS').exists() ).scalar(): db.add(BasPart( PartId=db.query(func.max(BasPart.PartId)).scalar() + 1, InvCode=product.cat_no, Free1='ETS', bVirtual=0 )) db.commit()