Advertisement
Guest User

Libevent _ Why no workie

a guest
Nov 18th, 2013
191
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #include <assert.h>
  2. #include <ws2tcpip.h>
  3. #include <event2/listener.h>
  4. #include <event2/bufferevent.h>
  5. #include <event2/buffer.h>
  6.  
  7. #include <string.h>
  8. #include <stdlib.h>
  9. #include <stdio.h>
  10. #include <errno.h>
  11.  
  12. #include <vector>
  13. #include <iostream>
  14. using namespace std;
  15.  
  16. //#include "c:/projects/library/thread.h"
  17.  
  18. struct Connection
  19. {
  20.    Connection(): bev(NULL), isBusy( false ), needsService( false ) {}
  21.    bufferevent* bev;
  22.    char buffer[MAX_PATH];
  23.    bool isBusy;// should be a thread atomic, but whatever
  24.    bool needsService;
  25.  
  26.    void  SendBuffer()
  27.    {
  28.       assert ( isBusy == false );
  29.       isBusy = true;
  30.       struct evbuffer* outputBuffer = bufferevent_get_output( bev );
  31.       int length = strlen( buffer );
  32.       evbuffer_add( outputBuffer, buffer, length );
  33.  
  34.       needsService = false;
  35.       isBusy = false;
  36.    }
  37. };
  38.  
  39. //*************************************************
  40. //*************************************************
  41.  
  42. vector< Connection* > connections;  /// global... testing only... needs threading protections
  43.  
  44. //*************************************************
  45. //*************************************************
  46.  
  47. class MY_THREAD : public THREAD_CLASS
  48. {
  49.     int Num;
  50. protected:
  51.     void MainThreadLoop ()
  52.     {
  53.       // major threading issue here.. needs protections
  54.       vector< Connection* >::iterator it = connections.begin();
  55.       while( it != connections.end() )
  56.       {
  57.          Connection* conn = *it;
  58.          vector< Connection* >::iterator current = it++;
  59.          if( conn == NULL )
  60.          {
  61.             connections.erase( current );
  62.             continue;
  63.          }
  64.          if( conn->isBusy == false )
  65.          {
  66.             if( conn->needsService== true )
  67.             {
  68.                conn->SendBuffer();
  69.             }
  70.          }
  71.       }
  72.     }
  73. public:
  74.     MY_THREAD (bool CreateSuspended = true) : THREAD_CLASS (CreateSuspended)
  75.     {
  76.         Num = 0;
  77.     }  
  78. };
  79.  
  80. //-------------------------------------------------------------------
  81.  
  82. static void
  83. echo_read_cb(struct bufferevent *bufferEventObj, void *ctx)
  84. {
  85.    Connection* conn = static_cast< Connection * >( ctx );
  86.    assert ( conn->isBusy == false );
  87.  
  88.    vector< Connection* >::iterator it =  connections.begin();
  89.    bool found = false;
  90.    while( it != connections.end() )
  91.    {
  92.       if( conn == *it )
  93.       {
  94.          found = true;
  95.          break;
  96.       }
  97.       it++;
  98.    }
  99.    if( found == false )
  100.       assert( 0 );
  101.    
  102.    conn->isBusy = true;
  103.    int numBytesReceived = bufferevent_read( bufferEventObj, conn->buffer, sizeof( conn->buffer ) );
  104.    if( numBytesReceived > 0 )
  105.       conn->buffer[ numBytesReceived ] = 0;
  106.    conn->isBusy = false;
  107.    conn->needsService = true;
  108. }
  109.  
  110. static void
  111. echo_event_cb(struct bufferevent *bev, short events, void *ctx)
  112. {
  113.         if (events & BEV_EVENT_ERROR)
  114.                 perror("Error from bufferevent");
  115.         if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
  116.                 bufferevent_free(bev);
  117.             Connection* conn = static_cast< Connection * >( ctx );
  118.             vector< Connection* >::iterator it = connections.begin();
  119.             while( it != connections.end() )
  120.             {
  121.                if( *it == conn )
  122.                {
  123.                   *it = NULL;
  124.                }
  125.                it++;
  126.             }
  127.  
  128.         }
  129. }
  130.  
  131. static void
  132. accept_conn_cb(struct evconnlistener *listener,
  133.     evutil_socket_t fd, struct sockaddr *address, int socklen,
  134.     void *ctx)
  135. {
  136.         /* We got a new connection! Set up a bufferevent for it. */
  137.         struct event_base *base = evconnlistener_get_base(listener);
  138.         struct bufferevent *bev = bufferevent_socket_new(
  139.                 base, fd, BEV_OPT_CLOSE_ON_FREE);
  140.  
  141.         Connection* conn = new Connection;
  142.         conn->bev = bev;
  143.        
  144.         strcpy( conn->buffer, "ready" );
  145.         conn->needsService = true;
  146.         connections.push_back( conn );
  147.  
  148.         bufferevent_setcb( bev, echo_read_cb, NULL, echo_event_cb, conn );
  149.  
  150.         bufferevent_enable(bev, EV_READ|EV_WRITE);
  151. }
  152.  
  153. static void
  154. accept_error_cb(struct evconnlistener *listener, void *ctx)
  155. {
  156.         struct event_base *base = evconnlistener_get_base(listener);
  157.         int err = EVUTIL_SOCKET_ERROR();
  158.         fprintf(stderr, "Got an error %d (%s) on the listener. "
  159.                 "Shutting down.\n", err, evutil_socket_error_to_string(err));
  160.  
  161.         event_base_loopexit(base, NULL);
  162. }
  163.  
  164. int
  165. main(int argc, char **argv)
  166. {
  167.    WSADATA WsaData;
  168.     WSAStartup( MAKEWORD(2,2), &WsaData );
  169.         struct event_base *base;
  170.         struct evconnlistener *listener;
  171.         struct sockaddr_in sin;
  172.  
  173.         int port = 5555;
  174.  
  175.         if (argc > 1) {
  176.                 port = atoi(argv[1]);
  177.         }
  178.         if (port<=0 || port>65535) {
  179.                 puts("Invalid port");
  180.                 return 1;
  181.         }
  182.  
  183.         base = event_base_new();
  184.         if (!base) {
  185.                 puts("Couldn't open event base");
  186.                 return 1;
  187.         }
  188.  
  189.         /* Clear the sockaddr before using it, in case there are extra
  190.          * platform-specific fields that can mess us up. */
  191.         memset(&sin, 0, sizeof(sin));/* This is an INET address */
  192.         sin.sin_family = AF_INET;/* Listen on 0.0.0.0 */
  193.         sin.sin_addr.s_addr = htonl(0);/* Listen on the given port. */
  194.         sin.sin_port = htons(port);
  195.  
  196.         listener = evconnlistener_new_bind(base, accept_conn_cb, NULL,
  197.             LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,
  198.             (struct sockaddr*)&sin, sizeof(sin));
  199.         if (!listener) {
  200.                 perror("Couldn't create listener");
  201.                 return 1;
  202.         }
  203.         evconnlistener_set_error_cb(listener, accept_error_cb);
  204.  
  205.         MY_THREAD mainThread;
  206.         mainThread.SetAlarm( 30 );// fire every 30 millisecs
  207.  
  208.         mainThread.StartThread();
  209.  
  210.         event_base_dispatch(base);
  211.         return 0;
  212. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement