Advertisement
Guest User

PROXY

a guest
Dec 13th, 2019
165
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.56 KB | None | 0 0
  1. package bmstu.khusnutdinov.lab7;
  2.  
  3. import org.zeromq.*;
  4.  
  5. import java.util.*;
  6.  
  7. import static bmstu.khusnutdinov.lab7.ZeroMQLabConstants.*;
  8.  
  9. public class Proxy {
  10.  
  11.  
  12.     public static void main(String[] args) {
  13.         try {
  14.             ZContext context = new ZContext();
  15.             ZMQ.Socket backend = context.createSocket(SocketType.ROUTER);
  16.             ZMQ.Socket frontend = context.createSocket(SocketType.ROUTER);
  17.             backend.setHWM(0);
  18.             frontend.setHWM(0);
  19.             backend.bind(BACKEND_SOCKET);
  20.             frontend.bind(FRONTEND_SOCKET);
  21.             ZMQ.Poller poller = context.createPoller(2);
  22.             poller.register(frontend, ZMQ.Poller.POLLIN);
  23.             poller.register(backend, ZMQ.Poller.POLLIN);
  24.             Map<ZFrame, CacheCommutator> commutatorMap = new HashMap<>();
  25.             while (!Thread.currentThread().isInterrupted()) {
  26.                 poller.poll(1);
  27.                 if (poller.pollin(FRONTEND_MSG)) {
  28.                     ZMsg msg = ZMsg.recvMsg(frontend);
  29.                     if (msg == null) {
  30.                         break;
  31.                     }
  32.                     System.out.println("GOT MSG ->" + msg);
  33.                     if (commutatorMap.isEmpty()) {
  34.                         ZMsg errMsg = new ZMsg();
  35.                         errMsg.add(msg.getFirst());
  36.                         errMsg.add(EMPTY_FRAME);
  37.                         errMsg.add("NO CURRENT CACHE");
  38.                         errMsg.send(frontend);
  39.                     } else {
  40.                         String[] data = msg.getLast().toString().split(DELIMITER);
  41.                         if (data[0].equals(GET_COMMAND)) {
  42.                             for (Map.Entry<ZFrame, CacheCommutator> map : commutatorMap.entrySet()) {
  43.                                 if (map.getValue().isIntersect(data[1])) {
  44.                                     ZFrame cacheFrame = map.getKey().duplicate();
  45.                                     msg.addFirst(cacheFrame);
  46.                                     msg.send(backend);
  47.                                 }
  48.                             }
  49.                         } else {
  50.                             if (data[0].equals(PUT_COMMAND)) {
  51.                                 for (Map.Entry<ZFrame, CacheCommutator> map : commutatorMap.entrySet()) {
  52.                                     if (map.getValue().isIntersect(data[1])) {
  53.                                         ZMsg tmp = msg;
  54.                                         ZFrame cacheFrame = map.getKey();
  55.                                         tmp.addFirst(cacheFrame);
  56.                                         System.out.println("PUT MSG ->" + tmp);
  57.                                         tmp.send(backend);
  58.                                     }
  59.                                 }
  60.                             } else {
  61.  
  62.                                 ZMsg errMsg = new ZMsg();
  63.                                 errMsg.add(msg.getFirst());
  64.                                 errMsg.add(EMPTY_FRAME);
  65.                                 errMsg.add("ERROR MESSAGE");
  66.                                 errMsg.send(frontend);
  67.                             }
  68.                         }
  69.                     }
  70.                 }
  71.  
  72.                 if (poller.pollin(BACKEND_MSG)) {
  73.                     ZMsg msg = ZMsg.recvMsg(backend);
  74.                     if (msg == null) {
  75.                         break;
  76.                     }
  77.                     if (msg.getLast().toString().contains(HEARTBEAT_COMMAND)) {
  78.                         if (!commutatorMap.containsKey(msg.getFirst())) {
  79.                             ZFrame data = msg.getLast();
  80.                             String[] fields = data.toString().split(DELIMITER);
  81.                             CacheCommutator tmp = new CacheCommutator(
  82.                                     fields[1],
  83.                                     fields[2],
  84.                                     System.currentTimeMillis()
  85.                             );
  86.                             commutatorMap.put(msg.getFirst().duplicate(), tmp);
  87.                             System.out.println("New cache -> " + msg.getFirst() + " " + tmp.getLeftBound() + " " + tmp.getRightBound());
  88.                         }else{
  89.                             commutatorMap.get(msg.getFirst().duplicate()).setTime(System.currentTimeMillis());
  90.                         }
  91.                     } else {
  92.                         System.out.println("NO HEARTHBEAT ->" + msg);
  93.                         msg.pop();
  94.                         msg.send(frontend);
  95.                     }
  96.                 }
  97.             }
  98.         }
  99.     }
  100. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement