#include #include #include #include #include #include #include // number of threads, for monitoring actions. #define NTHREAD 3 // path, where to store files, with result of action# work const char path[] = "/home/zythar"; // main working structure /* kinda dummy. in real program this is replaced with real functions. */ struct server_t { char *name; int (*triggered)(struct server_t *); // event, thar is monitored int (*action)(struct server_t *); // action taken, when event triggered }; typedef struct server_t server_t; /* structs for threads */ typedef struct act_threads { pthread_t tid; char *servername; } act_threads_t; typedef struct act_checker { act_threads_t **actions; int count; } act_checker_t; /* structs for threads */ //END // function that is called, when action completed. it takes value, that action func returned, and write it to file named thread_#thread_id# int return_thread(pthread_t tid, int retval) { FILE *thread; char *fullpath; fullpath = (char *) malloc((strlen(path) + strlen("thread_") + 12) * sizeof(char)); snprintf(fullpath, strlen(path) + strlen("thread_") + 12, "%s/thread_%lu", path, tid); thread = fopen(fullpath, "w"); if (thread != NULL) { printf("%s created successfully\n", fullpath); fprintf(thread, "%d", retval); fclose(thread); } else { printf("Cannot create file for thread #%lu.\n", tid); printf("Exiting...\n"); exit(0); } free(fullpath); return 0; } /* dummy funcs for events and actions */ int triggered1(struct server_t * server) { if (time(NULL) % 5 == 0) { return 0; } else { return -1; } } int triggered2(struct server_t * server) { if (time(NULL) % 13 == 0) { return 0; } else { return -1; } } int triggered3(struct server_t * server) { if (time(NULL) % 17 == 0) { return 0; } else { return -1; } } int action1(struct server_t * server) { static int called_times; called_times = ( called_times > 0) ? called_times : 0; called_times++; int i = 0; while (i < 17) { i++; sleep(1); } return_thread(pthread_self(), 0); pthread_exit(NULL); return 0; } int action2(struct server_t * server) { static int called_times; called_times = ( called_times > 0) ? called_times : 0; called_times++; int i = 0; while (i < 41) { i++; sleep(1); } return_thread(pthread_self(), 0); pthread_exit(NULL); return 0; } int action3(struct server_t * server) { static int called_times; called_times = ( called_times > 0) ? called_times : 0; called_times++; int i = 0; while (i < 10) { i++; sleep(1); } return_thread(pthread_self(), 0); pthread_exit(NULL); return 0; } /* dummy funcs for events and actions */ //END /*int print_struct(act_checker_t *checker) { if (checker->count <= 0) return -1; int i; for (i = 0; i < checker->count; i++) printf("\t%d-th element. tid -- %lu\n", i, checker->actions[i]->tid); printf("\n\n"); return 1; }*/ // mutex, that is locked, when trying to modify struct checker pthread_mutex_t checker_mutex; // func, that checks if actions are completed, and if they are free space allocated for particular action void * check_threads(act_checker_t *checker) { printf("\n\n\t\tin %s\n\n", __FUNCTION__); FILE *thread; char *fullpath; int i; int status; int mut; fullpath = (char *) malloc((strlen(path) + strlen("thread_") + 12) * sizeof(char)); // path, where files are stored while (1) { // trying to lock mutex for modifying @checker. if not succeded try again, until lock is successfull. while (1) { mut = pthread_mutex_trylock (&checker_mutex); if (mut == 0 || mut == EDEADLK) { //if okay, or we own the lock then continue break; } else { sleep(3); } } // if no elements, then nothing to do. free mutex for others. if (checker->count <= 0) { pthread_mutex_unlock (&checker_mutex); continue; } // there are elements. check if threads completed, and if they are delete files and free alocated space for that thread for (i = 0; i < checker->count; i++) { snprintf(fullpath, strlen(path) + strlen("thread_") + 12, "%s/thread_%lu", path, checker->actions[i]->tid); thread = fopen(fullpath, "r"); if (thread == NULL) { // if no such file, then thread not completed. continue to another thread. continue; } else { fscanf(thread, "%d", &status); // thread completed. read returned value and... fclose(thread); if (remove(fullpath) == 0) { // delete file. printf("%s removed successfully\n", fullpath); } else { printf("Cannot remove file for thread #%lu.\n", checker->actions[i]->tid); printf("Exiting...\n"); exit(0); } free(checker->actions[i]); --checker->count; // free alocated memory for that thread. } } pthread_mutex_unlock (&checker_mutex); sleep(1); } free(fullpath); } // func, for creating thread for concrete action. void create_action_thread(act_checker_t *checker, server_t *server) { int actret; int mut; printf("\n\n\t\tin %s\n\n", __FUNCTION__); // locking mutex, because trying to modify @checker. until mutex locked don't do anything. while (1) { mut = pthread_mutex_trylock (&checker_mutex); if (mut == 0 || mut == EDEADLK) { break; } else { sleep(3); } } // if successfull, then alocating memory for that thread, creating the thread and unlocking mutex. ++checker->count; checker->actions = (act_threads_t **) realloc(checker->actions, checker->count * sizeof(act_threads_t *)); checker->actions[checker->count-1] = (act_threads_t *) malloc(sizeof(act_threads_t )); checker->actions[checker->count-1]->servername = (char *) malloc((strlen(server->name) + 2) * sizeof(char)); strncpy(checker->actions[checker->count-1]->servername, server->name, strlen(server->name) + 2); actret = pthread_create(&(checker->actions[checker->count-1]->tid), NULL, server->action, (void *) server); pthread_mutex_unlock (&checker_mutex); } int main() { /* initialization */ printf("Initing mutex..."); int init_ret; init_ret = pthread_mutex_init(&checker_mutex, NULL); if (init_ret != 0) { printf("pthread_mutex_init return %d. exiting...", init_ret); exit(0); } else { printf("Seems ok. %d returned\n", init_ret); } pthread_t threads[NTHREAD]; pthread_t action_check; act_checker_t checker; checker.actions = NULL; checker.count = 0; int iret; int action_ret; long i; int ret = 1; server_t servers[] = { {"server1", triggered1, action1}, {"server2", triggered2, action2}, {"server3", triggered3, action2}, }; /* initialization */ // END // creating separate thread, to monitor file system. action_ret = pthread_create(&action_check, NULL, check_threads, (void *) &checker); /* create independent threads each of which will execute function */ while (1) { for (i = 0; i < NTHREAD; i++) { iret = pthread_create(&threads[i], NULL, servers[i].triggered, (void *) &servers[i]); } for (i = 0; i < NTHREAD; i++) { pthread_join(threads[i], (void *) &ret); if (ret == 0) { // if any event triggered then... // creating thread for action of that event. create_action_thread(&checker, &servers[i]); } } sleep(1); } pthread_mutex_destroy(&checker_mutex); pthread_exit(NULL); return 0; }