Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package bmstu.khusnutdinov.lab7;
- import org.zeromq.*;
- import java.util.*;
- import static bmstu.khusnutdinov.lab7.ZeroMQLabConstants.*;
- public class Proxy {
- public static void main(String[] args) {
- try {
- ZContext context = new ZContext();
- ZMQ.Socket backend = context.createSocket(SocketType.ROUTER);
- ZMQ.Socket frontend = context.createSocket(SocketType.ROUTER);
- backend.setHWM(0);
- frontend.setHWM(0);
- backend.bind(BACKEND_SOCKET);
- frontend.bind(FRONTEND_SOCKET);
- ZMQ.Poller poller = context.createPoller(2);
- poller.register(frontend, ZMQ.Poller.POLLIN);
- poller.register(backend, ZMQ.Poller.POLLIN);
- Map<ZFrame, CacheCommutator> commutatorMap = new HashMap<>();
- while (!Thread.currentThread().isInterrupted()) {
- poller.poll(1);
- if (poller.pollin(FRONTEND_MSG)) {
- ZMsg msg = ZMsg.recvMsg(frontend);
- if (msg == null) {
- break;
- }
- System.out.println("GOT MSG ->" + msg);
- if (commutatorMap.isEmpty()) {
- ZMsg errMsg = new ZMsg();
- errMsg.add(msg.getFirst());
- errMsg.add(EMPTY_FRAME);
- errMsg.add("NO CURRENT CACHE");
- errMsg.send(frontend);
- } else {
- String[] data = msg.getLast().toString().split(DELIMITER);
- if (data[0].equals(GET_COMMAND)) {
- for (Map.Entry<ZFrame, CacheCommutator> map : commutatorMap.entrySet()) {
- if (map.getValue().isIntersect(data[1])) {
- ZFrame cacheFrame = map.getKey().duplicate();
- msg.addFirst(cacheFrame);
- msg.send(backend);
- }
- }
- } else {
- if (data[0].equals(PUT_COMMAND)) {
- for (Map.Entry<ZFrame, CacheCommutator> map : commutatorMap.entrySet()) {
- if (map.getValue().isIntersect(data[1])) {
- ZMsg tmp = msg;
- ZFrame cacheFrame = map.getKey();
- tmp.addFirst(cacheFrame);
- System.out.println("PUT MSG ->" + tmp);
- tmp.send(backend);
- }
- }
- } else {
- ZMsg errMsg = new ZMsg();
- errMsg.add(msg.getFirst());
- errMsg.add(EMPTY_FRAME);
- errMsg.add("ERROR MESSAGE");
- errMsg.send(frontend);
- }
- }
- }
- }
- if (poller.pollin(BACKEND_MSG)) {
- ZMsg msg = ZMsg.recvMsg(backend);
- if (msg == null) {
- break;
- }
- if (msg.getLast().toString().contains(HEARTBEAT_COMMAND)) {
- if (!commutatorMap.containsKey(msg.getFirst())) {
- ZFrame data = msg.getLast();
- String[] fields = data.toString().split(DELIMITER);
- CacheCommutator tmp = new CacheCommutator(
- fields[1],
- fields[2],
- System.currentTimeMillis()
- );
- commutatorMap.put(msg.getFirst().duplicate(), tmp);
- System.out.println("New cache -> " + msg.getFirst() + " " + tmp.getLeftBound() + " " + tmp.getRightBound());
- }else{
- commutatorMap.get(msg.getFirst().duplicate()).setTime(System.currentTimeMillis());
- }
- } else {
- System.out.println("NO HEARTHBEAT ->" + msg);
- msg.pop();
- msg.send(frontend);
- }
- }
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement