Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Machine Problem: Wearables
- * CS 241 - Spring 2016
- */
- #include <fcntl.h>
- #include <netdb.h>
- #include <signal.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/socket.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <pthread.h>
- #include "queue.h"
- #define MAX_CLIENTS 15
- #define MSG_SIZE 64
- const char *TYPE1 = "heart_beat";
- const char *TYPE2 = "blood_sugar";
- const char *TYPE3 = "body_temp";
- struct addrinfo hints, *result;
- int wearable_clients[MAX_CLIENTS];
- // The wearable server socket, which all wearables connect to.
- int wearable_server_fd;
- // A lock for your queue.
- pthread_mutex_t queue_lock_;
- // Lock to keep track of connected wearables
- pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
- // Lock to keep track of connected wearables
- pthread_cond_t type1_cond = PTHREAD_COND_INITIALIZER;
- pthread_cond_t type2_cond = PTHREAD_COND_INITIALIZER;
- pthread_cond_t type3_cond = PTHREAD_COND_INITIALIZER;
- int wearablesConnected = 0;
- // A queue for all received data.
- queue_t receieved_data_;
- typedef struct SampleData {
- char type_[50];
- int data_;
- } SampleData;
- int compare(const void *a, const void *b) { return (*(int *)a - *(int *)b); }
- int select1(void *data) {
- return strcmp(((SampleData *)data)->type_, TYPE1) == 0;
- }
- int select2(void *data) {
- return strcmp(((SampleData *)data)->type_, TYPE2) == 0;
- }
- int select3(void *data) {
- return strcmp(((SampleData *)data)->type_, TYPE3) == 0;
- }
- int selectorall(void *__attribute__((unused)) val) { return 1; }
- typedef struct {
- pthread_t thread;
- int fd;
- long timestamp;
- // TODO you might want to put more things here
- } thread_data;
- thread_data **wearable_threads;
- int wearable_threads_size = 0;
- /**
- * Used to write out the statistics of a given results set (of
- * timestamp_entry's). To generate the result set see queue_gather(). fd is the
- * file descriptor to which the information is sent out. The type is the type of
- * data that is written out (TYPE1, TYPE2, TYPE3). results is the array of
- * timestamp_entrys, and size is the size of that array. NOTE: that you should
- * call method for every type (TYPE1, TYPE2, TYPE3), and then write out the
- * infomration "\r\n" to signify that you have finished sending out the results.
- */
- void write_results(int fd, const char *type, timestamp_entry *results,
- int size) {
- long avg = 0;
- int i;
- char buffer[1024];
- int temp_array[size];
- sprintf(buffer, "Results for %s:\n", type);
- sprintf(buffer + strlen(buffer), "Size:%i\n", size);
- for (i = 0; i < size; i++) {
- temp_array[i] = ((SampleData *)(results[i].data_))->data_;
- avg += ((SampleData *)(results[i].data_))->data_;
- }
- qsort(temp_array, size, sizeof(int), compare);
- if (size != 0) {
- sprintf(buffer + strlen(buffer), "Median:%i\n",
- (size % 2 == 0)
- ? (temp_array[size / 2] + temp_array[size / 2 - 1]) / 2
- : temp_array[size / 2]);
- } else {
- sprintf(buffer + strlen(buffer), "Median:0\n");
- }
- sprintf(buffer + strlen(buffer), "Average:%li\n\n",
- (size == 0 ? 0 : avg / size));
- write(fd, buffer, strlen(buffer));
- }
- /**
- * Given an input line in the form <timestamp>:<value>:<type>, this method
- * parses the infomration from the string, into the given timestamp, and mallocs
- * space for SampleData, and stores the type and value within
- */
- void extract_key(char *line, long *timestamp, SampleData **ret) {
- *ret = malloc(sizeof(SampleData));
- sscanf(line, "%zu:%i:%s\n", timestamp, &((*ret)->data_), (*ret)->type_);
- // eat the trailing ":"
- (*ret)->type_[strlen((*ret)->type_) - 1] = '\0';
- }
- /**
- * Given an input line in the form <timestamp>:<timestamp>, this method
- * parses the infomration from the string, into the given timestamps
- */
- void extract_timestamps(char *line, long *timestamp1, long *timestamp2) {
- sscanf(line, "%zu:%zu\n", timestamp1, timestamp2);
- }
- // Use a buffer of length 64!
- // TODO read data from the socket until -1 is returned by read
- // char buffer[64];
- // while (read(socketfd, buffer, 64) > 0) ... // or do you need recv???
- // A slight simplification for this MP: you can assume that data will always
- // arrive in 64 byte packets because the wearable clients are actually running on
- // the same localhost. Thus, you can read socket data in in blocks of 64
- // bytes and parse each block as a new data item. A real server might have to
- // stitch multiple blocks together because the packets can arrive in arbitrary
- // sizes as they are routed around the Internet.
- void *wearable_processor_thread(void *args) {
- thread_data *td;
- SampleData *wearable_data;
- long timestamp;
- int socketfd;
- int client_is_connected = 1;
- char buffer[MSG_SIZE];
- //int len;
- int num;
- td = (thread_data *)args;
- socketfd = td->fd;
- printf("Wearable Processor Thread: SocketFD: %d\n", socketfd);
- while(client_is_connected) {
- //len = 0;
- while (1) {
- num = read(socketfd, buffer, MSG_SIZE);
- //len += num;
- if (!num) {
- client_is_connected = 0;
- }
- break;
- // if (buffer[len - 1] == '\n')
- // break;
- }
- // Error or client closed the connection, so time to close this specific
- // client connection
- if (!client_is_connected) {
- printf("User %d left\n", socketfd);
- break;
- } else {
- // Process the buffer
- extract_key(buffer, ×tamp, &wearable_data);
- // Add wearable_data to the queue
- pthread_mutex_lock(&queue_lock_);
- queue_insert(&receieved_data_, timestamp, (void *) (wearable_data));
- if(strcmp((wearable_data)->type_,TYPE1) )
- pthread_cond_signal(&type1_cond);
- if(strcmp((wearable_data)->type_,TYPE1) )
- pthread_cond_signal(&type1_cond);
- if(strcmp((wearable_data)->type_,TYPE1) )
- pthread_cond_signal(&type1_cond);
- pthread_mutex_unlock(&queue_lock_);
- }
- }
- close(socketfd);
- pthread_mutex_lock(&mutex);
- wearablesConnected--;
- pthread_mutex_unlock(&mutex);
- return NULL;
- }
- // read data from the socket until -1 is returned by read
- // Requests will be in the form
- //<timestamp1>:<timestamp2>, then write out statistics for data between
- // those timestamp ranges
- void *user_request_thread(void *args) {
- int socketfd = *((int *)args);
- //timestamp_entry *timestamp_entry
- timestamp_entry *type1_ret, *type2_ret, *type3_ret;
- long timestamp_start, timestamp_end;
- int client_is_connected = 1;
- char buffer[MSG_SIZE];
- int type1_sz, type2_sz, type3_sz;
- int num;
- // Use a buffer of length 64!
- // TODO read data from the socket until -1 is returned by read
- // char buffer[64];
- // while (read(socketfd, buffer, 64) > 0) ... // or do you need recv???
- printf("User Request Thread: SocketFD: %d\n", socketfd);
- while(client_is_connected) {
- // len=0;
- while (1) {
- num = read(socketfd, buffer , MSG_SIZE);
- if (num == -1) {
- // error condition
- break;
- }
- //len += num;
- if (!num) {
- client_is_connected = 0;
- }
- break;
- }
- // Error or client closed the connection, so time to close this specific
- // client connection
- if (!client_is_connected) {
- printf("User %d left\n", socketfd);
- break;
- } else {
- // Process the buffer
- extract_timestamps(buffer, ×tamp_start, ×tamp_end);
- // Send the results back to the request client
- // Write \r \n as a marker
- // queue_gather returns address of an array
- // - Need to check if the array has entries inclusive of the two timestamps
- pthread_mutex_lock(&queue_lock_);
- type1_ret = queue_gather(&receieved_data_,timestamp_start, timestamp_end, select1, &type1_sz);
- //timestamp_entry = type1_ret + type1_sz;
- pthread_mutex_unlock(&queue_lock_);
- pthread_mutex_lock(&queue_lock_);
- type2_ret = queue_gather(&receieved_data_,timestamp_start, timestamp_end, select2, &type2_sz);
- pthread_mutex_unlock(&queue_lock_);
- pthread_mutex_lock(&queue_lock_);
- type3_ret = queue_gather(&receieved_data_,timestamp_start, timestamp_end, select3, &type3_sz);
- pthread_mutex_unlock(&queue_lock_);
- write_results(socketfd, TYPE1, type1_ret, type1_sz);
- write_results(socketfd, TYPE2, type2_ret, type2_sz);
- write_results(socketfd, TYPE3, type3_ret, type3_sz);
- write(socketfd,"\r \n", 4); // We may need to change this to 3
- }
- }
- close(socketfd);
- return NULL;
- }
- // IMPLEMENT!
- // given a string with the port value, set up a
- // serversocket file descriptor and return it
- int open_server_socket(const char *port) {
- // TODO
- int serverSocket;
- int s, optval = 1;
- serverSocket = socket(AF_INET, SOCK_STREAM, 0);
- memset(&hints, 0, sizeof(struct addrinfo));
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE;
- s = getaddrinfo(NULL, port, &hints, &result);
- if(s < 0) {
- fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
- exit(1);
- }
- setsockopt(serverSocket, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
- if (bind(serverSocket, result->ai_addr, result->ai_addrlen) != 0) {
- perror("bind");
- exit(1);
- }
- return serverSocket;
- }
- void signal_received(int sig) {
- // TODO close server socket, free anything you don't free in main
- }
- int main(int argc, const char *argv[]) {
- int request_server_fd, request_socket, wearable_socket;
- int i = 0;
- pthread_t wearable_thread[MAX_CLIENTS];
- thread_data threadData;
- if (argc != 3) {
- printf("Invalid input size\n");
- exit(EXIT_FAILURE);
- }
- // setup sig handler for SIGINT
- //signal(SIGINT, signal_received);
- // Initialize the queue
- queue_init(&receieved_data_);
- // Initialize the lock/mutex to protect the queue
- pthread_mutex_init(&queue_lock_, NULL);
- request_server_fd = open_server_socket(argv[2]);
- if(listen(request_server_fd, 1)) {
- perror("listen");
- exit(1);
- }
- wearable_server_fd = open_server_socket(argv[1]);
- if(listen(wearable_server_fd, MAX_CLIENTS)) {
- perror("listen");
- exit(1);
- }
- // Accept request connect ( stats thread)
- pthread_t request_thread;
- request_socket = accept(request_server_fd, NULL, NULL);
- printf("Main: Connection Made: request_client_fd = %d\n", request_socket);
- pthread_create(&request_thread, NULL, user_request_thread, &request_socket);
- // Accept wearable connects
- // create threads for handling the wearbles
- while((wearablesConnected < MAX_CLIENTS) && (wearable_socket = accept(wearable_server_fd, NULL, NULL))) {
- threadData.fd = wearable_socket;
- printf("Main: Connection Made: wearable_client_fd = %d\n", wearable_socket);
- pthread_mutex_lock(&mutex);
- wearablesConnected++;
- pthread_mutex_unlock(&mutex);
- wearable_clients[i] = wearable_socket;
- pthread_create(&wearable_thread[i], NULL, wearable_processor_thread, &threadData);
- i++;
- }
- // Only one request socket
- // Join thread we spawned from the request
- pthread_join(request_thread, NULL);
- // Join all threads we spawned from the wearables
- for(i=0;i<MAX_CLIENTS;i++)
- pthread_join(wearable_thread[i],NULL);
- // Free all the data in the queue
- queue_destroy(&receieved_data_, 1);
- close(request_server_fd);
- close(wearable_server_fd);
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement