Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //============================================================================
- // Name : OA3_Reactive_Web_Server_ACE_C.cpp
- // Author : Olemis Lang
- // Version :
- // Copyright : Copyright 2014 Olemis Lang
- // Description : Coursera mobilecloud-001 : ACE echo server in C++, Ansi-style
- //============================================================================
- #include <iostream>
- using namespace std;
- #include <ace/ACE.h>
- #include <ace/Acceptor.h>
- #include <ace/INET_Addr.h>
- #include <ace/Log_Msg.h>
- #include <ace/Message_Block.h>
- #include <ace/Reactor.h>
- #include <ace/SOCK_Acceptor.h>
- #include <ace/SOCK_Stream.h>
- #include <ace/Svc_Handler.h>
- class Echo_Svc_Handler : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
- {
- typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> super;
- public:
- // Emit log message for connection event
- virtual int open (void *p)
- {
- if (super::open (p) == -1)
- return -1;
- // Unfortunately not working ... :'(
- /*
- ACE_TCHAR peer_name[MAXHOSTNAMELEN];
- ACE_INET_Addr peer_addr();
- if (this->peer ().get_remote_addr ((ACE_Addr)peer_addr) == 0 &&
- peer_addr.addr_to_string (peer_name, MAXHOSTNAMELEN) == 0)
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) Connection from %s\n"),
- peer_name));
- */
- return 0;
- }
- virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE) {
- // TODO: Retrieve stream bound to ACE_HANDLE
- ACE_SOCK_Stream stream = this->peer();
- char msgbuf[BUFSIZ];
- // Read bytes sent by client
- ssize_t bytes_read = stream.recv (msgbuf, sizeof msgbuf);
- if (bytes_read <= 0)
- {
- ACE_DEBUG ((LM_DEBUG,
- ACE_TEXT ("(%P|%t) Connection closed unexpectedly\n")));
- return -1;
- }
- // Sent them back
- ssize_t bytes_sent = stream.send (msgbuf, bytes_read);
- // The whole buffer contents were sent , just return
- if (bytes_sent == bytes_read)
- return 0;
- // Handle unrecoverable errors
- if (bytes_sent == -1 && ACE_OS::last_error () != EWOULDBLOCK)
- ACE_ERROR_RETURN ((LM_ERROR,
- ACE_TEXT ("(%P|%t) %p\n"),
- ACE_TEXT ("send")),
- 0);
- // Otherwise use message queue to defer subsequent write operations
- // by relying upon message block (see handle_output below)
- if (bytes_sent == -1)
- bytes_sent = 0;
- ACE_Message_Block *mb;
- // Calculate remaining bytes
- ssize_t remaining = (bytes_read - bytes_sent);
- // Create future/premise (message block)
- ACE_NEW_RETURN (mb, ACE_Message_Block (&msgbuf[bytes_sent], remaining), -1);
- // Queue is empty ?
- int output_off = this->msg_queue ()->is_empty ();
- ACE_Time_Value nowait (ACE_OS::gettimeofday ());
- // Log and recover from an error condition
- if (this->putq (mb, &nowait) == -1)
- {
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%P|%t) %p; discarding data\n"),
- ACE_TEXT ("enqueue failed")));
- mb->release ();
- return 0;
- }
- if (output_off)
- return this->reactor ()->register_handler
- (this, ACE_Event_Handler::WRITE_MASK);
- return 0;
- }
- // Called when output is possible.
- virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE)
- {
- // Target message block specifying future/promise async I/O operation
- // see handle_input() above
- ACE_Message_Block *mb;
- ACE_Time_Value nowait (ACE_OS::gettimeofday ());
- // Acquire write lock
- while (-1 != this->getq (mb, &nowait))
- {
- // Try to send pending bytes
- ssize_t bytes_sent =
- this->peer ().send (mb->rd_ptr (), mb->length ());
- // Log errors, if any
- if (bytes_sent == -1)
- ACE_ERROR ((LM_ERROR,
- ACE_TEXT ("(%P|%t) %p\n"),
- ACE_TEXT ("send")));
- else
- // Move I/O pointer bytes_sent bytes forward
- mb->rd_ptr (bytes_sent);
- // Enqueue message block head for subsequent
- // processing if not all bytes have been sent
- if (mb->length () > 0)
- {
- // Release write lock for a while
- this->ungetq (mb);
- break;
- }
- mb->release ();
- }
- return (this->msg_queue ()->is_empty ()) ? -1 : 0;
- }
- // Called when this handler is removed from the ACE_Reactor.
- virtual int handle_close (ACE_HANDLE handle,
- ACE_Reactor_Mask close_mask)
- {
- // Enable reentrant behavior of handle_output()
- // by checking write mask set
- if (close_mask == ACE_Event_Handler::WRITE_MASK)
- return 0;
- return super::handle_close (handle, close_mask);
- }
- };
- typedef ACE_Acceptor<Echo_Svc_Handler, ACE_SOCK_ACCEPTOR>
- Echo_Acceptor;
- int main(int argc, char *argv[]) {
- u_int port = (argc > 1) ? atoi(argv[1]) : 9090;
- ACE_INET_Addr addr (port);
- // Create the acceptor and setup with address .
- // Implements Acceptor role in Acceptor / Connector pattern
- Echo_Acceptor acceptor;
- // By default acceptor is registered with reactor
- // singleton instance
- if (acceptor.open(addr) == -1)
- return 1;
- ACE_Reactor::instance ()->run_reactor_event_loop ();
- return (0);
- }
- mobilecloud-001 : ACE echo server
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement