Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- *
- */
- package bulk.reserving;
- import java.io.*;
- import java.net.*;
- import java.util.Collection;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import bulk.nodes.*;
- import bulk.reserving.helpers.*;
- /** This is a thread that handles one standby client connection
- * @author Vladimir Sadovnikov
- *
- */
- public class Tm_ClientThread extends Thread
- {
- private static Logger m_Logger = LoggerFactory.getLogger(Tm_ClientThread.class);
- private static final long WAIT_DELAY= 5000; // 5 seconds
- private Socket m_Socket = null;
- private Tm_ReservingJournal m_Journal = null;
- private long m_LastWait= 0;
- /** Create client thread
- * @param grp thread group of this thread
- * @param topic topic identifier used to name thread
- * @param sock connected client socket to serve requests
- */
- public Tm_ClientThread(ThreadGroup grp, String topic, Socket sock)
- {
- super(grp, topic);
- m_Socket = sock;
- }
- /** Read next frame from master thread.
- * @param sock frame socket
- * @param readerID identifier of topic reader
- * @return null if there is no data to read
- * @throws IOException
- * @throws InterruptedException
- */
- private Collection<Serializable> readNextFrame(Tm_FrameSocket sock, String readerID) throws IOException, InterruptedException
- {
- while (true)
- {
- Collection<Serializable> list = m_Journal.read(readerID);
- if (list.size() > 0)
- {
- // Check if result is null
- if (Tm_Reserving.getInstance().getDebug() && m_Logger.isTraceEnabled())
- {
- m_Logger.trace("jrd_read_events: ");
- for (Serializable res: list)
- m_Logger.trace(" " + res.toString());
- }
- return list;
- }
- // Check primitive type
- long time = System.currentTimeMillis();
- if (m_LastWait <= time)
- {
- if (Tm_Reserving.getInstance().getDebug() && m_Logger.isTraceEnabled())
- m_Logger.trace("send WAIT");
- // Write 'WAIT' frame to keep connection alive
- sock.writeFrame(new Tm_Frame(En_FrameType.WAIT));
- m_LastWait = time + WAIT_DELAY; // 5 seconds
- }
- }
- }
- private void safeSendTrap(Tm_BulkTrap trap)
- {
- try
- {
- // Check if debug ON
- if (Tm_Reserving.getInstance().getDebug())
- m_Logger.debug(trap.toString());
- Tm_Reserving.sendTrap(getName(), trap);
- }
- catch (InterruptedException ex)
- {
- m_Logger.warn("Interrupted");
- }
- }
- /**
- * @see java.lang.Thread#run()
- */
- public void run()
- {
- Tm_FrameSocket sock = null;
- m_Journal = Tm_Reserving.getInstance().getJournal();
- try
- {
- sock = new Tm_FrameSocket(m_Socket);
- String reader = null;
- // Main server logic
- while (true)
- {
- if (isInterrupted())
- {
- m_Logger.info("Client thread was interrupted");
- break;
- }
- // First poll client for it's ID
- Tm_Frame frm = sock.readFrame();
- if (frm==null)
- {
- m_Logger.warn("Read null frame, exiting");
- return;
- }
- // Check frame type
- switch (frm.getType())
- {
- case IDENTIFIER: // Identity frame
- if (Tm_Reserving.getInstance().getDebug())
- m_Logger.debug("recv IDENTIFIER, object=" + String.valueOf(frm.getObject()));
- if (frm.getObject() instanceof String)
- {
- if (reader==null)
- {
- // Remember reader ID
- reader = (String) frm.getObject();
- if (!m_Journal.validSubscriber(reader))
- {
- m_Logger.warn("Client id=" + reader + " is not registered on reserving subsystem");
- return;
- }
- setName(reader);
- m_Journal.reconnect(reader);
- if (Tm_Reserving.getInstance().getDebug())
- m_Logger.debug("send IDENTIFIER");
- sock.writeFrame(frm);
- break;
- }
- else
- {
- m_Logger.warn("Client already registered, id=" + reader);
- return;
- }
- }
- else
- {
- m_Logger.warn("Failed validating client identifier");
- return;
- }
- case READ: // Read frame
- if (Tm_Reserving.getInstance().getDebug())
- m_Logger.debug("recv READ, object=" + frm.getObject());
- if (reader==null)
- {
- m_Logger.warn("No client identifier!");
- return;
- }
- // Check if acknowledge flag is set
- if (frm.getObject() instanceof Boolean)
- {
- Boolean b = (Boolean)frm.getObject();
- if (b.booleanValue())
- m_Journal.acceptPosition(reader);
- }
- // Read object from topic & send it
- Collection<Serializable> s = readNextFrame(sock, reader);
- if (Tm_Reserving.getInstance().getDebug() && m_Logger.isTraceEnabled())
- m_Logger.trace("jrd_send READ, object=" + s);
- sock.writeFrame(new Tm_Frame(En_FrameType.READ, s));
- m_LastWait = System.currentTimeMillis() + WAIT_DELAY;
- break;
- case WAIT: // This is correct message to keep connection alive
- m_Logger.debug("jrd_recv WAIT");
- break;
- default: // Unknown frame
- m_Logger.warn("bad frame: " + frm.getType());
- return;
- }
- } // WHILE
- }
- catch (EOFException ex)
- {
- // Send a trap
- Tm_BulkTrap trap = new Tm_BulkTrap(getName());
- trap.setMessage("Client has closed a connection. Client address=tcp:" +
- m_Socket.getRemoteSocketAddress().toString());
- trap.setLevel(En_TrapLevel.WARNING);
- trap.setCause(ex);
- safeSendTrap(trap);
- }
- catch (IOException ex)
- {
- // Send trap
- Tm_BulkTrap trap = new Tm_BulkTrap(getName());
- trap.setMessage("Caught I/O exception while working with socket. Client address=tcp:" +
- m_Socket.getRemoteSocketAddress().toString());
- trap.setLevel(En_TrapLevel.WARNING);
- trap.setCause(ex);
- safeSendTrap(trap);
- }
- catch (Throwable ex)
- {
- // Send trap of unhandled exception
- Tm_BulkTrap trap = new Tm_BulkTrap(getName());
- trap.setMessage("Caught unhandled exception");
- trap.setLevel(En_TrapLevel.CRITICAL);
- trap.setCause(ex);
- safeSendTrap(trap);
- }
- finally
- {
- if (sock!=null)
- sock.close();
- }
- }
- }
Add Comment
Please, Sign In to add comment