Advertisement
Guest User

Untitled

a guest
Jun 25th, 2019
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 37.94 KB | None | 0 0
  1. from ryu.base import app_manager
  2. from ryu.ofproto import ofproto_v1_3
  3. from ryu.controller import ofp_event
  4. from ryu.controller.handler import set_ev_cls
  5. from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
  6. from ryu.topology import event
  7. from ryu.topology.switches import LLDPPacket
  8. from ryu.lib.packet import ipv4, arp
  9. from ryu.lib.packet import packet
  10. from ryu.lib.packet import lldp, ether_types
  11. from ryu.ofproto.ether import ETH_TYPE_LLDP, ETH_TYPE_ARP, ETH_TYPE_IPV6, ETH_TYPE_IP
  12. from ryu.lib.packet import packet, ethernet
  13. import array
  14. from ryu.topology.api import get_switch, get_link, get_host, get_all_switch, get_all_link, get_all_host
  15. import copy
  16. from ryu.controller import dpset
  17. from elasticsearch import Elasticsearch, TransportError, NotFoundError
  18. from ryu.lib import stplib
  19. import sys
  20. from ryu.cmd import manager
  21. import socket
  22. import threading
  23. from threading import Thread
  24. from ryu.controller.dpset import DPSet
  25. import logging.config
  26. import Queue
  27. import requests
  28. import thread
  29. import rabbitpy
  30. import datetime
  31. import uuid
  32. import json
  33. import logging
  34. import time
  35. import datetime
  36. from ryu.lib import hub
  37. import networkx as nx
  38. from operator import attrgetter
  39.  
  40. logging.basicConfig(level=logging.DEBUG,
  41. format='(%(threadName)-9s) %(message)s',)
  42.  
  43. HTTP_HOST = 'localhost'
  44. HTTP_PORT = 8081
  45. EXCHANGE = 'threading_example'
  46. my_ip = []
  47.  
  48. traffic_all_all = []
  49. traffic_all_single = []
  50. check_first = False
  51. all_switch_checked = False
  52. checked_switches = []
  53. checked_switches_ports = []
  54. link_ports_all = []
  55. b = []
  56. time_sleep=20
  57. curr_time =0
  58. kkk = 0
  59.  
  60. links=[]
  61. links_luk=[]
  62.  
  63. tx_sum = 0
  64. dp_tables = []
  65. bool_send_statistic = False
  66.  
  67. stats_prepared = False
  68.  
  69. if __name__ == "__main__": # Stuff to set additional command line options
  70. from ryu import cfg
  71. CONF = cfg.CONF
  72. CONF.register_cli_opts([
  73. cfg.IntOpt('d', min=1, max=5),
  74. cfg.IntOpt('p', min=1000, max=9000)
  75. ])
  76.  
  77.  
  78. class ProjectController(app_manager.RyuApp):
  79. OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
  80.  
  81. _CONTEXTS = {'dpset': DPSet}
  82.  
  83. def __init__(self, *args, **kwargs):
  84. super(ProjectController, self).__init__(*args, **kwargs)
  85. super(ProjectController, self).__init__(*args, **kwargs)
  86. self.mac_to_port = {}
  87. self.topology_api_app = self
  88. self.net = nx.DiGraph()
  89. self.nodes = {}
  90. self.links = {}
  91. self.links_with_traffic = {}
  92. self.links_short = {}
  93. self.no_of_nodes = 0
  94. self.no_of_links = 0
  95. self.sw1 = 0
  96. self.sw2 = 0
  97. self.link_ports_all = {}
  98. self.received_stats = []
  99. self.time_check = 10
  100. self.check_request = []
  101. self.inquiry = 0
  102. self.first_iteration = True
  103. self.datapaths = {}
  104. self.time_once=False
  105.  
  106. self.domain_number = self.CONF.d
  107. self.port = self.CONF.p
  108. self.dpset = kwargs['dpset']
  109. self.q = Queue.Queue()
  110. self.topology = TopologyStructure(self.domain_number, self.dpset)
  111. listener = MQListener(self.domain_number, self.q)
  112. listener.start()
  113. self.monitor_thread = hub.spawn(self.get_border_paths)
  114. self.monitor_thread2 = hub.spawn(self._monitor)
  115.  
  116. @set_ev_cls(ofp_event.EventOFPStateChange,
  117. [MAIN_DISPATCHER, DEAD_DISPATCHER])
  118. def _state_change_handler(self, ev):
  119. datapath = ev.datapath
  120. if ev.state == MAIN_DISPATCHER:
  121. if datapath.id not in self.datapaths:
  122. self.logger.debug('register datapath: %016x', datapath.id)
  123. self.datapaths[datapath.id] = datapath
  124. elif ev.state == DEAD_DISPATCHER:
  125. if datapath.id in self.datapaths:
  126. self.logger.debug('unregister datapath: %016x', datapath.id)
  127. del self.datapaths[datapath.id]
  128.  
  129. @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
  130. def _switch_features_handler(self, ev):
  131. # print "switch_features_handler is called"
  132. datapath = ev.msg.datapath
  133. ofproto = datapath.ofproto
  134. parser = datapath.ofproto_parser
  135. match = parser.OFPMatch()
  136. actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, ofproto.OFPCML_NO_BUFFER)]
  137. self.add_flow(datapath, 0, match, actions)
  138.  
  139. @staticmethod
  140. def add_flow(datapath, priority, match, actions, buffer_id=None):
  141. ofproto = datapath.ofproto
  142. parser = datapath.ofproto_parser
  143. inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)]
  144.  
  145. if buffer_id:
  146. mod = parser.OFPFlowMod(datapath=datapath, buffer_id=buffer_id,
  147. priority=priority, match=match,
  148. instructions=inst)
  149. else:
  150. mod = parser.OFPFlowMod(datapath=datapath, priority=priority,
  151. match=match, instructions=inst)
  152. datapath.send_msg(mod)
  153.  
  154. def delete_flow(self, datapath):
  155. ofproto = datapath.ofproto
  156. parser = datapath.ofproto_parser
  157.  
  158. for dst in self.mac_to_port[datapath.id].keys():
  159. match = parser.OFPMatch(eth_dst=dst)
  160. mod = parser.OFPFlowMod(
  161. datapath, command=ofproto.OFPFC_DELETE,
  162. out_port=ofproto.OFPP_ANY, out_group=ofproto.OFPG_ANY,
  163. priority=1, match=match)
  164. datapath.send_msg(mod)
  165.  
  166. # @set_ev_cls(event.EventSwitchEnter)
  167. # def _switch_enter_handler(self, ev):
  168. # # self.domain_number_allocation()
  169. # # self.topology.domain_number_allocation()
  170. # print "switch_entered_handler is called"
  171. # # print ev.switch
  172. # self.topology.raw_switches = copy.copy(get_switch(self, None))
  173. # self.topology.raw_links = copy.copy(get_link(self, None))
  174. # self.topology.convert_raw_links_to_list()
  175.  
  176. """
  177. The change notification event (stplib.EventTopologyChange) of the network topology is received and the learned
  178. MAC address and registered flow entry are initialized.
  179. """
  180.  
  181. @set_ev_cls(stplib.EventTopologyChange, MAIN_DISPATCHER)
  182. def _topology_change_handler(self, ev):
  183. print "topology_change_handler_is_called"
  184.  
  185. def get_topology_data1(self):
  186. # Call get_switch() to get the list of objects Switch.
  187. self.topology.raw_switches = copy.copy(get_all_switch(self))
  188. # Call get_link() to get the list of objects Link.
  189. self.topology.raw_links = copy.copy(get_all_link(self))
  190. self.topology.convert_raw_links_to_list()
  191. self.topology.convert_raw_switches_to_list()
  192. # self.topology.print_links("get_topology_data")
  193. # self.topology.print_switches("get_topology_data")
  194.  
  195. # @set_ev_cls(event.EventLinkAdd)
  196. # def link_add(self, ev):
  197. # print "link_add_handler_is_called"
  198.  
  199. @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
  200. def _packet_in_handler(self, ev):
  201. global curr_time
  202. if ev.msg.msg_len < ev.msg.total_len:
  203. self.logger.debug("packet truncated: only %s of %s bytes",
  204. ev.msg.msg_len, ev.msg.total_len)
  205.  
  206. if self.time_once == False:
  207. curr_time= int(round(time.time()))
  208. self.time_once=True
  209. msg = ev.msg
  210. datapath = msg.datapath
  211. in_port = msg.match['in_port']
  212. dpid = datapath.id
  213. # print "*****"
  214. # print datapath
  215. # print "*****"
  216. # print dpid
  217. # print "*****"
  218. # #print list(datapath)
  219. # print "*****"
  220. eth, pkt_type, pkt_data = ethernet.ethernet.parser(msg.data)
  221. dst = eth.dst
  222. src = eth.src
  223. pkt = packet.Packet(msg.data)
  224. dst_border_node = 0
  225. global time_sleep
  226.  
  227. if eth.ethertype == ETH_TYPE_IPV6:
  228. # Packets with Destination MAC prefix: 33:33:xx:xx:xx:xx are IPv6 multicast packets
  229. return
  230. elif eth.ethertype == ETH_TYPE_LLDP:
  231. if self.topology.check_if_border_node(msg) is True:
  232. self.topology.add_border_node(msg)
  233. if self.topology.elasticsearch.check_if_domains_assigned():
  234. self.topology.elasticsearch.assign_domain(self.topology.border_nodes)
  235. return
  236. elif eth.ethertype == ETH_TYPE_ARP:
  237. print "ARP"
  238. self.topology.raw_hosts = copy.copy(get_host(self, None))
  239. check_host = self.topology.check_if_host()
  240. if check_host[0] is True:
  241. self.topology.add_host(check_host[1])
  242. arp_packet = pkt.get_protocol(arp.arp)
  243. # src_ip = arp_packet.src_ip
  244. dst_ip = arp_packet.dst_ip
  245. src_ip = arp_packet.src_ip
  246. self.topology.dst_src_list.extend([dst_ip, src_ip])
  247. # n=0
  248. # found = False
  249. # while found is False:
  250. # if dst_ip != self.topology.dst_src_list[n][0]:
  251. # if src_ip != self.topology.dst_src_list[n][1]:
  252. # if dst_ip in my_ip:
  253. # print "IP in my domain, just add flows"
  254. # else
  255.  
  256. if dst_ip not in self.topology.dst_ip_list:
  257. if dst_ip in my_ip:
  258. print "IP in my domain, just add flows"
  259.  
  260. else:
  261. self.topology.dst_ip_list.append(dst_ip)
  262. self.send_dst_ip(dst_ip, dpid)
  263.  
  264. # dst_border_node = self.get_path(dst_ip)
  265. # print dst_border_node
  266. pass
  267. elif eth.ethertype == ETH_TYPE_IP:
  268. ipv4_packet = pkt.get_protocol(ipv4.ipv4)
  269. # src_ip = ipv4_packet.src
  270. dst_ip = ipv4_packet.dst
  271. pass
  272.  
  273. self.get_topology_data1()
  274.  
  275. # self.logger.info("\tpacket in dpid: %s src: %s dst: %s in port: %s %s", dpid, src, dst, in_port, pkt_type)
  276.  
  277. def _monitor(self):
  278. global stats_prepared, bool_send_statistic
  279. while True:
  280. if bool_send_statistic==True:
  281. for dp in self.datapaths.values():
  282. #print "staty z hubow"
  283. self._request_stats2(dp)
  284. #if stats_prepared is True:
  285. #Stats.prepareStats(self.link_ports_all, sw1, sw2)
  286. hub.sleep(time_sleep)
  287.  
  288. def _request_stats2(self, datapath):
  289. print "wchdze ze stats2"
  290. self.logger.debug('send stats request: %016x', datapath.id)
  291. ofproto = datapath.ofproto
  292. parser = datapath.ofproto_parser
  293. #req = parser.OFPFlowStatsRequest(datapath)
  294. #datapath.send_msg(req)
  295. req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
  296. datapath.send_msg(req)
  297.  
  298. def send_dst_ip(self, dst_ip, dpid):
  299. print "Sending dst_ip"
  300. with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
  301. with conn.channel() as channel:
  302. exchange = rabbitpy.Exchange(channel, 'my_exchange')
  303. exchange.declare()
  304. # message = rabbitpy.Message(channel,
  305. # message,
  306. # {'content_type': 'text/plain',
  307. # 'delivery_mode': 1,
  308. # 'message_type': 'Register',
  309. # 'timestamp': datetime.datetime.now(),
  310. # 'message_id': uuid.uuid4()})
  311.  
  312. cmdDict = {
  313. 'id': self.domain_number,
  314. 'dpid': dpid,
  315. 'ip' : dst_ip,
  316. 'name': "some_dummy_command"
  317. }
  318. cmdstr = json.dumps(cmdDict)
  319. message = rabbitpy.Message(channel, cmdstr)
  320. message.publish(exchange, routing_key='Destination')
  321.  
  322. def get_border_paths(self):
  323. while True:
  324. if self.q.qsize() != 0:
  325. message = self.q.get()
  326. self.compute_my_paths(message)
  327. hub.sleep(5)
  328.  
  329. def compute_my_paths(self, message):
  330. border_paths = message['Border_Paths']
  331. message_id = message['Message_id']
  332. self.run(border_paths)
  333. global b
  334. b =message_id
  335. # for border_path in border_paths:
  336. # print "Funkcja Michala dla %s" % border_path
  337. # paths.append([1, 2, 1, 100, 100])
  338. # paths.append([1, 3, 1, 200, 200])
  339. # self.send_paths(paths, message_id)
  340.  
  341. def send_pathsd(self, paths):
  342. print "Sending my paths"
  343. with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
  344. with conn.channel() as channel:
  345. exchange = rabbitpy.Exchange(channel, 'my_exchange')
  346. exchange.declare()
  347.  
  348. message_id = b
  349.  
  350. cmdDict = {
  351. 'id': self.domain_number,
  352. 'paths': paths,
  353. 'name': "some_dummy_command",
  354. 'message_id': message_id
  355. }
  356. cmdstr = json.dumps(cmdDict)
  357. message = rabbitpy.Message(channel, cmdstr)
  358. message.publish(exchange, routing_key='Paths')
  359.  
  360. @set_ev_cls(event.EventSwitchLeave, [MAIN_DISPATCHER, CONFIG_DISPATCHER, DEAD_DISPATCHER])
  361. def _handler_switch_leave(self, ev):
  362. self.logger.info("Not tracking Switches, switch leaved.")
  363.  
  364. @set_ev_cls(dpset.EventPortModify, MAIN_DISPATCHER)
  365. def _port_modify_handler(self, ev):
  366. print "port_modify_handler_is_called"
  367.  
  368. def run(self, check_request):
  369. global curr_time
  370. self.check_request = check_request
  371. for element in check_request:
  372. sw1 = element[0]
  373. sw2 = element[1]
  374. self.checkPaths(sw1, sw2)
  375. self._request_stats(dp_tables)
  376. #curr_time = datetime.datetime.now()
  377.  
  378.  
  379. def checkPaths(self, sw1, sw2):
  380. self.dupa()
  381. self.inquiry += 1
  382. global kkk
  383. print "Stats from switch " + str(sw1) + " and " + str(sw2)
  384. pathA = nx.all_simple_paths(self.net, sw1, sw2)
  385. print "All available paths between switch " + str(sw1) + " and " + str(sw2) + ":"
  386. listA = list(pathA) # lisha wszystkich sciezek jakie sa dla danego zapytania
  387. print listA
  388. new_lista = []
  389. lista_hopow = []
  390. print "a czy sa tu linki"
  391. for wpis in links_luk:
  392. print wpis
  393. #print links_luk
  394. #print "?????"
  395. nnn=0
  396. lista_dla_lukasza=[]
  397. nr_part=1
  398.  
  399. for part in listA:
  400. # print "jaki part"
  401. # print nr_part
  402. for part2 in links_luk:
  403. # print "jaki part2"
  404. #while nnn<len(part):
  405. if part[nnn]==links_luk[nnn][0] and part[nnn+1]==part2[1]:
  406. print "za ifem"
  407. while nnn+1<len(part):
  408. print "za whilem"
  409. print part[nnn]
  410. print part[nnn+1]
  411. print part2[2]
  412. print "jaki teraz part nnn" +str(part[nnn])+str(part[nnn+1])
  413. print "co chce wpisac"
  414. print part2
  415. lista_dla_lukasza.append([{'path_number: ': nr_part}, part[nnn], part2[2], part[nnn+1], part2[3]])
  416. nnn+=1
  417. continue
  418.  
  419. nnn=0
  420. nr_part+=1
  421. print "co dokladnie na koniec w liscie"
  422. for wpis in lista_dla_lukasza:
  423. print wpis
  424. #print lista_dla_lukasza
  425.  
  426.  
  427.  
  428.  
  429. # for part in listA:
  430. # for part2 in links:
  431. # print "jakie len(part)"+str(len(part))
  432. #
  433. # #while nnn<=(len(part)-1):
  434. # if part[nnn]==part2[nnn] and part[nnn+1]==part[nnn+1]:
  435. # print "mam dobra parke czyli"
  436. # print part[0]
  437. # print part[1]
  438. # print part2[2]
  439. # print "1234567890000000000"
  440. # print "a jakie nnn: "+str(nnn)
  441. # # nnn+=1
  442. # # print "jestem tutaj"
  443. # # print "tutaj2"
  444. # # print "tutaj3"
  445.  
  446.  
  447. print "Choosing the paths with the least number of hops"
  448. for elem in listA:
  449. liczb_hop = (len(elem) - 1)
  450. print "Path: " + str(elem) + "- number of hops: " + str(liczb_hop)
  451. if liczb_hop not in lista_hopow:
  452. lista_hopow.append(liczb_hop)
  453. lista_hopow.sort()
  454. print "Check number of hops"
  455. print lista_hopow
  456. nr = 0
  457. nr_intr_value = 2
  458. if len(lista_hopow)==1:
  459. for elem in listA:
  460. new_lista.append(elem)
  461. else:
  462. while nr < nr_intr_value:
  463. for elem in listA:
  464. if (len(elem) - 1) == lista_hopow[nr]:
  465. new_lista.append(elem)
  466. nr += 1
  467.  
  468. print "Chosen paths: "
  469. print new_lista
  470. #kkk = 0
  471.  
  472. link_ports_single = []
  473.  
  474.  
  475. for one_path in new_lista:
  476. for k in self.links:
  477. n = 0
  478. while n < (len(one_path) - 1):
  479. if k['src: '] == one_path[n] and k['dst: '] == one_path[n + 1]:
  480.  
  481. if k not in link_ports_single:
  482. link_ports_single.append(k)
  483. else:
  484. n += 1
  485. continue
  486. n += 1
  487.  
  488. for ii in link_ports_single:
  489. s = copy.deepcopy(ii)
  490. s.update({'path: ': kkk})
  491. s.update({'inquiry: ': self.inquiry})
  492. link_ports_all.append(s)
  493. kkk += 1
  494. link_ports_single = []
  495.  
  496. # print "Connections in all interesting paths"
  497. for i in link_ports_all:
  498. i.update({'tx_bytes: ': 0})
  499. # print i
  500.  
  501. global dp_tables
  502. while len(dp_tables) < (len(new_lista[0]) - 1):
  503. # print len(dp_tables)
  504. # print (len(new_lista[0]) - 1)
  505. for dp in self.datapaths.values():
  506. dp_tables.append(dp)
  507. # print "sprawdzenie LUK"
  508. # #print self.datapaths.id
  509. # print dp_tables
  510.  
  511. #print "jakie dp" +str(dp)
  512. #print "ile lacznie dp"+str(len(dp_tables))
  513. return dp_tables
  514. # def req2(self):
  515. # print "znow w glownej"
  516. # self._request_stats(dp_tables)
  517.  
  518. def _request_stats(self, dp_tables):
  519. print "kolejny raz w requeststats11111111"
  520. for i in dp_tables:
  521. parser = i.ofproto_parser
  522. # req = parser.OFPFlowStatsRequest(i)
  523. # i.send_msg(req)
  524. req = parser.OFPPortStatsRequest(i)
  525. i.send_msg(req)
  526.  
  527. @set_ev_cls(event.EventSwitchEnter)
  528. def get_topology_data(self, ev):
  529. self.topology.raw_switches = copy.copy(get_switch(self, None))
  530. self.topology.raw_links = copy.copy(get_link(self, None))
  531. self.topology.convert_raw_links_to_list()
  532. switch_list = get_switch(self.topology_api_app, None)
  533. switches = [switch.dp.id for switch in switch_list]
  534. # print switches
  535. self.net.add_nodes_from(switches)
  536. traffic = 0
  537. links_list = get_link(self.topology_api_app, None)
  538.  
  539. links_my = [({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no}) for link in
  540. links_list]
  541. links = [(link.src.dpid, link.dst.dpid, {'port': link.src.port_no}) for link in links_list]
  542. links_with_traffic = [
  543. ({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no, 'traffic:': traffic}) for
  544. link in links_list]
  545. links_short = [(link.src.dpid, link.src.port_no) for link in links_list]
  546. self.links = links_my
  547. self.links_with_traffic = links_with_traffic
  548. self.links_short = links_short
  549.  
  550. self.net.add_edges_from(links)
  551. links = [(link.dst.dpid, link.src.dpid, {'port': link.dst.port_no}) for link in links_list]
  552.  
  553. self.net.add_edges_from(links)
  554. # print links
  555.  
  556. def dupa(self):
  557. global links
  558. global links_luk
  559. self.topology.raw_switches = copy.copy(get_switch(self, None))
  560. self.topology.raw_links = copy.copy(get_link(self, None))
  561. self.topology.convert_raw_links_to_list()
  562. switch_list = get_switch(self.topology_api_app, None)
  563. switches = [switch.dp.id for switch in switch_list]
  564. # print switches
  565. self.net.add_nodes_from(switches)
  566. traffic = 0
  567. links_list = get_link(self.topology_api_app, None)
  568. links_my = [({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no}) for link in
  569. links_list]
  570. links = [(link.src.dpid, link.dst.dpid, {'port_src': link.src.port_no}) for link in links_list]
  571. links_luk = [(link.src.dpid, link.dst.dpid, {'port_src': link.src.port_no}, {'port_dst': link.dst.port_no}) for link
  572. in links_list]
  573. print "#############################################################"
  574. print links_luk
  575. links_with_traffic = [
  576. ({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no, 'traffic:': traffic}) for
  577. link in links_list]
  578. links_short = [(link.src.dpid, link.src.port_no) for link in links_list]
  579. self.links = links_my
  580. self.links_with_traffic = links_with_traffic
  581. self.links_short = links_short
  582.  
  583. self.net.add_edges_from(links, weight=0)
  584. links = [(link.dst.dpid, link.src.dpid, {'port': link.dst.port_no}) for link in links_list]
  585. print "##########"
  586. print links
  587. self.net.add_edges_from(links)
  588. print "1"
  589. print self.net.edges()
  590. print "2"
  591. print self.net.nodes()
  592.  
  593.  
  594. print "5"
  595. #print self.net[3][2]['weight']
  596.  
  597. # print links
  598.  
  599. # @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
  600. # def _flow_stats_reply_handler(self, ev):
  601. # body = ev.msg.body
  602. # sw_id=ev.msg.datapath.id
  603. # src = ev.msg.datapath
  604.  
  605. @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
  606. def _port_stats_reply_handler(self, ev):
  607.  
  608. body = ev.msg.body
  609. sw_id = ev.msg.datapath.id
  610. # print "test Lukk2"
  611. # listLuk=[]
  612. # listLuk.append(ev.msg.datapath)
  613. # print listLuk
  614. Stats(self.domain_number, self.net).checkStats2(body, sw_id, self.check_request)
  615.  
  616.  
  617. class TopologyStructure:
  618.  
  619. def __init__(self, domain_number, dpset):
  620. global links
  621. self.dpset = dpset
  622. self.elasticsearch = ElasticSearchConnector(domain_number)
  623. self.raw_switches = []
  624. self.switches = []
  625. self.raw_links = []
  626. self.links = links
  627. self.raw_hosts = []
  628. self.hosts = []
  629. self.border_nodes = []
  630. self.dst_ip_list = []
  631. self.dst_src_list = []
  632.  
  633. def print_links(self, func_str=None):
  634. # Convert the raw link to list so that it is printed easily
  635. print(" \t" + str(func_str) + ": Current Links:")
  636. for l in self.links:
  637. print (" \t\t" + str(l))
  638.  
  639. def print_switches(self, func_str=None):
  640. print(" \t" + str(func_str) + ": Current Switches:")
  641. for s in self.raw_switches:
  642. print (" \t\t" + str(s))
  643.  
  644. def count_switches(self):
  645. return len(self.raw_switches)
  646.  
  647. def convert_raw_links_to_list(self):
  648. # Build a list with all the links [((srcNode,port), (dstNode, port))].
  649. # The list is easier for printing.
  650. self.links = [((link.src.dpid, link.src.port_no),
  651. (link.dst.dpid, link.dst.port_no))
  652. for link in self.raw_links]
  653. # self.links = [[link.src.dpid, link.src.port_no] for link in self.links]
  654.  
  655. def convert_raw_switches_to_list(self):
  656. # Build a list with all the switches ([switches])
  657. self.switches = [switch.dp.id for switch in self.raw_switches]
  658.  
  659. def bring_up_link(self, link):
  660. self.links.append(link)
  661.  
  662. def bring_up_switch(self, switch):
  663. self.raw_switches.append(switch)
  664.  
  665. """
  666. Check if a link with specific nodes exists.
  667. """
  668.  
  669. def check_link(self, src_dpid, src_port, dst_dpid, dst_port):
  670. for link in self.links:
  671. if ((src_dpid, src_port), (dst_dpid, dst_port)) == (
  672. (link.src.dpid, link.src.port_no), (link.dst.dpid, link.dst.port_no)):
  673. return True
  674. return False
  675.  
  676. def check_if_border_node(self, msg):
  677. msg = msg
  678. lldp_src_dpid, lldp_src_port_no = self._parse_lldp_packet(msg)
  679. known_switches_dpid = [switch.dp.id for switch in self.raw_switches]
  680. if lldp_src_dpid not in known_switches_dpid:
  681. dpid = msg.datapath.id
  682. port_no = msg.match['in_port']
  683. # Check if the port is occupied - if so, it is not a border node
  684. border_node_port = [dpid, port_no]
  685. if border_node_port not in self.links:
  686. return True
  687. return False
  688.  
  689. @staticmethod
  690. def _parse_lldp_packet(msg):
  691. src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
  692. return src_dpid, src_port_no
  693.  
  694. def add_border_node(self, msg):
  695. dpid = msg.datapath.id
  696. port_no = msg.match['in_port']
  697. pkt = packet.Packet(msg.data)
  698. eth = pkt.get_protocol(ethernet.ethernet)
  699. src_mac = eth.src
  700. lldp_src_dpid, lldp_src_port_no = self._parse_lldp_packet(msg)
  701. mac = self._get_hwaddr(dpid, port_no)
  702. border_node = [lldp_src_dpid, lldp_src_port_no, src_mac, dpid, port_no, mac]
  703. if border_node not in self.border_nodes:
  704. self.border_nodes.append(border_node)
  705. self.elasticsearch.es_put_border_nodes(lldp_src_dpid, lldp_src_port_no, src_mac, dpid, port_no, mac)
  706.  
  707. def check_if_host(self):
  708. for host in self.raw_hosts:
  709. host_parameters = [host.port.dpid, host.port.port_no, host.mac]
  710. if not self._check_hosts_in_borders(host_parameters, self.border_nodes):
  711. return True, host
  712. return False
  713.  
  714. def add_host(self, host):
  715. if host.ipv4:
  716. host_parameters = [host.port.dpid, host.port.port_no, host.mac, host.ipv4]
  717. if host_parameters not in self.hosts:
  718. my_ip.append(host.ipv4)
  719. self.hosts.append(host_parameters)
  720. self.elasticsearch.es_put_hosts(host.port.dpid, host.port.port_no, host.mac, host.ipv4)
  721. # if self.hosts and self.border_nodes:
  722. # self.delete_borders_from_hosts(self.hosts, self.border_nodes)
  723.  
  724. @staticmethod
  725. def _check_hosts_in_borders(hosts, borders):
  726. for x in borders:
  727. y = set(hosts).issubset(x)
  728. if y is True:
  729. return True
  730. return False
  731.  
  732. @staticmethod
  733. def _delete_borders_from_hosts(hosts, borders):
  734. for host in hosts:
  735. for border in borders:
  736. y = set(host).issubset(border)
  737. if y is True:
  738. hosts.remove(host)
  739.  
  740. def _get_hwaddr(self, dpid, port_no):
  741. return self.dpset.get_port(dpid, port_no).hw_addr
  742.  
  743.  
  744. class ElasticSearchConnector:
  745.  
  746. def __init__(self, domain_number):
  747. self.es = Elasticsearch([{'host': 'localhost', 'port': 9200}], maxsize=25)
  748. es_log = logging.getLogger("elasticsearch")
  749. es_log.setLevel(logging.CRITICAL)
  750. self.domain_number = domain_number
  751.  
  752. def es_put_border_nodes(self, src_dpid, src_port_no, src_mac, dst_dpid, dst_port_no, dst_mac):
  753.  
  754. e = {
  755. "domain": self.domain_number,
  756. "src_domain": 0,
  757. "src_dpid": src_dpid,
  758. "src_dpid": src_dpid,
  759. "src_port": src_port_no,
  760. "src_mac": src_mac,
  761. "domain_dpid": dst_dpid,
  762. "domain_port": dst_port_no,
  763. "domain_mac": dst_mac
  764. }
  765. self.es.index(index='border_nodes', doc_type='border_node', body=e)
  766.  
  767. def es_put_hosts(self, border_dpid, border_port_no, host_mac, host_ip):
  768. e = {
  769. "domain": self.domain_number,
  770. "border_dpid": border_dpid,
  771. "border_port": border_port_no,
  772. "host_mac": host_mac,
  773. "host_ip": host_ip
  774. }
  775. self.es.index(index='hosts', doc_type='host', body=e)
  776.  
  777. def check_if_domains_assigned(self):
  778. res = self.es.search(index="border_nodes", doc_type="border_node", body={"query": {"match_phrase": {"src_domain": 0}}})
  779. if res['hits']['total'] > 0:
  780. return True
  781. else:
  782. return False
  783.  
  784. def assign_domain(self, border_nodes):
  785.  
  786. for border in border_nodes:
  787. mac = border[5]
  788. q = {
  789. "query": {
  790. "match_phrase": {
  791. "src_mac": mac
  792. }
  793. },
  794. "script": {
  795. "source": "ctx._source.src_domain = params.num",
  796. "params": {
  797. "num": self.domain_number
  798. },
  799. "lang": "painless"
  800. }
  801. }
  802.  
  803. try:
  804. self.es.indices.refresh(index="border_nodes")
  805. self.es.update_by_query(body=q, doc_type='border_node', index='border_nodes')
  806. except TransportError as e:
  807. # print(e.info)
  808. pass
  809.  
  810.  
  811. class MQListener(threading.Thread):
  812.  
  813. def __init__(self, domain_number, q):
  814. super(MQListener, self).__init__()
  815. self.domain_number = domain_number
  816. self.q = q or Queue.Queue()
  817. self.queue = None
  818. self.conn = None
  819. self.channel = None
  820. self.exchange = None
  821. self.queue = None
  822. self.running = False
  823.  
  824. def run_listener(self):
  825. with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as self.conn:
  826. # Open the channel to communicate with RabbitMQ
  827. with self.conn.channel() as self.channel:
  828. self.exchange = rabbitpy.TopicExchange(self.channel, 'response_exchange')
  829. self.exchange.declare()
  830.  
  831. queue_name = "queue%s" %self.domain_number
  832. self.queue = rabbitpy.Queue(self.channel, name=queue_name)
  833. self.queue.declare()
  834.  
  835. binding_key = "%s.*" % self.domain_number
  836. # print binding_key
  837. self.queue.bind(self.exchange, routing_key=binding_key)
  838.  
  839. while 1:
  840. try:
  841. # Consume the message
  842. for message in self.queue:
  843. if message.routing_key == ("%s.PathSend" % self.domain_number):
  844. cmdDict = json.loads(message.body)
  845. dst_border_nodes = cmdDict['dst_border_nodes']
  846. message_id = cmdDict['message_id']
  847. print "Got path request: ", dst_border_nodes
  848. message.ack()
  849. self.q.put({'Border_Paths': dst_border_nodes, 'Message_id': str(message_id)})
  850. elif message.routing_key == ("%s.AddFlow" % self.domain_number):
  851. print "Add flow"
  852. else:
  853. print message.body
  854.  
  855. except rabbitpy.exceptions.NotConsumingError as e:
  856. continue
  857.  
  858. def start(self):
  859. self.thread = threading.Thread(target=self.run_listener)
  860. self.running = True
  861. print "Starting listener"
  862. self.thread.start()
  863.  
  864.  
  865. class Stats():
  866.  
  867. def __init__(self, domain_number, net):
  868.  
  869. self.domain_number = domain_number
  870. self.net = net
  871. self.time_check = 10
  872. self.received_stats = []
  873. self.first_iteration = True
  874. self.dp_tables = {}
  875. self.src_dst_tx_bytes = []
  876. global link_ports_all
  877. self.link_ports_all = link_ports_all
  878.  
  879. def checkStats2(self, body, sw_id, check_request):
  880. # curr_time2 = datetime.now()
  881. curr_time2= int(round(time.time()))
  882. global logger_my_port, traffic_all_all, traffic_all_single, checked_switches_ports, time_sleep, curr_time
  883. checked_switches_ports.append(sw_id)
  884. time_last= curr_time2-curr_time
  885. # print "sprawdzam time_last"
  886. # print time_last
  887.  
  888. for stat in sorted(body, key=attrgetter('port_no')):
  889. print ('datapath port '
  890. 'rx-pkts rx-bytes rx-error '
  891. 'tx-pkts tx-bytes tx-error')
  892. print('---------------- -------- '
  893. '-------- -------- -------- '
  894. '-------- -------- --------')
  895. print(sw_id, stat.port_no, stat.rx_packets, stat.rx_bytes, stat.rx_errors, stat.tx_packets, stat.tx_bytes,
  896. stat.tx_errors)
  897.  
  898. # for i in self.link_ports_all:
  899. # if sw_id == i['src: '] and stat.port_no == i['src_port: ']:
  900. # i["tx_bytes: "] = stat.tx_bytes
  901.  
  902. for i in self.link_ports_all:
  903. if sw_id == i['src: '] and stat.port_no == i['src_port: ']:
  904. if bool_send_statistic==False:
  905. i["tx_bytes: "] = stat.tx_bytes/time_last
  906. i["previous: "] = stat.tx_bytes
  907. else:
  908. i["tx_bytes: "] = ((stat.tx_bytes-i["previous: "])/time_sleep)
  909. i["previous: "] = stat.tx_bytes
  910. # print "Sprawdzam link"
  911. # print i
  912. #i["tx_bytes: "] = stat.tx_bytes
  913. #print "JESTEM TUTAJ KOLEJNY RAZ?"
  914.  
  915. if len(dp_tables) == len(checked_switches_ports):
  916. print "All switch checked"
  917. global all_switch_checked, tx_sum3, bool_send_statistic
  918. all_switch_checked = True
  919. # for i in self.link_ports_all:
  920. # print i
  921. for elem in check_request:
  922. sw1 = elem[0]
  923. sw2 = elem[1]
  924. self.prepareStats(self.link_ports_all, sw1, sw2)
  925. print "Selection of the current best available path for tw queries from GCL:"
  926. print traffic_all_all
  927. self.send_paths(traffic_all_all)
  928. bool_send_statistic=True
  929. checked_switches_ports=[]
  930. traffic_all_all=[]
  931.  
  932. def prepareStats(self, link_ports_all, sw1, sw2):
  933. global tx_sum, all_switch_checked, traffic_all_single, links
  934. print "&&&&&&&&&&&&&&&&&&&&&&&&&&&&"
  935. print self.net
  936. print "@@@"
  937.  
  938. for elo in link_ports_all:
  939. print elo
  940. if all_switch_checked:
  941. nr_path = 0
  942. n = 0
  943. nn = 0
  944. list_tx_kbytes = []
  945. traffic_all_single = []
  946.  
  947. while n < len(link_ports_all):
  948.  
  949. for one_link in link_ports_all:
  950. if one_link['path: '] == nr_path:
  951. nn += 1
  952. tx_sum += (one_link['tx_bytes: '])
  953. list_tx_kbytes.append((one_link['tx_bytes: ']))
  954. self.net[one_link['dst: ']][one_link['src: ']]['weight']=one_link['tx_bytes: ']
  955. n += 1
  956. s = []
  957. tx_max = max(list_tx_kbytes)
  958. print "Paths with metrics:"
  959. s.extend([sw1, sw2, nn, tx_max, tx_sum / nn])
  960. print s
  961. if traffic_all_single == []:
  962. traffic_all_single.append(s)
  963. else:
  964. if s[4] < traffic_all_single[0][4]:
  965. traffic_all_single = []
  966. traffic_all_single.append(s)
  967. nr_path += 1
  968. nn = 0
  969. tx_sum = 0
  970. traffic_all_all.append(traffic_all_single[0])
  971. return traffic_all_all
  972.  
  973. def send_paths(self, paths):
  974. print "Sending my paths"
  975. print "..."
  976. print "Got flows_add request from GCL"
  977. print "Flows added"
  978. with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
  979. with conn.channel() as channel:
  980. exchange = rabbitpy.Exchange(channel, 'my_exchange')
  981. exchange.declare()
  982.  
  983. message_id = b
  984.  
  985. cmdDict = {
  986. 'id': self.domain_number,
  987. 'paths': paths,
  988. 'name': "some_dummy_command",
  989. 'message_id': message_id
  990. }
  991. cmdstr = json.dumps(cmdDict)
  992. message = rabbitpy.Message(channel, cmdstr)
  993. message.publish(exchange, routing_key='Paths')
  994.  
  995.  
  996. # class Metrics(threading.Thread):
  997. # def __init__(self, dp_tables):
  998. # super(Metrics,self).__init__()
  999. # threading.Thread.__init__(self, name="Metrics_thread")
  1000. # self.dp_tables= dp_tables
  1001. # sw1 = raw_input("Provide Start for new metrics: ")
  1002. # self.req2()
  1003. #
  1004. # #sw2 = raw_input("Destination switch: ")
  1005. # #b=ProjectController()
  1006. #
  1007. # @staticmethod
  1008. # def req2():
  1009. # print "wchodze ze static"
  1010. # ProjectController.req2()
  1011. # #b=ProjectController()
  1012. # #b.req2()
  1013. # #ProjectController._request_stats(dp_tables)
  1014. #
  1015. # # def newStats(self, dp_tables):
  1016.  
  1017. if __name__ == "__main__":
  1018. manager.main(args=sys.argv)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement