Advertisement
Guest User

Nanomsg reqrep thread

a guest
Mar 7th, 2015
407
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 2.78 KB | None | 0 0
  1. server.c:
  2. #include <assert.h>
  3. #include <stdio.h>
  4. #include "nanomsg/nn.h"
  5. #include "nanomsg/reqrep.h"
  6. #include <nanomsg/utils/thread.h>
  7. #include <nanomsg/utils/thread.c>
  8. #include <nanomsg/utils/atomic.h>
  9. #include <nanomsg/utils/atomic.c>
  10. #include <nanomsg/utils/err.h>
  11. #include <nanomsg/utils/err.c>
  12.  
  13.  
  14. #define ADDRESS_REQ "tcp://127.0.0.1:4444"
  15. #define ADDRESS_REP "tcp://127.0.0.1:4445"
  16.  
  17. #define NO_THREADS 8
  18.  
  19. struct nn_thread tid[NO_THREADS];
  20. static int workers = 0;
  21.  
  22.  
  23. void device(void * arg)
  24. {  
  25.     int rc;
  26.     int device_req;
  27.     int device_rep;
  28.    
  29.     /* Initialise the device sockets */
  30.     device_req = nn_socket(AF_SP_RAW, NN_REQ);
  31.     nn_bind(device_req, ADDRESS_REQ);
  32.  
  33.     device_rep = nn_socket(AF_SP_RAW, NN_REP);
  34.     nn_bind(device_rep, ADDRESS_REP);
  35.  
  36.     /* Run the device */
  37.     rc = nn_device(device_req, device_rep);
  38.  
  39.     /* This means nn_device loop exited either by an error or by calling nn_term() */
  40.     ASSERT(rc < 0 && nn_errno() == ETERM);
  41.  
  42.     /* Clean up */
  43.     nn_close(device_req);
  44.     nn_close(device_rep);
  45. }
  46.  
  47.  
  48.  
  49. void worker(void * arg)
  50. {
  51.     int sock;
  52.     int bind;
  53.     int worker = workers++;
  54.  
  55.     sock = nn_socket(AF_SP, NN_REP);
  56.     bind = nn_bind(sock, ADDRESS_REP);
  57.     assert(bind >= 0);
  58.  
  59.     printf("Server: %u, server_loop\n", worker);
  60.     while(1) {
  61.        char * buf = NULL;
  62.        int bytes = nn_recv(sock, &buf, NN_MSG, 0);
  63.        
  64.        if (bytes <= 0) {
  65.           break;
  66.        }
  67.        printf("Server: %u, received: %s\n", worker, buf);
  68.        nn_send(sock, ADDRESS_REP, sizeof(ADDRESS_REP), 0);
  69.        nn_freemsg(buf);
  70.     }
  71.     nn_shutdown(sock, bind);
  72.     nn_close(sock);
  73. }
  74.  
  75.  
  76. int main(int argc, const char * argv[])
  77. {
  78.     int i = 1;
  79.  
  80.     nn_thread_init(&tid[0], &device, NULL);
  81.     for(i = 1; i < NO_THREADS; i++) {
  82.        nn_thread_init(&tid[i], &worker, NULL);
  83.     }
  84.  
  85.     while(1) {
  86.        Sleep(1000);
  87.     }
  88.  
  89.     return 0;
  90. }
  91.  
  92.  
  93. client.c:
  94. #include <assert.h>
  95. #include <stdio.h>
  96. #include <string.h>
  97. #include "nanomsg/nn.h"
  98. #include "nanomsg/reqrep.h"
  99.  
  100.  
  101. #define ADDRESS_REQ "tcp://127.0.0.1:4445"
  102.  
  103.  
  104. int main(int argc, const char * argv[])
  105. {
  106.     const char * name = NULL;
  107.     int sock;
  108.     int conn;
  109.  
  110.     if (argc < 2) {
  111.        printf("client [name]\n");
  112.        return 1;
  113.     }
  114.     name = *(argv + 1);
  115.    
  116.     sock = nn_socket(AF_SP, NN_REQ);
  117.     conn = nn_connect(sock, ADDRESS_REQ);
  118.  
  119.     printf("client %s\n", name);
  120.     nn_send(sock, name, strlen(name) + 1, 0);
  121.     while(1) {
  122.        char * buf = NULL;
  123.        int bytes = nn_recv(sock, &buf, NN_MSG, 0);
  124.        
  125.        if (bytes > 0) {
  126.           printf("received: %s\n", buf);
  127.           nn_freemsg(buf);
  128.           break;
  129.        }
  130.     }
  131.     nn_shutdown(sock, conn);
  132.     nn_close(sock);
  133.  
  134.     return 0;
  135. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement