Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- avp 3 тестовые pthread программки для очередей
- */
- // pq1.c потребитель-производитель с "бесконечной" очередью
- #include <stdio.h>
- #include <stdlib.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <pthread.h>
- // объявляем структуру данных для одного задания
- struct producer_consumer_queue_item {
- struct producer_consumer_queue_item *next;
- // здесь идут собственно данные. вы можете поменять этот кусок,
- // использовав структуру, более специфичную для вашей задачи
- void *data;
- };
- // объявляем очередь с дополнительными структурами для синхронизации.
- // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
- struct producer_consumer_queue {
- struct producer_consumer_queue_item *head, *tail;
- // head == tail == 0, если очередь пуста
- pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
- pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
- int is_alive; // показывает, не закончила ли очередь свою работу
- };
- // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
- void
- enqueue (void *data, struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // упакуем задание в новую структуру
- struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
- p->data = data;
- p->next = 0;
- // получим "эксклюзивный" доступ к очереди заданий
- pthread_mutex_lock(&aq->lock);
- // ... и добавим новое задание туда:
- if (q->tail)
- q->tail->next = p;
- else {
- q->head = p;
- // очередь была пуста, а теперь нет -- надо разбудить потребителей
- pthread_cond_broadcast(&aq->cond);
- }
- q->tail = p;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- // разрешаем доступ всем снова
- pthread_mutex_unlock(&aq->lock);
- }
- void *
- dequeue(struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // получаем эксклюзивный доступ к очереди:
- pthread_mutex_lock(&aq->lock);
- while (!q->head && q->is_alive) {
- // очередь пуста, делать нечего, ждем...
- pthread_cond_wait(&aq->cond, &aq->lock);
- // wait разрешает доступ другим на время ожидания
- }
- // запоминаем текущий элемент или 0, если очередь умерла
- struct producer_consumer_queue_item *p = q->head;
- if (p)
- {
- // и удаляем его из очереди
- q->head = q->head->next;
- if (!q->head)
- q->tail = q->head;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- }
- // возвращаем эксклюзивный доступ другим участникам
- pthread_mutex_unlock(&aq->lock);
- // отдаём данные
- void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
- // согласно 7.20.3.2/2, можно не проверять на 0
- free(p);
- return data;
- }
- struct producer_consumer_queue *
- producer_consumer_queue_create()
- {
- struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
- q->head = q->tail = 0;
- q->is_alive = 1;
- pthread_mutex_init(&q->lock, 0);
- pthread_cond_init(&q->cond, 0);
- return q;
- }
- // И процедура для закрытия очереди:
- void
- producer_consumer_queue_stop(struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // для обращения к разделяемым переменным необходим эксклюзивный доступ
- pthread_mutex_lock(&aq->lock);
- q->is_alive = 0;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- pthread_cond_broadcast(&aq->cond); // !!!!! avp
- pthread_mutex_unlock(&aq->lock);
- }
- // это поток-потребитель
- void *
- consumer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- for (;;) {
- void *data = dequeue(q);
- // это сигнал, что очередь окончена
- if (!data)
- break; // значит, пора закрывать поток
- char *str = (char *)data;
- // тут наша обработка данных
- printf ("consuming: %s\n", str);
- sleep(2); // 2000 заменил на 2 avp
- printf ("consumed: %s\n", str);
- free(str);
- }
- return 0;
- }
- int
- main ()
- {
- pthread_t consumer_threads[2];
- void *res = 0;
- char *in = NULL;
- size_t sz;
- int l;
- // создадим очередь:
- struct producer_consumer_queue *q = producer_consumer_queue_create(); //add struct
- // и потоки-«потребители» (изм. consumer на consumer_thread)
- pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
- pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
- // главный цикл
- // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
- while ((puts("Enter"), getline(&in, &sz, stdin)) > 0) {
- enqueue(in, q);
- in = NULL;
- }
- producer_consumer_queue_stop(q);
- puts("Fin2");
- if (pthread_join(consumer_threads[0], &res) ||
- pthread_join(consumer_threads[1], &res))
- perror("join");
- return (long)res;
- }
- // pq2.c очередь ограниченного размера без упорядочивания ожидающих производителей
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <pthread.h>
- // объявляем структуру данных для одного задания
- struct producer_consumer_queue_item {
- struct producer_consumer_queue_item *next;
- // здесь идут собственно данные. вы можете поменять этот кусок,
- // использовав структуру, более специфичную для вашей задачи
- void *data;
- };
- // объявляем очередь с дополнительными структурами для синхронизации.
- // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
- struct producer_consumer_queue {
- struct producer_consumer_queue_item *head, *tail;
- // head == tail == 0, если очередь пуста
- pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
- pthread_cond_t condp; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
- pthread_cond_t condc; // этот cond "сигналим", когда в очереди ПОЯВИЛОСЬ СВОБОДНОЕ МЕСТО
- int is_alive; // показывает, не закончила ли очередь свою работу
- int max, cnt, // максимальный размер очереди и число заданий в ней
- pqcnt; // количество производителей, ждущих свободного места в очереди
- };
- void print_queue (struct producer_consumer_queue *q, int lock);
- extern
- #ifdef __cplusplus
- "C"
- #endif
- pid_t gettid();
- // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
- void
- enqueue (void *data, struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // упакуем задание в новую структуру
- struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
- p->data = data;
- p->next = 0;
- // получим "эксклюзивный" доступ к очереди заданий
- pthread_mutex_lock(&aq->lock);
- // проверим не переполнена ли она
- if (q->max <= q->cnt) {
- q->pqcnt++;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- // будем ждать пока потребители ее слегка не опустошат
- while(q->max <= q->cnt & q->is_alive)
- pthread_cond_wait(&aq->condc, &aq->lock);
- q->pqcnt--;
- asm volatile ("" : : : "memory");
- }
- // ... и добавим новое задание туда:
- if (q->tail)
- q->tail->next = p;
- else {
- q->head = p;
- // очередь была пуста, а теперь нет -- надо разбудить потребителей
- pthread_cond_broadcast(&aq->condp);
- }
- q->tail = p;
- q->cnt++;
- asm volatile ("" : : : "memory");
- // разрешаем доступ всем снова
- pthread_mutex_unlock(&aq->lock);
- }
- void *
- dequeue(struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // получаем эксклюзивный доступ к очереди:
- pthread_mutex_lock(&aq->lock);
- if (q->pqcnt && q->max > q->cnt)
- // в очереди есть место, а кто-то спит, разбудим их
- pthread_cond_broadcast(&aq->condc);
- while (!q->head && q->is_alive) {
- // очередь пуста, делать нечего, ждем...
- pthread_cond_wait(&aq->condp, &aq->lock);
- // wait разрешает доступ другим на время ожидания
- }
- // запоминаем текущий элемент или 0, если очередь умерла
- struct producer_consumer_queue_item *p = q->head;
- if (p) {
- // и удаляем его из очереди
- q->head = q->head->next;
- if (!q->head)
- q->tail = q->head;
- q->cnt--;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- // разбудим поставщиков в их очереди
- pthread_cond_broadcast(&aq->condc);
- }
- // возвращаем эксклюзивный доступ другим участникам
- pthread_mutex_unlock(&aq->lock);
- // отдаём данные
- void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
- // согласно 7.20.3.2/2, можно не проверять на 0
- free(p);
- return data;
- }
- struct producer_consumer_queue *
- producer_consumer_queue_create(int max)
- {
- struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
- q->head = q->tail = 0;
- q->is_alive = 1;
- q->max = max;
- q->cnt = 0;
- q->pqcnt = 0;
- pthread_mutex_init(&q->lock, 0);
- pthread_cond_init(&q->condc, 0);
- pthread_cond_init(&q->condp, 0);
- return q;
- }
- // И процедура для закрытия очереди:
- void
- producer_consumer_queue_stop(struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // для обращения к разделяемым переменным необходим эксклюзивный доступ
- pthread_mutex_lock(&aq->lock);
- q->is_alive = 0;
- asm volatile ("" : : : "memory");
- pthread_cond_broadcast(&aq->condc);
- pthread_cond_broadcast(&aq->condp);
- pthread_mutex_unlock(&aq->lock);
- }
- // это поток-потребитель
- void *
- consumer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- printf("Consumer: %ld\n", (long)gettid());
- for (;;) {
- void *data = dequeue(q);
- // это сигнал, что очередь окончена
- if (!data)
- break; // значит, пора закрывать поток
- char *str = (char *)data;
- // тут наша обработка данных
- printf ("%ld consuming: %s", (long)gettid(), str);
- sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
- // printf ("%ld consumed: %s", (long)gettid(), str);
- free(str);
- }
- printf ("%ld cons exit\n", (long)gettid());
- return 0;
- }
- char *
- getdata()
- {
- char *str = (char *)malloc(100);
- volatile static int seq = 0;
- static pthread_mutex_t seqlock = PTHREAD_MUTEX_INITIALIZER;
- pthread_mutex_lock(&seqlock);
- sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
- printf("%s", str);
- pthread_mutex_unlock(&seqlock);
- return str;
- }
- void *
- producer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- printf("Producer: %ld\n", (long)gettid());
- while (q->is_alive) {
- enqueue(getdata(), q);
- sleep(rand() % 2 + 1);
- }
- printf ("%ld prod exit\n", (long)gettid());
- return 0;
- }
- void
- print_queue (struct producer_consumer_queue *q, int lock)
- {
- if (lock)
- pthread_mutex_lock(&q->lock);
- printf ("cnt = %d pqcnt = %d queue:\n", q->cnt, q->pqcnt);
- struct producer_consumer_queue_item *p;
- for (p = q->head; p; p = p->next)
- printf ("%s", (char *)(p->data));
- puts("");
- if (lock)
- pthread_mutex_unlock(&q->lock);
- }
- int
- main (int ac, char *av[])
- {
- int swp = 0, n = av[1] ? atoi(av[1]) : 5;
- if (n < 0) {
- n = -n; swp = 1;
- }
- if (n < 2)
- n = 5;
- int i, nc = n / 2, np = n - nc;
- if (swp) {
- swp = nc; nc = np; np = swp;
- }
- printf ("test %d consumers and %d producers\n", nc, np);
- pthread_t threads[n];
- void *res = 0;
- char *in = NULL;
- size_t sz;
- // создадим очередь:
- struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
- // и потоки-«потребители» (изм. consumer на consumer_thread)
- for (i = 0; i < nc; i++)
- pthread_create(threads + i, 0, consumer_thread, (void *)q);
- // и потоки "производители"
- for (; i < n; i++)
- pthread_create(threads + i, 0, producer_thread, (void *)q);
- // главный цикл
- // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
- while ((puts("Enter (for info or ^D for exit)"),
- getline(&in, &sz, stdin)) > 0) {
- #if 0
- enqueue(in, q);
- in = NULL;
- #endif
- print_queue(q, 1);
- }
- producer_consumer_queue_stop(q);
- puts("Fin2");
- print_queue(q, 1);
- for (i = 0; i < n; i++)
- if (pthread_join(threads[i], &res))
- perror("join");
- return (long)res;
- }
- // pq3.c очередь ограниченного размера с упорядочиванием ожидающих производителей
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <pthread.h>
- // объявляем структуру данных для одного задания
- struct producer_consumer_queue_item {
- struct producer_consumer_queue_item *next;
- // здесь идут собственно данные. вы можете поменять этот кусок,
- // использовав структуру, более специфичную для вашей задачи
- void *data;
- };
- // струкура данных для спящего (ждущего свободного места) потока-производителя
- struct producer_queue_item {
- struct producer_queue_item *next;
- struct producer_consumer_queue_item *item; // данные для которых нет места
- pthread_cond_t cond; // этот cond "сигналим", когда в очереди появилось место
- #if DEBUG
- pid_t tid; // linux thread id for debug print
- int signaled; // индикатор "побудки" for debug print
- #endif
- };
- // объявляем очередь данных с дополнительными структурами для синхронизации.
- // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
- struct producer_consumer_queue {
- struct producer_consumer_queue_item *head, *tail;
- // head == tail == 0, если очередь пуста
- pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
- pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
- int is_alive; // показывает, не закончила ли очередь свою работу
- int max, cnt; // максимальный размер очереди и число заданий в ней
- // очередь потоков-производителей, ждущих свободного места для своих данных
- struct producer_queue_item *pqhead,
- *pqtail;
- };
- extern
- #ifdef __cplusplus
- "C"
- #endif
- pid_t gettid();
- void print_queue (struct producer_consumer_queue *q, int lock);
- // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
- void
- enqueue (void *data, struct producer_consumer_queue *q)
- {
- volatile struct producer_consumer_queue *vq = q;
- // упакуем задание в новую структуру
- struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
- p->data = data;
- p->next = 0;
- // получим "эксклюзивный" доступ к очереди заданий
- pthread_mutex_lock(&q->lock);
- #if DEBUG
- printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data));
- #endif
- // ... и добавим новое задание туда:
- if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать
- #if DEBUG
- if (vq->cnt < vq->max) {
- puts("========================");
- print_queue(q, 0);
- puts("========================");
- }
- #endif
- struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq));
- pthread_cond_init(&pq->cond, 0); // cond по которому его разбудят
- pq->next = 0;
- pq->item = p; // сохраним данные на время сна
- #if DEBUG
- pq->tid = gettid();
- #endif
- // поместим себя в очередь спящих производителей
- if (vq->pqtail)
- vq->pqtail->next = pq;
- else
- vq->pqhead = pq;
- vq->pqtail = pq;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- #if DEBUG
- int at = 0; // счетчик циклов пробуждения
- #endif
- do { // пойдем спать до появления свободного места в очереди данных
- #if DEBUG
- printf ("%ld prod cond wait (cnt: %d at: %d) %s",
- (long)gettid(), vq->cnt, at++, (char *)(p->data));
- pq->signaled = 0;
- #endif
- pthread_cond_wait(&pq->cond, &q->lock);
- } while(vq->max <= vq->cnt && vq->is_alive);
- // проснулись и владеем очередью
- /*
- Вот тонкий момент. Порядок активизации потоков не определен,
- а нам надо соблюдать очередность данных.
- Поэтому переустановим локальные переменные из очереди,
- хотя это могут быть данные, положенные туда другим потоком.
- */
- #if DEBUG
- if (pq != vq->pqhead) {
- printf ("BAAAD %ld (cnt: %d at: %d) %s",
- (long)gettid(), vq->cnt, at, (char *)(p->data));
- print_queue(q, 0);
- if (vq->is_alive)
- exit(1); // совсем плохо, такого быть не должно
- else
- puts("CONTINUE");
- }
- #endif
- pq = vq->pqhead; // в любом случае берем голову очереди производителей
- if ((vq->pqhead = pq->next) == 0) // и удаляем ее
- vq->pqtail = 0;
- asm volatile ("" : : : "memory");
- p = pq->item;
- free(pq);
- #if DEBUG
- printf ("%ld prod enqueued after wait (cnt: %d at: %d) %s",
- (long)gettid(), vq->cnt, at, (char *)(p->data));
- #endif
- }
- // вот тут реально кладем data в очередь для потребителей
- if (vq->tail)
- vq->tail->next = p;
- else {
- vq->head = p;
- // очередь была пуста, а теперь нет -- надо разбудить потребителей
- pthread_cond_broadcast(&q->cond);
- }
- vq->tail = p;
- vq->cnt++;
- asm volatile ("" : : : "memory");
- // сбросим изменения очереди в память
- // разрешаем доступ всем снова
- pthread_mutex_unlock(&q->lock);
- }
- #if DEBUG
- #define cond_signal_producer(q) ({ \
- if ((q)->pqhead) { \
- (q)->pqhead->signaled = 1; \
- pthread_cond_signal(&(q)->pqhead->cond); \
- } \
- })
- #else
- #define cond_signal_producer(q) ({ \
- if ((q)->pqhead) \
- pthread_cond_signal(&(q)->pqhead->cond); \
- })
- #endif
- void *
- dequeue(struct producer_consumer_queue *q)
- {
- volatile struct producer_consumer_queue *vq = q;
- // получаем эксклюзивный доступ к очереди:
- pthread_mutex_lock(&q->lock);
- // если есть спящие производители, то разбудим первого
- cond_signal_producer(vq);
- while (!vq->head && vq->is_alive) {
- // очередь пуста, делать нечего, ждем...
- pthread_cond_wait(&q->cond, &q->lock);
- // wait разрешает доступ другим на время ожидания
- }
- // запоминаем текущий элемент или 0, если очередь умерла
- struct producer_consumer_queue_item *p = vq->head;
- if (p) {
- // и удаляем его из очереди
- vq->head = vq->head->next;
- if (!vq->head)
- vq->tail = vq->head;
- vq->cnt--;
- asm volatile ("" : : : "memory");
- // сбросим изменения очереди в память
- // разбудим первого поставщика в их очереди
- cond_signal_producer(vq);
- }
- // возвращаем эксклюзивный доступ другим участникам
- pthread_mutex_unlock(&q->lock);
- // отдаём данные
- void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
- // согласно 7.20.3.2/2, можно не проверять на 0
- free(p);
- return data;
- }
- struct producer_consumer_queue *
- producer_consumer_queue_create(int max)
- {
- struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
- q->head = q->tail = 0;
- q->pqhead = q->pqtail = 0;
- q->is_alive = 1;
- q->max = max;
- q->cnt = 0;
- pthread_mutex_init(&q->lock, 0);
- pthread_cond_init(&q->cond, 0);
- return q;
- }
- // И процедура для закрытия очереди:
- void
- producer_consumer_queue_stop(struct producer_consumer_queue *q)
- {
- volatile struct producer_consumer_queue *vq = q;
- // для обращения к разделяемым переменным необходим эксклюзивный доступ
- pthread_mutex_lock(&q->lock);
- vq->is_alive = 0;
- pthread_cond_broadcast(&q->cond); // разбудим потребителей
- volatile struct producer_queue_item *pq;
- for (pq = vq->pqhead; pq; pq = pq->next) {
- #if DEBUG
- pq->signaled = 1;
- asm volatile ("" : : : "memory");
- #endif
- // будим каждого ждущего производителя
- pthread_cond_signal((pthread_cond_t *)&pq->cond);
- }
- pthread_mutex_unlock(&q->lock);
- }
- // это поток-потребитель
- void *
- consumer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- printf("Consumer: %ld\n", (long)gettid());
- for (;;) {
- void *data = dequeue(q);
- // это сигнал, что очередь окончена
- if (!data)
- break; // значит, пора закрывать поток
- char *str = (char *)data;
- // тут наша обработка данных
- printf ("%ld consuming: %s", (long)gettid(), str);
- sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
- // printf ("%ld consumed: %s", (long)gettid(), str);
- free(str);
- }
- printf ("%ld cons exit\n", (long)gettid());
- return 0;
- }
- // делаем тестовые строки с последовательными номерами
- char *
- getdata()
- {
- char *str = (char *)malloc(100);
- volatile static int seq = 0;
- static pthread_mutex_t seqlock = PTHREAD_MUTEX_INITIALIZER;
- pthread_mutex_lock(&seqlock);
- sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
- printf("%s", str);
- pthread_mutex_unlock(&seqlock);
- return str;
- }
- // это поток-производитель
- void *
- producer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- printf("Producer: %ld\n", (long)gettid());
- while (q->is_alive) {
- enqueue(getdata(), q);
- sleep(rand() % 2 + 1);
- }
- printf ("%ld prod exit\n", (long)gettid());
- return 0;
- }
- void
- print_queue (struct producer_consumer_queue *q, int lock)
- {
- if (lock)
- pthread_mutex_lock(&q->lock);
- printf ("cnt = %d queue:\n", q->cnt);
- struct producer_consumer_queue_item *p;
- for (p = q->head; p; p = p->next)
- printf ("%s", (char *)(p->data));
- puts ("producers queue:");
- struct producer_queue_item *pq;
- for (pq= q->pqhead; pq; pq = pq->next)
- #if DEBUG
- printf ("%ld (%d) %s",
- (long)pq->tid, pq->signaled, (char *)(pq->item->data));
- #else
- printf ("%s", (char *)(pq->item->data));
- #endif
- puts("");
- if (lock)
- pthread_mutex_unlock(&q->lock);
- }
- int
- main (int ac, char *av[])
- {
- int swp = 0, n = av[1] ? atoi(av[1]) : 5;
- if (n < 0) {
- n = -n; swp = 1;
- }
- if (n < 2)
- n = 5;
- int i, nc = n / 2, np = n - nc;
- if (swp) {
- swp = nc; nc = np; np = swp;
- }
- printf ("test %d consumers and %d producers\n", nc, np);
- pthread_t threads[n];
- void *res = 0;
- char *in = NULL;
- size_t sz;
- // создадим очередь:
- struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
- // и потоки-«потребители» (изм. consumer на consumer_thread)
- for (i = 0; i < nc; i++)
- pthread_create(threads + i, 0, consumer_thread, (void *)q);
- // и потоки "производители"
- for (; i < n; i++)
- pthread_create(threads + i, 0, producer_thread, (void *)q);
- // главный цикл
- // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
- while ((puts("Enter (for info or ^D for exit)"),
- getline(&in, &sz, stdin)) > 0) {
- #if 0
- enqueue(in, q);
- in = NULL;
- #endif
- print_queue(q, 1);
- }
- producer_consumer_queue_stop(q);
- puts("Fin2");
- print_queue(q, 1);
- for (i = 0; i < n; i++)
- if (pthread_join(threads[i], &res))
- perror("join");
- return (long)res;
- }
- /*
- avp 2011
- there are no gettid() in Linux libc
- so make it
- */
- #include <sys/types.h>
- #include <sys/syscall.h>
- pid_t
- gettid()
- {
- return syscall(SYS_gettid);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement