Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <stdlib.h> /*malloc, free*/
- #include <stdio.h> /*printf*/
- #include <ucontext.h> /*context functionality*/
- #include <assert.h> /*assert*/
- #include <signal.h>
- #include <sys/time.h>
- #include "green.h"
- #include <pthread.h>
- #include <time.h>
- #define PERIOD 100
- #define FALSE 0
- #define TRUE 1
- #define STACK_SIZE 4096
- #define MAX 1000000
- static ucontext_t main_cntx = {0};
- static green_t main_green = {&main_cntx, NULL, NULL, NULL, NULL, FALSE};
- static green_t* running = &main_green;
- static sigset_t block;
- pthread_cond_t emptyP, fullP;
- pthread_mutex_t mutexP;
- static void init () __attribute__ ((constructor));
- void timer_handler(int sig);
- void init () {
- getcontext (&main_cntx);
- sigemptyset(&block);
- sigaddset(&block, SIGVTALRM);
- struct sigaction act = {0};
- struct timeval interval;
- struct itimerval period;
- act.sa_handler = timer_handler;
- assert(sigaction(SIGVTALRM, &act, NULL) == 0);
- interval.tv_sec = 0;
- interval.tv_usec = PERIOD;
- period.it_interval = interval;
- period.it_value = interval;
- setitimer(ITIMER_VIRTUAL, &period, NULL);
- }
- struct Queue {
- green_t* front;
- green_t* end;
- };
- struct Queue* queue;
- // Function to add a key to queue
- void myEnQueue(green_t* new) {
- if (queue->end == NULL) {
- queue->front = queue->end = new;
- return;
- }
- queue->end->next = new;
- queue->end = new;
- }
- // Function to remove a key from queue
- struct green_t* myDeQueue() {
- if (queue->front == NULL) {
- return NULL;
- }
- //printf("1: queue->front = %p \n", queue->front);
- struct green_t* temp = queue->front;
- //temp->next = NULL;
- queue->front = queue->front->next;
- temp->next = NULL;
- if (queue->front == NULL) {
- queue->end = NULL;
- } else {
- //printf("2: queue->front = %p \n", queue->front);
- }
- return temp;
- }
- void removeFromReadyList(green_t* remove) {
- struct green_t* temp = queue->front;
- struct green_t* afterTemp = temp->next;
- if (temp == remove) {
- queue->front = queue->front->next;
- //printf("if (temp == remove), queue->front = %p \n", queue->front);
- return;
- }
- while (afterTemp != remove) {
- temp = temp->next;
- afterTemp = afterTemp->next;
- }
- if (afterTemp != NULL) {
- temp->next = afterTemp->next;
- }
- }
- int green_mutex_init(green_mutex_t *mutex) {
- mutex->taken = FALSE;
- mutex->susp = NULL;
- }
- int green_mutex_lock(green_mutex_t *mutex) {
- // block timer interrupt
- sigprocmask(SIG_BLOCK, &block, NULL);
- green_t *susp = running;
- while(mutex->taken) {
- // suspend the running thread. Should I have a susp list?
- // add susp to susp queue
- if (mutex->susp == NULL){
- mutex->susp = running;
- } else {
- //I want to make sure that we add it at the end of the queue.
- green_t* current = mutex->susp;
- while(current->next != NULL){
- if (current->next == susp) {
- break;
- }
- //printf(" current->next != %p, current = %p \n", current->next, current);
- current = current->next;
- }
- current->next = running;
- running->next = NULL;
- }
- // select the next thread for execution
- struct green_t* next = myDeQueue();
- // find the next thread
- running = next;
- swapcontext(susp->context, next->context);
- }
- // take the lock. This is enough, the caller function will wait until it gets back,
- // and do whatever with the locks.
- mutex->taken = TRUE;
- //mutex->susp = running;
- // unblock
- sigprocmask(SIG_UNBLOCK, &block, NULL);
- return 0;
- }
- int green_mutex_unlock(green_mutex_t *mutex) {
- // block timer interrupt
- sigprocmask(SIG_BLOCK, &block, NULL);
- green_t *susp = running;
- green_t* suspended = mutex->susp;
- // move suspended threads to ready queue
- while (suspended != NULL){
- if (suspended == susp) {
- break;
- }
- //printf(" mutex->susp != %p \n", mutex->susp);
- myEnQueue(suspended);
- if (suspended == suspended->next) {
- break;
- }
- suspended = suspended->next;
- //mutex->susp = NULL;
- }
- mutex->taken = FALSE;
- mutex->susp = NULL;
- // unblock
- sigprocmask(SIG_UNBLOCK, &block, NULL);
- return 0;
- }
- /*
- Initialize a green condition variable.
- */
- void green_cond_init(green_cond_t* cond) {
- cond = malloc(sizeof(struct green_cond_t));
- }
- /*
- Suspend the current thread
- on the condition.
- */
- void green_cond_wait(green_cond_t* cond, green_mutex_t* mutex) {
- sigprocmask(SIG_BLOCK, &block, NULL);
- //Add to cond queue
- green_t* this = running;
- if (cond->end == NULL) {
- cond->front = cond->end = this;
- }
- else {
- cond->end->next = this;
- cond->end = this;
- }
- green_t *susp = running;
- green_t* suspended = mutex->susp;
- //If we have a mutex. Enqueues from seups.
- if(mutex != NULL) {
- while (suspended != NULL){
- if (suspended == susp) {
- break;
- }
- //printf(" mutex->susp != %p \n", mutex->susp);
- myEnQueue(suspended);
- if (suspended == suspended->next) {
- break;
- }
- suspended = suspended->next;
- //mutex->susp = NULL;
- }
- mutex->taken = FALSE;
- mutex->susp = NULL;
- }
- susp = running;
- //printf("this = %p \n", this);
- //run next
- struct green_t* next = myDeQueue();
- //printf("next = %p \n", next);
- running = next;
- swapcontext (this->context, next->context);
- if(mutex != NULL){
- //Try to take the lock
- while(mutex->taken) {
- //Bad luck, suspended
- // add susp to susp queue
- if (mutex->susp == NULL){
- mutex->susp = running;
- } else {
- //I want to make sure that we add it at the end of the queue.
- green_t* current = mutex->susp;
- while(current->next != NULL){
- if (current->next == susp) {
- break;
- }
- //printf(" current->next != %p, current = %p \n", current->next, current);
- current = current->next;
- }
- current->next = running;
- running->next = NULL;
- }
- // select the next thread for execution
- struct green_t* next = myDeQueue();
- // find the next thread
- running = next;
- swapcontext(susp->context, next->context);
- }
- //Take the lock
- mutex->taken = TRUE;
- }
- sigprocmask(SIG_UNBLOCK, &block, NULL);
- }
- /*
- Move the first suspended
- thread to the ready cond.
- */
- void green_cond_signal(green_cond_t* cond) {
- sigprocmask(SIG_BLOCK, &block, NULL);
- if (cond->front == NULL) {
- return;
- }
- struct green_t* temp = cond->front;
- cond->front = cond->front->next;
- if (cond->front == NULL) {
- cond->end = NULL;
- }
- //printf("temp = %p \n", temp);
- //printf("cond->front = %p \n", cond->front);
- myEnQueue(temp);
- sigprocmask(SIG_UNBLOCK, &block, NULL);
- //return temp;
- }
- /*
- This function is responsible for calling the function provided by the user.
- This function will do (execute) two things:
- 1. Start the execution of the real function (the function provided by the user).
- 2. After returning from the call, terminate the thread.
- The tricky part is what to do when the called function returns.
- The program should check if there is a thread waiting for its termination,
- if so place it in the ready queue.
- The stack that was allocated, and the space for the context,
- should be returned to the memory management system;
- the thread is now a zombie process.
- There should be a thread in the ready queue
- so the program select the first and schedule it for execution.
- */
- void green_thread() {
- sigprocmask(SIG_UNBLOCK, &block, NULL);
- green_t* this = running;
- (*this->function)(this->arg);
- // place waiting (joining) thread in ready queue
- if (this == NULL) {
- //printf("this == NULL \n");
- }
- sigprocmask(SIG_BLOCK, &block, NULL);
- struct green_t* j = this->join;
- if (j != NULL) {
- //printf("j = %p \n", j);
- myEnQueue(j);
- //swapcontext (this->context, this->join->context);
- }
- //printf ("thread %d is done \n", *(int*)this->arg);
- //printf("this = %p \n", this);
- // free alocated memory structures, i.e. free the stack.
- //removeFromReadyList(this);
- free(this->context->uc_stack.ss_sp);
- free(this->context);
- // we're a zombie
- this->zombie = TRUE;
- //printf("Zombie \n");
- // find the next thread to run
- struct green_t* next = myDeQueue();
- if (next != NULL) {
- //printf("next = %p \n", next);
- running = next;
- setcontext (next->context);
- }
- //sigprocmask(SIG_UNBLOCK, &block, NULL);
- //printf("after next != NULL \n");
- //setcontext (this->context);
- }
- /*create a green thread*/
- int green_create (green_t* new, void* (*function)(void*), void* arg) {
- // sigprocmask(SIG_BLOCK, &block, NULL);
- ucontext_t* cntx = (ucontext_t *)malloc(sizeof(ucontext_t));
- getcontext (cntx);
- void* stack = malloc (STACK_SIZE);
- cntx->uc_stack.ss_sp = stack;
- cntx->uc_stack.ss_size = STACK_SIZE;
- makecontext (cntx, green_thread, 0);
- new->context = cntx;
- new->function = function;
- new->arg = arg;
- new->next = NULL;
- new->join = NULL;
- new->zombie = FALSE;
- // add new to the ready queue
- sigprocmask(SIG_BLOCK, &block, NULL);
- myEnQueue(new);
- sigprocmask(SIG_UNBLOCK, &block, NULL);
- return 0;
- }
- /*
- Suspends the current thread and selects a new thread for execution.
- This function will simply put the running thread last in the ready queue
- and then select the first thread from the queue as the next thread to run.
- The call to swapcontext() will do a context switch. It will save the
- current state in susp->context and continue execution from next->context.
- Note that when the suspended thread is scheduled, it will continue the execution
- from exactly this point.
- */
- int green_yield () {
- sigprocmask(SIG_BLOCK, &block, NULL);
- green_t* susp = running;
- // add susp to ready queue
- myEnQueue(susp);
- // select the next thread for execution
- struct green_t* next = myDeQueue();
- running = next;
- swapcontext (susp->context, next->context);
- sigprocmask(SIG_UNBLOCK, &block, NULL);
- return 0;
- }
- /*
- The current thread is suspended waiting for a specific thread to terminate.
- The join operation will wait for a thread to terminate. The program therefore add
- the waiting thread to the join field to the argument and select another
- thread for execution.
- If the thread has already terminated we can of course continue
- as if nothing happened.
- The program could allow several threads to wait for the same thread.
- */
- int green_join (green_t* thread) {
- if (thread->zombie) {
- return 0;
- }
- green_t* susp = running;
- sigprocmask(SIG_BLOCK, &block, NULL);
- // add to waiting threads
- //sigprocmask(SIG_BLOCK, &block, NULL);
- if(thread->join == NULL){
- thread->join = susp;
- } else {
- green_t* current = thread->join;
- while(current->next != NULL){
- current = current->next;
- }
- current->next = susp;
- }
- //printf ("susp = %p, susp->join = %p\n", susp, susp->join);
- // select the next thread for execution
- struct green_t* next = myDeQueue();
- running = next;
- //printf ("gonna swap\n");
- swapcontext (susp->context, next->context);
- sigprocmask(SIG_UNBLOCK, &block, NULL);
- //printf ("back in join\n");
- return 0;
- }
- void timer_handler(int sig) {
- sigprocmask(SIG_BLOCK, &block, NULL);
- green_t * susp = running;
- // add the running to the ready queue. Also have to
- myEnQueue(susp);
- // find the next thread for execution
- green_t* next = myDeQueue();
- running = next;
- swapcontext(susp->context, next->context);
- sigprocmask(SIG_UNBLOCK, &block, NULL);
- }
- /*
- --------------------------------------Testing--------------------------------------
- */
- //new
- int flag = 0;
- green_cond_t cond;
- green_mutex_t mutex;
- int number_of_threads = 10;
- int counter = 0;
- //end new
- void* test(void* arg) {
- int id = *(int*)arg;
- int loop = 4;
- while(loop > 0) {
- if(flag == id) {
- printf("thread %d: %d \n", id, loop);
- printf("Work \n");
- loop--;
- flag = (id + 1) % 2;
- green_cond_signal(&cond);
- } else {
- //printf("thread %d: %d ", id, loop);
- //printf("Wait \n");
- green_cond_wait(&cond, &mutex);
- }
- }
- //printf("while out \n");
- }
- void* testSharedResource(void* arg) {
- int id = *(int*)arg;
- for(int i = 0; i < 1000000; i++) {
- green_mutex_lock(&mutex);
- int temp = counter;
- /*
- int dummy = 0;
- if(temp > 1337) // waste time between read and write
- dummy++;
- */
- printf(" count: %d, thread: %d, ", counter, id);
- temp++;
- counter = temp;
- green_mutex_unlock(&mutex);
- }
- }
- void *test_timer(void *arg){
- int i = *(int*)arg;
- int loop = MAX;
- while(loop>0){
- loop--;
- counter++;
- printf(" count: %d, thread: %d\n",counter, i);
- }
- }
- /*
- void test_atomic(green_cond_t* cond, green_mutex_t* mutex){
- // green_mutex_lock(&mutex);
- while(1){
- if(flag == 0){
- flag = 1;
- green_cond_signal(&cond);
- break;
- }else {
- green_cond_wait(&cond, &mutex);
- flag = 0;
- }
- }
- }*/
- int buffer;
- int productions;
- green_cond_t full, empty;
- void produce() {
- for(int i = 0; i < productions/(number_of_threads/2); i++) {
- green_mutex_lock(&mutex);
- while(buffer == 1) {
- green_cond_wait(&empty, &mutex);
- }
- buffer = 1;
- green_cond_signal(&full);
- green_mutex_unlock(&mutex);
- }
- }
- void consume() {
- for(int i = 0; i < productions/(number_of_threads/2); i++) {
- green_mutex_lock(&mutex);
- while(buffer == 0){
- green_cond_wait(&full, &mutex);
- }
- buffer = 0;
- green_cond_signal(&empty);
- green_mutex_unlock(&mutex);
- }
- }
- void* test_consumer_producer_green(void* arg) {
- int id = *(int*)arg;
- if(id % 2 == 0) {
- produce();
- } else {
- consume();
- }
- }
- void test_green(int* args) {
- green_t threads[number_of_threads];
- for(int i = 0; i < number_of_threads; i++)
- green_create(&threads[i], test_consumer_producer_green, &args[i]);
- for(int i = 0; i < number_of_threads; i++)
- green_join(&threads[i]);
- }
- /***************PTHREADS*************/
- void produce_pthreads() {
- for(int i = 0; i < productions/(number_of_threads/2); i++) {
- pthread_mutex_lock(&mutexP);
- while(buffer == 1){
- pthread_cond_wait(&emptyP, &mutexP);
- }
- buffer = 1;
- pthread_cond_signal(&fullP);
- pthread_mutex_unlock(&mutexP);
- }
- }
- void consume_pthreads() {
- for(int i = 0; i < productions/(number_of_threads/2); i++) {
- pthread_mutex_lock(&mutexP);
- while(buffer == 0) {
- pthread_cond_wait(&fullP, &mutexP);
- }
- buffer = 0;
- //printf("Consumed!\n");
- pthread_cond_signal(&emptyP);
- pthread_mutex_unlock(&mutexP);
- }
- }
- void* test_consumer_producer_pthreads(void* arg) {
- int id = *(int*)arg;
- if(id % 2 == 0) {
- produce_pthreads();
- } else {
- consume_pthreads();
- }
- }
- void test_pthreads(int* args) {
- pthread_t threads[number_of_threads];
- for(int i = 0; i < number_of_threads; i++){
- pthread_create(&threads[i], NULL, test_consumer_producer_pthreads, &args[i]);
- }
- for(int i = 0; i < number_of_threads; i++){
- pthread_join(threads[i], NULL);
- }
- }
- int main () {
- clock_t clock_start, c_stop;
- double clock_ticks_gren = 0, clock_ticks_pthreads = 0;
- queue = malloc(sizeof *queue);
- green_cond_init(&cond);
- green_cond_init(&full);
- green_cond_init(&empty);
- green_mutex_init(&mutex);
- pthread_cond_init(&fullP, NULL);
- pthread_cond_init(&emptyP, NULL);
- pthread_mutex_init(&mutexP, NULL);
- int loops = 10;
- for(int run = 1; (run <= loops) && (counter<4000); run++) {
- buffer = 0;
- productions = 100 * 2 * run;
- int args[number_of_threads];
- for(int i = 0; i < number_of_threads; i++)
- args[i] = i;
- clock_start = clock();
- test_green(args);
- c_stop = clock();
- clock_ticks_gren = ((double)(c_stop - clock_start)) / ((double)CLOCKS_PER_SEC/1000);
- //printf("buffer: %d\n", buffer);
- clock_start = clock();
- test_pthreads(args);
- c_stop = clock();
- clock_ticks_pthreads = ((double)(c_stop - clock_start)) / ((double)CLOCKS_PER_SEC/1000);
- printf("%d\t%f\t%f\n", productions, clock_ticks_gren, clock_ticks_pthreads);
- }
- printf ("done\n");
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement