Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # :coding: utf-8
- # :copyright: Copyright (c) 2015 ftrack
- import getpass
- import sys
- import argparse
- import logging
- import threading
- import ftrack
- import os
- def async(fn):
- """Run *fn* asynchronously."""
- def wrapper(*args, **kwargs):
- thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
- thread.start()
- return wrapper
- class TransferComponentsAction(ftrack.Action):
- """Action to transfer components between locations."""
- #: Action identifier.
- identifier = 'transfer-components'
- #: Action label.
- label = 'Transfer component(s)'
- # Get current user from OS environment
- currentUser = getpass.getuser()
- def register(self):
- """
- This overloads the base register() function to only subscribe to receive
- certain events that have user information in them.
- Used to filter on users that initiated the action.
- """
- self.logger.debug('register() called from: {0}'.format(__name__))
- # only subscribe/launch if this action's username match.
- ftrack.EVENT_HUB.subscribe(
- subscription='topic=ftrack.action.discover and source.user.username={0}'
- .format(self.currentUser),
- callback=self.discover
- )
- ftrack.EVENT_HUB.subscribe(
- subscription='topic=ftrack.action.launch and data.actionIdentifier={0} '
- 'and source.user.username={1}'
- .format(self.identifier, self.currentUser),
- callback=self.launch
- )
- def validateSelection(self, selection):
- """
- Return if *selection* is valid.
- This ensures only Asset Versions, Task view and Show tab are valid selections
- """
- if (
- len(selection) >= 1 and
- any(
- True for item in selection
- if item.get('entityType') in ('assetversion', 'task', 'show')
- )
- ):
- self.logger.info('Selection is valid')
- return True
- else:
- self.logger.info('Selection is _not_ valid')
- return False
- def discover(self, event):
- """
- This method is invoked when the user brings up the
- Actions menu from the web UI.
- """
- selection = event['data'].get('selection', [])
- self.logger.info(u'Discovering action with selection: {0}'.format(selection))
- # Check that the selected entity is valid to run Action on
- if not self.validateSelection(selection):
- return
- return super(TransferComponentsAction, self).discover(event)
- def getVersionsInSelection(self, selection):
- """
- This method returns a list of the ``ftrack.AssetVersion`` objects that
- are associated with a *selection*.
- :param selection: ``list`` containing currently selected ftrack entities.
- :return: ``list`` containing ftrack entity objects.
- :rtype: ``list``
- """
- versions = []
- # Iterate over selection and get the available versions
- for item in selection:
- self.logger.info(
- 'Looking for versions on entity ({0}, {1})'
- .format(item['entityId'], item['entityType'])
- )
- if item['entityType'] == 'assetversion':
- versions.append(ftrack.AssetVersion(item['entityId']))
- continue
- # Get a valid entity type
- entity = None
- if item['entityType'] == 'show':
- entity = ftrack.Project(item['entityId'])
- elif item['entityType'] == 'task':
- entity = ftrack.Task(item['entityId'])
- if not entity:
- continue
- # Get all assetVersion containers on the entity to get Versions from
- assets = entity.getAssets(includeChildren=True)
- self.logger.info('Found {0} assets on entity'.format(len(assets)))
- # Grab each AssetVersion from each Asset
- for asset in assets:
- assetVersions = asset.getVersions()
- self.logger.info(
- 'Found {0} versions on asset {1}'.format(len(assetVersions), asset.getId())
- )
- versions.extend(assetVersions)
- self.logger.info('Found {0} versions in selection'.format(len(versions)))
- return versions
- def getComponentsInLocation(self, selection, location):
- """
- This method returns a ``list`` of components in *selection*.
- """
- versions = self.getVersionsInSelection(selection)
- components = []
- for version in versions:
- self.logger.info('Looking for components on version {0}'.format(version.getId()))
- try:
- components.extend(version.getComponents(location=location))
- # Possible to trigger if Component is uploaded to different location (ftrackreview)
- except ftrack.ComponentNotInLocationError:
- self.logger.warning('# Not all Components were able to be found in: {0}'
- .format(location))
- # Only add components that are available in the specified location instance
- # Get all components attached to this Version, this time letting ftrack choose most
- # appropriate Location to use
- componentList = version.getComponents(location='auto')
- # Now attempt to test if each Component exists in the specified Location
- # todo: handle components from different locations such as ftrack.review
- for component in componentList:
- try:
- component.switchLocation(location)
- components.append(component)
- except ftrack.ComponentNotInAnyLocationError:
- self.logger.warning('# {0} was skipped because it does not exist in any Location!'
- .format(component.getName()))
- except ftrack.LocationError:
- self.logger.warning('# {0} was skipped because it does not exist in {1}!'
- .format(component.getName(), location.getName()))
- self.logger.info('Found {0} components in selection'.format(len(components)))
- return components
- @async
- def transferComponents(
- self, selection, sourceLocation, targetLocation,
- userId=None,
- ignoreComponentNotInLocation=False,
- ignoreLocationErrors=False
- ):
- """
- Transfer components in *selection* from *sourceLocation* to *targetLocation*.
- if *ignoreComponentNotInLocation*, ignore components missing in source
- location. If *ignoreLocationErrors* is specified, ignore all locations-
- related errors.
- Reports progress back to *userId* using a job.
- """
- # Create ftrack API job visible in the UI
- job = ftrack.createJob('Transfer components (Gathering...)', 'running', user=userId)
- try:
- components = self.getComponentsInLocation(selection, sourceLocation)
- amount = len(components)
- self.logger.info('Transferring {0} components'.format(amount))
- # For each Component, transfer to target Location
- for index, component in enumerate(components, start=1):
- self.logger.info('Transferring component ({0} of {1})'.format(index, amount))
- job.setDescription('Transfer components ({0} of {1})'.format(index, amount))
- assert isinstance(component, ftrack.Component), \
- '### {0} is not a Component object!'.format(component)
- # get component id and resource identifier
- componentId = component.getId()
- componentResourceId = component.getResourceIdentifier()
- try:
- # add the Component to the target location and switch in the API
- targetLocation.addComponent(component)
- component.switchLocation(targetLocation)
- except ftrack.ComponentInLocationError:
- self.logger.warning('# Component: {0} already exists in target location!'
- .format(component))
- # Check if component does not exist in the filesystem
- accessor = targetLocation.getAccessor()
- # Check for correct type of Accessor
- if isinstance(accessor, ftrack.DiskAccessor):
- self.logger.info('Checking for resource on: {0}'.format(accessor))
- if not accessor.exists(componentResourceId):
- self.logger.info('Resource: {0} does not exist on disk, re-transferring Component...'
- .format(component.getName()))
- # Do not manage data because the data does not exist, just delete from database
- targetLocation.removeComponent(componentId, manageData=False)
- # now add component again
- targetLocation.addComponent(component)
- component.switchLocation(targetLocation)
- except ftrack.ComponentNotInLocationError:
- if ignoreComponentNotInLocation or ignoreLocationErrors:
- self.logger.exception('Failed to add component to location')
- else:
- raise
- except ftrack.LocationError:
- if ignoreLocationErrors:
- self.logger.exception('Failed to add component to location')
- else:
- raise
- job.setStatus('done')
- self.logger.info('Transfer complete ({0} components)'.format(amount))
- except Exception:
- self.logger.exception('Transfer failed!')
- job.setStatus('failed')
- def launch(self, event):
- """
- This method is invoked when the user executes the Action. This brings up
- a UI to choose where to transfer the components to.
- """
- selection = event['data'].get('selection', [])
- userId = event['source']['user']['id']
- self.logger.info(u'Launching action with selection: {0}'.format(selection))
- # Action has been executed
- if 'values' in event['data']:
- sourceLocation = ftrack.Location(event['data']['values']['from_location'])
- targetLocation = ftrack.Location(event['data']['values']['to_location'])
- if sourceLocation == targetLocation:
- return {
- 'success': False,
- 'message': 'Source and target locations are the same!'
- }
- self.logger.info(event['data']['values'])
- # Get user preferences for component transfer options
- ignoreComponentNotInLocation = (
- event['data']['values'].get('ignore_component_not_in_location') == 'true'
- )
- ignoreLocationErrors = (
- event['data']['values'].get('ignore_location_errors') == 'true'
- )
- self.logger.info(
- 'Transferring components from {0} to {1}'.format(sourceLocation, targetLocation)
- )
- self.transferComponents(
- selection,
- sourceLocation,
- targetLocation,
- userId=userId,
- ignoreComponentNotInLocation=ignoreComponentNotInLocation,
- ignoreLocationErrors=ignoreLocationErrors
- )
- return {
- 'success': True,
- 'message': 'Transferring components...'
- }
- allLocations = [
- {
- 'label': location.get('name'),
- 'value': location.get('id')
- }
- for location in ftrack.getLocations(excludeInaccessible=True)
- ]
- if len(allLocations) < 2:
- sourceLocation = ftrack.Location(event['data']['values']['from_location'])
- targetLocation = ftrack.Location(event['data']['values']['to_location'])
- if sourceLocation == targetLocation:
- return {
- 'success': False,
- 'message': 'Source and target locations are the same.'
- }
- self.transferComponents(selection, sourceLocation, targetLocation)
- return {
- 'success': False,
- 'message': 'Did not find two accessible locations'
- }
- # Format dict to be used in the Custom Actions UI
- return {
- 'items': [
- {
- 'value': 'Transfer components between locations',
- 'type': 'label'
- }, {
- 'label': 'Source location',
- 'type': 'enumerator',
- 'name': 'from_location',
- 'value': allLocations[0]['value'],
- 'data': allLocations
- }, {
- 'label': 'Target location',
- 'type': 'enumerator',
- 'name': 'to_location',
- 'value': allLocations[1]['value'],
- 'data': allLocations
- }, {
- 'value': '---',
- 'type': 'label'
- }, {
- 'label': 'Ignore missing',
- 'type': 'enumerator',
- 'name': 'ignore_component_not_in_location',
- 'value': 'false',
- 'data': [
- {'label': 'Yes', 'value': 'true'},
- {'label': 'No', 'value': 'false'}
- ]
- }, {
- 'label': 'Ignore errors',
- 'type': 'enumerator',
- 'name': 'ignore_location_errors',
- 'value': 'false',
- 'data': [
- {'label': 'Yes', 'value': 'true'},
- {'label': 'No', 'value': 'false'}
- ]
- }
- ]
- }
- def register(registry, **kwargs):
- """
- Register action. Called when used as an event plugin.
- :param registry:
- :param kwargs:
- :return:
- """
- # Validate plugin for legacy API to prevent double registration of plugin
- if not isinstance(registry, ftrack.Registry):
- return
- logger = logging.getLogger(__name__)
- logger.debug('Registering action: {0}'.format(
- os.path.abspath(os.path.dirname(__file__))))
- action = TransferComponentsAction()
- action.register()
- def main(arguments=None):
- """
- This entry point sets up logging and registers action when this is run
- as a standalone action.
- :param arguments:
- :return:
- """
- if arguments is None:
- arguments = []
- parser = argparse.ArgumentParser()
- # Allow setting of logging level from arguments.
- loggingLevels = {}
- for level in (
- logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
- logging.ERROR, logging.CRITICAL
- ):
- loggingLevels[logging.getLevelName(level).lower()] = level
- parser.add_argument(
- '-v', '--verbosity',
- help='Set the logging output verbosity.',
- choices=loggingLevels.keys(),
- default='info'
- )
- namespace = parser.parse_args(arguments)
- # Set up basic logging
- logging.basicConfig(level=loggingLevels[namespace.verbosity])
- # Subscribe to action.
- ftrack.setup()
- action = TransferComponentsAction()
- action.register()
- # Wait for events
- ftrack.EVENT_HUB.wait()
- if __name__ == '__main__':
- raise SystemExit(main(sys.argv[1:]))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement