Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package bulk.reserving;
- import java.io.IOException;
- import java.net.*;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import bulk.nodes.En_TrapLevel;
- import bulk.nodes.Tm_BulkTrap;
- import bulk.system.threading.Tm_SeparateThread;
- public class Tm_MasterThread extends Tm_SeparateThread
- {
- private static Logger m_Logger = LoggerFactory.getLogger(Tm_MasterThread.class);
- private int m_Port;
- private String m_Host;
- private int m_Timeout;
- private ThreadGroup m_Group;
- /** Create master thread
- */
- public Tm_MasterThread()
- {
- // Get reserving
- Tm_Reserving res = Tm_Reserving.getInstance();
- setID(res.getTopic());
- m_Port = res.getPort();
- m_Host = res.getBindAddress();
- m_Timeout = res.getTimeout();
- m_Group = new ThreadGroup(res.getID());
- }
- /**
- * @see bulk.concurrent.Tm_Thread#exception(java.lang.Throwable)
- */
- public void exception(Throwable ex) throws InterruptedException
- {
- m_Logger.warn("Caught unhandled exception: ", ex);
- // Send a critical trap
- Tm_BulkTrap trap = new Tm_BulkTrap(getID());
- trap.setMessage("Caught unhandled exception");
- trap.setLevel(En_TrapLevel.CRITICAL);
- trap.setCause(ex);
- Tm_Reserving.sendTrap(getID(), trap);
- }
- /** Accept socket connection with a timeout handling
- * @param server server socket
- * @return socket if connection estimated
- * @throws IOException
- * @throws InterruptedException
- */
- public Socket safeAccept(ServerSocket server) throws IOException, InterruptedException
- {
- while (true)
- {
- try
- {
- return server.accept();
- }
- catch (SocketTimeoutException ex)
- {
- // Check thread interrupt status
- if (isInterrupted())
- throw new InterruptedException();
- }
- }
- }
- /**
- * @see bulk.concurrent.Tm_Thread#execute()
- */
- public void execute() throws InterruptedException
- {
- ServerSocket server = null;
- try
- {
- server = new ServerSocket();
- SocketAddress addr = (m_Host!=null) ?
- new InetSocketAddress(m_Host, m_Port) :
- new InetSocketAddress(m_Port);
- // Set address re-using & bind socket
- server.setReuseAddress(true);
- server.setSoTimeout(5000); // 5 seconds for accept
- server.bind(addr);
- while (true)
- {
- // Check if thread is interrupted
- if (isInterrupted())
- {
- m_Logger.info("Interrupted");
- break;
- }
- // Accept a socket and set it's timeout
- Socket client = safeAccept(server);
- client.setSoTimeout(m_Timeout);
- // Start child thread
- synchronized (m_Group)
- {
- Thread thrd = new Tm_ClientThread(m_Group, getID(), client);
- thrd.setDaemon(true); // Fixing hangup
- thrd.start();
- }
- // Send a trap
- {
- m_Logger.info("Attached client from " +
- "address=tcp:" + client.getRemoteSocketAddress().toString());
- Tm_BulkTrap trap = new Tm_BulkTrap(getID());
- trap.setMessage("Attached client from " +
- "address=tcp:" + client.getRemoteSocketAddress().toString());
- trap.setLevel(En_TrapLevel.NORMAL);
- Tm_Reserving.sendTrap(getID(), trap);
- }
- }
- }
- catch (IOException ex)
- {
- m_Logger.warn("Caught I/O exception: ", ex);
- // Send a trap
- Tm_BulkTrap trap = new Tm_BulkTrap(getID());
- trap.setMessage("Caught I/O exception");
- trap.setLevel(En_TrapLevel.WARNING);
- trap.setCause(ex);
- Tm_Reserving.sendTrap(getID(), trap);
- }
- finally
- {
- safeCloseSocket(server);
- }
- exit();
- }
- /** Safe close socket
- * @param server socket to close
- */
- private static void safeCloseSocket(ServerSocket server)
- {
- try
- {
- if (server!=null)
- server.close();
- }
- catch (IOException ex)
- {
- /* nothing */
- }
- }
- /**
- * @see bulk.concurrent.Tm_Thread#exit()
- */
- public void exit()
- {
- synchronized (m_Group)
- {
- m_Logger.info("Interrupting all client threads");
- m_Group.interrupt();
- m_Logger.info("Interrupt OK");
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement