Advertisement
Guest User

Untitled

a guest
Mar 8th, 2019
143
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.16 KB | None | 0 0
  1. #!/usr/bin/env python
  2. import sys
  3. import signal
  4. from ncclient import manager
  5. from lxml import etree
  6. from kafka import KafkaProducer
  7. import jxmlease
  8. import json
  9. import logging
  10. import time
  11. import requests
  12.  
  13. class TelemetryIOSXE():
  14. def __init__(self, host, username='cisco', password='cisco', port=830,
  15. verbose=False, delete_after=None):
  16. self.host = host
  17. self.username = username
  18. self.password = password
  19. self.port = port
  20. if verbose:
  21. self.logging()
  22. self.delete_after = delete_after
  23. self.connect()
  24. self.kafka_connect()
  25.  
  26. def sigint_handler(self, signal, frame):
  27. self.man.close_session()
  28. sys.exit(0)
  29.  
  30. def callback(self, notif):
  31. print('----------------------------------------------->>')
  32. print('Event time : %s' % notif.event_time)
  33. print('Subscription Id : %d' % notif.subscription_id)
  34. print('Type : %d' % notif.type)
  35. print('NETCONF XML Response :')
  36. print(etree.tostring(notif.datastore_ele, pretty_print=True).decode('UTF-8'))
  37. print('<<-----------------------------------------------')
  38.  
  39. self.kafka_callback(notif)
  40.  
  41.  
  42. def kafka_connect(self):
  43. self.producer = KafkaProducer(
  44. bootstrap_servers='localhost:9092',
  45. value_serializer=lambda v: json.dumps(v).encode('utf-8'))
  46.  
  47. def kafka_callback(self, notif):
  48. def time_converter(timestamp):
  49. from datetime import datetime
  50. utc_time = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
  51. epoch_time = (utc_time - datetime(1970, 1, 1)).total_seconds()
  52. return epoch_time
  53.  
  54. def memory_load(memory, tags_master, metrics):
  55. tsdb = []
  56. tags = tags_master.copy()
  57. #import pdb; pdb.set_trace()
  58. for metric in memory.keys():
  59. metrics_copy = metrics.copy()
  60. if metric in ["free-memory", "used-memory"]:
  61. metrics_copy['metric'] = metric
  62. metrics_copy['value'] = memory[metric]
  63. metrics_copy['tags'] = tags
  64. tsdb.append(metrics_copy)
  65. if tsdb:
  66. return json.dumps(tsdb)
  67.  
  68. def cpu_load(cpu, tags_master, metrics):
  69. tsdb = []
  70. tags = tags_master.copy()
  71. for metric in cpu.keys():
  72. metrics_copy = metrics.copy()
  73. metrics_copy['metric'] = metric
  74. if metric == "five-seconds":
  75. metrics_copy['metric'] = "cpu-util"
  76. metrics_copy['value'] = cpu[metric]
  77. metrics_copy['tags'] = tags
  78. tsdb.append(metrics_copy)
  79. return json.dumps(tsdb)
  80.  
  81.  
  82. def interface_load(interface, tags_master, metrics):
  83. tsdb = []
  84. tags = tags_master.copy()
  85. if interface['name'] == 'Control Plane':
  86. interface['name'] = 'Control_Plane'
  87. tags['interface_name'] = interface['name']
  88. stats = interface['statistics']
  89. for metric in stats.keys():
  90. if not metric == 'discontinuity-time':
  91. metrics_copy = metrics.copy()
  92. metrics_copy['metric'] = metric
  93. metrics_copy['value'] = stats[metric]
  94. metrics_copy['tags'] = tags
  95. tsdb.append(metrics_copy)
  96. return json.dumps(tsdb)
  97.  
  98. def tsdb_api_put(data):
  99. if data:
  100. host = '127.0.0.1:4242'
  101. openTsdbUrl = 'http://' + host + '/api/put/details'
  102. request = requests.post(openTsdbUrl, json=data)
  103. if request.text:
  104. print(request.text)
  105.  
  106. def write_data_to_db(data):
  107. #import pdb; pdb.set_trace()
  108. timestamp = data['notification']['eventTime']
  109. epoch = time_converter(timestamp)
  110. content = data['notification']['push-update']
  111. tags_master = {
  112. 'NodeID' : 'csr1kv-1',
  113. 'Subscription' : content['subscription-id']
  114. }
  115. metrics = {
  116. "metric": 'metric',
  117. "timestamp": 'timestamp',
  118. "value": 'value',
  119. "tags": 'tags'
  120. }
  121. metrics['timestamp'] = epoch
  122. content_data = content['datastore-contents-xml']
  123. if 'interfaces-state' in content_data and content_data['interfaces-state']:
  124. interfaces = content_data['interfaces-state']
  125. for interface in interfaces['interface']:
  126. result = interface_load(interface, tags_master, metrics)
  127. tsdb_api_put(json.loads(result))
  128. if 'cpu-usage' in content_data and content_data['cpu-usage']:
  129. #import pdb; pdb.set_trace()
  130. cpu = content_data['cpu-usage']['cpu-utilization']
  131. result = cpu_load(cpu, tags_master, metrics)
  132. tsdb_api_put(json.loads(result))
  133. if 'memory-statistics' in content_data and content_data['memory-statistics']:
  134. memory = content_data['memory-statistics']['memory-statistic']
  135. result = memory_load(memory, tags_master, metrics)
  136. tsdb_api_put(json.loads(result))
  137.  
  138. data = json.loads(json.dumps(jxmlease.parse(notif.xml)))
  139. #print(data)
  140. print("Writing data to database")
  141. write_data_to_db(data)
  142. print("Finished writing data to database")
  143.  
  144. def errback(self, notif):
  145. pass
  146.  
  147. def unknown_host_cb(self, host, fingerprint):
  148. return True
  149.  
  150. def connect(self):
  151. self.man = manager.connect(host=self.host,
  152. port=self.port,
  153. username=self.username,
  154. password=self.password,
  155. allow_agent=False,
  156. look_for_keys=False,
  157. hostkey_verify=False,
  158. unknown_host_cb=self.unknown_host_cb)
  159.  
  160. def establish_sub(self, xpath, period=None, dampening_period=None):
  161. self.sub = self.man.establish_subscription(
  162. self.callback,
  163. self.errback,
  164. xpath=xpath,
  165. period=period,
  166. dampening_period=dampening_period)
  167. print('Subscription Result : %s' % self.sub.subscription_result)
  168. print('Subscription Id : %d' % self.sub.subscription_id)
  169.  
  170. def wait_for(self):
  171. if self.delete_after:
  172. time.sleep(self.delete_after)
  173. r = self.man.delete_subscription(self.sub.subscription_id)
  174. print('delete subscription result = %s' % r.subscription_result)
  175. else:
  176. while True:
  177. time.sleep(5)
  178.  
  179. def single_sub(self, xpath, **args):
  180. signal.signal(signal.SIGINT, self.sigint_handler)
  181. self.establish_sub(xpath, **args)
  182. self.wait_for()
  183.  
  184. def multi_sub(self, xpath_list):
  185. signal.signal(signal.SIGINT, self.sigint_handler)
  186. for item in xpath_list:
  187. self.establish_sub(**item)
  188. self.wait_for()
  189.  
  190. def logging(self):
  191. handler = logging.StreamHandler()
  192. for l in ['ncclient.transport.session', 'ncclient.operations.rpc']:
  193. logger = logging.getLogger(l)
  194. logger.addHandler(handler)
  195. logger.setLevel(logging.DEBUG)
  196.  
  197. if __name__ == '__main__':
  198. telem = TelemetryIOSXE(host='192.168.1.10', delete_after=300)
  199. xpath = '/if:interfaces-state/if:interface/if:statistics'
  200. xpaths_list = [
  201. {"xpath": '/process-cpu-ios-xe-oper:cpu-usage/process-cpu-ios-xe-oper:cpu-utilization/process-cpu-ios-xe-oper:five-seconds', "period": 100},
  202. {"xpath": '/if:interfaces-state/if:interface/if:statistics', "period": 100},
  203. {"xpath": '/memory-statistics/memory-statistic[name="Processor"]', "period": 100}]
  204. telem.multi_sub(xpaths_list)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement