Advertisement
Guest User

Untitled

a guest
Mar 24th, 2013
203
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 8.59 KB | None | 0 0
  1. /*
  2.  
  3.  * Copyright 2013 Jeanfrancois Arcand
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  6.  * use this file except in compliance with the License. You may obtain a copy of
  7.  * the License at
  8.  *
  9.  * http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13.  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14.  * License for the specific language governing permissions and limitations under
  15.  * the License.
  16.  */
  17. package org.atmosphere.coptermotion;
  18.  
  19. import static org.atmosphere.cpr.ApplicationConfig.PROPERTY_USE_STREAM;
  20.  
  21. import org.atmosphere.config.service.AtmosphereHandlerService;
  22. import org.atmosphere.cpr.AtmosphereRequest;
  23. import org.atmosphere.cpr.AtmosphereResource;
  24. import org.atmosphere.cpr.AtmosphereResourceEvent;
  25. import org.atmosphere.cpr.AtmosphereResponse;
  26. import org.atmosphere.cpr.Broadcaster;
  27. import org.atmosphere.cpr.MetaBroadcaster;
  28. import org.atmosphere.handler.AbstractReflectorAtmosphereHandler;
  29. import org.atmosphere.interceptor.AtmosphereResourceLifecycleInterceptor;
  30. import org.atmosphere.interceptor.BroadcastOnPostAtmosphereInterceptor;
  31. import org.json.simple.JSONArray;
  32. import org.json.simple.JSONObject;
  33. import org.json.simple.JSONValue;
  34. import org.slf4j.Logger;
  35. import org.slf4j.LoggerFactory;
  36. import org.atmosphere.interceptor.HeartbeatInterceptor;
  37.  
  38. import java.io.BufferedReader;
  39. import java.io.IOException;
  40. import java.util.ArrayList;
  41. import java.util.List;
  42. import java.util.concurrent.BlockingQueue;
  43. import java.util.concurrent.LinkedBlockingQueue;
  44.  
  45. import javax.servlet.ServletContextAttributeEvent;
  46. import javax.servlet.ServletContextAttributeListener;
  47.  
  48.  
  49. /**
  50.  * Simple AtmosphereHandler that implement the logic to build a Chat application.
  51.  *
  52.  * @author Jeanfrancois Arcand
  53.  */
  54.  
  55.  
  56. @AtmosphereHandlerService(path="/chat",
  57.         interceptors = {AtmosphereResourceLifecycleInterceptor.class,
  58.                         BroadcastOnPostAtmosphereInterceptor.class,
  59.                         HeartbeatInterceptor.class})
  60. public class ChatAtmosphereHandler extends AbstractReflectorAtmosphereHandler implements ServletContextAttributeListener {
  61.     BlockingQueue<ArrayList> readQueue = new LinkedBlockingQueue<ArrayList>();
  62.     private static BlockingQueue<ArrayList> writeQueue = new LinkedBlockingQueue<ArrayList>(); //had to make it static because onStateChange was making it's own version of queue.
  63.     private static final String SHUTDOWN_REQ = "SHUTDOWN";
  64.     Broadcaster broadcaster;
  65.     private static final Logger logger = LoggerFactory.getLogger(AbstractReflectorAtmosphereHandler.class);
  66.     private boolean queueInit = false;
  67.     private boolean queueWriteInit = false;
  68.    
  69.    
  70.     //is going to be called when both writeQueue and queue have been initialized & added to context
  71.     public void attributeAdded(ServletContextAttributeEvent event) {
  72.         ArrayList writeQueueList;
  73.         ArrayList queueList;
  74.         readQueue = (BlockingQueue<ArrayList>) event.getServletContext().getAttribute("readQueue");
  75.        
  76.         //take first element of writeQueue
  77.        
  78.             if (readQueue == null || queueInit == true){
  79.                 //System.out.println("queue is null");
  80.             } else {
  81.                 queueInit = true;
  82.                
  83.                 Thread thread = new Thread(new Runnable(){
  84.                     public void run(){
  85.                         ArrayList item;
  86.                         try {
  87.                             //while (!(item = queue.take()).equals(SHUTDOWN_REQ)) {
  88.                             while (true) {
  89.                                 item = readQueue.take();                           
  90.                                 JSONObject obj=new JSONObject();
  91.                                 String s = JSONValue.toJSONString(item);
  92.                                 obj.put("type", item.get(0));
  93.                                 obj.put("data", item); 
  94.                                 obj.put("source", "server");
  95.                                 MetaBroadcaster.getDefault().broadcastTo("/", obj.toJSONString());
  96.                             }
  97.                         } catch (InterruptedException e) {
  98.                             e.printStackTrace();
  99.                         }
  100.                     }
  101.                 });        
  102.                 thread.start();
  103.                
  104.                 ArrayList frame = new ArrayList();
  105.                 frame.add("init");
  106.                 writeQueue.add(frame);
  107.                 writeQueue.add(frame);
  108.                 event.getServletContext().setAttribute("writeQueue", writeQueue);
  109.             }
  110.                            
  111.             if (queueWriteInit == true || queueInit==false ){
  112.                 //System.out.println("writeQueue is null");
  113.             } else {
  114.                 queueWriteInit = true;
  115.                 //old code now moved to serialWriter.java
  116.                 //Thread thread = new Thread(new Runnable(){
  117.                 //  public void run(){
  118.                 //      ArrayList item;
  119.                 //      try {
  120.                 //          while (true) {
  121.                 //              item = writeQueue.take();          
  122.                 //              System.out.println("Spilling the guts");
  123.                 //              System.out.println(item.toString());
  124.                 //          }
  125.                 //      } catch (InterruptedException e) {
  126.                 //          e.printStackTrace();
  127.                 //      }
  128.                 //  }
  129.                 //});          
  130.                 //thread.start();  
  131.             }              
  132.     }
  133.     public void attributeRemoved(ServletContextAttributeEvent event) {
  134.         System.out.println("attribute removed");
  135.     }
  136.  
  137.     public void attributeReplaced(ServletContextAttributeEvent event) {
  138.         System.out.println("attribute replaced");
  139.     }
  140.  
  141.     public void onRequest(AtmosphereResource resource) throws IOException {
  142.         //Object message = resource.getAtmosphereResourceEvent().getMessage(); //is empty why?
  143.         //leave connection open
  144.         resource.suspend();
  145.        
  146.         BufferedReader reader = resource.getRequest().getReader();
  147.         Object message = reader.readLine();
  148.        
  149.         if (message !=null){
  150.             Object obj = JSONValue.parse(message.toString());
  151.             JSONObject jsonObject = (JSONObject) obj;
  152.             Object source = jsonObject.get("source");
  153.            
  154.             System.out.println("**onRequest: "+message.toString());
  155.             ArrayList frame = new ArrayList();
  156.             frame.add(jsonObject.get("type"));
  157.             frame.add(jsonObject.get("data"));
  158.             writeQueue.add(frame);
  159.         }
  160.     }
  161.  
  162.     public void destroy() {
  163.         System.out.println("destroy"); 
  164.     }
  165.    
  166.     public void onStateChange(AtmosphereResourceEvent event)
  167.             throws IOException {
  168.        
  169.         /** This method gets invoked when:
  170.                 (1) The remote connection gets closed, either by a browser or a proxy
  171.                 (2) The remote connection reach its maximum idle time (AtmosphereResource.suspend))
  172.                 (3) Everytime a broadcast operation is executed (broadcaster.broadcast)
  173.         **/
  174.        
  175.         Object message = event.getMessage();
  176.    
  177.         AtmosphereResponse r = event.getResource().getResponse();
  178.         if (message == null || event.isCancelled() || event.getResource().getRequest().destroyed()) return;
  179.         //message from server: broadcast
  180.    
  181.         Object obj = JSONValue.parse(message.toString());
  182.         JSONObject jsonObject = (JSONObject) obj;
  183.         Object source = jsonObject.get("source");
  184.        
  185.         if (event.getResource().getSerializer() != null) {
  186.             try {
  187.                 event.getResource().getSerializer().write(event.getResource().getResponse().getOutputStream(), message);
  188.             } catch (Throwable ex) {
  189.                 logger.warn("Serializer exception: message: " + message, ex);
  190.                 throw new IOException(ex);
  191.             }
  192.         } else {
  193.             boolean isUsingStream = true;
  194.             if (source.equals("client")){
  195.                 //do nothing
  196.             } else {
  197.                  if (event.getResource().getRequest().getAttribute(PROPERTY_USE_STREAM) != null) {
  198.                      isUsingStream = (Boolean) event.getResource().getRequest().getAttribute(PROPERTY_USE_STREAM);
  199.                  }
  200.  
  201.                  if (!isUsingStream) {
  202.                      try {
  203.                          r.getWriter();
  204.                      } catch (IllegalStateException e) {
  205.                          isUsingStream = true;
  206.                      }
  207.                  }
  208.  
  209.                  if (message instanceof List) {
  210.                      for (String s : (List<String>) message) {
  211.                          if (isUsingStream) {
  212.                             r.getOutputStream().write(s.getBytes(r.getCharacterEncoding()));
  213.                             r.getOutputStream().flush();
  214.                          } else {
  215.                             r.getWriter().write(s);
  216.                             r.getWriter().flush();
  217.                          }
  218.                      }
  219.                  } else {
  220.                      if (isUsingStream) {
  221.                         r.getOutputStream().write(message.toString().getBytes(r.getCharacterEncoding()));
  222.                         r.getOutputStream().flush();
  223.                      } else {
  224.                         r.getWriter().write(message.toString());
  225.                         r.getWriter().flush();
  226.                      }
  227.                  }
  228.             }
  229.            
  230.         }
  231.         postStateChange(event);
  232.     }  
  233. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement