Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Vofeed collector cron
- :author: Panos Paparrigopoulos
- :contact: panos.paparrigopoulos@cern.ch
- """
- from cric.apps.cms.models import CMSSite, StorageUnit
- from cric.apps.core.models import Queue, Service, ServiceType, Site
- from cric.apps.cric_crons.crons import ContentLoaderCron
- from cric.dataproviders.providers.vofeed_provider import VofeedProvider
- from cric.utils.merge import merge_dict_data
- from django.conf import settings
- from .base import BaseCron
- from .agis_serializer import AGISSerializerJSON #### FIX ME LATER: DO isolate/reimplement
- from ..models.job import track_job
- import json
- import os
- class VofeedLoaderCron(BaseCron):
- """
- Cron loads downtimes from GOCDB source
- """
- desc = {'createjson': 'create JSON dump with vofeed information collected from cmssst',
- 'job': 'Updated DB with glidein configuration entries collected from Github'}
- @track_job
- def job(self, *args, **kwargs):
- """
- kwargs param: createjson (boolean)
- """
- createcmsjson = self.get_bool('createjson', kwargs)
- if createcmsjson: # rebuild input json
- print "INFO: SET createjson=%s .. forced to rebuild json data.." % createjson
- self.build_cms_json(silence=True, **kwargs)
- createlhcbjson = self.get_bool('createjson', kwargs)
- if createlhcbjson: # rebuild input json
- print "INFO: SET createjson=%s .. forced to rebuild json data.." % createjson
- self.build_lhcb_json(silence=True, **kwargs)
- createjson = self.get_bool('createjson', kwargs)
- if createjson: # rebuild input json
- print "INFO: SET createjson=%s .. forced to rebuild json data.." % createjson
- self.build_json(silence=True, **kwargs)
- fname = self.get_jsonfile('vofeed', silence=False, **kwargs)
- data = AGISSerializerJSON.unserialize(AGISSerializerJSON.load(fname))
- return self.process_data(data, **kwargs)
- @track_job
- def op__createjson(self, **kwargs):
- """
- build, merge input json file(s) into final one
- """
- return self.build_json(**kwargs)
- @track_job
- def op__createlhcbjson(self, **kwargs):
- """
- build, merge input json file(s) into final one
- """
- return self.build_lhcb_json(**kwargs)
- @track_job
- def op__createcmsjson(self, **kwargs):
- """
- build, merge input json file(s) into final one
- """
- return self.build_cms_json(**kwargs)
- def build_json(self, **kwargs):
- cms_fname = self.get_jsonfile('cms_vofeed', silence=False, **kwargs)
- lhcb_fname = self.get_jsonfile('lhcb_vofeed', silence=False, **kwargs)
- res = merge_dict_data(cms_fname, lhcb_fname)
- fname = self.get_jsonfile('vofeed', **kwargs)
- AGISSerializerJSON.serialize(res, fname)
- def build_lhcb_json(self, **kwargs):
- objs = LHCBVofeedProvider().get_vofeed(verbose=self.get_bool('verbose', kwargs, False),
- cache=os.path.join(settings.TMPDIR, 'lhcbofvofeed.json'))
- ret = {
- 'ce_services': {}, # Computing Elements
- 'se_services': {}, # Storage Elements
- 'cms_data': {
- 'ddmendndpoint': []
- },
- 'lhcb_data': {
- },
- 'sites': {}
- }
- pass
- def build_cms_json(self, **kwargs):
- objs = VofeedProvider().get_vofeed(verbose=self.get_bool('verbose', kwargs, False),
- cache=os.path.join(settings.TMPDIR, 'vofeed.json'))
- ret = {
- 'ce_services': {}, # Computing Elements
- 'se_services': {}, # Storage Elements
- 'cms_data': {
- 'ddmendndpoint': []
- },
- 'lhcb_data': {
- },
- 'sites': {}
- }
- ce_flavours = ['HTCONDOR-CE', 'CREAM-CE', 'GLOBUS', 'ARC-CE']
- se_flavours = ['SRM', 'XROOTD']
- storage_impls = settings.STORAGE_IMPLEMENTATION_PER_SITE
- rcsites = dict([e.id, e.name] for e in Site.objects.all())
- ceobjs = {}
- for obj in Service.objects.filter(type=ServiceType.CE).all():
- ceobjs.setdefault(obj.endpoint, []).append(obj)
- # Fetch the hardcoded sites data.
- master_sites_data = settings.SITE_NAMES_MASTER_DATA
- core_queues = {}
- for queue in Queue.objects.all():
- core_queues.setdefault(queue.ce.name, []).append(queue.name)
- for entry, info in objs.iteritems():
- # Enforce correct rcsites data.
- if info['site'] in master_sites_data:
- info['site'] = master_sites_data[info['site']]
- # In case there is an rcsite named 'None' (wrong data) get the correct value by resolving
- # through the experiment site.
- if info['site'] == 'None':
- info['site'] = CMSSite.objects.filter(name=info['cmssite']).first().site.name
- for service in info['services']:
- new_ce = None
- found = False
- # Create CEs and queues that are missing from CRIC.
- if service['flavour'] in ce_flavours:
- # Try to find pre-existent ces that maybe match the one we are processing.
- if service['hostname'] in ceobjs:
- ces = ceobjs[service['hostname']]
- # The stored ce endpoint may contain port information.
- # Try to match without the port.
- # This part also covers the possibility that a ce was added while parsing the vofeed but it's not in the
- # ceobjs dictionary.
- else:
- ces = Service.objects.filter(endpoint__contains=service['hostname'], type=ServiceType.CE).all()
- # If there are no ces matched we proceed by adding a new ce and the coresponding queue information.
- if not ces:
- print "Creating new %s CE in %s" % (service['flavour'], info['site'])
- new_queue = {
- 'name': service['queue_name']
- }
- new_ce = {
- 'name': '%s-CE-%s-%s' % (info['site'], service['flavour'], service['hostname']),
- 'endpoint': service['hostname'],
- 'type': ServiceType.CE,
- 'flavour': service['flavour'],
- 'site': info['site'],
- 'jobmanager': service['batch_system'],
- 'queue': new_queue
- }
- else:
- # If there is a ce matched we should check if a queue must be added
- for ce in ces:
- # See if the flavour is matching
- if str(ce.flavour) == service['flavour']:
- new_ce = ce.__dict__
- if service['queue_name'] in core_queues.get(ce.name, []):
- found = True
- # If there is no correctly flavoured ce found and there is a GLOBUS ce without queues then this
- # should be changed to the current flavour and the queue must be added there.
- # This happens because GOCDB/OIM Provider uses `GLOBUS` as
- # default for services with unknown flavour
- # FIXME: The above should be fixed and the code in this part should be refactored accordingly.
- if not new_ce:
- for ce in ces:
- if str(ce.flavour) == "GLOBUS" and not core_queues.get(ce.name, []):
- new_ce = ce.__dict__
- new_ce['flavour'] = service['flavour']
- # That solves a corner case when there is a GLOBUS ce with same endpoint but it already has a queue.
- # This means that this ce is indeed GLOBUS and shouldn't be overwritten.
- # In that case we create a new ce.
- if not new_ce:
- new_ce = {
- 'name': '%s-CE-%s-%s' % (info['site'], service['flavour'], service['hostname']),
- 'endpoint': service['hostname'],
- 'type': ServiceType.CE,
- 'flavour': service['flavour'],
- 'site': info['site'],
- 'jobmanager': service['batch_system'],
- 'state': 'ACTIVE',
- 'state_comment': 'Set to active because CMS uses it in vofeed',
- 'queue': {}
- }
- # If the queue is there and the ce is ACTIVE we skip processing it.
- if found and new_ce['state'] == 'ACTIVE':
- continue
- # If the queue is not found in core data it must be added.
- if not found:
- new_ce['queue'] = {
- 'name': service['queue_name']
- }
- if new_ce.get('site_id', ''):
- new_ce['site'] = rcsites[new_ce['site_id']]
- if new_ce['state'] != 'ACTIVE':
- new_ce['state'] = 'ACTIVE'
- new_ce['state_comment'] = 'Set to active because CMS uses it in vofeed'
- ret['ces'][new_ce['name']] = new_ce
- if service['flavour'] in se_flavours:
- # Creating one SE per rc site and assign to it all the available protocols.
- # TODO: We will later go 1-1 and check (by hand) if some ses must be splitted.
- # TODO: e.g. the CERN-PROD SE contains both the castor and eos protocols (it need to be splitted).
- if info['site'] not in ret['ses']:
- se = {
- 'name': '%s_SE_CMS' % info['site'],
- 'is_virtual': True,
- 'type': "SE",
- 'impl': storage_impls.get(info['cmssite'], ''),
- 'protocols': [],
- 'state': 'ACTIVE',
- 'state_comment': 'Set to active as it is tested in CMS VOFeed',
- 'resources': [{
- 'name': 'root',
- }]
- }
- ret['ses'].setdefault(info['site'], []).append(se)
- ddme = {
- 'name': '%s_root' % se['name'],
- 'se': se['name'],
- 'site': info['cmssite'],
- 'resource': 'root'
- }
- ret['ddmendpoints'].setdefault(info['cmssite'], []).append(ddme)
- if info['cmssite'] in settings.DUAL_SE_CMSSITES:
- se = {
- 'name': '%s_SE_CMS_TAPE' % info['site'],
- 'is_virtual': True,
- 'type': "SE",
- 'impl': storage_impls.get(info['cmssite'], ''),
- 'protocols': [],
- 'state': 'ACTIVE',
- 'state_comment': 'Set to active as it is tested in CMS VOFeed',
- 'resources': [
- {'name': 'buffer',},
- {'name': 'tape',}
- ]
- }
- ret['ses'].setdefault(info['site'], []).append(se)
- ddme1 = {
- 'name': '%s_buffer' % se['name'],
- 'se': se['name'],
- 'site': info['cmssite'],
- 'resource': 'buffer'
- }
- ret['ddmendpoints'].setdefault(info['cmssite'], []).append(ddme1)
- ddme2 = {
- 'name': '%s_tape' % se['name'],
- 'se': se['name'],
- 'site': info['cmssite'],
- 'resource': 'tape'
- }
- ret['ddmendpoints'].setdefault(info['cmssite'], []).append(ddme2)
- protocol = {
- 'name': info['site'] + '_' + service['flavour'].lower() + '_' + service['endpoint'],
- 'flavour': service['flavour'].lower(),
- 'endpoint': service['endpoint'],
- 'production_status': service['production_status']
- }
- if protocol not in ret['ses'][info['site']][0]['protocols']:
- ret['ses'][info['site']][0]['protocols'].append(protocol)
- fname = self.get_jsonfile('cms_vofeed', **kwargs)
- AGISSerializerJSON.serialize(ret, fname)
- def process_ses(self, data, rcsites):
- c = ContentLoaderCron()
- for site, ses in data.iteritems():
- for se in ses:
- seobj, is_created = c.register_service2("SE", se, {'site': rcsites[site]},
- attributes=['name', 'is_virtual'])
- for resource in se['resources']:
- resource['service'] = seobj
- c.register_service_resource(resource)
- for protocol in se['protocols']:
- c.register_service_protocol(protocol, {'service': seobj})
- def process_ces(self, data, rcsites):
- c = ContentLoaderCron()
- for name, ce in data.iteritems():
- ceobj, is_valid = c.register_service2('CE', ce, {'site': rcsites[ce['site']]}, attributes=['name', 'endpoint', 'type', 'flavour', 'jobmanager', 'state_comment'], should_override=True)
- if ce.get('queue', None):
- c.register_queue(ceobj, ce['queue'])
- def process_ddmendpoints(self, data, ses, sus):
- c = ContentLoaderCron()
- for site, ddmendpoints in data.iteritems():
- for ddmendpoint in ddmendpoints:
- resources = dict([e.name, e] for e in ses[ddmendpoint['se']].resource_set.all())
- su = sus[ddmendpoint['site']]
- c.register_ddmendpoint(ddmendpoint, {'su': su, 'resource': resources[ddmendpoint['resource']]})
- def process_data(self, data, **kwargs):
- rcsites = dict([e.name, e] for e in Site.objects.all())
- sus = dict([e.site.name, e] for e in StorageUnit.objects.all())
- self.process_ces(data['ces'], rcsites)
- self.process_ses(data['ses'], rcsites)
- ses = dict([e.name, e] for e in Service.objects.filter(type=ServiceType.SE).all())
- # self.process_ddmendpoints(data['ddmendpoints'], ses, sus)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement