Advertisement
Guest User

Untitled

a guest
Jun 27th, 2019
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 37.41 KB | None | 0 0
  1. from ryu.base import app_manager
  2. from ryu.ofproto import ofproto_v1_3
  3. from ryu.controller import ofp_event
  4. from ryu.controller.handler import set_ev_cls
  5. from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
  6. from ryu.topology import event
  7. from ryu.topology.switches import LLDPPacket
  8. from ryu.lib.packet import ipv4, arp
  9. from ryu.lib.packet import packet
  10. from ryu.lib.packet import lldp, ether_types
  11. from ryu.ofproto.ether import ETH_TYPE_LLDP, ETH_TYPE_ARP, ETH_TYPE_IPV6, ETH_TYPE_IP
  12. from ryu.lib.packet import packet, ethernet
  13. import array
  14. from ryu.topology.api import get_switch, get_link, get_host, get_all_switch, get_all_link, get_all_host
  15. import copy
  16. from ryu.controller import dpset
  17. from elasticsearch import Elasticsearch, TransportError, NotFoundError
  18. from ryu.lib import stplib
  19. import sys
  20. from ryu.cmd import manager
  21. import socket
  22. import threading
  23. from threading import Thread
  24. from ryu.controller.dpset import DPSet
  25. import logging.config
  26. import Queue
  27. import requests
  28. import thread
  29. import rabbitpy
  30. import datetime
  31. import uuid
  32. import json
  33. import logging
  34. import time
  35. import datetime
  36. from ryu.lib import hub
  37. import networkx as nx
  38. from operator import attrgetter
  39.  
  40. logging.basicConfig(level=logging.DEBUG,
  41. format='(%(threadName)-9s) %(message)s',)
  42.  
  43. HTTP_HOST = 'localhost'
  44. HTTP_PORT = 8081
  45. EXCHANGE = 'threading_example'
  46. my_ip = []
  47.  
  48. traffic_all_all = []
  49. traffic_all_single = []
  50. check_first = False
  51. all_switch_checked = False
  52. checked_switches = []
  53. checked_switches_ports = []
  54. link_ports_all = []
  55. b = []
  56. time_sleep=20
  57. curr_time =0
  58. kkk = 0
  59.  
  60. links=[]
  61. links_luk=[]
  62.  
  63. tx_sum = 0
  64. dp_tables = []
  65. bool_send_statistic = False
  66.  
  67. stats_prepared = False
  68.  
  69. if __name__ == "__main__": # Stuff to set additional command line options
  70. from ryu import cfg
  71. CONF = cfg.CONF
  72. CONF.register_cli_opts([
  73. cfg.IntOpt('d', min=1, max=5),
  74. cfg.IntOpt('p', min=1000, max=9000)
  75. ])
  76.  
  77.  
  78. class ProjectController(app_manager.RyuApp):
  79. OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
  80.  
  81. _CONTEXTS = {'dpset': DPSet}
  82.  
  83. def __init__(self, *args, **kwargs):
  84. super(ProjectController, self).__init__(*args, **kwargs)
  85. super(ProjectController, self).__init__(*args, **kwargs)
  86. self.mac_to_port = {}
  87. self.topology_api_app = self
  88. self.net = nx.DiGraph()
  89. self.nodes = {}
  90. self.links = {}
  91. self.links_with_traffic = {}
  92. self.links_short = {}
  93. self.no_of_nodes = 0
  94. self.no_of_links = 0
  95. self.sw1 = 0
  96. self.sw2 = 0
  97. self.link_ports_all = {}
  98. self.received_stats = []
  99. self.time_check = 10
  100. self.check_request = []
  101. self.inquiry = 0
  102. self.first_iteration = True
  103. self.datapaths = {}
  104. self.time_once=False
  105.  
  106. self.domain_number = self.CONF.d
  107. self.port = self.CONF.p
  108. self.dpset = kwargs['dpset']
  109. self.q = Queue.Queue()
  110. self.topology = TopologyStructure(self.domain_number, self.dpset)
  111. listener = MQListener(self.domain_number, self.q)
  112. listener.start()
  113. self.monitor_thread = hub.spawn(self.get_border_paths)
  114. self.monitor_thread2 = hub.spawn(self._monitor,bool_send_statistic)
  115.  
  116. @set_ev_cls(ofp_event.EventOFPStateChange,
  117. [MAIN_DISPATCHER, DEAD_DISPATCHER])
  118. def _state_change_handler(self, ev):
  119. datapath = ev.datapath
  120. if ev.state == MAIN_DISPATCHER:
  121. if datapath.id not in self.datapaths:
  122. self.logger.debug('register datapath: %016x', datapath.id)
  123. self.datapaths[datapath.id] = datapath
  124. elif ev.state == DEAD_DISPATCHER:
  125. if datapath.id in self.datapaths:
  126. self.logger.debug('unregister datapath: %016x', datapath.id)
  127. del self.datapaths[datapath.id]
  128.  
  129. @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
  130. def _switch_features_handler(self, ev):
  131. # print "switch_features_handler is called"
  132. datapath = ev.msg.datapath
  133. ofproto = datapath.ofproto
  134. parser = datapath.ofproto_parser
  135. match = parser.OFPMatch()
  136. actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, ofproto.OFPCML_NO_BUFFER)]
  137. self.add_flow(datapath, 0, match, actions)
  138.  
  139. @staticmethod
  140. def add_flow(datapath, priority, match, actions, buffer_id=None):
  141. ofproto = datapath.ofproto
  142. parser = datapath.ofproto_parser
  143. inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)]
  144.  
  145. if buffer_id:
  146. mod = parser.OFPFlowMod(datapath=datapath, buffer_id=buffer_id,
  147. priority=priority, match=match,
  148. instructions=inst)
  149. else:
  150. mod = parser.OFPFlowMod(datapath=datapath, priority=priority,
  151. match=match, instructions=inst)
  152. datapath.send_msg(mod)
  153.  
  154. def delete_flow(self, datapath):
  155. ofproto = datapath.ofproto
  156. parser = datapath.ofproto_parser
  157.  
  158. for dst in self.mac_to_port[datapath.id].keys():
  159. match = parser.OFPMatch(eth_dst=dst)
  160. mod = parser.OFPFlowMod(
  161. datapath, command=ofproto.OFPFC_DELETE,
  162. out_port=ofproto.OFPP_ANY, out_group=ofproto.OFPG_ANY,
  163. priority=1, match=match)
  164. datapath.send_msg(mod)
  165.  
  166. # @set_ev_cls(event.EventSwitchEnter)
  167. # def _switch_enter_handler(self, ev):
  168. # # self.domain_number_allocation()
  169. # # self.topology.domain_number_allocation()
  170. # print "switch_entered_handler is called"
  171. # # print ev.switch
  172. # self.topology.raw_switches = copy.copy(get_switch(self, None))
  173. # self.topology.raw_links = copy.copy(get_link(self, None))
  174. # self.topology.convert_raw_links_to_list()
  175.  
  176. """
  177. The change notification event (stplib.EventTopologyChange) of the network topology is received and the learned
  178. MAC address and registered flow entry are initialized.
  179. """
  180.  
  181. @set_ev_cls(stplib.EventTopologyChange, MAIN_DISPATCHER)
  182. def _topology_change_handler(self, ev):
  183. print "topology_change_handler_is_called"
  184.  
  185. def get_topology_data1(self):
  186. # Call get_switch() to get the list of objects Switch.
  187. self.topology.raw_switches = copy.copy(get_all_switch(self))
  188. # Call get_link() to get the list of objects Link.
  189. self.topology.raw_links = copy.copy(get_all_link(self))
  190. self.topology.convert_raw_links_to_list()
  191. self.topology.convert_raw_switches_to_list()
  192. # self.topology.print_links("get_topology_data")
  193. # self.topology.print_switches("get_topology_data")
  194.  
  195. # @set_ev_cls(event.EventLinkAdd)
  196. # def link_add(self, ev):
  197. # print "link_add_handler_is_called"
  198.  
  199. @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
  200. def _packet_in_handler(self, ev):
  201. global curr_time
  202. if ev.msg.msg_len < ev.msg.total_len:
  203. self.logger.debug("packet truncated: only %s of %s bytes",
  204. ev.msg.msg_len, ev.msg.total_len)
  205.  
  206. if self.time_once == False:
  207. curr_time= int(round(time.time()))
  208. self.time_once=True
  209. msg = ev.msg
  210. datapath = msg.datapath
  211. in_port = msg.match['in_port']
  212. dpid = datapath.id
  213. # print "*****"
  214. # print datapath
  215. # print "*****"
  216. # print dpid
  217. # print "*****"
  218. # #print list(datapath)
  219. # print "*****"
  220. eth, pkt_type, pkt_data = ethernet.ethernet.parser(msg.data)
  221. dst = eth.dst
  222. src = eth.src
  223. pkt = packet.Packet(msg.data)
  224. dst_border_node = 0
  225. global time_sleep
  226.  
  227. if eth.ethertype == ETH_TYPE_IPV6:
  228. # Packets with Destination MAC prefix: 33:33:xx:xx:xx:xx are IPv6 multicast packets
  229. return
  230. elif eth.ethertype == ETH_TYPE_LLDP:
  231. if self.topology.check_if_border_node(msg) is True:
  232. self.topology.add_border_node(msg)
  233. if self.topology.elasticsearch.check_if_domains_assigned():
  234. self.topology.elasticsearch.assign_domain(self.topology.border_nodes)
  235. return
  236. elif eth.ethertype == ETH_TYPE_ARP:
  237. print "ARP"
  238. self.topology.raw_hosts = copy.copy(get_host(self, None))
  239. check_host = self.topology.check_if_host()
  240. if check_host[0] is True:
  241. self.topology.add_host(check_host[1])
  242. arp_packet = pkt.get_protocol(arp.arp)
  243. # src_ip = arp_packet.src_ip
  244. dst_ip = arp_packet.dst_ip
  245. src_ip = arp_packet.src_ip
  246. self.topology.dst_src_list.extend([dst_ip, src_ip])
  247. # n=0
  248. # found = False
  249. # while found is False:
  250. # if dst_ip != self.topology.dst_src_list[n][0]:
  251. # if src_ip != self.topology.dst_src_list[n][1]:
  252. # if dst_ip in my_ip:
  253. # print "IP in my domain, just add flows"
  254. # else
  255.  
  256. if dst_ip not in self.topology.dst_ip_list:
  257. if dst_ip in my_ip:
  258. print "IP in my domain, just add flows"
  259.  
  260. else:
  261. self.topology.dst_ip_list.append(dst_ip)
  262. self.send_dst_ip(dst_ip, dpid)
  263.  
  264. # dst_border_node = self.get_path(dst_ip)
  265. # print dst_border_node
  266. pass
  267. elif eth.ethertype == ETH_TYPE_IP:
  268. ipv4_packet = pkt.get_protocol(ipv4.ipv4)
  269. # src_ip = ipv4_packet.src
  270. dst_ip = ipv4_packet.dst
  271. pass
  272.  
  273. self.get_topology_data1()
  274.  
  275. # self.logger.info("\tpacket in dpid: %s src: %s dst: %s in port: %s %s", dpid, src, dst, in_port, pkt_type)
  276.  
  277. def _monitor(self):
  278. global stats_prepared, bool_send_statistic
  279. while True:
  280. if bool_send_statistic==True:
  281. for dp in self.datapaths.values():
  282. #print "staty z hubow"
  283. self._request_stats2(dp)
  284. #if stats_prepared is True:
  285. #Stats.prepareStats(self.link_ports_all, sw1, sw2)
  286. hub.sleep(time_sleep)
  287.  
  288. def _request_stats2(self, datapath):
  289. print "wchdze ze stats2"
  290. self.logger.debug('send stats request: %016x', datapath.id)
  291. ofproto = datapath.ofproto
  292. parser = datapath.ofproto_parser
  293. #req = parser.OFPFlowStatsRequest(datapath)
  294. #datapath.send_msg(req)
  295. req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
  296. datapath.send_msg(req)
  297.  
  298. def send_dst_ip(self, dst_ip, dpid):
  299. print "Sending dst_ip"
  300. with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
  301. with conn.channel() as channel:
  302. exchange = rabbitpy.Exchange(channel, 'my_exchange')
  303. exchange.declare()
  304. # message = rabbitpy.Message(channel,
  305. # message,
  306. # {'content_type': 'text/plain',
  307. # 'delivery_mode': 1,
  308. # 'message_type': 'Register',
  309. # 'timestamp': datetime.datetime.now(),
  310. # 'message_id': uuid.uuid4()})
  311.  
  312. cmdDict = {
  313. 'id': self.domain_number,
  314. 'dpid': dpid,
  315. 'ip' : dst_ip,
  316. 'name': "some_dummy_command"
  317. }
  318. cmdstr = json.dumps(cmdDict)
  319. message = rabbitpy.Message(channel, cmdstr)
  320. message.publish(exchange, routing_key='Destination')
  321.  
  322. def get_border_paths(self):
  323. while True:
  324. if self.q.qsize() != 0:
  325. message = self.q.get()
  326. self.compute_my_paths(message)
  327. hub.sleep(5)
  328.  
  329. def compute_my_paths(self, message):
  330. border_paths = message['Border_Paths']
  331. message_id = message['Message_id']
  332. self.run(border_paths)
  333. global b
  334. b =message_id
  335. # for border_path in border_paths:
  336. # print "Funkcja Michala dla %s" % border_path
  337. # paths.append([1, 2, 1, 100, 100])
  338. # paths.append([1, 3, 1, 200, 200])
  339. # self.send_paths(paths, message_id)
  340.  
  341. def send_pathsd(self, paths):
  342. print "Sending my paths"
  343. with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
  344. with conn.channel() as channel:
  345. exchange = rabbitpy.Exchange(channel, 'my_exchange')
  346. exchange.declare()
  347.  
  348. message_id = b
  349.  
  350. cmdDict = {
  351. 'id': self.domain_number,
  352. 'paths': paths,
  353. 'name': "some_dummy_command",
  354. 'message_id': message_id
  355. }
  356. cmdstr = json.dumps(cmdDict)
  357. message = rabbitpy.Message(channel, cmdstr)
  358. message.publish(exchange, routing_key='Paths')
  359.  
  360. @set_ev_cls(event.EventSwitchLeave, [MAIN_DISPATCHER, CONFIG_DISPATCHER, DEAD_DISPATCHER])
  361. def _handler_switch_leave(self, ev):
  362. self.logger.info("Not tracking Switches, switch leaved.")
  363.  
  364. @set_ev_cls(dpset.EventPortModify, MAIN_DISPATCHER)
  365. def _port_modify_handler(self, ev):
  366. print "port_modify_handler_is_called"
  367.  
  368. def run(self, check_request):
  369. global curr_time
  370. self.check_request = check_request
  371. for element in check_request:
  372. sw1 = element[0]
  373. sw2 = element[1]
  374. self.checkPaths(sw1, sw2)
  375. self._request_stats(dp_tables)
  376. #curr_time = datetime.datetime.now()
  377.  
  378.  
  379. def checkPaths(self, sw1, sw2):
  380. self.dupa()
  381. self.inquiry += 1
  382. global kkk
  383. print "Stats from switch " + str(sw1) + " and " + str(sw2)
  384. pathA = nx.all_simple_paths(self.net, sw1, sw2)
  385. print "All available paths between switch " + str(sw1) + " and " + str(sw2) + ":"
  386. listA = list(pathA) # lisha wszystkich sciezek jakie sa dla danego zapytania
  387. print listA
  388. new_lista = []
  389. lista_hopow = []
  390. print "a czy sa tu linki"
  391. for wpis in links_luk:
  392. print wpis
  393.  
  394. nnn=0
  395. m=0
  396. lista_dla_lukasza=[]
  397. nr_part=1
  398. for part in listA:
  399. while nnn + 1 < len(part):
  400. if part[nnn] == links_luk[m][0] and part[nnn + 1] == links_luk[m][1]:
  401. lista_dla_lukasza.append(
  402. [{'path_number: ': nr_part}, part[nnn], links_luk[m][2], part[nnn + 1], links_luk[m][3]])
  403. nnn += 1
  404. m=0
  405. else:
  406. m+=1
  407. nnn=0
  408. nr_part +=1
  409.  
  410. print "co dokladnie na koniec w liscie"
  411. for wpis in lista_dla_lukasza:
  412. print wpis
  413.  
  414. print "Choosing the paths with the least number of hops"
  415. for elem in listA:
  416. liczb_hop = (len(elem) - 1)
  417. print "Path: " + str(elem) + "- number of hops: " + str(liczb_hop)
  418. if liczb_hop not in lista_hopow:
  419. lista_hopow.append(liczb_hop)
  420. lista_hopow.sort()
  421. print "Check number of hops"
  422. print lista_hopow
  423. nr = 0
  424. nr_intr_value = 2
  425. if len(lista_hopow)==1:
  426. for elem in listA:
  427. new_lista.append(elem)
  428. else:
  429. while nr < nr_intr_value:
  430. for elem in listA:
  431. if (len(elem) - 1) == lista_hopow[nr]:
  432. new_lista.append(elem)
  433. nr += 1
  434.  
  435. print "Chosen paths: "
  436. print new_lista
  437. #kkk = 0
  438.  
  439. link_ports_single = []
  440.  
  441.  
  442. for one_path in new_lista:
  443. for k in self.links:
  444. n = 0
  445. while n < (len(one_path) - 1):
  446. if k['src: '] == one_path[n] and k['dst: '] == one_path[n + 1]:
  447.  
  448. if k not in link_ports_single:
  449. link_ports_single.append(k)
  450. else:
  451. n += 1
  452. continue
  453. n += 1
  454.  
  455. for ii in link_ports_single:
  456. s = copy.deepcopy(ii)
  457. s.update({'path: ': kkk})
  458. s.update({'inquiry: ': self.inquiry})
  459. link_ports_all.append(s)
  460. kkk += 1
  461. link_ports_single = []
  462.  
  463. # print "Connections in all interesting paths"
  464. for i in link_ports_all:
  465. i.update({'tx_bytes: ': 0})
  466. # print i
  467.  
  468. global dp_tables
  469. while len(dp_tables) < (len(new_lista[0]) - 1):
  470. # print len(dp_tables)
  471. # print (len(new_lista[0]) - 1)
  472. for dp in self.datapaths.values():
  473. dp_tables.append(dp)
  474. # print "sprawdzenie LUK"
  475. # #print self.datapaths.id
  476. # print dp_tables
  477.  
  478. #print "jakie dp" +str(dp)
  479. #print "ile lacznie dp"+str(len(dp_tables))
  480. return dp_tables
  481. # def req2(self):
  482. # print "znow w glownej"
  483. # self._request_stats(dp_tables)
  484.  
  485. def _request_stats(self, dp_tables):
  486. print "kolejny raz w requeststats11111111"
  487. for i in dp_tables:
  488. parser = i.ofproto_parser
  489. # req = parser.OFPFlowStatsRequest(i)
  490. # i.send_msg(req)
  491. req = parser.OFPPortStatsRequest(i)
  492. i.send_msg(req)
  493.  
  494. @set_ev_cls(event.EventSwitchEnter)
  495. def get_topology_data(self, ev):
  496. self.topology.raw_switches = copy.copy(get_switch(self, None))
  497. self.topology.raw_links = copy.copy(get_link(self, None))
  498. self.topology.convert_raw_links_to_list()
  499. switch_list = get_switch(self.topology_api_app, None)
  500. switches = [switch.dp.id for switch in switch_list]
  501. # print switches
  502. self.net.add_nodes_from(switches)
  503. traffic = 0
  504. links_list = get_link(self.topology_api_app, None)
  505.  
  506. links_my = [({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no}) for link in
  507. links_list]
  508. links = [(link.src.dpid, link.dst.dpid, {'port': link.src.port_no}) for link in links_list]
  509. links_with_traffic = [
  510. ({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no, 'traffic:': traffic}) for
  511. link in links_list]
  512. links_short = [(link.src.dpid, link.src.port_no) for link in links_list]
  513. self.links = links_my
  514. self.links_with_traffic = links_with_traffic
  515. self.links_short = links_short
  516.  
  517. self.net.add_edges_from(links)
  518. links = [(link.dst.dpid, link.src.dpid, {'port': link.dst.port_no}) for link in links_list]
  519.  
  520. self.net.add_edges_from(links)
  521. # print links
  522.  
  523. def dupa(self):
  524. global links
  525. global links_luk
  526. self.topology.raw_switches = copy.copy(get_switch(self, None))
  527. self.topology.raw_links = copy.copy(get_link(self, None))
  528. self.topology.convert_raw_links_to_list()
  529. switch_list = get_switch(self.topology_api_app, None)
  530. switches = [switch.dp.id for switch in switch_list]
  531. # print switches
  532. self.net.add_nodes_from(switches)
  533. traffic = 0
  534. links_list = get_link(self.topology_api_app, None)
  535. links_my = [({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no}) for link in
  536. links_list]
  537. links = [(link.src.dpid, link.dst.dpid, {'port_src': link.src.port_no}) for link in links_list]
  538. links_luk = [(link.src.dpid, link.dst.dpid, {'port_src': link.src.port_no}, {'port_dst': link.dst.port_no}) for link
  539. in links_list]
  540. print "#############################################################"
  541. print links_luk
  542. links_with_traffic = [
  543. ({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no, 'traffic:': traffic}) for
  544. link in links_list]
  545. links_short = [(link.src.dpid, link.src.port_no) for link in links_list]
  546. self.links = links_my
  547. self.links_with_traffic = links_with_traffic
  548. self.links_short = links_short
  549.  
  550. self.net.add_edges_from(links, weight=0)
  551. links = [(link.dst.dpid, link.src.dpid, {'port': link.dst.port_no}) for link in links_list]
  552. print "##########"
  553. print links
  554. self.net.add_edges_from(links)
  555. print "1"
  556. print self.net.edges()
  557. print "2"
  558. print self.net.nodes()
  559.  
  560.  
  561. print "5"
  562. #print self.net[3][2]['weight']
  563.  
  564. # print links
  565.  
  566. # @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
  567. # def _flow_stats_reply_handler(self, ev):
  568. # body = ev.msg.body
  569. # sw_id=ev.msg.datapath.id
  570. # src = ev.msg.datapath
  571.  
  572. @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
  573. def _port_stats_reply_handler(self, ev):
  574.  
  575. body = ev.msg.body
  576. sw_id = ev.msg.datapath.id
  577. # print "test Lukk2"
  578. # listLuk=[]
  579. # listLuk.append(ev.msg.datapath)
  580. # print listLuk
  581. Stats(self.domain_number, self.net).checkStats2(body, sw_id, self.check_request)
  582.  
  583.  
  584. class TopologyStructure:
  585.  
  586. def __init__(self, domain_number, dpset):
  587. global links
  588. self.dpset = dpset
  589. self.elasticsearch = ElasticSearchConnector(domain_number)
  590. self.raw_switches = []
  591. self.switches = []
  592. self.raw_links = []
  593. self.links = links
  594. self.raw_hosts = []
  595. self.hosts = []
  596. self.border_nodes = []
  597. self.dst_ip_list = []
  598. self.dst_src_list = []
  599.  
  600. def print_links(self, func_str=None):
  601. # Convert the raw link to list so that it is printed easily
  602. print(" \t" + str(func_str) + ": Current Links:")
  603. for l in self.links:
  604. print (" \t\t" + str(l))
  605.  
  606. def print_switches(self, func_str=None):
  607. print(" \t" + str(func_str) + ": Current Switches:")
  608. for s in self.raw_switches:
  609. print (" \t\t" + str(s))
  610.  
  611. def count_switches(self):
  612. return len(self.raw_switches)
  613.  
  614. def convert_raw_links_to_list(self):
  615. # Build a list with all the links [((srcNode,port), (dstNode, port))].
  616. # The list is easier for printing.
  617. self.links = [((link.src.dpid, link.src.port_no),
  618. (link.dst.dpid, link.dst.port_no))
  619. for link in self.raw_links]
  620. # self.links = [[link.src.dpid, link.src.port_no] for link in self.links]
  621.  
  622. def convert_raw_switches_to_list(self):
  623. # Build a list with all the switches ([switches])
  624. self.switches = [switch.dp.id for switch in self.raw_switches]
  625.  
  626. def bring_up_link(self, link):
  627. self.links.append(link)
  628.  
  629. def bring_up_switch(self, switch):
  630. self.raw_switches.append(switch)
  631.  
  632. """
  633. Check if a link with specific nodes exists.
  634. """
  635.  
  636. def check_link(self, src_dpid, src_port, dst_dpid, dst_port):
  637. for link in self.links:
  638. if ((src_dpid, src_port), (dst_dpid, dst_port)) == (
  639. (link.src.dpid, link.src.port_no), (link.dst.dpid, link.dst.port_no)):
  640. return True
  641. return False
  642.  
  643. def check_if_border_node(self, msg):
  644. msg = msg
  645. lldp_src_dpid, lldp_src_port_no = self._parse_lldp_packet(msg)
  646. known_switches_dpid = [switch.dp.id for switch in self.raw_switches]
  647. if lldp_src_dpid not in known_switches_dpid:
  648. dpid = msg.datapath.id
  649. port_no = msg.match['in_port']
  650. # Check if the port is occupied - if so, it is not a border node
  651. border_node_port = [dpid, port_no]
  652. if border_node_port not in self.links:
  653. return True
  654. return False
  655.  
  656. @staticmethod
  657. def _parse_lldp_packet(msg):
  658. src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
  659. return src_dpid, src_port_no
  660.  
  661. def add_border_node(self, msg):
  662. dpid = msg.datapath.id
  663. port_no = msg.match['in_port']
  664. pkt = packet.Packet(msg.data)
  665. eth = pkt.get_protocol(ethernet.ethernet)
  666. src_mac = eth.src
  667. lldp_src_dpid, lldp_src_port_no = self._parse_lldp_packet(msg)
  668. mac = self._get_hwaddr(dpid, port_no)
  669. border_node = [lldp_src_dpid, lldp_src_port_no, src_mac, dpid, port_no, mac]
  670. if border_node not in self.border_nodes:
  671. self.border_nodes.append(border_node)
  672. self.elasticsearch.es_put_border_nodes(lldp_src_dpid, lldp_src_port_no, src_mac, dpid, port_no, mac)
  673.  
  674. def check_if_host(self):
  675. for host in self.raw_hosts:
  676. host_parameters = [host.port.dpid, host.port.port_no, host.mac]
  677. if not self._check_hosts_in_borders(host_parameters, self.border_nodes):
  678. return True, host
  679. return False
  680.  
  681. def add_host(self, host):
  682. if host.ipv4:
  683. host_parameters = [host.port.dpid, host.port.port_no, host.mac, host.ipv4]
  684. if host_parameters not in self.hosts:
  685. my_ip.append(host.ipv4)
  686. self.hosts.append(host_parameters)
  687. self.elasticsearch.es_put_hosts(host.port.dpid, host.port.port_no, host.mac, host.ipv4)
  688. # if self.hosts and self.border_nodes:
  689. # self.delete_borders_from_hosts(self.hosts, self.border_nodes)
  690.  
  691. @staticmethod
  692. def _check_hosts_in_borders(hosts, borders):
  693. for x in borders:
  694. y = set(hosts).issubset(x)
  695. if y is True:
  696. return True
  697. return False
  698.  
  699. @staticmethod
  700. def _delete_borders_from_hosts(hosts, borders):
  701. for host in hosts:
  702. for border in borders:
  703. y = set(host).issubset(border)
  704. if y is True:
  705. hosts.remove(host)
  706.  
  707. def _get_hwaddr(self, dpid, port_no):
  708. return self.dpset.get_port(dpid, port_no).hw_addr
  709.  
  710.  
  711. class ElasticSearchConnector:
  712.  
  713. def __init__(self, domain_number):
  714. self.es = Elasticsearch([{'host': 'localhost', 'port': 9200}], maxsize=25)
  715. es_log = logging.getLogger("elasticsearch")
  716. es_log.setLevel(logging.CRITICAL)
  717. self.domain_number = domain_number
  718.  
  719. def es_put_border_nodes(self, src_dpid, src_port_no, src_mac, dst_dpid, dst_port_no, dst_mac):
  720.  
  721. e = {
  722. "domain": self.domain_number,
  723. "src_domain": 0,
  724. "src_dpid": src_dpid,
  725. "src_dpid": src_dpid,
  726. "src_port": src_port_no,
  727. "src_mac": src_mac,
  728. "domain_dpid": dst_dpid,
  729. "domain_port": dst_port_no,
  730. "domain_mac": dst_mac
  731. }
  732. self.es.index(index='border_nodes', doc_type='border_node', body=e)
  733.  
  734. def es_put_hosts(self, border_dpid, border_port_no, host_mac, host_ip):
  735. e = {
  736. "domain": self.domain_number,
  737. "border_dpid": border_dpid,
  738. "border_port": border_port_no,
  739. "host_mac": host_mac,
  740. "host_ip": host_ip
  741. }
  742. self.es.index(index='hosts', doc_type='host', body=e)
  743.  
  744. def check_if_domains_assigned(self):
  745. res = self.es.search(index="border_nodes", doc_type="border_node", body={"query": {"match_phrase": {"src_domain": 0}}})
  746. if res['hits']['total'] > 0:
  747. return True
  748. else:
  749. return False
  750.  
  751. def assign_domain(self, border_nodes):
  752.  
  753. for border in border_nodes:
  754. mac = border[5]
  755. q = {
  756. "query": {
  757. "match_phrase": {
  758. "src_mac": mac
  759. }
  760. },
  761. "script": {
  762. "source": "ctx._source.src_domain = params.num",
  763. "params": {
  764. "num": self.domain_number
  765. },
  766. "lang": "painless"
  767. }
  768. }
  769.  
  770. try:
  771. self.es.indices.refresh(index="border_nodes")
  772. self.es.update_by_query(body=q, doc_type='border_node', index='border_nodes')
  773. except TransportError as e:
  774. # print(e.info)
  775. pass
  776.  
  777.  
  778. class MQListener(threading.Thread):
  779.  
  780. def __init__(self, domain_number, q):
  781. super(MQListener, self).__init__()
  782. self.domain_number = domain_number
  783. self.q = q or Queue.Queue()
  784. self.queue = None
  785. self.conn = None
  786. self.channel = None
  787. self.exchange = None
  788. self.queue = None
  789. self.running = False
  790.  
  791. def run_listener(self):
  792. with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as self.conn:
  793. # Open the channel to communicate with RabbitMQ
  794. with self.conn.channel() as self.channel:
  795. self.exchange = rabbitpy.TopicExchange(self.channel, 'response_exchange')
  796. self.exchange.declare()
  797.  
  798. queue_name = "queue%s" %self.domain_number
  799. self.queue = rabbitpy.Queue(self.channel, name=queue_name)
  800. self.queue.declare()
  801.  
  802. binding_key = "%s.*" % self.domain_number
  803. # print binding_key
  804. self.queue.bind(self.exchange, routing_key=binding_key)
  805.  
  806. while 1:
  807. try:
  808. # Consume the message
  809. for message in self.queue:
  810. if message.routing_key == ("%s.PathSend" % self.domain_number):
  811. cmdDict = json.loads(message.body)
  812. dst_border_nodes = cmdDict['dst_border_nodes']
  813. message_id = cmdDict['message_id']
  814. print "Got path request: ", dst_border_nodes
  815. message.ack()
  816. self.q.put({'Border_Paths': dst_border_nodes, 'Message_id': str(message_id)})
  817. elif message.routing_key == ("%s.AddFlow" % self.domain_number):
  818. print "Add flow"
  819. else:
  820. print message.body
  821.  
  822. except rabbitpy.exceptions.NotConsumingError as e:
  823. continue
  824.  
  825. def start(self):
  826. self.thread = threading.Thread(target=self.run_listener)
  827. self.running = True
  828. print "Starting listener"
  829. self.thread.start()
  830.  
  831.  
  832. class Stats():
  833.  
  834. def __init__(self, domain_number, net):
  835.  
  836. self.domain_number = domain_number
  837. self.net = net
  838. self.time_check = 10
  839. self.received_stats = []
  840. self.first_iteration = True
  841. self.dp_tables = {}
  842. self.src_dst_tx_bytes = []
  843. global link_ports_all
  844. self.link_ports_all = link_ports_all
  845.  
  846. def checkStats2(self, body, sw_id, check_request):
  847. # curr_time2 = datetime.now()
  848. curr_time2= int(round(time.time()))
  849. global logger_my_port, traffic_all_all, traffic_all_single, checked_switches_ports, time_sleep, curr_time
  850. checked_switches_ports.append(sw_id)
  851. time_last= curr_time2-curr_time
  852. # print "sprawdzam time_last"
  853. # print time_last
  854.  
  855. for stat in sorted(body, key=attrgetter('port_no')):
  856. print ('datapath port '
  857. 'rx-pkts rx-bytes rx-error '
  858. 'tx-pkts tx-bytes tx-error')
  859. print('---------------- -------- '
  860. '-------- -------- -------- '
  861. '-------- -------- --------')
  862. print(sw_id, stat.port_no, stat.rx_packets, stat.rx_bytes, stat.rx_errors, stat.tx_packets, stat.tx_bytes,
  863. stat.tx_errors)
  864.  
  865. # for i in self.link_ports_all:
  866. # if sw_id == i['src: '] and stat.port_no == i['src_port: ']:
  867. # i["tx_bytes: "] = stat.tx_bytes
  868.  
  869. for i in self.link_ports_all:
  870. if sw_id == i['src: '] and stat.port_no == i['src_port: ']:
  871. if bool_send_statistic==False:
  872. i["tx_bytes: "] = stat.tx_bytes/time_last
  873. i["previous: "] = stat.tx_bytes
  874. else:
  875. i["tx_bytes: "] = ((stat.tx_bytes-i["previous: "])/time_sleep)
  876. i["previous: "] = stat.tx_bytes
  877. # print "Sprawdzam link"
  878. # print i
  879. #i["tx_bytes: "] = stat.tx_bytes
  880. # print "test linkow z metrykami"
  881. # for one in self.link_ports_all:
  882. # print one
  883. #print "JESTEM TUTAJ KOLEJNY RAZ?"
  884.  
  885. if len(dp_tables) == len(checked_switches_ports):
  886. print "All switch checked"
  887. global all_switch_checked, tx_sum3, bool_send_statistic
  888. all_switch_checked = True
  889. # for i in self.link_ports_all:
  890. # print i
  891. path_element = 0
  892. for elem in check_request:
  893. print "co jest w check_request!!!!!!!!!!!!!!!"
  894. print elem
  895. path_element+=1
  896. sw1 = elem[0]
  897. sw2 = elem[1]
  898. self.prepareStats(self.link_ports_all, sw1, sw2, path_element)
  899. print "Selection of the current best available path for queries from GCL:"
  900. print traffic_all_all
  901. self.send_paths(traffic_all_all)
  902. bool_send_statistic=True
  903. checked_switches_ports = []
  904. traffic_all_all = []
  905.  
  906. def prepareStats(self, link_ports_all, sw1, sw2, path_element):
  907. global tx_sum, all_switch_checked, traffic_all_single, links
  908. print "&&&&&&&&&&&&&&&&&&&&&&&&&&&&"
  909. print self.net
  910. print "@@@"
  911. self.path_element=path_element
  912. len_port=0
  913. for elo in link_ports_all:
  914. if elo['inquiry: '] ==path_element:
  915. len_port+=1
  916. print "co jest w elo"
  917. print elo
  918. print "ile jest teraz len_port: "+str(len_port)
  919. if all_switch_checked:
  920. nr_path = 0
  921. n = 0
  922. nn = 0 #ile linkow w danej sciezce
  923. list_tx_kbytes = []
  924. traffic_all_single = []
  925.  
  926. while n < len_port:
  927.  
  928. for one_link in link_ports_all:
  929. if one_link['path: '] == nr_path:
  930. nn += 1
  931. tx_sum += (one_link['tx_bytes: '])
  932. list_tx_kbytes.append((one_link['tx_bytes: ']))
  933. self.net[one_link['dst: ']][one_link['src: ']]['weight']=one_link['tx_bytes: ']
  934. n += 1
  935. s = []
  936. tx_max = max(list_tx_kbytes)
  937. print "Paths with metrics. Path_nr: "+str(nr_path)
  938. s.extend([sw1, sw2, nn, tx_max, tx_sum])
  939. print s
  940. if traffic_all_single == []:
  941. traffic_all_single.append(s)
  942. else:
  943. if s[4] < traffic_all_single[0][4]:
  944. traffic_all_single = []
  945. traffic_all_single.append(s)
  946. nr_path += 1
  947. nn = 0
  948. tx_sum = 0
  949. traffic_all_all.append(traffic_all_single[0])
  950. print "Chosen paths:"
  951. print traffic_all_all
  952. return traffic_all_all
  953.  
  954. def send_paths(self, paths):
  955. print "Sending my paths"
  956. print "..."
  957. print "Got flows_add request from GCL"
  958. print "Flows added"
  959. with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
  960. with conn.channel() as channel:
  961. exchange = rabbitpy.Exchange(channel, 'my_exchange')
  962. exchange.declare()
  963.  
  964. message_id = b
  965.  
  966. cmdDict = {
  967. 'id': self.domain_number,
  968. 'paths': paths,
  969. 'name': "some_dummy_command",
  970. 'message_id': message_id
  971. }
  972. cmdstr = json.dumps(cmdDict)
  973. message = rabbitpy.Message(channel, cmdstr)
  974. message.publish(exchange, routing_key='Paths')
  975.  
  976.  
  977. # class Metrics(threading.Thread):
  978. # def __init__(self, dp_tables):
  979. # super(Metrics,self).__init__()
  980. # threading.Thread.__init__(self, name="Metrics_thread")
  981. # self.dp_tables= dp_tables
  982. # sw1 = raw_input("Provide Start for new metrics: ")
  983. # self.req2()
  984. #
  985. # #sw2 = raw_input("Destination switch: ")
  986. # #b=ProjectController()
  987. #
  988. # @staticmethod
  989. # def req2():
  990. # print "wchodze ze static"
  991. # ProjectController.req2()
  992. # #b=ProjectController()
  993. # #b.req2()
  994. # #ProjectController._request_stats(dp_tables)
  995. #
  996. # # def newStats(self, dp_tables):
  997.  
  998. if __name__ == "__main__":
  999. manager.main(args=sys.argv)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement