Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # coding=utf-8
- #------------------------------------------------------------------------------------------------------
- # TDA596 Labs - Server Skeleton
- # server/server.py
- # Input: Node_ID total_number_of_ID
- # Student Group: G 27
- # Student names: Gunnar Gunnarsson & Pedram Talebi
- #------------------------------------------------------------------------------------------------------
- # We import various libraries
- from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler # Socket specifically designed to handle HTTP requests
- import sys # Retrieve arguments
- from urlparse import parse_qs # Parse POST data
- from httplib import HTTPConnection # Create a HTTP connection, as a client (for POST requests to the other vessels)
- from urllib import urlencode # Encode POST content into the HTTP header
- from codecs import open # Open a file
- from threading import Thread # Thread Management
- import cgi # To escape characters for the HTML
- from random import randint # Random integer generation
- import time # To create delays
- import ast # To translate string to dictionary
- import operator # For determining highest value in a dict
- #------------------------------------------------------------------------------------------------------
- # Global variables for HTML templates
- board_frontpage_footer_template = ""
- board_frontpage_header_template = ""
- boardcontents_template = ""
- entry_template = ""
- #------------------------------------------------------------------------------------------------------
- # Static variables definitions
- PORT_NUMBER = 80
- #------------------------------------------------------------------------------------------------------
- #------------------------------------------------------------------------------------------------------
- #------------------------------------------------------------------------------------------------------
- class BlackboardServer(HTTPServer):
- #------------------------------------------------------------------------------------------------------
- def __init__(self, server_address, handler, node_id, vessel_list):
- # We call the super init
- HTTPServer.__init__(self,server_address, handler)
- # we create the dictionary of values
- self.store = {}
- # We keep a variable of the next id to insert
- self.current_key = -1
- # our own ID (IP is 10.1.0.ID)
- self.vessel_id = vessel_id
- # The list of other vessels
- self.vessels = vessel_list
- # The random election value of this vessel
- self.election_value = randint(0,1000)
- # The leader
- self.leader_id = -1
- self.leader_value = -1
- time.sleep(5)
- # Identify the leader
- self.spawn_thread(self.send_incremented,(True, self.vessels.index("10.1.0.%s" % (str(vessel_id))), '', -1, None) )
- #------------------------------------------------------------------------------------------------------
- # We add a value received to the store
- def add_value_to_store(self, value):
- # Increment the current key index
- self.current_key += 1
- # We add the value to the store
- self.store[self.current_key] = value
- #------------------------------------------------------------------------------------------------------
- # We modify a value received in the store
- def modify_value_in_store(self,key,value):
- # we modify a value in the store if it exists
- if(int(key) in self.store.keys()):
- self.store[int(key)] = value
- else:
- raise Exception('404, the key doesn\'t have a value')
- #------------------------------------------------------------------------------------------------------
- # We delete a value received from the store
- def delete_value_in_store(self,key):
- # we delete a value in the store if it exists
- if(int(key) in self.store.keys()):
- self.store[int(key)] = ''
- else:
- raise Exception('404, the key doesn\'t have a value')
- #------------------------------------------------------------------------------------------------------
- # We update the ID of the leader
- def update_leader(self, leader):
- # we update the leader_id and leader_value as the given values
- self.leader_id = leader['id']
- self.leader_value = leader['value']
- #------------------------------------------------------------------------------------------------------
- # Contact a specific vessel with a set of variables to transmit to it
- def contact_vessel(self, vessel_ip, path, entry, delete, candidates, leader, sender):
- # If no original sender is specified, the current entity is the sender of the message
- if(sender == None):
- sender = self.vessel_id
- # the Boolean variable we will return
- success = False
- # The variables must be encoded in the URL format, through urllib.urlencode
- post_content = urlencode({'entry': entry, 'delete': delete, 'retransmitted': True, 'candidates': candidates, 'leader': leader, 'sender': sender})
- # the HTTP header must contain the type of data we are transmitting, here URL encoded
- headers = {"Content-type": "application/x-www-form-urlencoded"}
- # We should try to catch errors when contacting the vessel
- try:
- # We contact vessel:PORT_NUMBER since we all use the same port
- # We can set a timeout, after which the connection fails if nothing happened
- connection = HTTPConnection("%s:%d" % (vessel_ip, PORT_NUMBER), timeout = 30)
- # We only use POST to send data (PUT and DELETE not supported)
- action_type = "POST"
- # We send the HTTP request
- connection.request(action_type, path, post_content, headers)
- # We retrieve the response
- response = connection.getresponse()
- # We want to check the status, the body should be empty
- status = response.status
- # If we receive a HTTP 200 - OK
- if status == 200:
- success = True
- # We catch every possible exceptions
- except Exception as e:
- print "Error while contacting %s" % vessel_ip
- # printing the error given by Python
- print(e)
- # we return if we succeeded or not
- return success
- #------------------------------------------------------------------------------------------------------
- # We send a received value to all the other vessels of the system
- def propagate_value_to_vessels(self, path, entry, delete, is_leader):
- # If this entity is the leader, messages are propagated to slaves, otherwise the message is propagated to the leader only
- if is_leader:
- # We iterate through the vessel list
- for vessel in self.vessels:
- # We should not send it to our own IP, or we would create an infinite loop of updates
- if vessel != ("10.1.0.%s" % self.vessel_id):
- success = False
- # Try again until the request succeeds
- while not success:
- success = self.contact_vessel(vessel, path, entry, delete, '', -1, None)
- else:
- success = False
- # Try again until the request succeeds
- while not success:
- # Send to leader
- success = self.contact_vessel("10.1.0.%s" % self.leader_id, path, entry, delete, '', -1, None)
- # We send the message incrementally to all messages to all connected vessels until we receive a success message
- def send_incremented(self, is_election, next_index, candidates, leader, sender):
- # Set the initial recipient
- index = next_index
- success = False
- # Iterate until the message has been successfully sent
- while not success:
- # check if address has exceeded the address limit
- if index >= len(self.vessels):
- index = 0
- # Specify recipient address
- address = self.vessels[index]
- try:
- if is_election:
- path = '/election'
- else:
- path = '/coordination'
- # send the message to the specific address
- success = self.contact_vessel(address, path, '', '', candidates, leader, sender)
- except Exception as e:
- # address wasn't reachable
- print(e)
- # Increment to the next index in the list
- index += 1
- def spawn_thread(self, input_target, input_args):
- try:
- #thread = Thread(target=self.server.send_incremented,args=(False, self.server.vessels.index("10.1.0.%s" % (str(self.server.vessel_id))) + 1, '', leader, sender) )
- thread = Thread(target=input_target,args=input_args)
- # We kill the process if we kill the server
- thread.daemon = True
- # We start the thread
- thread.start()
- except Exception as e:
- # Catch all possible exceptions
- print "Error when executing spawn_thread"
- print(e)
- #------------------------------------------------------------------------------------------------------
- #------------------------------------------------------------------------------------------------------
- #------------------------------------------------------------------------------------------------------
- # This class implements the logic when a server receives a GET or POST request
- # It can access to the server data through self.server.*
- # i.e. the store is accessible through self.server.store
- # Attributes of the server are SHARED accross all request hqndling/ threads!
- class BlackboardRequestHandler(BaseHTTPRequestHandler):
- #------------------------------------------------------------------------------------------------------
- # We fill the HTTP headers
- def set_HTTP_headers(self, status_code = 200):
- # We set the response status code (200 if OK, something else otherwise)
- self.send_response(status_code)
- # We set the content type to HTML
- self.send_header("Content-type","text/html")
- # No more important headers, we can close them
- self.end_headers()
- #------------------------------------------------------------------------------------------------------
- # a POST request must be parsed through urlparse.parse_QS, since the content is URL encoded
- def parse_POST_request(self):
- post_data = ""
- # We need to parse the response, so we must know the length of the content
- length = int(self.headers['Content-Length'])
- # we can now parse the content using parse_qs
- post_data = parse_qs(self.rfile.read(length), keep_blank_values=1)
- # we return the data
- return post_data
- #------------------------------------------------------------------------------------------------------
- #------------------------------------------------------------------------------------------------------
- # Request handling - GET
- #------------------------------------------------------------------------------------------------------
- # This function contains the logic executed when this server receives a GET request
- # This function is called AUTOMATICALLY upon reception and is executed as a thread!
- def do_GET(self):
- #print("Receiving a GET on path %s" % self.path)
- try:
- # We set the response status code to 200 (OK)
- self.set_HTTP_headers(200)
- response = ''
- # We also parse the variables passed via request URL
- path_params = self.path.split("/")
- req_type = path_params[1]
- try:
- entry_id = int(path_params[2])
- except Exception as e:
- entry_id = -1
- # Here, we check which path was requested and call the right logic based on it
- if(req_type == ''):
- # Get the whole board
- response = self.do_GET_Index()
- elif(req_type == 'board'):
- # This is executed if we get the base URL or the board
- response = self.do_GET_Board()
- elif(req_type == 'entries'):
- # If there is an ID parameter defined, return a single entry
- if entry_id is not -1:
- response = self.do_GET_Entry(entry_id)
- # Otherwise return all entries
- else:
- response = self.do_GET_Entries()
- elif(req_type == 'leader'):
- # The board requests the leader ID and leader Value
- response = "ID: %s, Value: %s" % (self.server.leader_id, self.server.leader_value)
- # return the response
- self.wfile.write(response)
- except Exception as e:
- # Catch all possible exceptions
- self.set_HTTP_headers(400)
- print "Error when executing %s" % __name__
- print(e)
- #------------------------------------------------------------------------------------------------------
- # GET logic - specific path
- #------------------------------------------------------------------------------------------------------
- def do_GET_Index(self):
- try:
- # header
- header = board_frontpage_header_template.replace("leaderplaceholder",("ID: %s, Value: %s" % (self.server.leader_id, self.server.leader_value)))
- # fetch content
- content = self.do_GET_Board()
- # footer
- footer = board_frontpage_footer_template % (self.server.vessels)
- # return combined board
- return header + content + footer
- except Exception as e:
- # Catch all possible exceptions
- print "Error when executing %s" % __name__
- print(e)
- def do_GET_Board(self):
- # Only get the board
- try:
- # fetch entries
- entries = self.do_GET_Entries()
- # create boardcontents and add entries
- content = boardcontents_template % ("Board @ 10.1.0.%s:%s" % (self.server.vessel_id, PORT_NUMBER), entries)
- # return board
- return content
- except Exception as e:
- # Catch all possible exceptions
- print "Error when executing %s" % __name__
- print(e)
- def do_GET_Entries(self):
- # Get all entries
- try:
- # Initialize entries
- entries = ""
- # Initialize entry counter
- entry_counter = 0
- # Iterate over all entries
- while entry_counter < len(self.server.store):
- # Add entries to collection
- entries += self.do_GET_Entry(entry_counter)
- # Increment the entry counter
- entry_counter += 1
- # return all entries
- return entries
- except Exception as e:
- # Catch all possible exceptions
- print "Error when executing %s" % __name__
- print(e)
- def do_GET_Entry(self, key):
- # Get a single entry
- try:
- # Initialize entry variable
- entry = ''
- # Check if entry is a non-empty string
- if(self.server.store[key] != ''):
- # Get the specified entry
- entry_text = self.server.store[key][0]
- # Update entry variable
- entry += entry_template % ('entries/' + str(key), key, cgi.escape(self.server.store[key], quote=True))
- # return the entry
- return entry
- except Exception as e:
- # Catch all possible exceptions
- print "Error when executing %s" % __name__
- print(e)
- #------------------------------------------------------------------------------------------------------
- # we might want some other functions
- #------------------------------------------------------------------------------------------------------
- #------------------------------------------------------------------------------------------------------
- # Request handling - POST
- #------------------------------------------------------------------------------------------------------
- def do_POST(self):
- #print("Receiving a POST on %s" % self.path)
- try:
- # We parse the data received
- post_params = self.parse_POST_request()
- # We also parse the variables passed via request URL
- path_params = self.path.split("/")
- # Filter out the request type
- req_type = path_params[1]
- # Check if this entity is the leader
- is_leader = self.server.vessel_id == self.server.leader_id
- # Check if message has been propagated
- try:
- # If it has been propagated, variable is set to True
- retransmitted = post_params['retransmitted'][0]
- except Exception as e:
- # If we are unable to get the value for retransmittance, the default is False
- retransmitted = False
- # Set the headers for the client
- self.set_HTTP_headers(200)
- # Here, we check which path was requested and call the right logic based on it
- if(req_type == 'board' or req_type == 'entries'):
- # Get ID of the entry, if it doesn't exist we give it ID = -1
- try:
- entry_id = path_params[2]
- except Exception as e:
- entry_id = '-1'
- # Get the entry in text
- try:
- entry_param = post_params['entry'][0]
- except Exception as e:
- entry_param = ''
- # Get the delete flag, if it doesn't exist we give it the value 0
- try:
- delete_flag = post_params['delete'][0]
- except Exception as e:
- delete_flag = '0'
- # if I am the leader or if I just received a message
- # A leader can only recieve a message from anybody and if the leader receieves a message, it needs to perform the action
- # A slave can only recieve a message from its leader and if the slave recieves a message, it needs to perform the action
- if self.server.vessel_id == self.server.leader_id or retransmitted:
- self.do_POST_Entries(entry_id, entry_param, delete_flag)
- if(req_type == 'election'):
- # An election is in progress, we handle the dictionary for candidates in a separate function
- self.do_POST_Election(post_params['candidates'][0])
- self.wfile.write("OK")
- if(req_type == 'coordination'):
- # A leader has been elected and this entity is being notified of the new leader
- self.do_POST_Coordination(int(post_params['sender'][0]),post_params['leader'][0])
- self.wfile.write("OK")
- if req_type == 'board' or req_type == 'entries':
- # If we want to retransmit what we received to the other vessels
- if not retransmitted or is_leader:
- # Set base path
- target_path = '/entries'
- # Check if an ID has been specified
- if entry_id is not '-1':
- # Add the ID to the path
- target_path += '/' + entry_id
- # Propagate the specific command to all other vessels
- self.server.spawn_thread(self.server.propagate_value_to_vessels,(target_path, entry_param, delete_flag, is_leader) )
- except Exception as e:
- # Catch all possible exceptions
- self.set_HTTP_headers(400)
- print "Error when executing %s" % __name__
- print(e)
- #------------------------------------------------------------------------------------------------------
- # POST Logic
- #------------------------------------------------------------------------------------------------------
- # We might want some functions here as well
- #------------------------------------------------------------------------------------------------------
- def do_POST_Entries(self, id, entry, delete):
- try:
- # Check if an ID has been specified
- if id == '-1':
- # An entry that does not have an ID is added to the store
- self.server.add_value_to_store(entry)
- else:
- # Check if the delete flag is set
- if delete == '1':
- # Set the specified value to the store
- self.server.delete_value_in_store(id)
- else:
- # Modify the value at the given ID
- self.server.modify_value_in_store(id,entry)
- except Exception as e:
- # Catch all possible exceptions
- print "Error when executing do_POST_Entries"
- print(e)
- def do_POST_Election(self, input_candidates):
- try:
- candidates = {}
- # Parse list of candidates into tuples
- if (input_candidates != ''):
- candidates = ast.literal_eval(input_candidates)
- # if my id is in the candidates list, then the process is over
- if self.server.vessel_id in candidates:
- # I choose the leader
- leader_item = max(candidates.iteritems(), key=operator.itemgetter(1))
- leader = {'id': leader_item[0], 'value': leader_item[1]}
- # Update the leader locally
- self.server.update_leader(leader)
- # post a coordination message
- self.server.spawn_thread(self.server.send_incremented,(False, self.server.vessels.index("10.1.0.%s" % (str(vessel_id))) + 1, '', leader, self.server.vessel_id) )
- else:
- # otherwise I add myself as candidate
- # create pseudo-unique election value
- is_unique = False
- unique_election_value = self.server.election_value
- while not is_unique:
- if unique_election_value in candidates.values():
- unique_election_value += 0.0001
- else:
- is_unique = True
- candidates[self.server.vessel_id] = unique_election_value
- # send an election message to an incremented id
- self.server.spawn_thread(self.server.send_incremented,(True, self.server.vessels.index("10.1.0.%s" % (str(vessel_id))) + 1, str(candidates), -1, None) )
- except Exception as e:
- # Catch all possible exceptions
- print "Error when executing do_POST_Election"
- print(e)
- def do_POST_Coordination(self, sender, leader):
- try:
- if(sender != self.server.vessel_id):
- # save leader locally
- self.server.update_leader(ast.literal_eval(leader))
- # redirecting the coordination message
- self.server.spawn_thread(self.server.send_incremented,(False, self.server.vessels.index("10.1.0.%s" % (str(vessel_id))) + 1, '', leader, sender))
- except Exception as e:
- # Catch all possible exceptions
- print "Error when executing do_POST_Coordination"
- print(e)
- #------------------------------------------------------------------------------------------------------
- #------------------------------------------------------------------------------------------------------
- # Execute the code
- if __name__ == '__main__':
- ## read the templates from the corresponding html files
- board_frontpage_footer_template = open('/home/mininet/lab2/server/board_frontpage_footer_template.html').read()
- board_frontpage_header_template = open('/home/mininet/lab2/server/board_frontpage_header_template.html').read()
- boardcontents_template = open('/home/mininet/lab2/server/boardcontents_template.html').read()
- entry_template = open('/home/mininet/lab2/server/entry_template.html').read()
- vessel_list = []
- vessel_id = 0
- # Checking the arguments
- if len(sys.argv) != 3: # 2 args, the script and the vessel name
- print("Arguments: vessel_ID number_of_vessels")
- else:
- # We need to know the vessel IP
- vessel_id = int(sys.argv[1])
- # We need to write the other vessels IP, based on the knowledge of their number
- for i in range(1, int(sys.argv[2])+1):
- vessel_list.append("10.1.0.%d" % i) # We can add ourselves, we have a test in the propagation
- # We launch a server
- server = BlackboardServer(('', PORT_NUMBER), BlackboardRequestHandler, vessel_id, vessel_list)
- print("Starting the server on port %d" % PORT_NUMBER)
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- server.server_close()
- print("Stopping Server")
- #------------------------------------------------------------------------------------------------------
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement