Advertisement
Guest User

Untitled

a guest
May 4th, 2016
79
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 11.51 KB | None | 0 0
  1. /**
  2.  * Machine Problem: Wearables
  3.  * CS 241 - Spring 2016
  4.  */
  5.  
  6. #include <fcntl.h>
  7. #include <netdb.h>
  8. #include <signal.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <sys/socket.h>
  13. #include <sys/types.h>
  14. #include <unistd.h>
  15.  
  16. #include <pthread.h>
  17.  
  18. #include "queue.h"
  19.  
  20. #define MAX_CLIENTS 15
  21. #define MSG_SIZE  64
  22.  
  23. const char *TYPE1 = "heart_beat";
  24. const char *TYPE2 = "blood_sugar";
  25. const char *TYPE3 = "body_temp";
  26.  
  27. struct addrinfo hints, *result;
  28.  
  29. int wearable_clients[MAX_CLIENTS];
  30.  
  31. // The wearable server socket, which all wearables connect to.
  32. int wearable_server_fd;
  33.  
  34. // A lock for your queue.
  35. pthread_mutex_t queue_lock_;
  36.  
  37. // Lock to keep track of connected wearables
  38. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  39.  
  40. // Lock to keep track of connected wearables
  41. pthread_cond_t  type1_cond = PTHREAD_COND_INITIALIZER;
  42. pthread_cond_t  type2_cond = PTHREAD_COND_INITIALIZER;
  43. pthread_cond_t  type3_cond = PTHREAD_COND_INITIALIZER;
  44.  
  45. int wearablesConnected = 0;
  46.  
  47. // A queue for all received data.
  48. queue_t receieved_data_;
  49.  
  50. typedef struct SampleData {
  51.  
  52.   char type_[50];
  53.   int data_;
  54.  
  55. } SampleData;
  56.  
  57. int compare(const void *a, const void *b) { return (*(int *)a - *(int *)b); }
  58.  
  59. int select1(void *data) {
  60.   return strcmp(((SampleData *)data)->type_, TYPE1) == 0;
  61. }
  62.  
  63. int select2(void *data) {
  64.   return strcmp(((SampleData *)data)->type_, TYPE2) == 0;
  65. }
  66.  
  67. int select3(void *data) {
  68.   return strcmp(((SampleData *)data)->type_, TYPE3) == 0;
  69. }
  70.  
  71. int selectorall(void *__attribute__((unused)) val) { return 1; }
  72.  
  73. typedef struct {
  74.   pthread_t thread;
  75.   int fd;
  76.   long timestamp;
  77.   // TODO you might want to put more things here
  78. } thread_data;
  79.  
  80. thread_data **wearable_threads;
  81. int wearable_threads_size = 0;
  82.  
  83. /**
  84.  * Used to write out the statistics of a given results set (of
  85.  * timestamp_entry's).  To generate the result set see queue_gather(). fd is the
  86.  * file descriptor to which the information is sent out. The type is the type of
  87.  * data that is written out (TYPE1, TYPE2, TYPE3). results is the array of
  88.  * timestamp_entrys, and size is the size of that array. NOTE: that you should
  89.  * call method for every type (TYPE1, TYPE2, TYPE3), and then write out the
  90.  * infomration "\r\n" to signify that you have finished sending out the results.
  91.  */
  92. void write_results(int fd, const char *type, timestamp_entry *results,
  93.                    int size) {
  94.   long avg = 0;
  95.   int i;
  96.  
  97.   char buffer[1024];
  98.   int temp_array[size];
  99.   sprintf(buffer, "Results for %s:\n", type);
  100.   sprintf(buffer + strlen(buffer), "Size:%i\n", size);
  101.   for (i = 0; i < size; i++) {
  102.     temp_array[i] = ((SampleData *)(results[i].data_))->data_;
  103.     avg += ((SampleData *)(results[i].data_))->data_;
  104.   }
  105.  
  106.   qsort(temp_array, size, sizeof(int), compare);
  107.  
  108.   if (size != 0) {
  109.     sprintf(buffer + strlen(buffer), "Median:%i\n",
  110.             (size % 2 == 0)
  111.                 ? (temp_array[size / 2] + temp_array[size / 2 - 1]) / 2
  112.                 : temp_array[size / 2]);
  113.   } else {
  114.     sprintf(buffer + strlen(buffer), "Median:0\n");
  115.   }
  116.  
  117.   sprintf(buffer + strlen(buffer), "Average:%li\n\n",
  118.           (size == 0 ? 0 : avg / size));
  119.   write(fd, buffer, strlen(buffer));
  120. }
  121.  
  122. /**
  123.  * Given an input line in the form <timestamp>:<value>:<type>, this method
  124.  * parses the infomration from the string, into the given timestamp, and mallocs
  125.  * space for SampleData, and stores the type and value within
  126.  */
  127. void extract_key(char *line, long *timestamp, SampleData **ret) {
  128.   *ret = malloc(sizeof(SampleData));
  129.   sscanf(line, "%zu:%i:%s\n", timestamp, &((*ret)->data_), (*ret)->type_);
  130.   // eat the trailing ":"
  131.   (*ret)->type_[strlen((*ret)->type_) - 1] = '\0';
  132. }
  133.  
  134. /**
  135.  * Given an input line in the form <timestamp>:<timestamp>, this method
  136.  * parses the infomration from the string, into the given timestamps
  137.  */
  138. void extract_timestamps(char *line, long *timestamp1, long *timestamp2) {
  139.   sscanf(line, "%zu:%zu\n", timestamp1, timestamp2);
  140. }
  141.  
  142. // Use a buffer of length 64!
  143. // TODO read data from the socket until -1 is returned by read
  144. // char buffer[64];
  145. // while (read(socketfd, buffer, 64) > 0) ... // or do you need recv???
  146.  
  147. // A slight simplification for this MP: you can assume that data will always
  148. // arrive in 64 byte packets because the wearable clients are actually running on
  149. // the same localhost. Thus, you can read socket data in in blocks of 64
  150. // bytes and parse each block as a new data item. A real server might have to
  151. // stitch multiple blocks together because the packets can arrive in arbitrary
  152. // sizes as they are routed around the Internet.
  153.  
  154. void *wearable_processor_thread(void *args) {
  155.  
  156.   thread_data *td;
  157.   SampleData *wearable_data;
  158.   long    timestamp;
  159.   int socketfd;
  160.   int client_is_connected = 1;
  161.   char  buffer[MSG_SIZE];
  162.   //int len;
  163.   int num;
  164.  
  165.   td = (thread_data *)args;
  166.   socketfd = td->fd;
  167.   printf("Wearable Processor Thread: SocketFD: %d\n", socketfd);
  168.   while(client_is_connected) {
  169.     //len = 0;
  170.     while (1) {
  171.       num = read(socketfd, buffer, MSG_SIZE);
  172.       //len += num;
  173.  
  174.       if (!num) {
  175.         client_is_connected = 0;
  176.       }
  177.       break;
  178.       // if (buffer[len - 1] == '\n')
  179.       //   break;
  180.     }
  181.  
  182.     // Error or client closed the connection, so time to close this specific
  183.     // client connection
  184.     if (!client_is_connected) {
  185.       printf("User %d left\n", socketfd);
  186.       break;
  187.     } else {
  188.       // Process the buffer
  189.       extract_key(buffer, &timestamp, &wearable_data);
  190.       // Add wearable_data to the queue
  191.  
  192.       pthread_mutex_lock(&queue_lock_);
  193.       queue_insert(&receieved_data_, timestamp, (void *) (wearable_data));
  194.  
  195.       if(strcmp((wearable_data)->type_,TYPE1) )
  196.         pthread_cond_signal(&type1_cond);
  197.  
  198.       if(strcmp((wearable_data)->type_,TYPE1) )
  199.         pthread_cond_signal(&type1_cond);
  200.  
  201.       if(strcmp((wearable_data)->type_,TYPE1) )
  202.         pthread_cond_signal(&type1_cond);
  203.  
  204.       pthread_mutex_unlock(&queue_lock_);
  205.     }
  206.  
  207.   }
  208.  
  209.  
  210.   close(socketfd);
  211.   pthread_mutex_lock(&mutex);
  212.   wearablesConnected--;
  213.   pthread_mutex_unlock(&mutex);
  214.  
  215.   return NULL;
  216. }
  217.  
  218. // read data from the socket until -1 is returned by read
  219. // Requests will be in the form
  220. //<timestamp1>:<timestamp2>, then write out statistics for data between
  221. // those timestamp ranges
  222.  
  223. void *user_request_thread(void *args) {
  224.  
  225.   int     socketfd = *((int *)args);
  226.   //timestamp_entry *timestamp_entry
  227.   timestamp_entry *type1_ret, *type2_ret, *type3_ret;
  228.   long      timestamp_start, timestamp_end;
  229.   int     client_is_connected = 1;
  230.   char      buffer[MSG_SIZE];
  231.   int      type1_sz, type2_sz, type3_sz;
  232.   int     num;
  233.  
  234.   // Use a buffer of length 64!
  235.   // TODO read data from the socket until -1 is returned by read
  236.   // char buffer[64];
  237.   // while (read(socketfd, buffer, 64) > 0) ... // or do you need recv???
  238.   printf("User Request Thread: SocketFD: %d\n", socketfd);
  239.   while(client_is_connected) {
  240.  //   len=0;
  241.     while (1) {
  242.       num = read(socketfd, buffer , MSG_SIZE);
  243.       if (num == -1) {
  244.         // error condition
  245.         break;
  246.       }
  247.       //len += num;
  248.  
  249.       if (!num) {
  250.         client_is_connected = 0;
  251.       }
  252.       break;
  253.     }
  254.  
  255.     // Error or client closed the connection, so time to close this specific
  256.     // client connection
  257.     if (!client_is_connected) {
  258.       printf("User %d left\n", socketfd);
  259.       break;
  260.     } else {
  261.  
  262.       // Process the buffer
  263.  
  264.       extract_timestamps(buffer, &timestamp_start, &timestamp_end);
  265.  
  266.       // Send the results back to the request client
  267.       // Write \r \n as a marker
  268.       // queue_gather returns address of an array
  269.       // - Need to check if the array has entries inclusive of the two timestamps
  270.  
  271.       pthread_mutex_lock(&queue_lock_);
  272.       type1_ret = queue_gather(&receieved_data_,timestamp_start, timestamp_end, select1, &type1_sz);
  273.       //timestamp_entry = type1_ret + type1_sz;
  274.       pthread_mutex_unlock(&queue_lock_);
  275.  
  276.       pthread_mutex_lock(&queue_lock_);
  277.       type2_ret = queue_gather(&receieved_data_,timestamp_start, timestamp_end, select2, &type2_sz);
  278.       pthread_mutex_unlock(&queue_lock_);
  279.  
  280.       pthread_mutex_lock(&queue_lock_);
  281.       type3_ret = queue_gather(&receieved_data_,timestamp_start, timestamp_end, select3, &type3_sz);
  282.       pthread_mutex_unlock(&queue_lock_);
  283.  
  284.       write_results(socketfd, TYPE1, type1_ret, type1_sz);
  285.       write_results(socketfd, TYPE2, type2_ret, type2_sz);
  286.       write_results(socketfd, TYPE3, type3_ret, type3_sz);
  287.  
  288.       write(socketfd,"\r \n", 4);                         // We may need to change this to 3
  289.     }
  290.  
  291.   }
  292.  
  293.   close(socketfd);
  294.   return NULL;
  295. }
  296.  
  297. // IMPLEMENT!
  298. // given a string with the port value, set up a
  299. // serversocket file descriptor and return it
  300. int open_server_socket(const char *port) {
  301.   // TODO
  302.   int serverSocket;
  303.   int s, optval = 1;
  304.  
  305.   serverSocket = socket(AF_INET, SOCK_STREAM, 0);
  306.   memset(&hints, 0, sizeof(struct addrinfo));    
  307.   hints.ai_family = AF_INET;
  308.   hints.ai_socktype = SOCK_STREAM;
  309.   hints.ai_flags = AI_PASSIVE;
  310.  
  311.   s = getaddrinfo(NULL, port, &hints, &result);
  312.   if(s < 0) {
  313.       fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
  314.       exit(1);
  315.   }
  316.  
  317.   setsockopt(serverSocket, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
  318.  
  319.   if (bind(serverSocket, result->ai_addr, result->ai_addrlen) != 0) {
  320.       perror("bind");
  321.       exit(1);
  322.   }
  323.  
  324.   return serverSocket;
  325. }
  326.  
  327. void signal_received(int sig) {
  328.   // TODO close server socket, free anything you don't free in main
  329. }
  330.  
  331. int main(int argc, const char *argv[]) {
  332.  
  333.   int request_server_fd, request_socket, wearable_socket;
  334.   int i = 0;
  335.   pthread_t wearable_thread[MAX_CLIENTS];
  336.   thread_data threadData;
  337.  
  338.   if (argc != 3) {
  339.     printf("Invalid input size\n");
  340.     exit(EXIT_FAILURE);
  341.   }
  342.  
  343.  
  344.   // setup sig handler for SIGINT
  345.   //signal(SIGINT, signal_received);
  346.  
  347.  
  348.   // Initialize the queue
  349.   queue_init(&receieved_data_);
  350.  
  351.   // Initialize the lock/mutex to protect the queue
  352.   pthread_mutex_init(&queue_lock_, NULL);
  353.  
  354.   request_server_fd = open_server_socket(argv[2]);
  355.   if(listen(request_server_fd, 1)) {
  356.         perror("listen");
  357.         exit(1);
  358.   }
  359.  
  360.   wearable_server_fd = open_server_socket(argv[1]);
  361.   if(listen(wearable_server_fd, MAX_CLIENTS)) {
  362.       perror("listen");
  363.       exit(1);
  364.   }
  365.  
  366.   // Accept request connect ( stats thread)
  367.  
  368.   pthread_t request_thread;
  369.   request_socket = accept(request_server_fd, NULL, NULL);
  370.   printf("Main: Connection Made: request_client_fd = %d\n", request_socket);
  371.   pthread_create(&request_thread, NULL, user_request_thread, &request_socket);
  372.  
  373.   // Accept wearable connects
  374.   // create threads for handling the wearbles
  375.  
  376.   while((wearablesConnected < MAX_CLIENTS) && (wearable_socket = accept(wearable_server_fd, NULL, NULL))) {
  377.     threadData.fd = wearable_socket;
  378.     printf("Main: Connection Made: wearable_client_fd = %d\n", wearable_socket);
  379.     pthread_mutex_lock(&mutex);
  380.     wearablesConnected++;
  381.     pthread_mutex_unlock(&mutex);
  382.     wearable_clients[i] = wearable_socket;
  383.     pthread_create(&wearable_thread[i], NULL,  wearable_processor_thread, &threadData);
  384.     i++;
  385.  
  386.   }
  387.   // Only one request socket
  388.  
  389.   // Join thread we spawned from the request
  390.   pthread_join(request_thread, NULL);
  391.  
  392.   // Join all threads we spawned from the wearables
  393.   for(i=0;i<MAX_CLIENTS;i++)
  394.     pthread_join(wearable_thread[i],NULL);
  395.  
  396.   // Free all the data in the queue
  397.   queue_destroy(&receieved_data_, 1);
  398.  
  399.   close(request_server_fd);
  400.   close(wearable_server_fd);
  401.  
  402.   return 0;
  403. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement