Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import sys
- import signal
- from ncclient import manager
- from lxml import etree
- from kafka import KafkaProducer
- import jxmlease
- import json
- import logging
- import time
- import requests
- class TelemetryIOSXE():
- def __init__(self, host, username='cisco', password='cisco', port=830,
- verbose=False, delete_after=None):
- self.host = host
- self.username = username
- self.password = password
- self.port = port
- if verbose:
- self.logging()
- self.delete_after = delete_after
- self.connect()
- self.kafka_connect()
- def sigint_handler(self, signal, frame):
- self.man.close_session()
- sys.exit(0)
- def callback(self, notif):
- print('----------------------------------------------->>')
- print('Event time : %s' % notif.event_time)
- print('Subscription Id : %d' % notif.subscription_id)
- print('Type : %d' % notif.type)
- print('NETCONF XML Response :')
- print(etree.tostring(notif.datastore_ele, pretty_print=True).decode('UTF-8'))
- print('<<-----------------------------------------------')
- self.kafka_callback(notif)
- def kafka_connect(self):
- self.producer = KafkaProducer(
- bootstrap_servers='localhost:9092',
- value_serializer=lambda v: json.dumps(v).encode('utf-8'))
- def kafka_callback(self, notif):
- def time_converter(timestamp):
- from datetime import datetime
- utc_time = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
- epoch_time = (utc_time - datetime(1970, 1, 1)).total_seconds()
- return epoch_time
- def memory_load(memory, tags_master, metrics):
- tsdb = []
- tags = tags_master.copy()
- #import pdb; pdb.set_trace()
- for metric in memory.keys():
- metrics_copy = metrics.copy()
- if metric in ["free-memory", "used-memory"]:
- metrics_copy['metric'] = metric
- metrics_copy['value'] = memory[metric]
- metrics_copy['tags'] = tags
- tsdb.append(metrics_copy)
- if tsdb:
- return json.dumps(tsdb)
- def cpu_load(cpu, tags_master, metrics):
- tsdb = []
- tags = tags_master.copy()
- for metric in cpu.keys():
- metrics_copy = metrics.copy()
- metrics_copy['metric'] = metric
- if metric == "five-seconds":
- metrics_copy['metric'] = "cpu-util"
- metrics_copy['value'] = cpu[metric]
- metrics_copy['tags'] = tags
- tsdb.append(metrics_copy)
- return json.dumps(tsdb)
- def interface_load(interface, tags_master, metrics):
- tsdb = []
- tags = tags_master.copy()
- if interface['name'] == 'Control Plane':
- interface['name'] = 'Control_Plane'
- tags['interface_name'] = interface['name']
- stats = interface['statistics']
- for metric in stats.keys():
- if not metric == 'discontinuity-time':
- metrics_copy = metrics.copy()
- metrics_copy['metric'] = metric
- metrics_copy['value'] = stats[metric]
- metrics_copy['tags'] = tags
- tsdb.append(metrics_copy)
- return json.dumps(tsdb)
- def tsdb_api_put(data):
- if data:
- host = '127.0.0.1:4242'
- openTsdbUrl = 'http://' + host + '/api/put/details'
- request = requests.post(openTsdbUrl, json=data)
- if request.text:
- print(request.text)
- def write_data_to_db(data):
- #import pdb; pdb.set_trace()
- timestamp = data['notification']['eventTime']
- epoch = time_converter(timestamp)
- content = data['notification']['push-update']
- tags_master = {
- 'NodeID' : 'csr1kv-1',
- 'Subscription' : content['subscription-id']
- }
- metrics = {
- "metric": 'metric',
- "timestamp": 'timestamp',
- "value": 'value',
- "tags": 'tags'
- }
- metrics['timestamp'] = epoch
- content_data = content['datastore-contents-xml']
- if 'interfaces-state' in content_data and content_data['interfaces-state']:
- interfaces = content_data['interfaces-state']
- for interface in interfaces['interface']:
- result = interface_load(interface, tags_master, metrics)
- tsdb_api_put(json.loads(result))
- if 'cpu-usage' in content_data and content_data['cpu-usage']:
- #import pdb; pdb.set_trace()
- cpu = content_data['cpu-usage']['cpu-utilization']
- result = cpu_load(cpu, tags_master, metrics)
- tsdb_api_put(json.loads(result))
- if 'memory-statistics' in content_data and content_data['memory-statistics']:
- memory = content_data['memory-statistics']['memory-statistic']
- result = memory_load(memory, tags_master, metrics)
- tsdb_api_put(json.loads(result))
- data = json.loads(json.dumps(jxmlease.parse(notif.xml)))
- #print(data)
- print("Writing data to database")
- write_data_to_db(data)
- print("Finished writing data to database")
- def errback(self, notif):
- pass
- def unknown_host_cb(self, host, fingerprint):
- return True
- def connect(self):
- self.man = manager.connect(host=self.host,
- port=self.port,
- username=self.username,
- password=self.password,
- allow_agent=False,
- look_for_keys=False,
- hostkey_verify=False,
- unknown_host_cb=self.unknown_host_cb)
- def establish_sub(self, xpath, period=None, dampening_period=None):
- self.sub = self.man.establish_subscription(
- self.callback,
- self.errback,
- xpath=xpath,
- period=period,
- dampening_period=dampening_period)
- print('Subscription Result : %s' % self.sub.subscription_result)
- print('Subscription Id : %d' % self.sub.subscription_id)
- def wait_for(self):
- if self.delete_after:
- time.sleep(self.delete_after)
- r = self.man.delete_subscription(self.sub.subscription_id)
- print('delete subscription result = %s' % r.subscription_result)
- else:
- while True:
- time.sleep(5)
- def single_sub(self, xpath, **args):
- signal.signal(signal.SIGINT, self.sigint_handler)
- self.establish_sub(xpath, **args)
- self.wait_for()
- def multi_sub(self, xpath_list):
- signal.signal(signal.SIGINT, self.sigint_handler)
- for item in xpath_list:
- self.establish_sub(**item)
- self.wait_for()
- def logging(self):
- handler = logging.StreamHandler()
- for l in ['ncclient.transport.session', 'ncclient.operations.rpc']:
- logger = logging.getLogger(l)
- logger.addHandler(handler)
- logger.setLevel(logging.DEBUG)
- if __name__ == '__main__':
- telem = TelemetryIOSXE(host='192.168.1.10', delete_after=300)
- xpath = '/if:interfaces-state/if:interface/if:statistics'
- xpaths_list = [
- {"xpath": '/process-cpu-ios-xe-oper:cpu-usage/process-cpu-ios-xe-oper:cpu-utilization/process-cpu-ios-xe-oper:five-seconds', "period": 100},
- {"xpath": '/if:interfaces-state/if:interface/if:statistics', "period": 100},
- {"xpath": '/memory-statistics/memory-statistic[name="Processor"]', "period": 100}]
- telem.multi_sub(xpaths_list)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement