Advertisement
danielu13

mzl.c

May 21st, 2015
136
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 9.19 KB | None | 0 0
  1. /**
  2.  * Networking code written to connect MIDAS Data Acquision System to a live data viewer
  3.  *
  4.  Written By: Daniel Underwood
  5.  */
  6. #include <stdlib.h>
  7. #include <stdio.h>
  8. #include <string.h>
  9. #include <errno.h>
  10. #include <zmq.h>
  11. #include "mzl.h"
  12.  
  13. // Function definitions
  14.  
  15. // TODO: Send startup messages to clients
  16. MZLContext *  mzl_initialize(const unsigned int reply_port, const unsigned int publisher_port)
  17. {
  18.  
  19.     // Allocate memory for context
  20.     MZLContext * context = (MZLContext*)malloc(sizeof(MZLContext));
  21.  
  22.     // Assign ports
  23.     context->reply_port = reply_port;
  24.     context->publisher_port = publisher_port;
  25.  
  26.     // Create zmq  context
  27.     context->zmq_context = zmq_ctx_new();
  28.  
  29.     // Check to make sure zmq context is created
  30.     if(!context->zmq_context)
  31.     {
  32.         fprintf(stderr,
  33.             "MZL: Error Creating ZMQ Context, Errno %d: %s\n",
  34.             errno, strerror(errno));
  35.         return NULL;
  36.     }
  37.  
  38.     // IP Strings
  39.     // Use string of length 13 due to 8 characters in tcp://*: and maximum port number 65535
  40.     char publisher_ip[13];
  41.  
  42.     sprintf(publisher_ip, "tcp://*:%d", context->publisher_port);
  43.  
  44.     // Create publisher socket
  45.     context->publisher_socket = zmq_socket(context->zmq_context, ZMQ_PUB);
  46.  
  47.     // Set publisher linger
  48.     int linger = 0;
  49.     zmq_setsockopt(context->publisher_socket, ZMQ_LINGER, &linger, sizeof(int));
  50.  
  51.     // Check to make sure that socket was created and if not, print error
  52.     // TODO: Improve error handling
  53.     if(!context->publisher_socket)
  54.     {
  55.         fprintf(stderr,
  56.             "MZL: Error Creating Publisher Socket, Errno %d: %s\n",
  57.             errno, strerror(errno));
  58.         return NULL;
  59.     }
  60.  
  61.  
  62.     // Bind publisher socket
  63.     int publisher_status = zmq_bind(context->publisher_socket, publisher_ip);
  64.  
  65.     // Check to make sure that publisher socket binding was successful
  66.     if(publisher_status)
  67.     {
  68.         fprintf(stderr,
  69.             "MZL: Error Binding Publisher Socket to %s, Errno %d: %s\n",
  70.             publisher_ip, errno, strerror(errno));
  71.         return NULL;
  72.     }
  73.  
  74.     // TODO: Setup inproc socker for messages to req/rep thread
  75.     // Create thread communuication socket
  76.     context->thread_comm_socket = zmq_socket(context->zmq_context, ZMQ_PAIR);
  77.  
  78.     // Set thread communication socket linger
  79.     zmq_setsockopt(context->thread_comm_socket, ZMQ_LINGER, &linger, sizeof(int));
  80.     // Check to make sure that socket was created
  81.     if(!context->thread_comm_socket)
  82.     {
  83.         fprintf(stderr,
  84.             "MZL: Error Creating Thread Communication Socket, Errno %d: %s\n",
  85.             errno, strerror(errno));
  86.         return NULL;
  87.     }
  88.  
  89.     // Bind thread communication socket
  90.     int comm_status = zmq_bind(context->thread_comm_socket, "inproc://threadcomm");
  91.     // Check to make sure binding was successful
  92.     if(comm_status)
  93.     {
  94.         fprintf(stderr,
  95.             "MZL: Error Binding Thread Communication Socket to %s, Errno %d: %s\n",
  96.             "inproc://threadcomm", errno, strerror(errno));
  97.         return NULL;
  98.     }
  99.  
  100.     // Create thread for request/reply process
  101.     pthread_create(&(context->request_reply_thread), 0, mzl_request_reply_process, (void*)context);
  102.  
  103.     // Return the created context or error code
  104.     return context;
  105. }
  106.  
  107. // TODO: Send shutdown messages to clients
  108. void mzl_shutdown(MZLContext * context)
  109. {
  110.     // Send stop message to request/reply thread
  111.     zmq_send(context->thread_comm_socket, "Stop", 4, 0);
  112.  
  113.     // Join request/reply thread
  114.     // TODO: Time this out if there is an issue
  115.     pthread_join(context->request_reply_thread, 0);
  116.  
  117.     // Unbind ZMQ Ports
  118.     int publisher_close_status = zmq_close(context->publisher_socket);
  119.     if(publisher_close_status)
  120.     {
  121.         fprintf(stderr,
  122.             "MZL Error: Could Not Close Publisher Socket, Errno %d: %s\n",
  123.             errno, strerror(errno));
  124.     }
  125.  
  126.     int thread_comm_close_status = zmq_close(context->thread_comm_socket);
  127.     if(thread_comm_close_status)
  128.     {
  129.         fprintf(stderr,
  130.             "MZL Error: Could Not Close Communication Thread Socket, Errno %d: %s\n",
  131.             errno, strerror(errno));
  132.     }
  133.  
  134.     printf("*** Shutdown Call ***\n");
  135.     // Shut down zmq context
  136.     int zmq_shutdown_status = zmq_ctx_shutdown(context->zmq_context);
  137.     if(zmq_shutdown_status)
  138.     {
  139.         fprintf(stderr,
  140.             "MZL Error: Error Shutting Down ZMQ Context, Errno %d: %s\n",
  141.             errno, strerror(errno));
  142.     }
  143.  
  144.     printf("*** Term Call ***\n");
  145.     // Destroy zmq context
  146.     zmq_ctx_term(context->zmq_context);
  147.  
  148.     printf("*** After Term Call ***\n");
  149.     // Free context memory
  150.     free(context);
  151. }
  152.  
  153. void mzl_register_command_handler(MZLContext * context, CommandHandlerFunction handler)
  154. {
  155.     context->command_handler = handler;
  156. }
  157.  
  158. int mzl_publish_data(MZLContext * context, char * data)
  159. {
  160.     return zmq_send(context->publisher_socket, data, strlen(data), ZMQ_DONTWAIT);
  161. }
  162.  
  163. void * mzl_request_reply_process(void * context)
  164. {
  165.     // Cast context
  166.     MZLContext * ctx = (MZLContext*)context;
  167.  
  168.     // Create socket to communicate with main thread
  169.     // Note: This is the endpoint of the socket; the socket in context is the main thread's socket
  170.     void * thread_comm_endpoint = zmq_socket(ctx->zmq_context, ZMQ_PAIR);
  171.  
  172.     // Set endpoint linger
  173.     int linger = 0;
  174.     zmq_setsockopt(thread_comm_endpoint, ZMQ_LINGER, &linger, sizeof(int));
  175.  
  176.     // Check to make sure socket was created
  177.     if(!thread_comm_endpoint)
  178.     {
  179.         fprintf(stderr,
  180.             "MZL: Error Creating Thread Communication Endpoint, Errno %d: %s\n",
  181.             errno, strerror(errno));
  182.         return NULL;
  183.     }
  184.  
  185.         int endpoint_status = zmq_connect(thread_comm_endpoint, "inproc://threadcomm");
  186.  
  187.     if(endpoint_status)
  188.     {
  189.         fprintf(stderr,
  190.             "MZL: Error Creating Thread Communication Endpoint at %s, Errno %d: %s\n",
  191.             "inproc://threadcomm", errno, strerror(errno));
  192.         return NULL;
  193.     }
  194.  
  195.     // Check for an initial message to cancel loop
  196.     char message[128];
  197.     int comm_message_code = zmq_recv(thread_comm_endpoint, message, 128, ZMQ_DONTWAIT);
  198.  
  199.     // Continue if no message received
  200.     // TODO: Implement handling for different messages
  201.     // TODO: Fix this to use ZMQ's errno
  202.     int comm_message_status = comm_message_code == -1 ?
  203.         errno : comm_message_code;
  204.     int stopLoop = 0;
  205.     switch(comm_message_status)
  206.     {
  207.         case EAGAIN:
  208.             // No message, don't need to stop loop
  209.             break;
  210.         case ENOTSUP:
  211.             // Socket doesn't support receive -- fall through to return
  212.         case ETERM:
  213.             // Socket terminated -- fall through to return
  214.         case ENOTSOCK:
  215.             // Invalid socket -- fall through to return
  216.         case EINTR:
  217.             // Receive interruped -- last case, so return
  218.             return NULL;
  219.         default:
  220.             // Default is that a message is received -- stop loop
  221.             stopLoop = 1;
  222.             break;
  223.     }
  224.  
  225.     // Create Reply Socket
  226.     // TODO: Move all of this socket to this thread?
  227.     char reply_ip[13];
  228.     sprintf(reply_ip, "tcp://*:%d", ctx->reply_port);
  229.     ctx->reply_socket = zmq_socket(ctx->zmq_context, ZMQ_REP);
  230.  
  231.  
  232.     // Set reply socket linger
  233.     zmq_setsockopt(ctx->reply_socket, ZMQ_LINGER, &linger, sizeof(int));
  234.    
  235.     // Check socket creation
  236.     if(!ctx->reply_socket)
  237.     {
  238.         fprintf(stderr,
  239.             "MZL: Error Creating Reply Socket, Errno %d: %s\n",
  240.             errno, strerror(errno));
  241.         return NULL;
  242.     }
  243.  
  244.     int reply_status = zmq_bind(ctx->reply_socket, reply_ip);
  245.  
  246.     // Check binding socket
  247.     if(reply_status)
  248.     {
  249.         fprintf(stderr,
  250.             "MZL: Error Binding Reply Socket on Port %d, Errno %d: %s\n",
  251.             ctx->reply_port, errno, strerror(errno));
  252.         return NULL;
  253.     }
  254.  
  255.     if(endpoint_status || reply_status) return NULL;
  256.    
  257.     // TODO: Signaling for stop
  258.     // TODO: Look into zmq polling
  259.     while(!stopLoop)
  260.     {
  261.         // Check for message from main thread
  262.         comm_message_code = zmq_recv(thread_comm_endpoint, message, 128, ZMQ_DONTWAIT);
  263.         comm_message_status = -1 == comm_message_code ?
  264.             errno : comm_message_code;
  265.         // TODO: Use common code for message handling
  266.         switch(comm_message_status)
  267.         {
  268.             case EAGAIN:
  269.                 // No message, don't need to stop loop
  270.                 break;
  271.             case ENOTSUP:
  272.                 // Socket doesn't support receive -- fall through to return
  273.             case ETERM:
  274.                 // Socket terminated -- fall through to return
  275.             case ENOTSOCK:
  276.                 // Invalid socket -- fall through to return
  277.             case EINTR:
  278.                 // Receive interruped -- last case, so return
  279.                 return NULL;
  280.             default:
  281.                 // Default is that a message is received -- stop loop
  282.                 stopLoop = 1;
  283.                 break;
  284.         }
  285.  
  286.         // Check request/respond socket
  287.         int reply_message_code = zmq_recv(ctx->reply_socket, message, 128, ZMQ_DONTWAIT);
  288.         int reply_message_status = -1 == reply_message_code ?
  289.             errno : reply_message_code;
  290.         switch(reply_message_status)
  291.         {
  292.             case EAGAIN:
  293.                 // No message, don't need to do anything
  294.                 break;
  295.             case ENOTSUP:
  296.                 // Socket doesn't support receive -- fall through to return
  297.             case ETERM:
  298.                 // Socket terminated -- fall through to return
  299.             case ENOTSOCK:
  300.                 // Invalid socket -- fall through to return
  301.             case EINTR:
  302.                 // Receive interruped -- last case, so return
  303.                 return NULL;
  304.             default:
  305.                 // Default is that a message is received -- process message with handler
  306.                 ctx->command_handler(ctx, message);
  307.                 break;
  308.         }
  309.        
  310.     }
  311.  
  312.     // Close sockets
  313.     int reply_close_status = zmq_close(ctx->reply_socket);
  314.     if(reply_close_status)
  315.     {
  316.         fprintf(stderr,
  317.             "MZL Error: Could Not Close Reply Socket, Errno %d: %s\n",
  318.             errno, strerror(errno));
  319.     }
  320.  
  321.     int thread_endpoint_close_status = zmq_close(thread_comm_endpoint);
  322.     if(thread_endpoint_close_status)
  323.     {
  324.         fprintf(stderr,
  325.             "MZL Error: Could Not Close Thread Endpoint Socket, Errno %d: %s\n",
  326.             errno, strerror(errno));
  327.     }
  328.     return NULL;
  329. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement