Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #-------------------------------------------------------------------------------
- # Name: hazard
- # Purpose:
- #
- # Author: 40011
- #
- # Created: 04/30/2019
- # Copyright: (c) 40011 2019
- # Licence: <your licence>
- #-------------------------------------------------------------------------------
- import os
- import time
- import shutil
- import glob
- import logging as Star_logger
- import subprocess
- import re
- import pwd
- if __name__ == "__main__":
- import sys
- sys.path.append("/home/qa/sandisk/executable")
- from Star_FW.StarUtil.Core import Core
- class hazard(Core):
- def __init__(self, dictJson):
- Core.__init__(self, globals())
- #pdb.set_trace()
- Star_logger.info("initializing variables")
- self.currentDir = os.getcwd()
- self.setJsonDict(dictJson)
- self.timeInMin = str(self.getValue_userDefine("timeInMinutes"))
- self.timeInSec = str(int(self.timeInMin) * 60)
- self.loopIntervalTime = str(self.getValue_userDefine("loopTimeInMinutes"))
- self.appDir = str(self.getValue_userDefine("applicationDir"))
- self.testInstanceId = str(self.dictJson[0][0]['testinstanceid'])
- self.testLogsDir = os.path.join(self.appDir, "logs")
- self.networkTestResultDir = str(self.dictJson[0][0]['networkresultdir'])
- self.machinePassword = str(self.getValue_userDefine("machinePassword"))
- self.machineUser = str(self.getValue_userDefine("machineUser"))
- self.mountDir = "/mnt/datastore"
- self.networkMountFullPath = ""
- self.networkWinPathToMount = ""
- def mountDatastore(self):
- re1='(.)' # Any Single Character 1
- re2='(.)' # Any Single Character 2
- re3='((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))(?![\\d])' # IPv4 IP Address 1
- re4='(.)' # Any Single Character 3
- re5='((?:[a-z][a-z]*[0-9]+[a-z0-9]*))' # Alphanum 1
- rg = re.compile(re1+re2+re3+re4+re5,re.IGNORECASE|re.DOTALL)
- match = rg.search(self.networkTestResultDir)
- Star_logger.info("extracting datastore path from network path")
- self.networkWinPathToMount = match.group(0)
- Star_logger.info("Path to mount is "+self.networkWinPathToMount)
- mountAppendPath = self.networkTestResultDir.replace(self.networkWinPathToMount,'')
- mountAppendPath = mountAppendPath.replace('\\', '/')
- self.networkMountFullPath = self.mountDir + mountAppendPath
- Star_logger.info("Linux based mount path is " + self.networkMountFullPath)
- self.networkWinPathToMount = self.networkWinPathToMount.replace('\\','/')
- uid = str(pwd.getpwnam(self.machineUser).pw_uid)
- gid = str(pwd.getpwnam(self.machineUser).pw_gid)
- if not(os.path.exists(self.mountDir)):
- os.system('echo %s|sudo -S %s' % (self.machinePassword, "mkdir " + self.mountDir))
- if (os.path.ismount(self.mountDir)):
- cmdUnmount = "umount -l " + self.mountDir
- os.system('echo %s|sudo -S %s' % (self.machinePassword, cmdUnmount))
- cmdMount = "mount -t cifs -o soft " + self.networkWinPathToMount + " " + self.mountDir + " -o username=pssd,password=Pw4p1x12,uid="+uid+",gid="+gid
- Star_logger.info("mount command is " + cmdMount)
- os.system('echo %s|sudo -S %s' % (self.machinePassword, cmdMount))
- Star_logger.info("datastore path is mounted")
- def copyLogs(self):
- if (os.path.exists(self.testLogsDir)):
- shutil.copytree(self.testLogsDir, self.networkMountFullPath)
- def checkFileOutput(self):
- if os.path.exists("/tmp/errorFilename.txt"):
- os.system('echo %s|sudo -S %s' % (self.machinePassword, r"rm /tmp/errorFilename.txt"))
- ## os.remove("/tmp/errorFilename.txt")
- if not os.path.exists(self.testLogsDir):
- Star_logger.error("Failed, test not launched properly or logs not created")
- assert False, "Failed, test not launched properly or logs not created"
- os.chdir(self.testLogsDir)
- fileNameArray = glob.glob('mgerror*')
- errorFileName = fileNameArray[0]
- cmd = "cp " + errorFileName + " /tmp/errorFilename.txt"
- os.system('echo %s|sudo -S %s' % (self.machinePassword, cmd))
- ## os.system(cmd)
- cmd = "dos2unix " + "/tmp/errorFilename.txt"
- os.system('echo %s|sudo -S %s' % (self.machinePassword, cmd))
- ## os.system(cmd)
- #if 'Error Log End Time' in open(errorFileName).read():
- if 'Error Log End Time' in open("/tmp/errorFilename.txt").read():
- os.chdir(self.currentDir)
- return True
- else:
- os.chdir(self.currentDir)
- return False
- def ToolExecution(self):
- Star_logger.info("starting network mount")
- self.mountDatastore()
- #remove the log folders of previous runs starting with name 'HAZARD'
- if os.path.exists(self.appDir):
- Star_logger.info("remove test logs if already existing")
- os.system('echo %s|sudo -S %s' % (self.machinePassword, "rm -fR "+self.appDir+"/HAZARD*"))
- cmd_makeExecutable = "chmod +x -R " + self.appDir
- os.system(cmd_makeExecutable)
- cmd_runHazard = self.appDir + "/hazard -Cidsys -Csysstalled -HsystemCrits=50 -ocat3=2 -l16 -t"+self.testTimeInHrs+" -ossd="+self.numOfSSDs+" -d HAZARD_"+self.modelName+"_"+self.numOfSSDs+"SSD_"+self.monthDate+" "+self.machineIP
- Star_logger.info("command to launch hazard tool is " + cmd_runHazard)
- pidApp = os.system('echo %s|sudo -S %s' % (self.machinePassword, cmd_runHazard))
- Star_logger.info("Hazard Test started")
- Star_logger.info("sleeping for 5 seconds")
- time.sleep(5)
- timeout = time.time() + int(self.timeInSec)
- loopingTime = int(self.loopIntervalTime) * 60
- Star_logger.info("loop monitoring time in seconds is "+str(loopingTime))
- #loopingTime = 30
- if (loopingTime >= int(self.timeInSec)):
- Star_logger.error("loop interval time cannot be greater than total run time")
- assert False, "Loop interval time should be less than total run time"
- time.sleep(5)
- Star_logger.info("hazard started")
- processPid = subprocess.check_output(["ps -eaf | grep -i 'hazard' | grep -i 'Csysstalled' | awk '{print $2}'"], shell=True)
- processPid = processPid.strip()
- Star_logger.info("PID of the hazard tool is "+ processPid)
- #processPid = os.system("ps -eaf | grep -i 'nodrivers' | grep -i 'ld-2.12.so' | awk '{print $2}'")
- if (processPid == None) or (processPid=="") or (processPid == 0):
- Star_logger.error("Hazard PID not found, tool not running")
- assert False, "Hazard app not started from command line"
- Star_logger.info("monitoring starts here")
- poll = ""
- while True:
- #apply the logic for monitoring here-------------------------------------
- try:
- Star_logger.info("checking for process pid")
- poll = (subprocess.check_output(["ps -eaf | grep -i 'hazard' | grep -i 'Csysstalled' | awk '{print $2}'"], shell=True)).strip()
- except Exception as e:
- poll = ""
- if (poll == ""):
- Star_logger.info("check output error file if no pid found")
- if(self.checkFileOutput() == False):
- Star_logger.error("failed as process closed before proper termination")
- assert False, "Process closed before proper termination"
- if time.time() > timeout:
- Star_logger.info("coming out of monitoring loop when time is exceeded")
- break
- time.sleep(loopingTime)
- time.sleep(60)
- Star_logger.info("finally checking for output logs")
- if(self.checkFileOutput() == False):
- Star_logger.error("failed logs not proper after test completion")
- assert False, "logs not generated properly after completion"
- else:
- if os.path.exists("/tmp/errorFilename.txt"):
- os.system('echo %s|sudo -S %s' % (self.machinePassword, r"rm /tmp/errorFilename.txt"))
- ## os.remove("/tmp/errorFilename.txt")
- os.chdir(self.testLogsDir)
- fileNameArray = glob.glob('mgerror*')
- errorFileName = fileNameArray[0]
- cmd = "cp " + errorFileName + " /tmp/errorFilename.txt"
- os.system('echo %s|sudo -S %s' % (self.machinePassword, cmd))
- ## os.system(cmd)
- cmd = "dos2unix " + "/tmp/errorFilename.txt"
- os.system('echo %s|sudo -S %s' % (self.machinePassword, cmd))
- ## os.system(cmd)
- if 'Total Errors = 0' in open("/tmp/errorFilename.txt").read():
- Star_logger.info("Test passed")
- else:
- Star_logger.error("failed because of errors in log file")
- assert False, "errors encountered in test log file"
- Star_logger.info("Copying logs to datastore")
- self.copyLogs()
- Star_logger.info("Unmounting datastore path")
- os.system('echo %s|sudo -S %s' % (self.machinePassword, "umount " + self.mountDir)) #unmount the datastore at end of test
- def run(self):
- self.ToolExecution()
- return 0, "Pass"
Add Comment
Please, Sign In to add comment