Guest User

Untitled

a guest
Dec 13th, 2017
163
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 6.14 KB | None | 0 0
  1. /**
  2.  *
  3.  */
  4. package bulk.reserving;
  5.  
  6. import java.io.*;
  7. import java.net.*;
  8. import java.util.Collection;
  9.  
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12.  
  13. import bulk.nodes.*;
  14. import bulk.reserving.helpers.*;
  15.  
  16. /** This is a thread that handles one standby client connection
  17.  * @author Vladimir Sadovnikov
  18.  *
  19.  */
  20. public class Tm_ClientThread extends Thread
  21. {
  22.     private static Logger       m_Logger  = LoggerFactory.getLogger(Tm_ClientThread.class);
  23.    
  24.     private static final long   WAIT_DELAY= 5000; // 5 seconds
  25.    
  26.     private Socket              m_Socket  = null;
  27.     private Tm_ReservingJournal m_Journal = null;
  28.     private long                m_LastWait= 0;
  29.    
  30.     /** Create client thread
  31.      * @param grp thread group of this thread
  32.      * @param topic topic identifier used to name thread
  33.      * @param sock connected client socket to serve requests
  34.      */
  35.     public Tm_ClientThread(ThreadGroup grp, String topic, Socket sock)
  36.     {
  37.         super(grp, topic);
  38.         m_Socket = sock;
  39.     }
  40.    
  41.     /** Read next frame from master thread.
  42.      * @param sock frame socket
  43.      * @param readerID identifier of topic reader
  44.      * @return null if there is no data to read
  45.      * @throws IOException
  46.      * @throws InterruptedException
  47.      */
  48.     private Collection<Serializable> readNextFrame(Tm_FrameSocket sock, String readerID) throws IOException, InterruptedException
  49.     {
  50.         while (true)
  51.         {
  52.             Collection<Serializable> list = m_Journal.read(readerID);
  53.            
  54.             if (list.size() > 0)
  55.             {
  56.                 // Check if result is null
  57.                 if (Tm_Reserving.getInstance().getDebug() && m_Logger.isTraceEnabled())
  58.                 {
  59.                     m_Logger.trace("jrd_read_events: ");
  60.                    
  61.                     for (Serializable res: list)
  62.                         m_Logger.trace("  " + res.toString());
  63.                 }
  64.                
  65.                 return list;
  66.             }
  67.            
  68.             // Check primitive type
  69.             long time = System.currentTimeMillis();
  70.             if (m_LastWait <= time)
  71.             {
  72.                 if (Tm_Reserving.getInstance().getDebug() && m_Logger.isTraceEnabled())
  73.                     m_Logger.trace("send WAIT");
  74.  
  75.                 // Write 'WAIT' frame to keep connection alive
  76.                 sock.writeFrame(new Tm_Frame(En_FrameType.WAIT));
  77.                
  78.                 m_LastWait = time + WAIT_DELAY; // 5 seconds
  79.             }
  80.         }
  81.     }
  82.    
  83.     private void safeSendTrap(Tm_BulkTrap trap)
  84.     {
  85.         try
  86.         {
  87.             // Check if debug ON
  88.             if (Tm_Reserving.getInstance().getDebug())
  89.                 m_Logger.debug(trap.toString());
  90.            
  91.             Tm_Reserving.sendTrap(getName(), trap);
  92.         }
  93.         catch (InterruptedException ex)
  94.         {
  95.             m_Logger.warn("Interrupted");
  96.         }
  97.     }
  98.    
  99.     /**
  100.      * @see java.lang.Thread#run()
  101.      */
  102.     public void run()
  103.     {
  104.         Tm_FrameSocket sock = null;
  105.         m_Journal = Tm_Reserving.getInstance().getJournal();
  106.        
  107.         try
  108.         {
  109.             sock          = new Tm_FrameSocket(m_Socket);
  110.             String reader = null;
  111.  
  112.             // Main server logic
  113.             while (true)
  114.             {
  115.                 if (isInterrupted())
  116.                 {
  117.                     m_Logger.info("Client thread was interrupted");
  118.                     break;
  119.                 }
  120.                
  121.                 // First poll client for it's ID
  122.                 Tm_Frame frm = sock.readFrame();
  123.                
  124.                 if (frm==null)
  125.                 {
  126.                     m_Logger.warn("Read null frame, exiting");
  127.                     return;
  128.                 }
  129.                
  130.                 // Check frame type
  131.                 switch (frm.getType())
  132.                 {
  133.                     case IDENTIFIER: // Identity frame
  134.                         if (Tm_Reserving.getInstance().getDebug())
  135.                             m_Logger.debug("recv IDENTIFIER, object=" + String.valueOf(frm.getObject()));
  136.                         if (frm.getObject() instanceof String)
  137.                         {
  138.                             if (reader==null)
  139.                             {
  140.                                 // Remember reader ID
  141.                                 reader = (String) frm.getObject();
  142.                                 if (!m_Journal.validSubscriber(reader))
  143.                                 {
  144.                                     m_Logger.warn("Client id=" + reader + " is not registered on reserving subsystem");
  145.                                     return;
  146.                                 }
  147.                                
  148.                                 setName(reader);
  149.                                 m_Journal.reconnect(reader);
  150.                                
  151.                                 if (Tm_Reserving.getInstance().getDebug())
  152.                                     m_Logger.debug("send IDENTIFIER");
  153.                                 sock.writeFrame(frm);
  154.                                 break;
  155.                             }
  156.                             else
  157.                             {
  158.                                 m_Logger.warn("Client already registered, id=" + reader);
  159.                                 return;
  160.                             }
  161.                         }
  162.                         else
  163.                         {
  164.                             m_Logger.warn("Failed validating client identifier");
  165.                             return;
  166.                         }
  167.                        
  168.                     case READ: // Read frame
  169.                         if (Tm_Reserving.getInstance().getDebug())
  170.                             m_Logger.debug("recv READ, object=" + frm.getObject());
  171.                         if (reader==null)
  172.                         {
  173.                             m_Logger.warn("No client identifier!");
  174.                             return;
  175.                         }
  176.  
  177.                         // Check if acknowledge flag is set
  178.                         if (frm.getObject() instanceof Boolean)
  179.                         {
  180.                             Boolean b = (Boolean)frm.getObject();
  181.                             if (b.booleanValue())
  182.                                 m_Journal.acceptPosition(reader);
  183.                         }
  184.                        
  185.                         // Read object from topic & send it
  186.                         Collection<Serializable> s = readNextFrame(sock, reader);
  187.                         if (Tm_Reserving.getInstance().getDebug() && m_Logger.isTraceEnabled())
  188.                             m_Logger.trace("jrd_send READ, object=" + s);
  189.                         sock.writeFrame(new Tm_Frame(En_FrameType.READ, s));
  190.                         m_LastWait = System.currentTimeMillis() + WAIT_DELAY;
  191.                         break;
  192.                    
  193.                     case WAIT: // This is correct message to keep connection alive
  194.                         m_Logger.debug("jrd_recv WAIT");
  195.                         break;
  196.                        
  197.                     default: // Unknown frame
  198.                         m_Logger.warn("bad frame: " + frm.getType());
  199.                         return;
  200.                 }
  201.             } // WHILE
  202.         }
  203.         catch (EOFException ex)
  204.         {
  205.             // Send a trap
  206.             Tm_BulkTrap trap = new Tm_BulkTrap(getName());
  207.             trap.setMessage("Client has closed a connection. Client address=tcp:" +
  208.                         m_Socket.getRemoteSocketAddress().toString());
  209.             trap.setLevel(En_TrapLevel.WARNING);
  210.             trap.setCause(ex);
  211.  
  212.             safeSendTrap(trap);
  213.         }
  214.         catch (IOException ex)
  215.         {
  216.             // Send trap
  217.             Tm_BulkTrap trap = new Tm_BulkTrap(getName());
  218.             trap.setMessage("Caught I/O exception while working with socket. Client address=tcp:" +
  219.                         m_Socket.getRemoteSocketAddress().toString());
  220.             trap.setLevel(En_TrapLevel.WARNING);
  221.             trap.setCause(ex);
  222.            
  223.             safeSendTrap(trap);
  224.         }
  225.         catch (Throwable ex)
  226.         {
  227.             // Send trap of unhandled exception
  228.             Tm_BulkTrap trap = new Tm_BulkTrap(getName());
  229.             trap.setMessage("Caught unhandled exception");
  230.             trap.setLevel(En_TrapLevel.CRITICAL);
  231.             trap.setCause(ex);
  232.            
  233.             safeSendTrap(trap);
  234.         }
  235.         finally
  236.         {
  237.             if (sock!=null)
  238.                 sock.close();
  239.         }
  240.     }
  241. }
Add Comment
Please, Sign In to add comment