Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- __author__ = "EmilClaeson"
- import sys
- import os
- import subprocess
- import re
- import time
- import datetime
- import os.path, time
- Regex_FileInfo = re.compile('([^#]+)#(\d+) - (\S+) (\S+ \S+) \((.+)\)') # <FILENAME>, <REV>, <OP>, <CHANGELIST>, <TYPE>
- Regex_Change = re.compile('(default) change|change (\d+)') # <DEFAULT>, <NUMBER>
- Regex_Created = re.compile('Change (\d+) created.') # <CHANGELIST>
- Regex_FileLog = re.compile('#(\d+) change (\d+) (\S+) on (\S+) (\S+) by (\S+) \((.+)\) \'(.*)\'') # <REV>, <CHANGELIST>, <DATE>, <TIME>, <CLIENT>, <TYPE>, <DESC>
- Regex_Where_Template = '(//.+) //.+ (__ROOT__.+)$' # <IGNORE>, <DEPOT>, <LOCAL>
- Regex_Have = re.compile('([^#]+)#(\d+) - (.+)') # <DEPOT>, <REV>, <LOCAL>
- Regex_Sizes = re.compile('([^#]+)#\d+ (\d+) bytes') # <DEPOT>, <SiZE>
- Regex_Sync = re.compile('([^#]+)#(\d+) - (.+)') # <DEPOT>, <REV>, <OP + LOCAL>
- Regex_Changes = re.compile('Change (\d+) on (\S+) (\S+) by (\S+)\s*(\S+)?') # <CHANGELIST>, <DATE>, <TIME>, <CLIENT>, <STATUS>
- Regex_Describe = re.compile('([^#]+)#(\d+) (\S+)') # <DEPOT>, <REV>, <OP>
- class LogEntry:
- def __init__(self, Entry):
- Revision, ChangeList, Operation, Date, Time, Client, Type, Description = Entry
- self.Revision = int(Revision)
- self.ChangeList = int(ChangeList)
- self.Operation = Operation
- self.Date = Date
- self.Time = Time
- self.Client = Client
- self.Type = Type
- self.Description = Description
- DateTime = parse_datetime( '%s %s' % (Date, Time) )
- self.TimeStamp = time.mktime( DateTime.timetuple() )
- def DebugLog(Message):
- if os.environ.get('P4DEBUG', '') == '1':
- print >> sys.stderr, '[P4DEBUG] %s' % Message
- def RunCommand(Args, bInput = None, bEnvironment = None, bSilent = False):
- Command = [ 'p4' ] + Args
- DebugLog((' '.join(Command)))
- stdin = subprocess.PIPE if input else None
- stderr = subprocess.STDOUT if bSilent else None
- bEnvironment['PWD'] = os.getcwd() # fix cygwin style working directory
- # stdin & stderr breaks unreal python implementation, temporary disabled
- #SubProcess = subprocess.Popen(Command, stdout = subprocess.PIPE, stdin = stdin, stderr = stderr, bEnvironment = bEnvironment, shell=True)
- SubProcess = subprocess.Popen(Command, stdout = subprocess.PIPE, shell = True)
- (Output, Error) = SubProcess.communicate(input)
- DebugLog(SubProcess.returncode)
- if SubProcess.returncode != 0:
- raise Exception("ERROR: command '{}' running in {} exited with error status {}".format(Command, os.getcwd(), SubProcess.returncode))
- return Output
- def ParseChangelist(Text):
- Match = Regex_Change.match(Text)
- (Default, Number) = Match.groups()
- return Default if Default else Number
- # walk over a list of file info, returned by commands like "p4 opened" and "p4 files"
- def ParseFilelist(Lines):
- for Line in Lines:
- Match = Regex_FileInfo.match(Line)
- if not Match:
- raise Exception('ERROR: Could not parse output from p4 command! <<< %s >>>' % (Line))
- else:
- (Filename, Revision, Operation, Changelist, Filetype) = Match.groups()
- Changelist = ParseChangelist(Changelist)
- yield (Filename, Revision, Operation, Changelist, Filetype)
- # parses perforce "Y/m/d H:M:S" and returns datetime object.
- def ParseDatetime(PerforceDateTime):
- return datetime.datetime.strptime(PerforceDateTime, '%Y/%m/%d %H:%M:%S')
- # returns (PerforceUser, PerforceClient)
- def GetConnectionInfo():
- ConnectionInfo = info()
- PerforceUser = ConnectionInfo['User name']
- PerforceClient = ConnectionInfo['Client name']
- return (PerforceUser, PerforceClient)
- class Perforce:
- def __init__(self, Port = None, User = None, Client = None):
- self.Environment = dict(os.environ)
- if Port: self.Environment['P4PORT'] = str(Port)
- if Client: self.Environment['P4CLIENT'] = str(Client)
- if User: self.Environment['P4USER'] = str(User)
- self.ValidConnection = False
- try:
- self.Info = self.GetInfo()
- self.Port = self.Info['Server address']
- self.User = self.Info['User name']
- self.Client = self.Info['Client name']
- self.ClientMapping = ''
- except:
- return None
- def IsValid(self):
- return self.ValidConnection
- def CheckForAdd(self, FilePath):
- FilePath = os.path.normpath(FilePath)
- if os.path.isfile(FilePath) and os.access(FilePath, os.W_OK) and not [x for x in self.Have([FilePath])]:
- self.MarkForAdd([FilePath])
- return True
- else:
- self.Sync([FilePath], bForce = True)
- self.CheckOut(FilePath)
- def CheckOutLatest(self, FilePath, bForceSync = True):
- FilePath = os.path.normpath(FilePath)
- if os.path.isfile(FilePath):
- for x in self.have([FilePath]):
- self.Sync([FilePath], bForce = bForceSync)
- self.CheckOut(FilePath)
- def _RunCommand(self, Args, bInput = None, bSilent = False):
- return RunCommand(Args, bInput = bInput, bEnvironment = self.Environment, bSilent = bSilent)
- def GetInfo(self):
- Result = {}
- Info = self._RunCommand(['info'])
- for Line in Info.splitlines():
- try:
- self.ValidConnection = True
- (Key, Value) = Line.split(':', 1)
- except ValueError:
- self.ValidConnection = False
- print "Failed to get key value pair for line %s" %Line
- Result[Key] = Value.strip()
- return Result
- # return login stat (true / false)
- def IsLoggedIn(self):
- try:
- self._RunCommand(['client', '-o'], bSilent = True) # run dummy command
- return True
- except:
- return False
- # run login command. if no password is given, the script will stop and wait for user input.
- def Login(self, Password = None):
- try:
- self._RunCommand(['login'], input = Password)
- return True
- except:
- return False
- # returns list of (filename, revision, operation, changelist, filetype) tuples.
- def Opened(self, Args = ''):
- Output = self._RunCommand(['opened' ] + list(Args))
- Output = Output.splitlines()
- for FileList in ParseFilelist(Output):
- yield FileList
- # returns list of (filename, revision, operation, changelist, filetype) tuples.
- def Files(self, Args):
- Output = self._RunCommand(['files'] + list(Args))
- Output = Output.splitlines()
- for x in ParseFilelist(Output):
- yield x
- def FindFilesInDirectory(self, Args):
- Output = self._RunCommand(['files'] + list(Args))
- Output = Output.splitlines()
- return [x for x in ParseFilelist(Output)]
- def ReOpen(self, Args):
- Output = self._RunCommand(['reopen'] + Args)
- return Output
- def MarkForAdd(self, Args = '', bPreview = False):
- Argument = ['add']
- if bPreview: Argument.append('-n')
- Output = self._RunCommand(Argument + Args)
- if bPreview:
- if Output.find('currently opened for add') != -1:
- return True
- else:
- return False
- return Output
- def Move(self, SourcePath, TargetPath):
- # p4 [g-opts] rename [-c change] [-f -n -k] [-t filetype] SourcePath TargetPath
- self.CheckOut(SourcePath)
- RenameArg = ['move', '-f'] + [SourcePath] + [TargetPath]
- Output = self._RunCommand(RenameArg)
- return Output
- def Integrate(self, SourcePath, TargetPath):
- # p4 [g-opts] rename [-c change] [-f -n -k] [-t filetype] SourcePath TargetPath
- IntegrateArg = ['integrate'] + [SourcePath] + [TargetPath]
- Output = self._RunCommand(IntegrateArg)
- return Output
- # generates list of (filepath, [LogEntry]) tuples.
- def FileLog(self, Files):
- FileInfo = self._RunCommand(['filelog', '-t'] + list(Files))
- Current = (None, [])
- for Line in FileInfo.splitlines():
- if Line[:8] == '... ... ':
- continue
- elif Line[:4] == '... ':
- Match = Regex_FileLog.match(Line[4:])
- if not Match:
- raise Exception("Could not parse filelog line: %s\n(filter: %s)\n(Files: %s)\n" % (Line, Regex_FileLog.pattern, ' '.join(Files)))
- Current[1].append(LogEntry(Match.groups()))
- else:
- if Current[0]:
- yield Current
- Current = (Line, [])
- if Current[0]:
- yield Current
- # generates list of (depot-file, revision, local-file) tuples.
- def Have(self, Args = ''):
- Output = self._RunCommand(['have'] + list(Args))
- Output = Output.splitlines()
- for Line in Output:
- Match = Regex_Have.match(Line)
- if not Match:
- raise Exception('ERROR: Could not parse output from p4 command! <<< %s >>>' % (Line))
- else:
- (Depot, Revision, Local) = Match.groups()
- yield (Depot, Revision, Local)
- # generates list of (depot-file, size) tuples.
- def Sizes(self, Args = ''):
- Output = self._RunCommand(['sizes'] + list(Args))
- Output = Output.splitlines()
- for Line in Output:
- Match = Regex_Sizes.match(Line)
- if not Match:
- raise Exception('ERROR: Could not parse output from p4 command! <<< %s >>>' % (Line))
- else:
- yield (Match.group(1), int(Match.group(2)))
- # returns list of (depot-file, revision) tuples.
- def Sync(self, Args = '', bPreview = False, bForce = False, bSilent = True, Revision = None):
- Result = []
- Argument = ['sync']
- if bPreview:
- Argument.append('-n')
- if bForce:
- Argument.append('-f')
- if Revision != None:
- Args = ['%s#%d'%(x, Revision) for x in list(Args)]
- Argument += list(Args)
- Output = self._RunCommand(Argument, bSilent = bSilent)
- Output = Output.splitlines()
- for Line in Output:
- if not bSilent:
- print Line
- Match = Regex_Sync.match(Line)
- if Match:
- Result.append((Match.group(1), int(Match.group(2))))
- return Result
- # open file for edit
- def CheckOut(self, Filepath, Changelist = None, bPreview = False, setExclusive=False):
- Argument = ['edit']
- if Changelist:
- Argument += ['-c', str(Changelist)]
- if setExclusive:
- Argument += ['-t', '+l']
- if bPreview: Argument.append('-n')
- Argument += [Filepath]
- return self._RunCommand(Argument)
- # submit a changelist
- def Submit(self, Changelist):
- return self._RunCommand(['submit', '-c', str(Changelist)])
- # gets the contents of a file at specific revision
- def GetRevision(self, Filepath, Revision):
- Argument = '%s#%d' % (Filepath, Revision)
- return self._RunCommand(['print', '-q', Argument])
- def GetRevisionByDate(self, Filepath, Date):
- Argument = '%s@%s' % (Filepath, Date)
- return self._RunCommand(['print', '-q', Argument])
- def Revert(self, Filepath, Changelist = None):
- Argument = ['revert']
- if Changelist:
- Argument += ['-c', str(Changelist)]
- Argument += [Filepath]
- return self._RunCommand(Argument)
- def Resolve(self, Filepath, bPreview = True):
- Argument = ['resolve']
- if bPreview:
- Argument += ['-n']
- Argument += [Filepath]
- return self._RunCommand(Argument)
- def IsFileInDepot(self, FilePath):
- try:
- if self.MapFile(FilePath):
- return True
- return False
- except:
- return False
- def IsFileMarkedForDelete(self, FilePath):
- try:
- return self.IsMarkedForDelete(FilePath)
- except:
- return False
- def IsCheckedOutByUser(self, Status):
- if os.path.isfile(Status):
- Status = self.CheckOut(Status, bPreview = True)
- if not Status:
- return False
- if isinstance(Status, str):
- if 'also opened by' in Status:
- return True
- return False
- def IsCheckedOutByMe(self, Status):
- if os.path.isfile(Status):
- Status = self.CheckOut(Status, bPreview = True)
- if not Status:
- return False
- if isinstance(Status, str):
- if 'currently opened for edit' in Status:
- return True
- return False
- # returns (depot_path, local_path) tuple
- def MapFile(self, FilePath):
- Regex_Where = Regex_Where_Template.replace('__ROOT__', self.Info['Client root'])
- Regex_Where = Regex_Where.replace('\\', '/')
- Line = self._RunCommand(['where', FilePath]).splitlines()[0].rstrip()
- Line = Line.replace('\\', '/')
- # where will for some computers return a leading
- if Line.startswith('-'):
- Line = Line[1:]
- Match = re.match(Regex_Where, Line)
- if not Match:
- raise Exception("could not parse: %s" % Line)
- return Match.groups()
- # generates list of (depot_path, local_path) tuples
- def MapFiles(self, Files):
- Regex_Where = re.compile(Regex_Where_Template.replace('__ROOT__', self.Info['Client root']))
- Output = self._RunCommand(['where'] + list(Files))
- for Line in Output.splitlines():
- if Line[0] == '-':
- continue
- Match = Regex_Where.match(Line)
- if not Match:
- raise Exception("could not parse: %s" % Line)
- yield Match.groups()
- # returns (depot_path, local_path) tuple for directory
- def MapDirectory(self, DirectoryPath):
- Remote, Local = self.MapFile('%s...'%DirectoryPath)
- return Remote.rstrip('.'), Local.rstrip('.')
- def GetClientMapping(self, bForceUpdate = False):
- if self.ClientMapping and not bForceUpdate:
- return self.ClientMapping
- FolderPath = self.Info['Current directory']
- Result = self._RunCommand(['where', FolderPath])
- Lines = [x for x in Result.splitlines() if x]
- if len(Lines) > 1:
- Mapping = [x for x in Lines if x.find('raw') == -1]
- if Mapping:
- Mapping = Mapping[0]
- else:
- Mapping = self._RunCommand(['where', FilePath]).splitlines()[0].rstrip()
- Mapping = Mapping.replace('\\', '/')
- if Mapping.startswith('-'):
- Mapping = Mapping[1:]
- Regex_Where = Regex_Where_Template.replace('__ROOT__', self.Info['Client root']).replace('\\', '/')
- Match = re.match(Regex_Where, Mapping)
- if Match:
- self.ClientMapping = Match.groups()[0].rsplit('/', 2)[0]
- return self.ClientMapping
- else:
- raise Exception("could not parse: %s" %Mapping)
- #
- # generates list of ((changelist, date, time, user@client, status), description) for all changelists match the given cirterias
- #
- # - status: "pending" or "submitted"
- # - since: date or date/time
- # - until: date or date/time
- # - path: path filter
- # - client: name of workspace
- #
- def Changes(self, Status = None, Since = None, Until = None, FilePath = None, Client = None):
- Args = ['changes', '-l', '-t']
- if Status:
- Args += ['-s', Status]
- if Client:
- Args += ['-c', Client]
- Filter = []
- if FilePath:
- Filter.append(FilePath)
- if Since:
- Filter.append('@%s,%s' % (Since, Until if Until else 'now'))
- elif Until:
- Filter.append('@%s' % (Until))
- if len(Filter):
- Args += [''.join(Filter)]
- Output = self._RunCommand(Args).splitlines()
- Current, Description = (None, [])
- for Line in Output:
- Line = Line.strip()
- if not Line:
- continue
- Match = Regex_Changes.match(Line)
- if Match:
- if Current:
- yield (Current, '\n'.join(Description))
- Current, Description = (Match.groups(), [])
- else:
- Line = unicode(Line, 'iso-8859-1')
- Description.append(Line)
- if Current:
- yield (Current, '\n'.join(Description))
- # generates [ ((changelist, date, client, status), description) ] for all pending changelists on current client with matching description.
- def GetPendingChangeLists(self, Description):
- for Info, Description in self.Changes(Status = 'pending', Client = self.Client):
- if Description.find(Description) != -1:
- yield (Info, Description)
- # returns changelist number for created changelist.
- def CreateChangeList(self, Description):
- Template = '''
- # A Perforce Change Specification.
- #
- # Change: The change number. 'new' on a new changelist.
- # Date: The date this specification was last modified.
- # Client: The client on which the changelist was created. Read-only.
- # User: The user who created the changelist.
- # Status: Either 'pending' or 'submitted'. Read-only.
- # Type: Either 'public' or 'restricted'. Default is 'public'.
- # Description: Comments about the changelist. Required.
- # Jobs: What opened jobs are to be closed by this changelist.
- # You may delete jobs from this list. (New changelists only.)
- # Files: What opened files from the default changelist are to be added
- # to this changelist. You may delete files from this list.
- # (New changelists only.)
- Change: new
- Client: __P4CLIENT__
- User: __P4USER__
- Status: new
- Description:
- __DESCRIPTION__
- Files:
- '''
- Template = Template.lstrip()
- Template = Template.replace('__P4CLIENT__', self.Client)
- Template = Template.replace('__P4USER__', self.user)
- Template = Template.replace('__DESCRIPTION__', Description.replace('\n', '\n\t'))
- Output = self._RunCommand(['change', '-i'], Template)
- Match = Regex_Created.match(Output)
- Changelist = int(Match.group(1))
- return Changelist
- # generates list of files affected by a changelist [ (filepath, rev, op) ]
- def GetChangeListFiles(self, Changelist):
- Output = self._RunCommand(['describe', '-s', str(Changelist) ]).splitlines()
- Header = True
- for Line in Output:
- if Line[0:4] != '... ':
- continue
- Match = Regex_Describe.match(Line[4:])
- FileName, Revision, Operation = Match.groups()
- FileName = unicode(FileName, 'iso-8859-1')
- yield (FileName, Revision, Operation)
- # returns info about a perforce user { key: value }
- def UserInfo(self, User):
- Output = self._RunCommand(['user', '-o', User]).splitlines()
- Result = {}
- for Line in Output:
- Line = Line.strip()
- if not Line or Line[0] == '#':
- continue
- (key, value) = Line.split(':', 1)
- Result[key.lower()] = unicode(value.strip(), 'iso-8859-1')
- return Result
- def CheckVersion(self, FileName = ''):
- FileRevisions = self._RunCommand(['changes', FileName]) # Returns (changeList) (submitDate) (Submitter) (Object)
- if FileRevisions:
- print FileRevisions
- print 'file Exists on Perforce'
- else:
- print 'File does not exist on perforce'
- def IsMarkedForDelete(self, FileName = ''):
- Result = self._RunCommand(['files', FileName])
- if 'move/delete' in Result:
- DepotFile = self.mapfile(FileName)[0].replace('raw', '')
- FileInfo = self._RunCommand(['filelog', DepotFile])
- for Line in FileInfo.splitlines():
- if Line[:8] == '... ... ':
- NewDirectory = Line.split(' ')[-1].partition('#')[0]
- NewDirectory = NewDirectory.replace(self.GetClientMapping(), self.GetClientRoot()).replace('/', '\\')
- return NewDirectory
- elif 'delete' in Result:
- return True
- else:
- return False
- def GetClientRoot(self):
- if self.ValidConnection:
- if self.Info:
- return self.Info['Client root'][:-1]
- if "PROJECTDISC" in os.environ:
- return os.environ["PROJECTDISC"]
- return 'Q:'
- # TODO: Needs to be able to specify if we should include SubFolders
- def GetFilesInDirectory(self, FolderPath, Filter = 'fbx', bSplitBasePath = False):
- ClientMapping = self.GetClientMapping()
- ClientRoot = self.GetClientRoot()
- if not FolderPath.startswith('//'):
- FolderPath = '%s/%s/...' %(ClientMapping, FolderPath[FolderPath.find(':')+2:])
- FolderPath = FolderPath.replace('\\', '/')
- Files = self.FindFilesInDirectory([FolderPath])
- FileNames = []
- FileRoots = []
- for File in Files:
- Operation = File[2]
- FileName = File[0].replace('%23', '#')
- if Operation.find('delete') != -1:
- continue
- NameSplit = FileName.rsplit('.',1)
- FileType = NameSplit[1] if len(NameSplit) > 1 else ''
- if Filter and FileType in Filter or not Filter:
- FilePath = '%s%s.%s' %(ClientRoot, NameSplit[0].replace(ClientMapping, '').replace('/', '\\'), Filter)
- if bSplitBasePath:
- FileNames.append(os.path.basename(FilePath))
- FileRoots.append(os.path.dirname(FilePath))
- else:
- FileNames.append(FilePath)
- if bSplitBasePath:
- return FileNames, FileRoots
- return FileNames
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement