Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Networking code written to connect MIDAS Data Acquision System to a live data viewer
- *
- Written By: Daniel Underwood
- */
- #include <stdlib.h>
- #include <stdio.h>
- #include <string.h>
- #include <errno.h>
- #include <zmq.h>
- #include "mzl.h"
- // Function definitions
- // TODO: Send startup messages to clients
- MZLContext * mzl_initialize(const unsigned int reply_port, const unsigned int publisher_port)
- {
- // Allocate memory for context
- MZLContext * context = (MZLContext*)malloc(sizeof(MZLContext));
- // Assign ports
- context->reply_port = reply_port;
- context->publisher_port = publisher_port;
- // Create zmq context
- context->zmq_context = zmq_ctx_new();
- // Check to make sure zmq context is created
- if(!context->zmq_context)
- {
- fprintf(stderr,
- "MZL: Error Creating ZMQ Context, Errno %d: %s\n",
- errno, strerror(errno));
- return NULL;
- }
- // IP Strings
- // Use string of length 13 due to 8 characters in tcp://*: and maximum port number 65535
- char publisher_ip[13];
- sprintf(publisher_ip, "tcp://*:%d", context->publisher_port);
- // Create publisher socket
- context->publisher_socket = zmq_socket(context->zmq_context, ZMQ_PUB);
- // Set publisher linger
- int linger = 0;
- zmq_setsockopt(context->publisher_socket, ZMQ_LINGER, &linger, sizeof(int));
- // Check to make sure that socket was created and if not, print error
- // TODO: Improve error handling
- if(!context->publisher_socket)
- {
- fprintf(stderr,
- "MZL: Error Creating Publisher Socket, Errno %d: %s\n",
- errno, strerror(errno));
- return NULL;
- }
- // Bind publisher socket
- int publisher_status = zmq_bind(context->publisher_socket, publisher_ip);
- // Check to make sure that publisher socket binding was successful
- if(publisher_status)
- {
- fprintf(stderr,
- "MZL: Error Binding Publisher Socket to %s, Errno %d: %s\n",
- publisher_ip, errno, strerror(errno));
- return NULL;
- }
- // TODO: Setup inproc socker for messages to req/rep thread
- // Create thread communuication socket
- context->thread_comm_socket = zmq_socket(context->zmq_context, ZMQ_PAIR);
- // Set thread communication socket linger
- zmq_setsockopt(context->thread_comm_socket, ZMQ_LINGER, &linger, sizeof(int));
- // Check to make sure that socket was created
- if(!context->thread_comm_socket)
- {
- fprintf(stderr,
- "MZL: Error Creating Thread Communication Socket, Errno %d: %s\n",
- errno, strerror(errno));
- return NULL;
- }
- // Bind thread communication socket
- int comm_status = zmq_bind(context->thread_comm_socket, "inproc://threadcomm");
- // Check to make sure binding was successful
- if(comm_status)
- {
- fprintf(stderr,
- "MZL: Error Binding Thread Communication Socket to %s, Errno %d: %s\n",
- "inproc://threadcomm", errno, strerror(errno));
- return NULL;
- }
- // Create thread for request/reply process
- pthread_create(&(context->request_reply_thread), 0, mzl_request_reply_process, (void*)context);
- // Return the created context or error code
- return context;
- }
- // TODO: Send shutdown messages to clients
- void mzl_shutdown(MZLContext * context)
- {
- // Send stop message to request/reply thread
- zmq_send(context->thread_comm_socket, "Stop", 4, 0);
- // Join request/reply thread
- // TODO: Time this out if there is an issue
- pthread_join(context->request_reply_thread, 0);
- // Unbind ZMQ Ports
- int publisher_close_status = zmq_close(context->publisher_socket);
- if(publisher_close_status)
- {
- fprintf(stderr,
- "MZL Error: Could Not Close Publisher Socket, Errno %d: %s\n",
- errno, strerror(errno));
- }
- int thread_comm_close_status = zmq_close(context->thread_comm_socket);
- if(thread_comm_close_status)
- {
- fprintf(stderr,
- "MZL Error: Could Not Close Communication Thread Socket, Errno %d: %s\n",
- errno, strerror(errno));
- }
- printf("*** Shutdown Call ***\n");
- // Shut down zmq context
- int zmq_shutdown_status = zmq_ctx_shutdown(context->zmq_context);
- if(zmq_shutdown_status)
- {
- fprintf(stderr,
- "MZL Error: Error Shutting Down ZMQ Context, Errno %d: %s\n",
- errno, strerror(errno));
- }
- printf("*** Term Call ***\n");
- // Destroy zmq context
- zmq_ctx_term(context->zmq_context);
- printf("*** After Term Call ***\n");
- // Free context memory
- free(context);
- }
- void mzl_register_command_handler(MZLContext * context, CommandHandlerFunction handler)
- {
- context->command_handler = handler;
- }
- int mzl_publish_data(MZLContext * context, char * data)
- {
- return zmq_send(context->publisher_socket, data, strlen(data), ZMQ_DONTWAIT);
- }
- void * mzl_request_reply_process(void * context)
- {
- // Cast context
- MZLContext * ctx = (MZLContext*)context;
- // Create socket to communicate with main thread
- // Note: This is the endpoint of the socket; the socket in context is the main thread's socket
- void * thread_comm_endpoint = zmq_socket(ctx->zmq_context, ZMQ_PAIR);
- // Set endpoint linger
- int linger = 0;
- zmq_setsockopt(thread_comm_endpoint, ZMQ_LINGER, &linger, sizeof(int));
- // Check to make sure socket was created
- if(!thread_comm_endpoint)
- {
- fprintf(stderr,
- "MZL: Error Creating Thread Communication Endpoint, Errno %d: %s\n",
- errno, strerror(errno));
- return NULL;
- }
- int endpoint_status = zmq_connect(thread_comm_endpoint, "inproc://threadcomm");
- if(endpoint_status)
- {
- fprintf(stderr,
- "MZL: Error Creating Thread Communication Endpoint at %s, Errno %d: %s\n",
- "inproc://threadcomm", errno, strerror(errno));
- return NULL;
- }
- // Check for an initial message to cancel loop
- char message[128];
- int comm_message_code = zmq_recv(thread_comm_endpoint, message, 128, ZMQ_DONTWAIT);
- // Continue if no message received
- // TODO: Implement handling for different messages
- // TODO: Fix this to use ZMQ's errno
- int comm_message_status = comm_message_code == -1 ?
- errno : comm_message_code;
- int stopLoop = 0;
- switch(comm_message_status)
- {
- case EAGAIN:
- // No message, don't need to stop loop
- break;
- case ENOTSUP:
- // Socket doesn't support receive -- fall through to return
- case ETERM:
- // Socket terminated -- fall through to return
- case ENOTSOCK:
- // Invalid socket -- fall through to return
- case EINTR:
- // Receive interruped -- last case, so return
- return NULL;
- default:
- // Default is that a message is received -- stop loop
- stopLoop = 1;
- break;
- }
- // Create Reply Socket
- // TODO: Move all of this socket to this thread?
- char reply_ip[13];
- sprintf(reply_ip, "tcp://*:%d", ctx->reply_port);
- ctx->reply_socket = zmq_socket(ctx->zmq_context, ZMQ_REP);
- // Set reply socket linger
- zmq_setsockopt(ctx->reply_socket, ZMQ_LINGER, &linger, sizeof(int));
- // Check socket creation
- if(!ctx->reply_socket)
- {
- fprintf(stderr,
- "MZL: Error Creating Reply Socket, Errno %d: %s\n",
- errno, strerror(errno));
- return NULL;
- }
- int reply_status = zmq_bind(ctx->reply_socket, reply_ip);
- // Check binding socket
- if(reply_status)
- {
- fprintf(stderr,
- "MZL: Error Binding Reply Socket on Port %d, Errno %d: %s\n",
- ctx->reply_port, errno, strerror(errno));
- return NULL;
- }
- if(endpoint_status || reply_status) return NULL;
- // TODO: Signaling for stop
- // TODO: Look into zmq polling
- while(!stopLoop)
- {
- // Check for message from main thread
- comm_message_code = zmq_recv(thread_comm_endpoint, message, 128, ZMQ_DONTWAIT);
- comm_message_status = -1 == comm_message_code ?
- errno : comm_message_code;
- // TODO: Use common code for message handling
- switch(comm_message_status)
- {
- case EAGAIN:
- // No message, don't need to stop loop
- break;
- case ENOTSUP:
- // Socket doesn't support receive -- fall through to return
- case ETERM:
- // Socket terminated -- fall through to return
- case ENOTSOCK:
- // Invalid socket -- fall through to return
- case EINTR:
- // Receive interruped -- last case, so return
- return NULL;
- default:
- // Default is that a message is received -- stop loop
- stopLoop = 1;
- break;
- }
- // Check request/respond socket
- int reply_message_code = zmq_recv(ctx->reply_socket, message, 128, ZMQ_DONTWAIT);
- int reply_message_status = -1 == reply_message_code ?
- errno : reply_message_code;
- switch(reply_message_status)
- {
- case EAGAIN:
- // No message, don't need to do anything
- break;
- case ENOTSUP:
- // Socket doesn't support receive -- fall through to return
- case ETERM:
- // Socket terminated -- fall through to return
- case ENOTSOCK:
- // Invalid socket -- fall through to return
- case EINTR:
- // Receive interruped -- last case, so return
- return NULL;
- default:
- // Default is that a message is received -- process message with handler
- ctx->command_handler(ctx, message);
- break;
- }
- }
- // Close sockets
- int reply_close_status = zmq_close(ctx->reply_socket);
- if(reply_close_status)
- {
- fprintf(stderr,
- "MZL Error: Could Not Close Reply Socket, Errno %d: %s\n",
- errno, strerror(errno));
- }
- int thread_endpoint_close_status = zmq_close(thread_comm_endpoint);
- if(thread_endpoint_close_status)
- {
- fprintf(stderr,
- "MZL Error: Could Not Close Thread Endpoint Socket, Errno %d: %s\n",
- errno, strerror(errno));
- }
- return NULL;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement