Advertisement
Panos512

Untitled

Jun 19th, 2018
221
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 15.60 KB | None | 0 0
  1. """
  2. Vofeed collector cron
  3. :author: Panos Paparrigopoulos
  4. :contact: panos.paparrigopoulos@cern.ch
  5. """
  6. from cric.apps.cms.models import CMSSite, StorageUnit
  7. from cric.apps.core.models import Queue, Service, ServiceType, Site
  8. from cric.apps.cric_crons.crons import ContentLoaderCron
  9. from cric.dataproviders.providers.vofeed_provider import VofeedProvider
  10. from cric.utils.merge import merge_dict_data
  11.  
  12. from django.conf import settings
  13.  
  14. from .base import BaseCron
  15. from .agis_serializer import AGISSerializerJSON    #### FIX ME LATER: DO isolate/reimplement
  16.  
  17. from ..models.job import track_job
  18.  
  19. import json
  20. import os
  21.  
  22.  
  23. class VofeedLoaderCron(BaseCron):
  24.     """
  25.        Cron loads downtimes from GOCDB source
  26.    """
  27.  
  28.     desc = {'createjson': 'create JSON dump with vofeed information collected from cmssst',
  29.             'job': 'Updated DB with glidein configuration entries collected from Github'}
  30.  
  31.  
  32.     @track_job
  33.     def job(self, *args, **kwargs):
  34.         """
  35.            kwargs param: createjson (boolean)
  36.        """
  37.  
  38.         createcmsjson = self.get_bool('createjson', kwargs)
  39.         if createcmsjson: # rebuild input json
  40.             print "INFO: SET createjson=%s .. forced to rebuild json data.." % createjson
  41.             self.build_cms_json(silence=True, **kwargs)
  42.        
  43.         createlhcbjson = self.get_bool('createjson', kwargs)
  44.         if createlhcbjson: # rebuild input json
  45.             print "INFO: SET createjson=%s .. forced to rebuild json data.." % createjson
  46.             self.build_lhcb_json(silence=True, **kwargs)
  47.        
  48.         createjson = self.get_bool('createjson', kwargs)
  49.         if createjson: # rebuild input json
  50.             print "INFO: SET createjson=%s .. forced to rebuild json data.." % createjson
  51.             self.build_json(silence=True, **kwargs)
  52.  
  53.         fname = self.get_jsonfile('vofeed', silence=False, **kwargs)
  54.  
  55.         data = AGISSerializerJSON.unserialize(AGISSerializerJSON.load(fname))
  56.         return self.process_data(data, **kwargs)
  57.  
  58.  
  59.     @track_job
  60.     def op__createjson(self, **kwargs):
  61.         """
  62.            build, merge input json file(s) into final one
  63.        """
  64.         return self.build_json(**kwargs)
  65.  
  66.     @track_job
  67.     def op__createlhcbjson(self, **kwargs):
  68.         """
  69.            build, merge input json file(s) into final one
  70.        """
  71.         return self.build_lhcb_json(**kwargs)
  72.  
  73.     @track_job
  74.     def op__createcmsjson(self, **kwargs):
  75.         """
  76.            build, merge input json file(s) into final one
  77.        """
  78.         return self.build_cms_json(**kwargs)
  79.  
  80.     def build_json(self, **kwargs):
  81.         cms_fname = self.get_jsonfile('cms_vofeed', silence=False, **kwargs)
  82.         lhcb_fname = self.get_jsonfile('lhcb_vofeed', silence=False, **kwargs)
  83.         res = merge_dict_data(cms_fname, lhcb_fname)
  84.         fname = self.get_jsonfile('vofeed', **kwargs)
  85.         AGISSerializerJSON.serialize(res, fname)
  86.  
  87.     def build_lhcb_json(self, **kwargs):
  88.         objs = LHCBVofeedProvider().get_vofeed(verbose=self.get_bool('verbose', kwargs, False),
  89.                                            cache=os.path.join(settings.TMPDIR, 'lhcbofvofeed.json'))
  90.        
  91.         ret = {
  92.             'ce_services': {},  # Computing Elements
  93.             'se_services': {},  # Storage Elements
  94.             'cms_data': {
  95.                     'ddmendndpoint': []
  96.             },
  97.             'lhcb_data': {
  98.  
  99.             },
  100.             'sites': {}
  101.         }
  102.         pass
  103.  
  104.     def build_cms_json(self, **kwargs):
  105.  
  106.         objs = VofeedProvider().get_vofeed(verbose=self.get_bool('verbose', kwargs, False),
  107.                                                    cache=os.path.join(settings.TMPDIR, 'vofeed.json'))
  108.        
  109.  
  110.         ret = {
  111.             'ce_services': {},  # Computing Elements
  112.             'se_services': {},  # Storage Elements
  113.             'cms_data': {
  114.                     'ddmendndpoint': []
  115.             },
  116.             'lhcb_data': {
  117.  
  118.             },
  119.             'sites': {}
  120.         }
  121.  
  122.         ce_flavours = ['HTCONDOR-CE', 'CREAM-CE', 'GLOBUS', 'ARC-CE']
  123.         se_flavours = ['SRM', 'XROOTD']
  124.  
  125.         storage_impls = settings.STORAGE_IMPLEMENTATION_PER_SITE
  126.  
  127.         rcsites = dict([e.id, e.name] for e in Site.objects.all())
  128.  
  129.         ceobjs = {}
  130.         for obj in Service.objects.filter(type=ServiceType.CE).all():
  131.             ceobjs.setdefault(obj.endpoint, []).append(obj)
  132.  
  133.         # Fetch the hardcoded sites data.
  134.         master_sites_data = settings.SITE_NAMES_MASTER_DATA
  135.  
  136.         core_queues = {}
  137.         for queue in Queue.objects.all():
  138.             core_queues.setdefault(queue.ce.name, []).append(queue.name)
  139.  
  140.         for entry, info in objs.iteritems():
  141.  
  142.             # Enforce correct rcsites data.
  143.             if info['site'] in master_sites_data:
  144.                 info['site'] = master_sites_data[info['site']]
  145.  
  146.             # In case there is an rcsite named 'None' (wrong data) get the correct value by resolving
  147.             # through the experiment site.
  148.             if info['site'] == 'None':
  149.                 info['site'] = CMSSite.objects.filter(name=info['cmssite']).first().site.name
  150.  
  151.             for service in info['services']:
  152.                 new_ce = None
  153.                 found = False
  154.  
  155.                 # Create CEs  and queues that are missing from CRIC.
  156.                 if service['flavour'] in ce_flavours:
  157.  
  158.                     # Try to find pre-existent ces that maybe match the one we are processing.
  159.                     if service['hostname'] in ceobjs:
  160.                         ces = ceobjs[service['hostname']]
  161.                     # The stored ce endpoint may contain port information.
  162.                     # Try to match without the port.
  163.                     # This part also covers the possibility that a ce was added while parsing the vofeed but it's not in the
  164.                     # ceobjs dictionary.
  165.                     else:
  166.                         ces = Service.objects.filter(endpoint__contains=service['hostname'], type=ServiceType.CE).all()
  167.  
  168.                     # If there are no ces matched we proceed by adding a new ce and the coresponding queue information.
  169.                     if not ces:
  170.                         print "Creating new %s CE in %s" % (service['flavour'], info['site'])
  171.                         new_queue = {
  172.                             'name': service['queue_name']
  173.                         }
  174.  
  175.                         new_ce = {
  176.                             'name': '%s-CE-%s-%s' % (info['site'], service['flavour'], service['hostname']),
  177.                             'endpoint': service['hostname'],
  178.                             'type': ServiceType.CE,
  179.                             'flavour': service['flavour'],
  180.                             'site': info['site'],
  181.                             'jobmanager': service['batch_system'],
  182.                             'queue': new_queue
  183.                         }
  184.  
  185.                     else:
  186.                         # If there is a ce matched we should check if a queue must be added
  187.                         for ce in ces:
  188.                             # See if the flavour is matching
  189.                             if str(ce.flavour) == service['flavour']:
  190.                                 new_ce = ce.__dict__
  191.  
  192.                                 if service['queue_name'] in core_queues.get(ce.name, []):
  193.                                     found = True
  194.                         # If there is no correctly flavoured ce found and there is a GLOBUS ce  without queues then this
  195.                         # should be changed to the current flavour and the queue must be added there.
  196.                         # This happens because GOCDB/OIM Provider uses `GLOBUS` as
  197.                         # default for services with unknown flavour
  198.                         # FIXME: The above should be fixed and the code in this part should be refactored accordingly.
  199.                         if not new_ce:
  200.                             for ce in ces:
  201.                                 if str(ce.flavour) == "GLOBUS" and not core_queues.get(ce.name, []):
  202.                                     new_ce = ce.__dict__
  203.                                     new_ce['flavour'] = service['flavour']
  204.  
  205.                         # That solves a corner case when there is a GLOBUS ce with same endpoint but it already has a queue.
  206.                         # This means that this ce is indeed GLOBUS and shouldn't be overwritten.
  207.                         # In that case we create a new ce.
  208.                         if not new_ce:
  209.                             new_ce = {
  210.                                 'name': '%s-CE-%s-%s' % (info['site'], service['flavour'], service['hostname']),
  211.                                 'endpoint': service['hostname'],
  212.                                 'type': ServiceType.CE,
  213.                                 'flavour': service['flavour'],
  214.                                 'site': info['site'],
  215.                                 'jobmanager': service['batch_system'],
  216.                                 'state': 'ACTIVE',
  217.                                 'state_comment': 'Set to active because CMS uses it in vofeed',
  218.                                 'queue': {}
  219.                             }
  220.  
  221.                         # If the queue is there and the ce is ACTIVE we skip processing it.
  222.                         if found and new_ce['state'] == 'ACTIVE':
  223.                             continue
  224.  
  225.                         # If the queue is not found in core data it must be added.
  226.                         if not found:
  227.                             new_ce['queue'] = {
  228.                                 'name': service['queue_name']
  229.                             }
  230.  
  231.                         if new_ce.get('site_id', ''):
  232.                             new_ce['site'] = rcsites[new_ce['site_id']]
  233.  
  234.                         if new_ce['state'] != 'ACTIVE':
  235.                             new_ce['state'] = 'ACTIVE'
  236.                             new_ce['state_comment'] = 'Set to active because CMS uses it in vofeed'
  237.  
  238.                     ret['ces'][new_ce['name']] = new_ce
  239.  
  240.                 if service['flavour'] in se_flavours:
  241.  
  242.                     # Creating one SE per rc site and assign to it all the available protocols.
  243.                     # TODO: We will later go 1-1 and check (by hand) if some ses must be splitted.
  244.                     # TODO: e.g. the CERN-PROD SE contains both the castor and eos protocols (it need to be splitted).
  245.                     if info['site'] not in ret['ses']:
  246.                         se = {
  247.                             'name': '%s_SE_CMS' % info['site'],
  248.                             'is_virtual': True,
  249.                             'type': "SE",
  250.                             'impl': storage_impls.get(info['cmssite'], ''),
  251.                             'protocols': [],
  252.                             'state': 'ACTIVE',
  253.                             'state_comment': 'Set to active as it is tested in CMS VOFeed',
  254.                             'resources': [{
  255.                                             'name': 'root',
  256.                                         }]
  257.                         }
  258.  
  259.                         ret['ses'].setdefault(info['site'], []).append(se)
  260.  
  261.                         ddme = {
  262.                             'name': '%s_root' % se['name'],
  263.                             'se': se['name'],
  264.                             'site': info['cmssite'],
  265.                             'resource': 'root'
  266.  
  267.                         }
  268.  
  269.                         ret['ddmendpoints'].setdefault(info['cmssite'], []).append(ddme)
  270.  
  271.  
  272.                         if info['cmssite'] in settings.DUAL_SE_CMSSITES:
  273.                             se = {
  274.                                 'name': '%s_SE_CMS_TAPE' % info['site'],
  275.                                 'is_virtual': True,
  276.                                 'type': "SE",
  277.                                 'impl': storage_impls.get(info['cmssite'], ''),
  278.                                 'protocols': [],
  279.                                 'state': 'ACTIVE',
  280.                                 'state_comment': 'Set to active as it is tested in CMS VOFeed',
  281.                                 'resources': [
  282.                                     {'name': 'buffer',},
  283.                                     {'name': 'tape',}
  284.                                 ]
  285.                             }
  286.                             ret['ses'].setdefault(info['site'], []).append(se)
  287.  
  288.                             ddme1 = {
  289.                                 'name': '%s_buffer' % se['name'],
  290.                                 'se': se['name'],
  291.                                 'site': info['cmssite'],
  292.                                 'resource': 'buffer'
  293.  
  294.                             }
  295.  
  296.                             ret['ddmendpoints'].setdefault(info['cmssite'], []).append(ddme1)
  297.  
  298.                             ddme2 = {
  299.                                 'name': '%s_tape' % se['name'],
  300.                                 'se': se['name'],
  301.                                 'site': info['cmssite'],
  302.                                 'resource': 'tape'
  303.  
  304.                             }
  305.  
  306.                             ret['ddmendpoints'].setdefault(info['cmssite'], []).append(ddme2)
  307.  
  308.                     protocol = {
  309.                         'name': info['site'] + '_' + service['flavour'].lower() + '_' + service['endpoint'],
  310.                         'flavour': service['flavour'].lower(),
  311.                         'endpoint': service['endpoint'],
  312.                         'production_status': service['production_status']
  313.  
  314.                     }
  315.                     if protocol not in ret['ses'][info['site']][0]['protocols']:
  316.                         ret['ses'][info['site']][0]['protocols'].append(protocol)
  317.  
  318.         fname = self.get_jsonfile('cms_vofeed', **kwargs)
  319.         AGISSerializerJSON.serialize(ret, fname)
  320.  
  321.     def process_ses(self, data, rcsites):
  322.         c = ContentLoaderCron()
  323.  
  324.         for site, ses in data.iteritems():
  325.             for se in ses:
  326.                 seobj, is_created = c.register_service2("SE", se, {'site': rcsites[site]},
  327.                                                         attributes=['name', 'is_virtual'])
  328.  
  329.                 for resource in se['resources']:
  330.                     resource['service'] = seobj
  331.                     c.register_service_resource(resource)
  332.  
  333.                 for protocol in se['protocols']:
  334.                     c.register_service_protocol(protocol, {'service': seobj})
  335.  
  336.     def process_ces(self, data, rcsites):
  337.         c = ContentLoaderCron()
  338.  
  339.         for name, ce in data.iteritems():
  340.             ceobj, is_valid = c.register_service2('CE', ce, {'site': rcsites[ce['site']]}, attributes=['name', 'endpoint', 'type', 'flavour', 'jobmanager', 'state_comment'], should_override=True)
  341.             if ce.get('queue', None):
  342.                 c.register_queue(ceobj, ce['queue'])
  343.  
  344.     def process_ddmendpoints(self, data, ses, sus):
  345.         c = ContentLoaderCron()
  346.  
  347.         for site, ddmendpoints in data.iteritems():
  348.             for ddmendpoint in ddmendpoints:
  349.                 resources = dict([e.name, e] for e in ses[ddmendpoint['se']].resource_set.all())
  350.                 su = sus[ddmendpoint['site']]
  351.                 c.register_ddmendpoint(ddmendpoint, {'su': su, 'resource': resources[ddmendpoint['resource']]})
  352.  
  353.     def process_data(self, data, **kwargs):
  354.  
  355.         rcsites = dict([e.name, e] for e in Site.objects.all())
  356.         sus = dict([e.site.name, e] for e in StorageUnit.objects.all())
  357.  
  358.         self.process_ces(data['ces'], rcsites)
  359.         self.process_ses(data['ses'], rcsites)
  360.  
  361.         ses = dict([e.name, e] for e in Service.objects.filter(type=ServiceType.SE).all())
  362.  
  363.         # self.process_ddmendpoints(data['ddmendpoints'], ses, sus)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement