Advertisement
Guest User

lab2 - gunnar

a guest
Nov 23rd, 2017
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 21.70 KB | None | 0 0
  1. # coding=utf-8
  2. #------------------------------------------------------------------------------------------------------
  3. # TDA596 Labs - Server Skeleton
  4. # server/server.py
  5. # Input: Node_ID total_number_of_ID
  6. # Student Group: G 27
  7. # Student names: Gunnar Gunnarsson & Pedram Talebi
  8. #------------------------------------------------------------------------------------------------------
  9. # We import various libraries
  10. from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler # Socket specifically designed to handle HTTP requests
  11. import sys # Retrieve arguments
  12. from urlparse import parse_qs # Parse POST data
  13. from httplib import HTTPConnection # Create a HTTP connection, as a client (for POST requests to the other vessels)
  14. from urllib import urlencode # Encode POST content into the HTTP header
  15. from codecs import open # Open a file
  16. from threading import  Thread # Thread Management
  17. import cgi # To escape characters for the HTML
  18. from random import randint # Random integer generation
  19. import time # To create delays
  20. import ast # To translate string to dictionary
  21. import operator # For determining highest value in a dict
  22. #------------------------------------------------------------------------------------------------------
  23.  
  24. # Global variables for HTML templates
  25. board_frontpage_footer_template = ""
  26. board_frontpage_header_template = ""
  27. boardcontents_template = ""
  28. entry_template = ""
  29.  
  30. #------------------------------------------------------------------------------------------------------
  31. # Static variables definitions
  32. PORT_NUMBER = 80
  33. #------------------------------------------------------------------------------------------------------
  34.  
  35.  
  36.  
  37.  
  38. #------------------------------------------------------------------------------------------------------
  39. #------------------------------------------------------------------------------------------------------
  40. class BlackboardServer(HTTPServer):
  41. #------------------------------------------------------------------------------------------------------
  42.     def __init__(self, server_address, handler, node_id, vessel_list):
  43.     # We call the super init
  44.         HTTPServer.__init__(self,server_address, handler)
  45.         # we create the dictionary of values
  46.         self.store = {}
  47.         # We keep a variable of the next id to insert
  48.         self.current_key = -1
  49.         # our own ID (IP is 10.1.0.ID)
  50.         self.vessel_id = vessel_id
  51.         # The list of other vessels
  52.         self.vessels = vessel_list
  53.         # The random election value of this vessel
  54.         self.election_value = randint(0,1000)
  55.         # The leader
  56.         self.leader_id = -1
  57.         self.leader_value = -1
  58.  
  59.         time.sleep(5)
  60.        
  61.         # Identify the leader
  62.         self.spawn_thread(self.send_incremented,(True, self.vessels.index("10.1.0.%s" % (str(vessel_id))), '', -1, None) )
  63. #------------------------------------------------------------------------------------------------------
  64.     # We add a value received to the store
  65.     def add_value_to_store(self, value):
  66.         # Increment the current key index
  67.         self.current_key += 1
  68.         # We add the value to the store
  69.         self.store[self.current_key] = value
  70. #------------------------------------------------------------------------------------------------------
  71.     # We modify a value received in the store
  72.     def modify_value_in_store(self,key,value):
  73.         # we modify a value in the store if it exists
  74.         if(int(key) in self.store.keys()):
  75.             self.store[int(key)] = value
  76.         else:
  77.             raise Exception('404, the key doesn\'t have a value')
  78. #------------------------------------------------------------------------------------------------------
  79.     # We delete a value received from the store
  80.     def delete_value_in_store(self,key):
  81.         # we delete a value in the store if it exists
  82.         if(int(key) in self.store.keys()):
  83.             self.store[int(key)] = ''
  84.         else:
  85.             raise Exception('404, the key doesn\'t have a value')
  86. #------------------------------------------------------------------------------------------------------
  87.     # We update the ID of the leader
  88.     def update_leader(self, leader):
  89.         # we update the leader_id and leader_value as the given values
  90.         self.leader_id = leader['id']
  91.         self.leader_value = leader['value']
  92. #------------------------------------------------------------------------------------------------------
  93. # Contact a specific vessel with a set of variables to transmit to it
  94.     def contact_vessel(self, vessel_ip, path, entry, delete, candidates, leader, sender):
  95.         # If no original sender is specified, the current entity is the sender of the message
  96.         if(sender == None):
  97.             sender = self.vessel_id
  98.         # the Boolean variable we will return
  99.         success = False
  100.         # The variables must be encoded in the URL format, through urllib.urlencode
  101.         post_content = urlencode({'entry': entry, 'delete': delete, 'retransmitted': True, 'candidates': candidates, 'leader': leader, 'sender': sender})
  102.         # the HTTP header must contain the type of data we are transmitting, here URL encoded
  103.         headers = {"Content-type": "application/x-www-form-urlencoded"}
  104.         # We should try to catch errors when contacting the vessel
  105.         try:
  106.             # We contact vessel:PORT_NUMBER since we all use the same port
  107.             # We can set a timeout, after which the connection fails if nothing happened
  108.             connection = HTTPConnection("%s:%d" % (vessel_ip, PORT_NUMBER), timeout = 30)
  109.             # We only use POST to send data (PUT and DELETE not supported)
  110.             action_type = "POST"
  111.             # We send the HTTP request
  112.             connection.request(action_type, path, post_content, headers)
  113.             # We retrieve the response
  114.             response = connection.getresponse()
  115.             # We want to check the status, the body should be empty
  116.             status = response.status
  117.             # If we receive a HTTP 200 - OK
  118.             if status == 200:
  119.                 success = True
  120.         # We catch every possible exceptions
  121.         except Exception as e:
  122.             print "Error while contacting %s" % vessel_ip
  123.             # printing the error given by Python
  124.             print(e)
  125.  
  126.         # we return if we succeeded or not
  127.         return success
  128.  
  129. #------------------------------------------------------------------------------------------------------
  130.     # We send a received value to all the other vessels of the system
  131.     def propagate_value_to_vessels(self, path, entry, delete, is_leader):
  132.         # If this entity is the leader, messages are propagated to slaves, otherwise the message is propagated to the leader only
  133.         if is_leader:
  134.             # We iterate through the vessel list
  135.             for vessel in self.vessels:
  136.                 # We should not send it to our own IP, or we would create an infinite loop of updates
  137.                 if vessel != ("10.1.0.%s" % self.vessel_id):
  138.                     success = False
  139.                     # Try again until the request succeeds
  140.                     while not success:
  141.                         success = self.contact_vessel(vessel, path, entry, delete, '', -1, None)
  142.         else:
  143.             success = False
  144.             # Try again until the request succeeds
  145.             while not success:
  146.                 # Send to leader
  147.                 success = self.contact_vessel("10.1.0.%s" % self.leader_id, path, entry, delete, '', -1, None)
  148.  
  149.     # We send the message incrementally to all messages to all connected vessels until we receive a success message
  150.     def send_incremented(self, is_election, next_index, candidates, leader, sender):
  151.         # Set the initial recipient
  152.         index = next_index
  153.         success = False
  154.         # Iterate until the message has been successfully sent
  155.         while not success:
  156.             # check if address has exceeded the address limit
  157.             if index >= len(self.vessels):
  158.                 index = 0
  159.             # Specify recipient address
  160.             address = self.vessels[index]
  161.             try:
  162.                 if is_election:
  163.                     path = '/election'
  164.                 else:
  165.                     path = '/coordination'
  166.                 # send the message to the specific address
  167.                 success = self.contact_vessel(address, path, '', '', candidates, leader, sender)
  168.             except Exception as e:
  169.                 # address wasn't reachable
  170.                 print(e)
  171.             # Increment to the next index in the list
  172.             index += 1
  173.  
  174.     def spawn_thread(self, input_target, input_args):
  175.         try:
  176.             #thread = Thread(target=self.server.send_incremented,args=(False, self.server.vessels.index("10.1.0.%s" % (str(self.server.vessel_id))) + 1, '', leader, sender) )
  177.             thread = Thread(target=input_target,args=input_args)
  178.             # We kill the process if we kill the server
  179.             thread.daemon = True
  180.             # We start the thread
  181.             thread.start()
  182.         except Exception as e:
  183.             # Catch all possible exceptions
  184.             print "Error when executing spawn_thread"
  185.             print(e)
  186. #------------------------------------------------------------------------------------------------------
  187.  
  188.  
  189.  
  190.  
  191.  
  192.  
  193.  
  194. #------------------------------------------------------------------------------------------------------
  195. #------------------------------------------------------------------------------------------------------
  196. # This class implements the logic when a server receives a GET or POST request
  197. # It can access to the server data through self.server.*
  198. # i.e. the store is accessible through self.server.store
  199. # Attributes of the server are SHARED accross all request hqndling/ threads!
  200. class BlackboardRequestHandler(BaseHTTPRequestHandler):
  201. #------------------------------------------------------------------------------------------------------
  202.     # We fill the HTTP headers
  203.     def set_HTTP_headers(self, status_code = 200):
  204.          # We set the response status code (200 if OK, something else otherwise)
  205.         self.send_response(status_code)
  206.         # We set the content type to HTML
  207.         self.send_header("Content-type","text/html")
  208.         # No more important headers, we can close them
  209.         self.end_headers()
  210. #------------------------------------------------------------------------------------------------------
  211.     # a POST request must be parsed through urlparse.parse_QS, since the content is URL encoded
  212.     def parse_POST_request(self):
  213.         post_data = ""
  214.         # We need to parse the response, so we must know the length of the content
  215.         length = int(self.headers['Content-Length'])
  216.         # we can now parse the content using parse_qs
  217.         post_data = parse_qs(self.rfile.read(length), keep_blank_values=1)
  218.         # we return the data
  219.         return post_data
  220. #------------------------------------------------------------------------------------------------------
  221. #------------------------------------------------------------------------------------------------------
  222. # Request handling - GET
  223. #------------------------------------------------------------------------------------------------------
  224.     # This function contains the logic executed when this server receives a GET request
  225.     # This function is called AUTOMATICALLY upon reception and is executed as a thread!
  226.     def do_GET(self):
  227.         #print("Receiving a GET on path %s" % self.path)
  228.         try:
  229.             # We set the response status code to 200 (OK)
  230.             self.set_HTTP_headers(200)
  231.             response = ''
  232.             # We also parse the variables passed via request URL
  233.             path_params = self.path.split("/")
  234.             req_type = path_params[1]
  235.             try:
  236.                 entry_id = int(path_params[2])
  237.             except Exception as e:
  238.                 entry_id = -1
  239.             # Here, we check which path was requested and call the right logic based on it
  240.             if(req_type == ''):
  241.                 # Get the whole board
  242.                 response = self.do_GET_Index()
  243.             elif(req_type == 'board'):
  244.                 # This is executed if we get the base URL or the board
  245.                 response = self.do_GET_Board()
  246.             elif(req_type == 'entries'):
  247.                 # If there is an ID parameter defined, return a single entry
  248.                 if entry_id is not -1:
  249.                     response = self.do_GET_Entry(entry_id)
  250.                 # Otherwise return all entries
  251.                 else:
  252.                     response = self.do_GET_Entries()
  253.             elif(req_type == 'leader'):
  254.                 # The board requests the leader ID and leader Value
  255.                 response = "ID: %s, Value: %s" % (self.server.leader_id, self.server.leader_value)
  256.  
  257.             # return the response
  258.             self.wfile.write(response)
  259.         except Exception as e:
  260.             # Catch all possible exceptions
  261.             self.set_HTTP_headers(400)
  262.             print "Error when executing %s" % __name__
  263.             print(e)
  264.  
  265. #------------------------------------------------------------------------------------------------------
  266. # GET logic - specific path
  267. #------------------------------------------------------------------------------------------------------
  268.     def do_GET_Index(self):
  269.         try:
  270.             # header
  271.             header = board_frontpage_header_template.replace("leaderplaceholder",("ID: %s, Value: %s" % (self.server.leader_id, self.server.leader_value)))
  272.             # fetch content
  273.             content = self.do_GET_Board()
  274.             # footer
  275.             footer = board_frontpage_footer_template % (self.server.vessels)
  276.             # return combined board
  277.             return header + content + footer
  278.         except Exception as e:
  279.             # Catch all possible exceptions
  280.             print "Error when executing %s" % __name__
  281.             print(e)
  282.  
  283.     def do_GET_Board(self):
  284.         # Only get the board
  285.         try:
  286.             # fetch entries
  287.             entries = self.do_GET_Entries()
  288.             # create boardcontents and add entries
  289.             content = boardcontents_template % ("Board @ 10.1.0.%s:%s" % (self.server.vessel_id, PORT_NUMBER), entries)
  290.             # return board
  291.             return content
  292.         except Exception as e:
  293.             # Catch all possible exceptions
  294.             print "Error when executing %s" % __name__
  295.             print(e)
  296.  
  297.     def do_GET_Entries(self):
  298.         # Get all entries
  299.         try:
  300.             # Initialize entries
  301.             entries = ""
  302.             # Initialize entry counter
  303.             entry_counter = 0
  304.             # Iterate over all entries
  305.             while entry_counter < len(self.server.store):
  306.                 # Add entries to collection
  307.                 entries += self.do_GET_Entry(entry_counter)
  308.                 # Increment the entry counter
  309.                 entry_counter += 1
  310.             # return all entries
  311.             return entries
  312.         except Exception as e:
  313.             # Catch all possible exceptions
  314.             print "Error when executing %s" % __name__
  315.             print(e)
  316.  
  317.     def do_GET_Entry(self, key):
  318.         # Get a single entry
  319.         try:
  320.             # Initialize entry variable
  321.             entry = ''
  322.             # Check if entry is a non-empty string
  323.             if(self.server.store[key] != ''):
  324.                 # Get the specified entry
  325.                 entry_text = self.server.store[key][0]
  326.                 # Update entry variable
  327.                 entry += entry_template % ('entries/' + str(key), key, cgi.escape(self.server.store[key], quote=True))
  328.             # return the entry
  329.             return entry
  330.         except Exception as e:
  331.             # Catch all possible exceptions
  332.             print "Error when executing %s" % __name__
  333.             print(e)
  334. #------------------------------------------------------------------------------------------------------
  335.     # we might want some other functions
  336. #------------------------------------------------------------------------------------------------------
  337. #------------------------------------------------------------------------------------------------------
  338. # Request handling - POST
  339. #------------------------------------------------------------------------------------------------------
  340.     def do_POST(self):
  341.         #print("Receiving a POST on %s" % self.path)
  342.         try:
  343.             # We parse the data received
  344.             post_params = self.parse_POST_request()
  345.             # We also parse the variables passed via request URL
  346.             path_params = self.path.split("/")
  347.             # Filter out the request type
  348.             req_type = path_params[1]
  349.             # Check if this entity is the leader
  350.             is_leader = self.server.vessel_id == self.server.leader_id
  351.            
  352.             # Check if message has been propagated
  353.             try:
  354.                 # If it has been propagated, variable is set to True
  355.                 retransmitted = post_params['retransmitted'][0]
  356.             except Exception as e:
  357.                 # If we are unable to get the value for retransmittance, the default is False
  358.                 retransmitted = False
  359.  
  360.             # Set the headers for the client
  361.             self.set_HTTP_headers(200)
  362.  
  363.             # Here, we check which path was requested and call the right logic based on it
  364.             if(req_type == 'board' or req_type == 'entries'):
  365.                 # Get ID of the entry, if it doesn't exist we give it ID = -1
  366.                 try:
  367.                     entry_id = path_params[2]
  368.                 except Exception as e:
  369.                     entry_id = '-1'
  370.                 # Get the entry in text
  371.                 try:
  372.                     entry_param = post_params['entry'][0]
  373.                 except Exception as e:
  374.                     entry_param = ''
  375.                 # Get the delete flag, if it doesn't exist we give it the value 0
  376.                 try:
  377.                     delete_flag = post_params['delete'][0]
  378.                 except Exception as e:
  379.                     delete_flag = '0'
  380.                 # if I am the leader or if I just received a message
  381.                 #   A leader can only recieve a message from anybody and if the leader receieves a message, it needs to perform the action
  382.                 #   A slave can only recieve a message from its leader and if the slave recieves a message, it needs to perform the action
  383.                 if self.server.vessel_id == self.server.leader_id or retransmitted:
  384.                     self.do_POST_Entries(entry_id, entry_param, delete_flag)
  385.             if(req_type == 'election'):
  386.                 # An election is in progress, we handle the dictionary for candidates in a separate function
  387.                 self.do_POST_Election(post_params['candidates'][0])
  388.                 self.wfile.write("OK")
  389.             if(req_type == 'coordination'):
  390.                 # A leader has been elected and this entity is being notified of the new leader
  391.                 self.do_POST_Coordination(int(post_params['sender'][0]),post_params['leader'][0])
  392.                 self.wfile.write("OK")
  393.             if req_type == 'board' or req_type == 'entries':
  394.                 # If we want to retransmit what we received to the other vessels
  395.                 if not retransmitted or is_leader:
  396.                     # Set base path
  397.                     target_path = '/entries'
  398.                     # Check if an ID has been specified
  399.                     if entry_id is not '-1':
  400.                         # Add the ID to the path
  401.                         target_path += '/' + entry_id
  402.                     # Propagate the specific command to all other vessels
  403.                     self.server.spawn_thread(self.server.propagate_value_to_vessels,(target_path, entry_param, delete_flag, is_leader) )
  404.         except Exception as e:
  405.             # Catch all possible exceptions
  406.             self.set_HTTP_headers(400)
  407.             print "Error when executing %s" % __name__
  408.             print(e)
  409. #------------------------------------------------------------------------------------------------------
  410. # POST Logic
  411. #------------------------------------------------------------------------------------------------------
  412.     # We might want some functions here as well
  413. #------------------------------------------------------------------------------------------------------
  414.     def do_POST_Entries(self, id, entry, delete):
  415.         try:
  416.             # Check if an ID has been specified
  417.             if id == '-1':
  418.                 # An entry that does not have an ID is added to the store
  419.                 self.server.add_value_to_store(entry)
  420.             else:
  421.                 # Check if the delete flag is set
  422.                 if delete == '1':
  423.                     # Set the specified value to the store
  424.                     self.server.delete_value_in_store(id)
  425.                 else:
  426.                     # Modify the value at the given ID
  427.                     self.server.modify_value_in_store(id,entry)
  428.         except Exception as e:
  429.             # Catch all possible exceptions
  430.             print "Error when executing do_POST_Entries"
  431.             print(e)
  432.  
  433.     def do_POST_Election(self, input_candidates):
  434.         try:
  435.             candidates = {}
  436.             # Parse list of candidates into tuples
  437.             if (input_candidates != ''):
  438.                 candidates = ast.literal_eval(input_candidates)
  439.             # if my id is in the candidates list, then the process is over
  440.             if self.server.vessel_id in candidates:
  441.                 # I choose the leader
  442.                 leader_item = max(candidates.iteritems(), key=operator.itemgetter(1))
  443.                 leader = {'id': leader_item[0], 'value': leader_item[1]}
  444.                 # Update the leader locally
  445.                 self.server.update_leader(leader)
  446.                 # post a coordination message
  447.                 self.server.spawn_thread(self.server.send_incremented,(False, self.server.vessels.index("10.1.0.%s" % (str(vessel_id))) + 1, '', leader, self.server.vessel_id) )
  448.             else:
  449.                 # otherwise I add myself as candidate
  450.                 # create pseudo-unique election value
  451.                 is_unique = False
  452.                 unique_election_value = self.server.election_value
  453.                 while not is_unique:
  454.                     if unique_election_value in candidates.values():
  455.                         unique_election_value += 0.0001
  456.                     else:
  457.                         is_unique = True
  458.                 candidates[self.server.vessel_id] = unique_election_value
  459.                 # send an election message to an incremented id
  460.                 self.server.spawn_thread(self.server.send_incremented,(True, self.server.vessels.index("10.1.0.%s" % (str(vessel_id))) + 1, str(candidates), -1, None) )
  461.         except Exception as e:
  462.             # Catch all possible exceptions
  463.             print "Error when executing do_POST_Election"
  464.             print(e)
  465.  
  466.     def do_POST_Coordination(self, sender, leader):
  467.         try:
  468.             if(sender != self.server.vessel_id):
  469.                 # save leader locally
  470.                 self.server.update_leader(ast.literal_eval(leader))
  471.                 # redirecting the coordination message
  472.                 self.server.spawn_thread(self.server.send_incremented,(False, self.server.vessels.index("10.1.0.%s" % (str(vessel_id))) + 1, '', leader, sender))
  473.         except Exception as e:
  474.             # Catch all possible exceptions
  475.             print "Error when executing do_POST_Coordination"
  476.             print(e)
  477. #------------------------------------------------------------------------------------------------------
  478. #------------------------------------------------------------------------------------------------------
  479. # Execute the code
  480. if __name__ == '__main__':
  481.  
  482.     ## read the templates from the corresponding html files
  483.     board_frontpage_footer_template = open('/home/mininet/lab2/server/board_frontpage_footer_template.html').read()
  484.     board_frontpage_header_template = open('/home/mininet/lab2/server/board_frontpage_header_template.html').read()
  485.     boardcontents_template = open('/home/mininet/lab2/server/boardcontents_template.html').read()
  486.     entry_template = open('/home/mininet/lab2/server/entry_template.html').read()
  487.  
  488.     vessel_list = []
  489.     vessel_id = 0
  490.     # Checking the arguments
  491.     if len(sys.argv) != 3: # 2 args, the script and the vessel name
  492.         print("Arguments: vessel_ID number_of_vessels")
  493.     else:
  494.         # We need to know the vessel IP
  495.         vessel_id = int(sys.argv[1])
  496.         # We need to write the other vessels IP, based on the knowledge of their number
  497.         for i in range(1, int(sys.argv[2])+1):
  498.             vessel_list.append("10.1.0.%d" % i) # We can add ourselves, we have a test in the propagation
  499.  
  500.     # We launch a server
  501.     server = BlackboardServer(('', PORT_NUMBER), BlackboardRequestHandler, vessel_id, vessel_list)
  502.     print("Starting the server on port %d" % PORT_NUMBER)
  503.  
  504.     try:
  505.         server.serve_forever()
  506.  
  507.     except KeyboardInterrupt:
  508.         server.server_close()
  509.         print("Stopping Server")
  510. #------------------------------------------------------------------------------------------------------
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement