Advertisement
olemis

Coursera mobilecloud-001 : ACE echo server

Sep 22nd, 2014
270
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 4.89 KB | None | 0 0
  1. //============================================================================
  2. // Name        : OA3_Reactive_Web_Server_ACE_C.cpp
  3. // Author      : Olemis Lang
  4. // Version     :
  5. // Copyright   : Copyright 2014 Olemis Lang
  6. // Description : Coursera mobilecloud-001 : ACE echo server in C++, Ansi-style
  7. //============================================================================
  8.  
  9. #include <iostream>
  10. using namespace std;
  11.  
  12. #include <ace/ACE.h>
  13. #include <ace/Acceptor.h>
  14. #include <ace/INET_Addr.h>
  15. #include <ace/Log_Msg.h>
  16. #include <ace/Message_Block.h>
  17. #include <ace/Reactor.h>
  18. #include <ace/SOCK_Acceptor.h>
  19. #include <ace/SOCK_Stream.h>
  20. #include <ace/Svc_Handler.h>
  21.  
  22. class Echo_Svc_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
  23. {
  24.  
  25.     typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> super;
  26.  
  27. public:
  28.     // Emit log message for connection event
  29.     virtual int open (void *p)
  30.     {
  31.         if (super::open (p) == -1)
  32.             return -1;
  33.  
  34.         // Unfortunately not working ... :'(
  35.         /*
  36.         ACE_TCHAR peer_name[MAXHOSTNAMELEN];
  37.         ACE_INET_Addr peer_addr();
  38.         if (this->peer ().get_remote_addr ((ACE_Addr)peer_addr) == 0 &&
  39.             peer_addr.addr_to_string (peer_name, MAXHOSTNAMELEN) == 0)
  40.         ACE_DEBUG ((LM_DEBUG,
  41.                     ACE_TEXT ("(%P|%t) Connection from %s\n"),
  42.                     peer_name));
  43.         */
  44.         return 0;
  45.     }
  46.  
  47.     virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE) {
  48.         // TODO: Retrieve stream bound to ACE_HANDLE
  49.         ACE_SOCK_Stream stream = this->peer();
  50.  
  51.         char msgbuf[BUFSIZ];
  52.  
  53.         // Read bytes sent by client
  54.         ssize_t bytes_read = stream.recv (msgbuf, sizeof msgbuf);
  55.         if (bytes_read <= 0)
  56.         {
  57.             ACE_DEBUG ((LM_DEBUG,
  58.                       ACE_TEXT ("(%P|%t) Connection closed unexpectedly\n")));
  59.             return -1;
  60.         }
  61.         // Sent them back
  62.         ssize_t bytes_sent = stream.send (msgbuf, bytes_read);
  63.  
  64.         // The whole buffer contents were sent , just return
  65.         if (bytes_sent == bytes_read)
  66.             return 0;
  67.         // Handle unrecoverable errors
  68.         if (bytes_sent == -1 && ACE_OS::last_error () != EWOULDBLOCK)
  69.             ACE_ERROR_RETURN ((LM_ERROR,
  70.                                ACE_TEXT ("(%P|%t) %p\n"),
  71.                                ACE_TEXT ("send")),
  72.                               0);
  73.         // Otherwise use message queue to defer subsequent write operations
  74.         // by relying upon message block (see handle_output below)
  75.         if (bytes_sent == -1)
  76.             bytes_sent = 0;
  77.         ACE_Message_Block *mb;
  78.         // Calculate remaining bytes
  79.         ssize_t remaining = (bytes_read - bytes_sent);
  80.         // Create future/premise (message block)
  81.         ACE_NEW_RETURN (mb, ACE_Message_Block (&msgbuf[bytes_sent], remaining), -1);
  82.  
  83.         // Queue is empty ?
  84.         int output_off = this->msg_queue ()->is_empty ();
  85.         ACE_Time_Value nowait (ACE_OS::gettimeofday ());
  86.  
  87.         // Log and recover from an error condition
  88.         if (this->putq (mb, &nowait) == -1)
  89.         {
  90.             ACE_ERROR ((LM_ERROR,
  91.                       ACE_TEXT ("(%P|%t) %p; discarding data\n"),
  92.                       ACE_TEXT ("enqueue failed")));
  93.             mb->release ();
  94.             return 0;
  95.         }
  96.         if (output_off)
  97.             return this->reactor ()->register_handler
  98.                     (this, ACE_Event_Handler::WRITE_MASK);
  99.         return 0;
  100.     }
  101.  
  102.     // Called when output is possible.
  103.     virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE)
  104.     {
  105.         // Target message block specifying future/promise async I/O operation
  106.         // see handle_input() above
  107.         ACE_Message_Block *mb;
  108.         ACE_Time_Value nowait (ACE_OS::gettimeofday ());
  109.  
  110.         // Acquire write lock
  111.         while (-1 != this->getq (mb, &nowait))
  112.         {
  113.             // Try to send pending bytes
  114.             ssize_t bytes_sent =
  115.                     this->peer ().send (mb->rd_ptr (), mb->length ());
  116.             // Log errors, if any
  117.             if (bytes_sent == -1)
  118.                 ACE_ERROR ((LM_ERROR,
  119.                             ACE_TEXT ("(%P|%t) %p\n"),
  120.                             ACE_TEXT ("send")));
  121.             else
  122.                 // Move I/O pointer bytes_sent bytes forward
  123.                 mb->rd_ptr (bytes_sent);
  124.             // Enqueue message block head for subsequent
  125.             // processing if not all bytes have been sent
  126.             if (mb->length () > 0)
  127.             {
  128.                 // Release write lock for a while
  129.                 this->ungetq (mb);
  130.                 break;
  131.             }
  132.             mb->release ();
  133.         }
  134.         return (this->msg_queue ()->is_empty ()) ? -1 : 0;
  135.     }
  136.  
  137.     // Called when this handler is removed from the ACE_Reactor.
  138.     virtual int handle_close (ACE_HANDLE handle,
  139.                             ACE_Reactor_Mask close_mask)
  140.     {
  141.         // Enable reentrant behavior of handle_output()
  142.         // by checking write mask set
  143.         if (close_mask == ACE_Event_Handler::WRITE_MASK)
  144.             return 0;
  145.         return super::handle_close (handle, close_mask);
  146.     }
  147. };
  148.  
  149. typedef ACE_Acceptor<Echo_Svc_Handler, ACE_SOCK_ACCEPTOR>
  150.     Echo_Acceptor;
  151.  
  152. int main(int argc, char *argv[]) {
  153.     u_int port = (argc > 1) ? atoi(argv[1]) : 9090;
  154.  
  155.     ACE_INET_Addr addr (port);
  156.  
  157.     // Create the acceptor and setup with address .
  158.     // Implements Acceptor role in Acceptor / Connector pattern
  159.     Echo_Acceptor acceptor;
  160.     // By default acceptor is registered with reactor
  161.     // singleton instance
  162.     if (acceptor.open(addr) == -1)
  163.         return 1;
  164.  
  165.     ACE_Reactor::instance ()->run_reactor_event_loop ();
  166.  
  167.     return (0);
  168. }
  169.  mobilecloud-001 : ACE echo server
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement