Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.Vector;
- import java.util.stream.Collectors;
- public class Join extends Algorithm {
- private final int m = 6; // Ring of identifiers has size 2^m
- private int SizeRing = exp(2,m);
- String result = ""; // Locations of searched-for keys will be stored here
- public Object run() {
- return findFingersAndKeys(getID());
- }
- // Each message sent by this algorithm has the form: flag, value, ID, where:
- // - if flag = "GET" then the message is a request to get the document with the given key
- // - if flag = "LOOKUP" then the message is request to forward the message to the successor of
- // this processor
- // - if flag = "FOUND" then the message contains the key and processor that stores it
- // - if flag = "NOT_FOUND" then the requested data is not in the system
- // - if flag = "END" the algorithm terminates
- /* Implements the simple algorithm to find a key in a peer-to peer system assuming that each
- processor only knows the address of its successor. The algorithm will return a String
- of the form: "k1:p1 k2:p2 ... kn:pn"m where k1, k2, ... kn are the keys that this processor
- needs to find and pi is the processor that stores the document with key ki for each
- i = 1, 2, ..., n */
- /* ----------------------------- */
- public Object findFingersAndKeys(String id) {
- /* ------------------------------ */
- try {
- Vector<Integer> searchKeys; // Keys that this processor needs to find in the P2P system
- Vector<Integer> localKeys; // Keys stored in this processor
- Vector<Integer> fingers;
- String[] fingerTable;
- int m; // The ring of identifiers has size 2^m
- int globalIndex =0;
- localKeys = new Vector<Integer>();
- fingers = new Vector<Integer>();
- searchKeys = keysToFind(); // Read information from configuration file
- if (searchKeys.size() > 0) {
- for (int i = 0; i < searchKeys.size();) {
- if (searchKeys.elementAt(i) < 0) { // Negative keys are the keys that must be stored locally
- localKeys.add(-searchKeys.elementAt(i));
- searchKeys.remove(i);
- }
- else if (searchKeys.elementAt(i) > 1000) {
- fingers.add(searchKeys.elementAt(i)-1000);
- searchKeys.remove(i);
- }
- else ++i; // Key that needs to be searched for
- }
- }
- if(fingers.stream().distinct().collect(Collectors.toList()).size()==1){
- for(int i = 0 ; i< fingers.size(); i++){
- searchKeys.add(Integer.parseInt(id) + (int)Math.pow(2,i));
- }
- } // if this is the collection we are searching for
- m = fingers.size();
- // Store the finger table in an array of Strings
- fingerTable = new String[m+1];
- for (int i = 0; i < m; ++i) fingerTable[i] = integerToString(fingers.elementAt(i));
- fingerTable[m] = id;
- ////////////////////// END OF CODE THAT POPULATES FINGER TABLES
- String succ = successor(); // Successor of this processor in the ring of identifiers
- Message mssg = null, message; // Message buffers
- int indexNextKey = 0;
- boolean keyProcessed = false; // This variable takes value true when the algorithm has
- // determined that the current key being searched either
- // is in the system or it is not in the system
- int keyValue; // Key that is being searched
- int hashID = hp(id); // Identifier for this processor
- int hashSucc = hp(succ); // Identifier for the successor
- String[] data;
- if (searchKeys.size() > 0) { // Get next key to find and check if it is stored locally
- keyValue = searchKeys.elementAt(0);
- searchKeys.remove(0); // Do not search for the same key twice
- if (localKeys.contains(keyValue)) {
- result = result + keyValue + ":" + id + " "; // Store location of key in the result
- keyProcessed = true;
- }
- else { // Key was not stored locally
- mssg = makeMessage(succ, pack("LOOKUP", keyValue, id));
- }
- }
- // Synchronous loop
- while (waitForNextRound()) {
- if (mssg != null) {
- send(mssg);
- data = unpack(mssg.data());
- if (data[0].equals("END") && searchKeys.size() == 0) return result;
- }
- mssg = null;
- message = receive();
- while (message != null) {
- data = unpack(message.data());
- if (data[0].equals("GET")) {
- // If this is the same GET message that this processor originally sent, then the
- // key is not in the system
- if (data[2].equals(id)) {
- result = result + data[1] + ":not found ";
- keyProcessed = true;
- }
- // This processor must contain the key, if it is in the system
- else {
- mssg = makeMessage(data[2], pack("FOUND", data[1], id));
- }
- }
- else if (data[0].equals("LOOKUP")) {
- // Forward the request
- keyValue = stringToInteger(data[1]);
- if (inSegment(hk(keyValue), hashID, hashSucc))
- mssg = makeMessage (succ,pack("GET",keyValue,data[2]));
- else{
- for(int i=0; i < m-1; i++){
- String currrentProc = fingerTable[i];
- String currentSucessor = fingerTable[i+1];
- if(inSegment(hk(keyValue), hp(currrentProc), hp(currentSucessor))){
- mssg = makeMessage(currrentProc, pack("LOOKUP", keyValue, data[2]));
- }
- }
- }
- }
- else if (data[0].equals("FOUND")) {
- result = result + data[1] + ":" + data[2] + " ";
- keyProcessed = true;
- }
- else if (data[0].equals("END"))
- if (searchKeys.size() > 0) return result;
- else mssg = makeMessage(succ,"END");
- message = receive();
- }
- if (keyProcessed) { // Search for the next key
- if (searchKeys.size() == 0) // There are no more keys to find
- mssg = makeMessage(succ,"END");
- else {
- keyValue = searchKeys.elementAt(0);
- searchKeys.remove(0); // DO not search for same key twice
- if (localKeys.contains(keyValue)) {
- result = result + keyValue + ":" + id + " "; // Store location of key in the result
- keyProcessed = true;
- }
- mssg = makeMessage(succ, pack("LOOKUP", keyValue, id));
- }
- if (mssg != null) keyProcessed = false;
- }
- }
- } catch(SimulatorException e){
- System.out.println("ERROR: " + e.toString());
- }
- /* At this point something likely went wrong. If you do not have a result you can return null */
- return null;
- }
- /* Determine the keys that need to be stored locally and the keys that the processor needs to find.
- Negative keys returned by the simulator's method keysToFind() are to be stored locally in this
- processor as positive numbers. */
- /* ---------------------------------------------------------------------------------------------------- */
- private void getKeys (Vector<Integer> searchKeys, Vector<Integer> localKeys) throws SimulatorException {
- /* ---------------------------------------------------------------------------------------------------- */
- String local = ""; // Keys to be stored locally in this processor
- if (numKeysToFind() > 0) {
- for (int i = 0; i < searchKeys.size();) {
- if (searchKeys.elementAt(i) < 0) { // Negative keys are the keys that must be stored locally
- localKeys.add(-searchKeys.elementAt(i));
- searchKeys.remove(i);
- }
- else if (searchKeys.elementAt(i) > 1000) searchKeys.remove(i);
- else ++i;
- }
- }
- for (int i = 0; i < localKeys.size(); ++i) local = local + localKeys.elementAt(i) + " ";
- showMessage(local); // Show in the simulator the keys stored in this processor
- }
- /* Determine whether hk(value) is in (hp(ID),hp(succ)] */
- /* ---------------------------------------------------------------- */
- private boolean inSegment(int hashValue, int hashID, int hashSucc) {
- /* ----------------------------------------------------------------- */
- if (hashID == hashSucc)
- if (hashValue == hashID) return true;
- else return false;
- else if (hashID < hashSucc)
- if ((hashValue > hashID) && (hashValue <= hashSucc)) return true;
- else return false;
- else
- if (((hashValue > hashID) && (hashValue < SizeRing)) ||
- ((0 <= hashValue) && (hashValue <= hashSucc))) return true;
- else return false;
- }
- /* Hash function for processors. We use the simple mod function */
- /* ------------------------------- */
- private int hp(String ID) throws SimulatorException{
- /* ------------------------------- */
- return stringToInteger(ID) % SizeRing;
- }
- /* Hash function for processors. We use the simple mod function */
- /* ------------------------------- */
- private int hk(int key) {
- /* ------------------------------- */
- return key % SizeRing;
- }
- /* Compute base^exponent ("base" to the power "exponent") */
- /* --------------------------------------- */
- private int exp(int base, int exponent) {
- /* --------------------------------------- */
- int i = 0;
- int result = 1;
- while (i < exponent) {
- result = result * base;
- ++i;
- }
- return result;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement