Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from ryu.base import app_manager
- from ryu.ofproto import ofproto_v1_3
- from ryu.controller import ofp_event
- from ryu.controller.handler import set_ev_cls
- from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
- from ryu.topology import event
- from ryu.topology.switches import LLDPPacket
- from ryu.lib.packet import ipv4, arp
- from ryu.lib.packet import packet
- from ryu.lib.packet import lldp, ether_types
- from ryu.ofproto.ether import ETH_TYPE_LLDP, ETH_TYPE_ARP, ETH_TYPE_IPV6, ETH_TYPE_IP
- from ryu.lib.packet import packet, ethernet
- import array
- from ryu.topology.api import get_switch, get_link, get_host, get_all_switch, get_all_link, get_all_host
- import copy
- from ryu.controller import dpset
- from elasticsearch import Elasticsearch, TransportError, NotFoundError
- from ryu.lib import stplib
- import sys
- from ryu.cmd import manager
- import socket
- import threading
- from threading import Thread
- from ryu.controller.dpset import DPSet
- import logging.config
- import Queue
- import requests
- import thread
- import rabbitpy
- import datetime
- import uuid
- import json
- import logging
- import time
- import datetime
- from ryu.lib import hub
- import networkx as nx
- from operator import attrgetter
- logging.basicConfig(level=logging.DEBUG,
- format='(%(threadName)-9s) %(message)s',)
- HTTP_HOST = 'localhost'
- HTTP_PORT = 8081
- EXCHANGE = 'threading_example'
- my_ip = []
- traffic_all_all = []
- traffic_all_single = []
- check_first = False
- all_switch_checked = False
- checked_switches = []
- checked_switches_ports = []
- link_ports_all = []
- b = []
- time_sleep=20
- curr_time =0
- kkk = 0
- links=[]
- links_luk=[]
- tx_sum = 0
- dp_tables = []
- bool_send_statistic = False
- stats_prepared = False
- if __name__ == "__main__": # Stuff to set additional command line options
- from ryu import cfg
- CONF = cfg.CONF
- CONF.register_cli_opts([
- cfg.IntOpt('d', min=1, max=5),
- cfg.IntOpt('p', min=1000, max=9000)
- ])
- class ProjectController(app_manager.RyuApp):
- OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
- _CONTEXTS = {'dpset': DPSet}
- def __init__(self, *args, **kwargs):
- super(ProjectController, self).__init__(*args, **kwargs)
- super(ProjectController, self).__init__(*args, **kwargs)
- self.mac_to_port = {}
- self.topology_api_app = self
- self.net = nx.DiGraph()
- self.nodes = {}
- self.links = {}
- self.links_with_traffic = {}
- self.links_short = {}
- self.no_of_nodes = 0
- self.no_of_links = 0
- self.sw1 = 0
- self.sw2 = 0
- self.link_ports_all = {}
- self.received_stats = []
- self.time_check = 10
- self.check_request = []
- self.inquiry = 0
- self.first_iteration = True
- self.datapaths = {}
- self.time_once=False
- self.domain_number = self.CONF.d
- self.port = self.CONF.p
- self.dpset = kwargs['dpset']
- self.q = Queue.Queue()
- self.topology = TopologyStructure(self.domain_number, self.dpset)
- listener = MQListener(self.domain_number, self.q)
- listener.start()
- self.monitor_thread = hub.spawn(self.get_border_paths)
- self.monitor_thread2 = hub.spawn(self._monitor,bool_send_statistic)
- @set_ev_cls(ofp_event.EventOFPStateChange,
- [MAIN_DISPATCHER, DEAD_DISPATCHER])
- def _state_change_handler(self, ev):
- datapath = ev.datapath
- if ev.state == MAIN_DISPATCHER:
- if datapath.id not in self.datapaths:
- self.logger.debug('register datapath: %016x', datapath.id)
- self.datapaths[datapath.id] = datapath
- elif ev.state == DEAD_DISPATCHER:
- if datapath.id in self.datapaths:
- self.logger.debug('unregister datapath: %016x', datapath.id)
- del self.datapaths[datapath.id]
- @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
- def _switch_features_handler(self, ev):
- # print "switch_features_handler is called"
- datapath = ev.msg.datapath
- ofproto = datapath.ofproto
- parser = datapath.ofproto_parser
- match = parser.OFPMatch()
- actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, ofproto.OFPCML_NO_BUFFER)]
- self.add_flow(datapath, 0, match, actions)
- @staticmethod
- def add_flow(datapath, priority, match, actions, buffer_id=None):
- ofproto = datapath.ofproto
- parser = datapath.ofproto_parser
- inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)]
- if buffer_id:
- mod = parser.OFPFlowMod(datapath=datapath, buffer_id=buffer_id,
- priority=priority, match=match,
- instructions=inst)
- else:
- mod = parser.OFPFlowMod(datapath=datapath, priority=priority,
- match=match, instructions=inst)
- datapath.send_msg(mod)
- def delete_flow(self, datapath):
- ofproto = datapath.ofproto
- parser = datapath.ofproto_parser
- for dst in self.mac_to_port[datapath.id].keys():
- match = parser.OFPMatch(eth_dst=dst)
- mod = parser.OFPFlowMod(
- datapath, command=ofproto.OFPFC_DELETE,
- out_port=ofproto.OFPP_ANY, out_group=ofproto.OFPG_ANY,
- priority=1, match=match)
- datapath.send_msg(mod)
- # @set_ev_cls(event.EventSwitchEnter)
- # def _switch_enter_handler(self, ev):
- # # self.domain_number_allocation()
- # # self.topology.domain_number_allocation()
- # print "switch_entered_handler is called"
- # # print ev.switch
- # self.topology.raw_switches = copy.copy(get_switch(self, None))
- # self.topology.raw_links = copy.copy(get_link(self, None))
- # self.topology.convert_raw_links_to_list()
- """
- The change notification event (stplib.EventTopologyChange) of the network topology is received and the learned
- MAC address and registered flow entry are initialized.
- """
- @set_ev_cls(stplib.EventTopologyChange, MAIN_DISPATCHER)
- def _topology_change_handler(self, ev):
- print "topology_change_handler_is_called"
- def get_topology_data1(self):
- # Call get_switch() to get the list of objects Switch.
- self.topology.raw_switches = copy.copy(get_all_switch(self))
- # Call get_link() to get the list of objects Link.
- self.topology.raw_links = copy.copy(get_all_link(self))
- self.topology.convert_raw_links_to_list()
- self.topology.convert_raw_switches_to_list()
- # self.topology.print_links("get_topology_data")
- # self.topology.print_switches("get_topology_data")
- # @set_ev_cls(event.EventLinkAdd)
- # def link_add(self, ev):
- # print "link_add_handler_is_called"
- @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
- def _packet_in_handler(self, ev):
- global curr_time
- if ev.msg.msg_len < ev.msg.total_len:
- self.logger.debug("packet truncated: only %s of %s bytes",
- ev.msg.msg_len, ev.msg.total_len)
- if self.time_once == False:
- curr_time= int(round(time.time()))
- self.time_once=True
- msg = ev.msg
- datapath = msg.datapath
- in_port = msg.match['in_port']
- dpid = datapath.id
- # print "*****"
- # print datapath
- # print "*****"
- # print dpid
- # print "*****"
- # #print list(datapath)
- # print "*****"
- eth, pkt_type, pkt_data = ethernet.ethernet.parser(msg.data)
- dst = eth.dst
- src = eth.src
- pkt = packet.Packet(msg.data)
- dst_border_node = 0
- global time_sleep
- if eth.ethertype == ETH_TYPE_IPV6:
- # Packets with Destination MAC prefix: 33:33:xx:xx:xx:xx are IPv6 multicast packets
- return
- elif eth.ethertype == ETH_TYPE_LLDP:
- if self.topology.check_if_border_node(msg) is True:
- self.topology.add_border_node(msg)
- if self.topology.elasticsearch.check_if_domains_assigned():
- self.topology.elasticsearch.assign_domain(self.topology.border_nodes)
- return
- elif eth.ethertype == ETH_TYPE_ARP:
- print "ARP"
- self.topology.raw_hosts = copy.copy(get_host(self, None))
- check_host = self.topology.check_if_host()
- if check_host[0] is True:
- self.topology.add_host(check_host[1])
- arp_packet = pkt.get_protocol(arp.arp)
- # src_ip = arp_packet.src_ip
- dst_ip = arp_packet.dst_ip
- src_ip = arp_packet.src_ip
- self.topology.dst_src_list.extend([dst_ip, src_ip])
- # n=0
- # found = False
- # while found is False:
- # if dst_ip != self.topology.dst_src_list[n][0]:
- # if src_ip != self.topology.dst_src_list[n][1]:
- # if dst_ip in my_ip:
- # print "IP in my domain, just add flows"
- # else
- if dst_ip not in self.topology.dst_ip_list:
- if dst_ip in my_ip:
- print "IP in my domain, just add flows"
- else:
- self.topology.dst_ip_list.append(dst_ip)
- self.send_dst_ip(dst_ip, dpid)
- # dst_border_node = self.get_path(dst_ip)
- # print dst_border_node
- pass
- elif eth.ethertype == ETH_TYPE_IP:
- ipv4_packet = pkt.get_protocol(ipv4.ipv4)
- # src_ip = ipv4_packet.src
- dst_ip = ipv4_packet.dst
- pass
- self.get_topology_data1()
- # self.logger.info("\tpacket in dpid: %s src: %s dst: %s in port: %s %s", dpid, src, dst, in_port, pkt_type)
- def _monitor(self):
- global stats_prepared, bool_send_statistic
- while True:
- if bool_send_statistic==True:
- for dp in self.datapaths.values():
- #print "staty z hubow"
- self._request_stats2(dp)
- #if stats_prepared is True:
- #Stats.prepareStats(self.link_ports_all, sw1, sw2)
- hub.sleep(time_sleep)
- def _request_stats2(self, datapath):
- print "wchdze ze stats2"
- self.logger.debug('send stats request: %016x', datapath.id)
- ofproto = datapath.ofproto
- parser = datapath.ofproto_parser
- #req = parser.OFPFlowStatsRequest(datapath)
- #datapath.send_msg(req)
- req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
- datapath.send_msg(req)
- def send_dst_ip(self, dst_ip, dpid):
- print "Sending dst_ip"
- with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
- with conn.channel() as channel:
- exchange = rabbitpy.Exchange(channel, 'my_exchange')
- exchange.declare()
- # message = rabbitpy.Message(channel,
- # message,
- # {'content_type': 'text/plain',
- # 'delivery_mode': 1,
- # 'message_type': 'Register',
- # 'timestamp': datetime.datetime.now(),
- # 'message_id': uuid.uuid4()})
- cmdDict = {
- 'id': self.domain_number,
- 'dpid': dpid,
- 'ip' : dst_ip,
- 'name': "some_dummy_command"
- }
- cmdstr = json.dumps(cmdDict)
- message = rabbitpy.Message(channel, cmdstr)
- message.publish(exchange, routing_key='Destination')
- def get_border_paths(self):
- while True:
- if self.q.qsize() != 0:
- message = self.q.get()
- self.compute_my_paths(message)
- hub.sleep(5)
- def compute_my_paths(self, message):
- border_paths = message['Border_Paths']
- message_id = message['Message_id']
- self.run(border_paths)
- global b
- b =message_id
- # for border_path in border_paths:
- # print "Funkcja Michala dla %s" % border_path
- # paths.append([1, 2, 1, 100, 100])
- # paths.append([1, 3, 1, 200, 200])
- # self.send_paths(paths, message_id)
- def send_pathsd(self, paths):
- print "Sending my paths"
- with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
- with conn.channel() as channel:
- exchange = rabbitpy.Exchange(channel, 'my_exchange')
- exchange.declare()
- message_id = b
- cmdDict = {
- 'id': self.domain_number,
- 'paths': paths,
- 'name': "some_dummy_command",
- 'message_id': message_id
- }
- cmdstr = json.dumps(cmdDict)
- message = rabbitpy.Message(channel, cmdstr)
- message.publish(exchange, routing_key='Paths')
- @set_ev_cls(event.EventSwitchLeave, [MAIN_DISPATCHER, CONFIG_DISPATCHER, DEAD_DISPATCHER])
- def _handler_switch_leave(self, ev):
- self.logger.info("Not tracking Switches, switch leaved.")
- @set_ev_cls(dpset.EventPortModify, MAIN_DISPATCHER)
- def _port_modify_handler(self, ev):
- print "port_modify_handler_is_called"
- def run(self, check_request):
- global curr_time
- self.check_request = check_request
- for element in check_request:
- sw1 = element[0]
- sw2 = element[1]
- self.checkPaths(sw1, sw2)
- self._request_stats(dp_tables)
- #curr_time = datetime.datetime.now()
- def checkPaths(self, sw1, sw2):
- self.dupa()
- self.inquiry += 1
- global kkk
- print "Stats from switch " + str(sw1) + " and " + str(sw2)
- pathA = nx.all_simple_paths(self.net, sw1, sw2)
- print "All available paths between switch " + str(sw1) + " and " + str(sw2) + ":"
- listA = list(pathA) # lisha wszystkich sciezek jakie sa dla danego zapytania
- print listA
- new_lista = []
- lista_hopow = []
- print "a czy sa tu linki"
- for wpis in links_luk:
- print wpis
- nnn=0
- m=0
- lista_dla_lukasza=[]
- nr_part=1
- for part in listA:
- while nnn + 1 < len(part):
- if part[nnn] == links_luk[m][0] and part[nnn + 1] == links_luk[m][1]:
- lista_dla_lukasza.append(
- [{'path_number: ': nr_part}, part[nnn], links_luk[m][2], part[nnn + 1], links_luk[m][3]])
- nnn += 1
- m=0
- else:
- m+=1
- nnn=0
- nr_part +=1
- print "co dokladnie na koniec w liscie"
- for wpis in lista_dla_lukasza:
- print wpis
- print "Choosing the paths with the least number of hops"
- for elem in listA:
- liczb_hop = (len(elem) - 1)
- print "Path: " + str(elem) + "- number of hops: " + str(liczb_hop)
- if liczb_hop not in lista_hopow:
- lista_hopow.append(liczb_hop)
- lista_hopow.sort()
- print "Check number of hops"
- print lista_hopow
- nr = 0
- nr_intr_value = 2
- if len(lista_hopow)==1:
- for elem in listA:
- new_lista.append(elem)
- else:
- while nr < nr_intr_value:
- for elem in listA:
- if (len(elem) - 1) == lista_hopow[nr]:
- new_lista.append(elem)
- nr += 1
- print "Chosen paths: "
- print new_lista
- #kkk = 0
- link_ports_single = []
- for one_path in new_lista:
- for k in self.links:
- n = 0
- while n < (len(one_path) - 1):
- if k['src: '] == one_path[n] and k['dst: '] == one_path[n + 1]:
- if k not in link_ports_single:
- link_ports_single.append(k)
- else:
- n += 1
- continue
- n += 1
- for ii in link_ports_single:
- s = copy.deepcopy(ii)
- s.update({'path: ': kkk})
- s.update({'inquiry: ': self.inquiry})
- link_ports_all.append(s)
- kkk += 1
- link_ports_single = []
- # print "Connections in all interesting paths"
- for i in link_ports_all:
- i.update({'tx_bytes: ': 0})
- # print i
- global dp_tables
- while len(dp_tables) < (len(new_lista[0]) - 1):
- # print len(dp_tables)
- # print (len(new_lista[0]) - 1)
- for dp in self.datapaths.values():
- dp_tables.append(dp)
- # print "sprawdzenie LUK"
- # #print self.datapaths.id
- # print dp_tables
- #print "jakie dp" +str(dp)
- #print "ile lacznie dp"+str(len(dp_tables))
- return dp_tables
- # def req2(self):
- # print "znow w glownej"
- # self._request_stats(dp_tables)
- def _request_stats(self, dp_tables):
- print "kolejny raz w requeststats11111111"
- for i in dp_tables:
- parser = i.ofproto_parser
- # req = parser.OFPFlowStatsRequest(i)
- # i.send_msg(req)
- req = parser.OFPPortStatsRequest(i)
- i.send_msg(req)
- @set_ev_cls(event.EventSwitchEnter)
- def get_topology_data(self, ev):
- self.topology.raw_switches = copy.copy(get_switch(self, None))
- self.topology.raw_links = copy.copy(get_link(self, None))
- self.topology.convert_raw_links_to_list()
- switch_list = get_switch(self.topology_api_app, None)
- switches = [switch.dp.id for switch in switch_list]
- # print switches
- self.net.add_nodes_from(switches)
- traffic = 0
- links_list = get_link(self.topology_api_app, None)
- links_my = [({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no}) for link in
- links_list]
- links = [(link.src.dpid, link.dst.dpid, {'port': link.src.port_no}) for link in links_list]
- links_with_traffic = [
- ({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no, 'traffic:': traffic}) for
- link in links_list]
- links_short = [(link.src.dpid, link.src.port_no) for link in links_list]
- self.links = links_my
- self.links_with_traffic = links_with_traffic
- self.links_short = links_short
- self.net.add_edges_from(links)
- links = [(link.dst.dpid, link.src.dpid, {'port': link.dst.port_no}) for link in links_list]
- self.net.add_edges_from(links)
- # print links
- def dupa(self):
- global links
- global links_luk
- self.topology.raw_switches = copy.copy(get_switch(self, None))
- self.topology.raw_links = copy.copy(get_link(self, None))
- self.topology.convert_raw_links_to_list()
- switch_list = get_switch(self.topology_api_app, None)
- switches = [switch.dp.id for switch in switch_list]
- # print switches
- self.net.add_nodes_from(switches)
- traffic = 0
- links_list = get_link(self.topology_api_app, None)
- links_my = [({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no}) for link in
- links_list]
- links = [(link.src.dpid, link.dst.dpid, {'port_src': link.src.port_no}) for link in links_list]
- links_luk = [(link.src.dpid, link.dst.dpid, {'port_src': link.src.port_no}, {'port_dst': link.dst.port_no}) for link
- in links_list]
- print "#############################################################"
- print links_luk
- links_with_traffic = [
- ({'src: ': link.src.dpid, 'dst: ': link.dst.dpid, 'src_port: ': link.src.port_no, 'traffic:': traffic}) for
- link in links_list]
- links_short = [(link.src.dpid, link.src.port_no) for link in links_list]
- self.links = links_my
- self.links_with_traffic = links_with_traffic
- self.links_short = links_short
- self.net.add_edges_from(links, weight=0)
- links = [(link.dst.dpid, link.src.dpid, {'port': link.dst.port_no}) for link in links_list]
- print "##########"
- print links
- self.net.add_edges_from(links)
- print "1"
- print self.net.edges()
- print "2"
- print self.net.nodes()
- print "5"
- #print self.net[3][2]['weight']
- # print links
- # @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
- # def _flow_stats_reply_handler(self, ev):
- # body = ev.msg.body
- # sw_id=ev.msg.datapath.id
- # src = ev.msg.datapath
- @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
- def _port_stats_reply_handler(self, ev):
- body = ev.msg.body
- sw_id = ev.msg.datapath.id
- # print "test Lukk2"
- # listLuk=[]
- # listLuk.append(ev.msg.datapath)
- # print listLuk
- Stats(self.domain_number, self.net).checkStats2(body, sw_id, self.check_request)
- class TopologyStructure:
- def __init__(self, domain_number, dpset):
- global links
- self.dpset = dpset
- self.elasticsearch = ElasticSearchConnector(domain_number)
- self.raw_switches = []
- self.switches = []
- self.raw_links = []
- self.links = links
- self.raw_hosts = []
- self.hosts = []
- self.border_nodes = []
- self.dst_ip_list = []
- self.dst_src_list = []
- def print_links(self, func_str=None):
- # Convert the raw link to list so that it is printed easily
- print(" \t" + str(func_str) + ": Current Links:")
- for l in self.links:
- print (" \t\t" + str(l))
- def print_switches(self, func_str=None):
- print(" \t" + str(func_str) + ": Current Switches:")
- for s in self.raw_switches:
- print (" \t\t" + str(s))
- def count_switches(self):
- return len(self.raw_switches)
- def convert_raw_links_to_list(self):
- # Build a list with all the links [((srcNode,port), (dstNode, port))].
- # The list is easier for printing.
- self.links = [((link.src.dpid, link.src.port_no),
- (link.dst.dpid, link.dst.port_no))
- for link in self.raw_links]
- # self.links = [[link.src.dpid, link.src.port_no] for link in self.links]
- def convert_raw_switches_to_list(self):
- # Build a list with all the switches ([switches])
- self.switches = [switch.dp.id for switch in self.raw_switches]
- def bring_up_link(self, link):
- self.links.append(link)
- def bring_up_switch(self, switch):
- self.raw_switches.append(switch)
- """
- Check if a link with specific nodes exists.
- """
- def check_link(self, src_dpid, src_port, dst_dpid, dst_port):
- for link in self.links:
- if ((src_dpid, src_port), (dst_dpid, dst_port)) == (
- (link.src.dpid, link.src.port_no), (link.dst.dpid, link.dst.port_no)):
- return True
- return False
- def check_if_border_node(self, msg):
- msg = msg
- lldp_src_dpid, lldp_src_port_no = self._parse_lldp_packet(msg)
- known_switches_dpid = [switch.dp.id for switch in self.raw_switches]
- if lldp_src_dpid not in known_switches_dpid:
- dpid = msg.datapath.id
- port_no = msg.match['in_port']
- # Check if the port is occupied - if so, it is not a border node
- border_node_port = [dpid, port_no]
- if border_node_port not in self.links:
- return True
- return False
- @staticmethod
- def _parse_lldp_packet(msg):
- src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data)
- return src_dpid, src_port_no
- def add_border_node(self, msg):
- dpid = msg.datapath.id
- port_no = msg.match['in_port']
- pkt = packet.Packet(msg.data)
- eth = pkt.get_protocol(ethernet.ethernet)
- src_mac = eth.src
- lldp_src_dpid, lldp_src_port_no = self._parse_lldp_packet(msg)
- mac = self._get_hwaddr(dpid, port_no)
- border_node = [lldp_src_dpid, lldp_src_port_no, src_mac, dpid, port_no, mac]
- if border_node not in self.border_nodes:
- self.border_nodes.append(border_node)
- self.elasticsearch.es_put_border_nodes(lldp_src_dpid, lldp_src_port_no, src_mac, dpid, port_no, mac)
- def check_if_host(self):
- for host in self.raw_hosts:
- host_parameters = [host.port.dpid, host.port.port_no, host.mac]
- if not self._check_hosts_in_borders(host_parameters, self.border_nodes):
- return True, host
- return False
- def add_host(self, host):
- if host.ipv4:
- host_parameters = [host.port.dpid, host.port.port_no, host.mac, host.ipv4]
- if host_parameters not in self.hosts:
- my_ip.append(host.ipv4)
- self.hosts.append(host_parameters)
- self.elasticsearch.es_put_hosts(host.port.dpid, host.port.port_no, host.mac, host.ipv4)
- # if self.hosts and self.border_nodes:
- # self.delete_borders_from_hosts(self.hosts, self.border_nodes)
- @staticmethod
- def _check_hosts_in_borders(hosts, borders):
- for x in borders:
- y = set(hosts).issubset(x)
- if y is True:
- return True
- return False
- @staticmethod
- def _delete_borders_from_hosts(hosts, borders):
- for host in hosts:
- for border in borders:
- y = set(host).issubset(border)
- if y is True:
- hosts.remove(host)
- def _get_hwaddr(self, dpid, port_no):
- return self.dpset.get_port(dpid, port_no).hw_addr
- class ElasticSearchConnector:
- def __init__(self, domain_number):
- self.es = Elasticsearch([{'host': 'localhost', 'port': 9200}], maxsize=25)
- es_log = logging.getLogger("elasticsearch")
- es_log.setLevel(logging.CRITICAL)
- self.domain_number = domain_number
- def es_put_border_nodes(self, src_dpid, src_port_no, src_mac, dst_dpid, dst_port_no, dst_mac):
- e = {
- "domain": self.domain_number,
- "src_domain": 0,
- "src_dpid": src_dpid,
- "src_dpid": src_dpid,
- "src_port": src_port_no,
- "src_mac": src_mac,
- "domain_dpid": dst_dpid,
- "domain_port": dst_port_no,
- "domain_mac": dst_mac
- }
- self.es.index(index='border_nodes', doc_type='border_node', body=e)
- def es_put_hosts(self, border_dpid, border_port_no, host_mac, host_ip):
- e = {
- "domain": self.domain_number,
- "border_dpid": border_dpid,
- "border_port": border_port_no,
- "host_mac": host_mac,
- "host_ip": host_ip
- }
- self.es.index(index='hosts', doc_type='host', body=e)
- def check_if_domains_assigned(self):
- res = self.es.search(index="border_nodes", doc_type="border_node", body={"query": {"match_phrase": {"src_domain": 0}}})
- if res['hits']['total'] > 0:
- return True
- else:
- return False
- def assign_domain(self, border_nodes):
- for border in border_nodes:
- mac = border[5]
- q = {
- "query": {
- "match_phrase": {
- "src_mac": mac
- }
- },
- "script": {
- "source": "ctx._source.src_domain = params.num",
- "params": {
- "num": self.domain_number
- },
- "lang": "painless"
- }
- }
- try:
- self.es.indices.refresh(index="border_nodes")
- self.es.update_by_query(body=q, doc_type='border_node', index='border_nodes')
- except TransportError as e:
- # print(e.info)
- pass
- class MQListener(threading.Thread):
- def __init__(self, domain_number, q):
- super(MQListener, self).__init__()
- self.domain_number = domain_number
- self.q = q or Queue.Queue()
- self.queue = None
- self.conn = None
- self.channel = None
- self.exchange = None
- self.queue = None
- self.running = False
- def run_listener(self):
- with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as self.conn:
- # Open the channel to communicate with RabbitMQ
- with self.conn.channel() as self.channel:
- self.exchange = rabbitpy.TopicExchange(self.channel, 'response_exchange')
- self.exchange.declare()
- queue_name = "queue%s" %self.domain_number
- self.queue = rabbitpy.Queue(self.channel, name=queue_name)
- self.queue.declare()
- binding_key = "%s.*" % self.domain_number
- # print binding_key
- self.queue.bind(self.exchange, routing_key=binding_key)
- while 1:
- try:
- # Consume the message
- for message in self.queue:
- if message.routing_key == ("%s.PathSend" % self.domain_number):
- cmdDict = json.loads(message.body)
- dst_border_nodes = cmdDict['dst_border_nodes']
- message_id = cmdDict['message_id']
- print "Got path request: ", dst_border_nodes
- message.ack()
- self.q.put({'Border_Paths': dst_border_nodes, 'Message_id': str(message_id)})
- elif message.routing_key == ("%s.AddFlow" % self.domain_number):
- print "Add flow"
- else:
- print message.body
- except rabbitpy.exceptions.NotConsumingError as e:
- continue
- def start(self):
- self.thread = threading.Thread(target=self.run_listener)
- self.running = True
- print "Starting listener"
- self.thread.start()
- class Stats():
- def __init__(self, domain_number, net):
- self.domain_number = domain_number
- self.net = net
- self.time_check = 10
- self.received_stats = []
- self.first_iteration = True
- self.dp_tables = {}
- self.src_dst_tx_bytes = []
- global link_ports_all
- self.link_ports_all = link_ports_all
- def checkStats2(self, body, sw_id, check_request):
- # curr_time2 = datetime.now()
- curr_time2= int(round(time.time()))
- global logger_my_port, traffic_all_all, traffic_all_single, checked_switches_ports, time_sleep, curr_time
- checked_switches_ports.append(sw_id)
- time_last= curr_time2-curr_time
- # print "sprawdzam time_last"
- # print time_last
- for stat in sorted(body, key=attrgetter('port_no')):
- print ('datapath port '
- 'rx-pkts rx-bytes rx-error '
- 'tx-pkts tx-bytes tx-error')
- print('---------------- -------- '
- '-------- -------- -------- '
- '-------- -------- --------')
- print(sw_id, stat.port_no, stat.rx_packets, stat.rx_bytes, stat.rx_errors, stat.tx_packets, stat.tx_bytes,
- stat.tx_errors)
- # for i in self.link_ports_all:
- # if sw_id == i['src: '] and stat.port_no == i['src_port: ']:
- # i["tx_bytes: "] = stat.tx_bytes
- for i in self.link_ports_all:
- if sw_id == i['src: '] and stat.port_no == i['src_port: ']:
- if bool_send_statistic==False:
- i["tx_bytes: "] = stat.tx_bytes/time_last
- i["previous: "] = stat.tx_bytes
- else:
- i["tx_bytes: "] = ((stat.tx_bytes-i["previous: "])/time_sleep)
- i["previous: "] = stat.tx_bytes
- # print "Sprawdzam link"
- # print i
- #i["tx_bytes: "] = stat.tx_bytes
- # print "test linkow z metrykami"
- # for one in self.link_ports_all:
- # print one
- #print "JESTEM TUTAJ KOLEJNY RAZ?"
- if len(dp_tables) == len(checked_switches_ports):
- print "All switch checked"
- global all_switch_checked, tx_sum3, bool_send_statistic
- all_switch_checked = True
- # for i in self.link_ports_all:
- # print i
- path_element = 0
- for elem in check_request:
- print "co jest w check_request!!!!!!!!!!!!!!!"
- print elem
- path_element+=1
- sw1 = elem[0]
- sw2 = elem[1]
- self.prepareStats(self.link_ports_all, sw1, sw2, path_element)
- print "Selection of the current best available path for queries from GCL:"
- print traffic_all_all
- self.send_paths(traffic_all_all)
- bool_send_statistic=True
- checked_switches_ports = []
- traffic_all_all = []
- def prepareStats(self, link_ports_all, sw1, sw2, path_element):
- global tx_sum, all_switch_checked, traffic_all_single, links
- print "&&&&&&&&&&&&&&&&&&&&&&&&&&&&"
- print self.net
- print "@@@"
- self.path_element=path_element
- len_port=0
- for elo in link_ports_all:
- if elo['inquiry: '] ==path_element:
- len_port+=1
- print "co jest w elo"
- print elo
- print "ile jest teraz len_port: "+str(len_port)
- if all_switch_checked:
- nr_path = 0
- n = 0
- nn = 0 #ile linkow w danej sciezce
- list_tx_kbytes = []
- traffic_all_single = []
- while n < len_port:
- for one_link in link_ports_all:
- if one_link['path: '] == nr_path:
- nn += 1
- tx_sum += (one_link['tx_bytes: '])
- list_tx_kbytes.append((one_link['tx_bytes: ']))
- self.net[one_link['dst: ']][one_link['src: ']]['weight']=one_link['tx_bytes: ']
- n += 1
- s = []
- tx_max = max(list_tx_kbytes)
- print "Paths with metrics. Path_nr: "+str(nr_path)
- s.extend([sw1, sw2, nn, tx_max, tx_sum])
- print s
- if traffic_all_single == []:
- traffic_all_single.append(s)
- else:
- if s[4] < traffic_all_single[0][4]:
- traffic_all_single = []
- traffic_all_single.append(s)
- nr_path += 1
- nn = 0
- tx_sum = 0
- traffic_all_all.append(traffic_all_single[0])
- print "Chosen paths:"
- print traffic_all_all
- return traffic_all_all
- def send_paths(self, paths):
- print "Sending my paths"
- print "..."
- print "Got flows_add request from GCL"
- print "Flows added"
- with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
- with conn.channel() as channel:
- exchange = rabbitpy.Exchange(channel, 'my_exchange')
- exchange.declare()
- message_id = b
- cmdDict = {
- 'id': self.domain_number,
- 'paths': paths,
- 'name': "some_dummy_command",
- 'message_id': message_id
- }
- cmdstr = json.dumps(cmdDict)
- message = rabbitpy.Message(channel, cmdstr)
- message.publish(exchange, routing_key='Paths')
- # class Metrics(threading.Thread):
- # def __init__(self, dp_tables):
- # super(Metrics,self).__init__()
- # threading.Thread.__init__(self, name="Metrics_thread")
- # self.dp_tables= dp_tables
- # sw1 = raw_input("Provide Start for new metrics: ")
- # self.req2()
- #
- # #sw2 = raw_input("Destination switch: ")
- # #b=ProjectController()
- #
- # @staticmethod
- # def req2():
- # print "wchodze ze static"
- # ProjectController.req2()
- # #b=ProjectController()
- # #b.req2()
- # #ProjectController._request_stats(dp_tables)
- #
- # # def newStats(self, dp_tables):
- if __name__ == "__main__":
- manager.main(args=sys.argv)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement