Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Паттерн producer/consumer достаточно часто встречается в многопоточном программировании. Его смысл состоит в том, что один или несколько потоков _производят_ данные, и параллельно этому один или несколько потоков _потребляют_ их.
- Например, произведённые данные могут быть представлять вычислительно интенсивное _задание_. В этом случае разумно иметь единственный производящий поток, и несколько выполняющих потоков (например, столько, сколько в системе ядер процессора, если узкое место обработки — вычисления). Или производящий поток загружает данные из сети, а по окончанию загрузки выполняющий поток производит разбор загруженных данных.
- Нетривиальность проблемы заключается в том, что потенциально как создание новых данных, так и их потребление могут занимать длительное время, и хотелось бы, чтобы обработка шла без простоев, на максимально возможной скорости.
- Производящий поток (или потоки) называется «производитель», «поставщик» или просто «producer», потребляющий (-ие) — «потребитель» или «consumer».
- Реализация данного паттерна различается по степени сложности в различных языках.
- ---
- **Имплементация на C с pthreads**
- В C, в соответствии с духом языка, нет встроенных высокоуровневых синхронизирующихся коллекций. Наверное самой популярной и широко используемой библиотекой, реализующей многопоточность, является pthreads. С её помощью паттерн можно реализовать так (спасибо за код @avp):
- #include <pthread.h>
- // объявляем структуру данных для одного задания
- struct producer_consumer_queue_item {
- struct producer_consumer_queue_item *next;
- // здесь идут собственно данные. вы можете поменять этот кусок,
- // использовав структуру, более специфичную для вашей задачи
- void *data;
- };
- // объявляем очередь с дополнительными структурами для синхронизации.
- // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
- struct producer_consumer_queue {
- struct producer_consumer_queue_item *head, *tail;
- // head == tail == 0, если очередь пуста
- pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
- pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
- int is_alive; // показывает, не закончила ли очередь свою работу
- };
- Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
- void
- enqueue (void *data, struct producer_consumer_queue *q)
- {
- // упакуем задание в новую структуру
- struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
- p->data = data;
- p->next = 0;
- // получим "эксклюзивный" доступ к очереди заданий
- pthread_mutex_lock(&q->lock);
- // ... и добавим новое задание туда:
- if (q->tail)
- q->tail->next = p;
- else {
- q->head = p;
- // очередь была пуста, а теперь нет -- надо разбудить потребителей
- pthread_cond_broadcast(&q->cond);
- }
- q->tail = p;
- // разрешаем доступ всем снова
- pthread_mutex_unlock(&q->lock);
- }
- void *
- dequeue(struct producer_consumer_queue *q)
- {
- // получаем эксклюзивный доступ к очереди:
- pthread_mutex_lock(&q->lock);
- while (!q->head && q->is_alive) {
- // очередь пуста, делать нечего, ждем...
- pthread_cond_wait(&q->cond, &q->lock);
- // wait разрешает доступ другим на время ожидания
- }
- // запоминаем текущий элемент или 0, если очередь умерла
- struct producer_consumer_queue_item *p = q->head;
- if (p)
- {
- // и удаляем его из очереди
- q->head = q->head->next;
- if (!q->head)
- q->tail = q->head;
- }
- // возвращаем эксклюзивный доступ другим участникам
- pthread_mutex_unlock(&q->lock);
- // отдаём данные
- void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
- // согласно 7.20.3.2/2, можно не проверять на 0
- free(p);
- return data;
- }
- Ещё нужна процедура для инициализации очереди:
- struct producer_consumer_queue *
- producer_consumer_queue_create()
- {
- struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
- q->head = q->tail = 0;
- q->is_alive = 1;
- pthread_mutex_init(&q->lock, 0);
- pthread_cond_init(&q->cond, 0);
- return q;
- }
- И процедура для закрытия очереди:
- void
- producer_consumer_queue_stop(struct producer_consumer_queue *q)
- {
- // для обращения к разделяемым переменным необходим эксклюзивный доступ
- pthread_mutex_lock(&q->lock);
- q->is_alive = 0;
- pthread_cond_broadcast(&q->cond);
- pthread_mutex_unlock(&q->lock);
- }
- Отлично, у нас есть всё, что нам надо.
- Как использовать это? Нужно:
- * запустить несколько потоков-«производителей» и несколько «потребителей»
- * придумать структуру данных для задания
- Пример: (производитель — главный поток, потребители — 2 потока)
- // это поток-потребитель
- void *
- consumer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- for (;;) {
- void *data = dequeue(q);
- // это сигнал, что очередь окончена
- if (!data)
- break; // значит, пора закрывать поток
- char *str = (char *)data;
- // тут наша обработка данных
- printf ("consuming: %s\n", str);
- sleep(2);
- printf ("consumed: %s\n", str);
- free(str);
- }
- return 0;
- }
- int
- main ()
- {
- pthread_t consumer_threads[2];
- void *res = 0;
- char *in = NULL;
- size_t sz;
- // создадим очередь:
- struct producer_consumer_queue *q = producer_consumer_queue_create();
- // и потоки-«потребители»
- pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
- pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
- // главный цикл
- // получаем данные с клавиатуры:
- while (getline(&in, &sz, stdin) > 0) {
- enqueue(in, q);
- in = NULL;
- }
- producer_consumer_queue_stop(q);
- if (pthread_join(consumer_threads[0], &res) ||
- pthread_join(consumer_threads[1], &res))
- perror("join");
- return (long)res;
- }
- ---
- **Имплементация на C#**
- Если вы работаете со старой версией C#, вы можете воспользоваться встроенным классом `Monitor`, который является аналогом condition variable из pthreads.
- public class ProducerConsumer<T> where T : class
- {
- object mutex = new object();
- Queue<T> queue = new Queue<T>();
- bool isDead = false;
- public void Enqueue(T task)
- {
- if (task == null)
- throw new ArgumentNullException("task");
- lock (mutex)
- {
- if (isDead)
- throw new InvalidOperationException("Queue already stopped");
- queue.Enqueue(task);
- Monitor.Pulse(mutex);
- }
- }
- public T Dequeue()
- {
- lock (mutex)
- {
- while (queue.Count == 0 && !isDead)
- Monitor.Wait(mutex);
- if (queue.Count == 0)
- return null;
- return queue.Dequeue();
- }
- }
- public void Stop()
- {
- lock (mutex)
- {
- isDead = true;
- Monitor.PulseAll(mutex);
- }
- }
- }
- Использование (аналогичный пример):
- class Program
- {
- static public void Main()
- {
- new Program().Run();
- }
- ProducerConsumer<string> q = new ProducerConsumer<string>();
- void Run()
- {
- var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
- foreach (var t in threads)
- t.Start();
- string s;
- while ((s = Console.ReadLine()).Length != 0)
- q.Enqueue(s);
- q.Stop();
- foreach (var t in threads)
- t.Join();
- }
- void Consumer()
- {
- while (true)
- {
- string s = q.Dequeue();
- if (s == null)
- break;
- Console.WriteLine("Processing: {0}", s);
- Thread.Sleep(2000);
- Console.WriteLine("Processed: {0}", s);
- }
- }
- }
- ---
- Для современных версий языка (начиная с C# 4.0), лучше, по совету @Flammable, [воспользоваться готовым классом `BlockingCollection`](http://www.albahari.com/threading/part5.aspx#_BlockingCollectionT), представляющим ту же функциональность. Пример:
- class Program
- {
- static public void Main()
- {
- new Program().Run();
- }
- BlockingCollection<string> q = new BlockingCollection<string>();
- void Run()
- {
- var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
- foreach (var t in threads)
- t.Start();
- string s;
- while ((s = Console.ReadLine()).Length != 0)
- q.Add(s);
- q.CompleteAdding(); // останавливаем
- foreach (var t in threads)
- t.Join();
- }
- void Consumer()
- {
- foreach (var s in q.GetConsumingEnumerable())
- {
- Console.WriteLine("Processing: {0}", s);
- Thread.Sleep(2000);
- Console.WriteLine("Processed: {0}", s);
- }
- }
- }
- ==================================================================
- Comments:
- Я поправил сишную часть по мелочам, добавил struct перед producer_consumer_queue, заменил sleep(2000) на sleep(2) и убрал `l` из `main()`, все равно не используется.
- Единственная серьезная правка, это добавил в `producer_consumer_queue_stop()` `pthread_cond_broadcast(&q->cond);`.
- Без этого вызова потоки остаются ждать на `cond_wait` и `main` "зависает" на `join`.
- --
- Наверное стоит добавить (про си), что этот вариант правильно работает только для очереди "бесконечной" длины, т.е. когда производители не переходят в ожидание.
- Вечерком набросаю код для этого случая.
- (20 Май '13 21:38) avp
- @avp: спасибо, что заметили! в шарповом коде в `Stop()` я не забыл `Monitor.PulseAll`, а тут забыл. Кстати, у нас немного разные стратегии в `enqueue`: вы делаете broadcast только когда очередь была пустой, а я делаю signal каждый раз. Судя по всему, и так и так корректно.
- (20 Май '13 22:45) VladD
- Да, в такой ситуации корректно. pthread_cond сигналы не накапливаются.
- Более того, если в момент вызова pthread_cond_broadcast()/pthread_cond_signal() нет потоков, ожидающих на cond, то сигнал попросту теряется. Нельзя сигнализировать "впрок".
- --
- Это серьезное отличие от семафоров, источник потенциальных ошибок синхронизации. В случае семафоров при каждой доставке данных в очередь, можно вызывать post и счетчик семафора будет содержать коолчество элементов очереди. Естественно, потребитель говорит wait на семафор каждый раз, когда он собирается взять данные из очереди.
- (20 Май '13 23:21) avp
- А не могли бы вы развить данное исследование до уровня синхронизации между процессами, при обращении к общим ресурсам (не только между потоками)?
- (22 Май '13 14:11) mister
- @mister: синхронизация между процессами — другое дело, т. к. отсутствует разделяемый ресурс: общая память. Например, можно устроить один хост-диспетчер, который будет (1) принимать задачи от хостов-поставщиков, (2) класть их во внутреннюю producer-consumer-очередь, и (3) раздавать хостам-потребителям на выполнение.
- (22 Май '13 14:53) VladD
- Ок, спасибо за ответы.
- (22 Май '13 15:23) mister
- @VladD, я сделал еще пару тестовых программок для очередей с ограничением на количество данных и ожиданием производителя, пока в очереди не появится свободное место и положил их все в pastebin все 3 + утилитка в одном файле.
- В pq2.c активизируется произвольный ждущий производитель, а в pq3.c "правильный" (тот, что раньше пришел). Плюс поправил работу с памятью между потоками, добавив кое-где volatile.
- --
- Посмотрите, добавьте в текст исследования, что сочтете интересным.
- Если появятся какие-то вопросы, то лучше пишите мне на почту (доступна через мой ник).
- (23 Май '13 1:30) avp
- Извиняюсь за глупый вопросы. Метод `Dequeue`
- Почему `while`, а не `if`?
- ... while (queue.Count == 0 && !isDead) Monitor.Wait(mutex);
- (27 Май '14 13:19) Veikedo
- @VladD, благодарю. Наверное самое хорошее русскоязычное описание паттерна Producer-Consumer. Очень помогло при записи из разных потоков в один файл.
- (13 Июл '14 22:52) z668
- pthread_mutex_lock(&q->lock); // ... и добавим новое задание туда: if (q->tail) q->tail->next = p; else { q->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&q->cond); } q->tail = p;
- Почему, если tail существовал, мы сначала ему в next записываем p, а потом перезатираем сам tail этим p? Зачем вообще эта операция. И второе - правильно ли я понимаю, что даже если мы уже сделали бродкаст, пока мы не вызовем "pthread_mutex_unlock", потребители будут висеть на pthread_cond_wait?
- (29 Окт '14 14:34) smallFish
- @smallFish, по поводу broadcast/wait/unlock правильно понимаете.
- Что же касается первого вопроса, то сейчас я уже не помню этих деталей и опять влезать в код неохота. Можете поправить, как считаете будет правильно.
- --
- Кстати, а у Вас оповещения об изменениях здесь на почту приходят?
- (у меня нет, так что время реакции непредсказуемо).
- (10 Дек '14 3:03) avp
- ====================================================================================================
- /*
- avp 3 тестовые pthread программки для очередей
- */
- // pq1.c потребитель-производитель с "бесконечной" очередью
- #include <stdio.h>
- #include <stdlib.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <pthread.h>
- // объявляем структуру данных для одного задания
- struct producer_consumer_queue_item {
- struct producer_consumer_queue_item *next;
- // здесь идут собственно данные. вы можете поменять этот кусок,
- // использовав структуру, более специфичную для вашей задачи
- void *data;
- };
- // объявляем очередь с дополнительными структурами для синхронизации.
- // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
- struct producer_consumer_queue {
- struct producer_consumer_queue_item *head, *tail;
- // head == tail == 0, если очередь пуста
- pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
- pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
- int is_alive; // показывает, не закончила ли очередь свою работу
- };
- // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
- void
- enqueue (void *data, struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // упакуем задание в новую структуру
- struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
- p->data = data;
- p->next = 0;
- // получим "эксклюзивный" доступ к очереди заданий
- pthread_mutex_lock(&aq->lock);
- // ... и добавим новое задание туда:
- if (q->tail)
- q->tail->next = p;
- else {
- q->head = p;
- // очередь была пуста, а теперь нет -- надо разбудить потребителей
- pthread_cond_broadcast(&aq->cond);
- }
- q->tail = p;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- // разрешаем доступ всем снова
- pthread_mutex_unlock(&aq->lock);
- }
- void *
- dequeue(struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // получаем эксклюзивный доступ к очереди:
- pthread_mutex_lock(&aq->lock);
- while (!q->head && q->is_alive) {
- // очередь пуста, делать нечего, ждем...
- pthread_cond_wait(&aq->cond, &aq->lock);
- // wait разрешает доступ другим на время ожидания
- }
- // запоминаем текущий элемент или 0, если очередь умерла
- struct producer_consumer_queue_item *p = q->head;
- if (p)
- {
- // и удаляем его из очереди
- q->head = q->head->next;
- if (!q->head)
- q->tail = q->head;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- }
- // возвращаем эксклюзивный доступ другим участникам
- pthread_mutex_unlock(&aq->lock);
- // отдаём данные
- void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
- // согласно 7.20.3.2/2, можно не проверять на 0
- free(p);
- return data;
- }
- struct producer_consumer_queue *
- producer_consumer_queue_create()
- {
- struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
- q->head = q->tail = 0;
- q->is_alive = 1;
- pthread_mutex_init(&q->lock, 0);
- pthread_cond_init(&q->cond, 0);
- return q;
- }
- // И процедура для закрытия очереди:
- void
- producer_consumer_queue_stop(struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // для обращения к разделяемым переменным необходим эксклюзивный доступ
- pthread_mutex_lock(&aq->lock);
- q->is_alive = 0;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- pthread_cond_broadcast(&aq->cond); // !!!!! avp
- pthread_mutex_unlock(&aq->lock);
- }
- // это поток-потребитель
- void *
- consumer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- for (;;) {
- void *data = dequeue(q);
- // это сигнал, что очередь окончена
- if (!data)
- break; // значит, пора закрывать поток
- char *str = (char *)data;
- // тут наша обработка данных
- printf ("consuming: %s\n", str);
- sleep(2); // 2000 заменил на 2 avp
- printf ("consumed: %s\n", str);
- free(str);
- }
- return 0;
- }
- int
- main ()
- {
- pthread_t consumer_threads[2];
- void *res = 0;
- char *in = NULL;
- size_t sz;
- int l;
- // создадим очередь:
- struct producer_consumer_queue *q = producer_consumer_queue_create(); //add struct
- // и потоки-«потребители» (изм. consumer на consumer_thread)
- pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
- pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
- // главный цикл
- // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
- while ((puts("Enter"), getline(&in, &sz, stdin)) > 0) {
- enqueue(in, q);
- in = NULL;
- }
- producer_consumer_queue_stop(q);
- puts("Fin2");
- if (pthread_join(consumer_threads[0], &res) ||
- pthread_join(consumer_threads[1], &res))
- perror("join");
- return (long)res;
- }
- // pq2.c очередь ограниченного размера без упорядочивания ожидающих производителей
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <pthread.h>
- // объявляем структуру данных для одного задания
- struct producer_consumer_queue_item {
- struct producer_consumer_queue_item *next;
- // здесь идут собственно данные. вы можете поменять этот кусок,
- // использовав структуру, более специфичную для вашей задачи
- void *data;
- };
- // объявляем очередь с дополнительными структурами для синхронизации.
- // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
- struct producer_consumer_queue {
- struct producer_consumer_queue_item *head, *tail;
- // head == tail == 0, если очередь пуста
- pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
- pthread_cond_t condp; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
- pthread_cond_t condc; // этот cond "сигналим", когда в очереди ПОЯВИЛОСЬ СВОБОДНОЕ МЕСТО
- int is_alive; // показывает, не закончила ли очередь свою работу
- int max, cnt, // максимальный размер очереди и число заданий в ней
- pqcnt; // количество производителей, ждущих свободного места в очереди
- };
- void print_queue (struct producer_consumer_queue *q, int lock);
- extern
- #ifdef __cplusplus
- "C"
- #endif
- pid_t gettid();
- // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
- void
- enqueue (void *data, struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // упакуем задание в новую структуру
- struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
- p->data = data;
- p->next = 0;
- // получим "эксклюзивный" доступ к очереди заданий
- pthread_mutex_lock(&aq->lock);
- // проверим не переполнена ли она
- if (q->max <= q->cnt) {
- q->pqcnt++;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- // будем ждать пока потребители ее слегка не опустошат
- while(q->max <= q->cnt & q->is_alive)
- pthread_cond_wait(&aq->condc, &aq->lock);
- q->pqcnt--;
- asm volatile ("" : : : "memory");
- }
- // ... и добавим новое задание туда:
- if (q->tail)
- q->tail->next = p;
- else {
- q->head = p;
- // очередь была пуста, а теперь нет -- надо разбудить потребителей
- pthread_cond_broadcast(&aq->condp);
- }
- q->tail = p;
- q->cnt++;
- asm volatile ("" : : : "memory");
- // разрешаем доступ всем снова
- pthread_mutex_unlock(&aq->lock);
- }
- void *
- dequeue(struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // получаем эксклюзивный доступ к очереди:
- pthread_mutex_lock(&aq->lock);
- if (q->pqcnt && q->max > q->cnt)
- // в очереди есть место, а кто-то спит, разбудим их
- pthread_cond_broadcast(&aq->condc);
- while (!q->head && q->is_alive) {
- // очередь пуста, делать нечего, ждем...
- pthread_cond_wait(&aq->condp, &aq->lock);
- // wait разрешает доступ другим на время ожидания
- }
- // запоминаем текущий элемент или 0, если очередь умерла
- struct producer_consumer_queue_item *p = q->head;
- if (p) {
- // и удаляем его из очереди
- q->head = q->head->next;
- if (!q->head)
- q->tail = q->head;
- q->cnt--;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- // разбудим поставщиков в их очереди
- pthread_cond_broadcast(&aq->condc);
- }
- // возвращаем эксклюзивный доступ другим участникам
- pthread_mutex_unlock(&aq->lock);
- // отдаём данные
- void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
- // согласно 7.20.3.2/2, можно не проверять на 0
- free(p);
- return data;
- }
- struct producer_consumer_queue *
- producer_consumer_queue_create(int max)
- {
- struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
- q->head = q->tail = 0;
- q->is_alive = 1;
- q->max = max;
- q->cnt = 0;
- q->pqcnt = 0;
- pthread_mutex_init(&q->lock, 0);
- pthread_cond_init(&q->condc, 0);
- pthread_cond_init(&q->condp, 0);
- return q;
- }
- // И процедура для закрытия очереди:
- void
- producer_consumer_queue_stop(struct producer_consumer_queue *aq)
- {
- volatile struct producer_consumer_queue *q = aq;
- // для обращения к разделяемым переменным необходим эксклюзивный доступ
- pthread_mutex_lock(&aq->lock);
- q->is_alive = 0;
- asm volatile ("" : : : "memory");
- pthread_cond_broadcast(&aq->condc);
- pthread_cond_broadcast(&aq->condp);
- pthread_mutex_unlock(&aq->lock);
- }
- // это поток-потребитель
- void *
- consumer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- printf("Consumer: %ld\n", (long)gettid());
- for (;;) {
- void *data = dequeue(q);
- // это сигнал, что очередь окончена
- if (!data)
- break; // значит, пора закрывать поток
- char *str = (char *)data;
- // тут наша обработка данных
- printf ("%ld consuming: %s", (long)gettid(), str);
- sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
- // printf ("%ld consumed: %s", (long)gettid(), str);
- free(str);
- }
- printf ("%ld cons exit\n", (long)gettid());
- return 0;
- }
- char *
- getdata()
- {
- char *str = (char *)malloc(100);
- volatile static int seq = 0;
- static pthread_mutex_t seqlock = PTHREAD_MUTEX_INITIALIZER;
- pthread_mutex_lock(&seqlock);
- sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
- printf("%s", str);
- pthread_mutex_unlock(&seqlock);
- return str;
- }
- void *
- producer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- printf("Producer: %ld\n", (long)gettid());
- while (q->is_alive) {
- enqueue(getdata(), q);
- sleep(rand() % 2 + 1);
- }
- printf ("%ld prod exit\n", (long)gettid());
- return 0;
- }
- void
- print_queue (struct producer_consumer_queue *q, int lock)
- {
- if (lock)
- pthread_mutex_lock(&q->lock);
- printf ("cnt = %d pqcnt = %d queue:\n", q->cnt, q->pqcnt);
- struct producer_consumer_queue_item *p;
- for (p = q->head; p; p = p->next)
- printf ("%s", (char *)(p->data));
- puts("");
- if (lock)
- pthread_mutex_unlock(&q->lock);
- }
- int
- main (int ac, char *av[])
- {
- int swp = 0, n = av[1] ? atoi(av[1]) : 5;
- if (n < 0) {
- n = -n; swp = 1;
- }
- if (n < 2)
- n = 5;
- int i, nc = n / 2, np = n - nc;
- if (swp) {
- swp = nc; nc = np; np = swp;
- }
- printf ("test %d consumers and %d producers\n", nc, np);
- pthread_t threads[n];
- void *res = 0;
- char *in = NULL;
- size_t sz;
- // создадим очередь:
- struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
- // и потоки-«потребители» (изм. consumer на consumer_thread)
- for (i = 0; i < nc; i++)
- pthread_create(threads + i, 0, consumer_thread, (void *)q);
- // и потоки "производители"
- for (; i < n; i++)
- pthread_create(threads + i, 0, producer_thread, (void *)q);
- // главный цикл
- // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
- while ((puts("Enter (for info or ^D for exit)"),
- getline(&in, &sz, stdin)) > 0) {
- #if 0
- enqueue(in, q);
- in = NULL;
- #endif
- print_queue(q, 1);
- }
- producer_consumer_queue_stop(q);
- puts("Fin2");
- print_queue(q, 1);
- for (i = 0; i < n; i++)
- if (pthread_join(threads[i], &res))
- perror("join");
- return (long)res;
- }
- // pq3.c очередь ограниченного размера с упорядочиванием ожидающих производителей
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include <pthread.h>
- // объявляем структуру данных для одного задания
- struct producer_consumer_queue_item {
- struct producer_consumer_queue_item *next;
- // здесь идут собственно данные. вы можете поменять этот кусок,
- // использовав структуру, более специфичную для вашей задачи
- void *data;
- };
- // струкура данных для спящего (ждущего свободного места) потока-производителя
- struct producer_queue_item {
- struct producer_queue_item *next;
- struct producer_consumer_queue_item *item; // данные для которых нет места
- pthread_cond_t cond; // этот cond "сигналим", когда в очереди появилось место
- #if DEBUG
- pid_t tid; // linux thread id for debug print
- int signaled; // индикатор "побудки" for debug print
- #endif
- };
- // объявляем очередь данных с дополнительными структурами для синхронизации.
- // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
- struct producer_consumer_queue {
- struct producer_consumer_queue_item *head, *tail;
- // head == tail == 0, если очередь пуста
- pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
- pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
- int is_alive; // показывает, не закончила ли очередь свою работу
- int max, cnt; // максимальный размер очереди и число заданий в ней
- // очередь потоков-производителей, ждущих свободного места для своих данных
- struct producer_queue_item *pqhead,
- *pqtail;
- };
- extern
- #ifdef __cplusplus
- "C"
- #endif
- pid_t gettid();
- void print_queue (struct producer_consumer_queue *q, int lock);
- // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
- void
- enqueue (void *data, struct producer_consumer_queue *q)
- {
- volatile struct producer_consumer_queue *vq = q;
- // упакуем задание в новую структуру
- struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
- p->data = data;
- p->next = 0;
- // получим "эксклюзивный" доступ к очереди заданий
- pthread_mutex_lock(&q->lock);
- #if DEBUG
- printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data));
- #endif
- // ... и добавим новое задание туда:
- if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать
- #if DEBUG
- if (vq->cnt < vq->max) {
- puts("========================");
- print_queue(q, 0);
- puts("========================");
- }
- #endif
- struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq));
- pthread_cond_init(&pq->cond, 0); // cond по которому его разбудят
- pq->next = 0;
- pq->item = p; // сохраним данные на время сна
- #if DEBUG
- pq->tid = gettid();
- #endif
- // поместим себя в очередь спящих производителей
- if (vq->pqtail)
- vq->pqtail->next = pq;
- else
- vq->pqhead = pq;
- vq->pqtail = pq;
- asm volatile ("" : : : "memory");
- // зафиксируем изменения очереди в памяти
- #if DEBUG
- int at = 0; // счетчик циклов пробуждения
- #endif
- do { // пойдем спать до появления свободного места в очереди данных
- #if DEBUG
- printf ("%ld prod cond wait (cnt: %d at: %d) %s",
- (long)gettid(), vq->cnt, at++, (char *)(p->data));
- pq->signaled = 0;
- #endif
- pthread_cond_wait(&pq->cond, &q->lock);
- } while(vq->max <= vq->cnt && vq->is_alive);
- // проснулись и владеем очередью
- /*
- Вот тонкий момент. Порядок активизации потоков не определен,
- а нам надо соблюдать очередность данных.
- Поэтому переустановим локальные переменные из очереди,
- хотя это могут быть данные, положенные туда другим потоком.
- */
- #if DEBUG
- if (pq != vq->pqhead) {
- printf ("BAAAD %ld (cnt: %d at: %d) %s",
- (long)gettid(), vq->cnt, at, (char *)(p->data));
- print_queue(q, 0);
- if (vq->is_alive)
- exit(1); // совсем плохо, такого быть не должно
- else
- puts("CONTINUE");
- }
- #endif
- pq = vq->pqhead; // в любом случае берем голову очереди производителей
- if ((vq->pqhead = pq->next) == 0) // и удаляем ее
- vq->pqtail = 0;
- asm volatile ("" : : : "memory");
- p = pq->item;
- free(pq);
- #if DEBUG
- printf ("%ld prod enqueued after wait (cnt: %d at: %d) %s",
- (long)gettid(), vq->cnt, at, (char *)(p->data));
- #endif
- }
- // вот тут реально кладем data в очередь для потребителей
- if (vq->tail)
- vq->tail->next = p;
- else {
- vq->head = p;
- // очередь была пуста, а теперь нет -- надо разбудить потребителей
- pthread_cond_broadcast(&q->cond);
- }
- vq->tail = p;
- vq->cnt++;
- asm volatile ("" : : : "memory");
- // сбросим изменения очереди в память
- // разрешаем доступ всем снова
- pthread_mutex_unlock(&q->lock);
- }
- #if DEBUG
- #define cond_signal_producer(q) ({ \
- if ((q)->pqhead) { \
- (q)->pqhead->signaled = 1; \
- pthread_cond_signal(&(q)->pqhead->cond); \
- } \
- })
- #else
- #define cond_signal_producer(q) ({ \
- if ((q)->pqhead) \
- pthread_cond_signal(&(q)->pqhead->cond); \
- })
- #endif
- void *
- dequeue(struct producer_consumer_queue *q)
- {
- volatile struct producer_consumer_queue *vq = q;
- // получаем эксклюзивный доступ к очереди:
- pthread_mutex_lock(&q->lock);
- // если есть спящие производители, то разбудим первого
- cond_signal_producer(vq);
- while (!vq->head && vq->is_alive) {
- // очередь пуста, делать нечего, ждем...
- pthread_cond_wait(&q->cond, &q->lock);
- // wait разрешает доступ другим на время ожидания
- }
- // запоминаем текущий элемент или 0, если очередь умерла
- struct producer_consumer_queue_item *p = vq->head;
- if (p) {
- // и удаляем его из очереди
- vq->head = vq->head->next;
- if (!vq->head)
- vq->tail = vq->head;
- vq->cnt--;
- asm volatile ("" : : : "memory");
- // сбросим изменения очереди в память
- // разбудим первого поставщика в их очереди
- cond_signal_producer(vq);
- }
- // возвращаем эксклюзивный доступ другим участникам
- pthread_mutex_unlock(&q->lock);
- // отдаём данные
- void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
- // согласно 7.20.3.2/2, можно не проверять на 0
- free(p);
- return data;
- }
- struct producer_consumer_queue *
- producer_consumer_queue_create(int max)
- {
- struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
- q->head = q->tail = 0;
- q->pqhead = q->pqtail = 0;
- q->is_alive = 1;
- q->max = max;
- q->cnt = 0;
- pthread_mutex_init(&q->lock, 0);
- pthread_cond_init(&q->cond, 0);
- return q;
- }
- // И процедура для закрытия очереди:
- void
- producer_consumer_queue_stop(struct producer_consumer_queue *q)
- {
- volatile struct producer_consumer_queue *vq = q;
- // для обращения к разделяемым переменным необходим эксклюзивный доступ
- pthread_mutex_lock(&q->lock);
- vq->is_alive = 0;
- pthread_cond_broadcast(&q->cond); // разбудим потребителей
- volatile struct producer_queue_item *pq;
- for (pq = vq->pqhead; pq; pq = pq->next) {
- #if DEBUG
- pq->signaled = 1;
- asm volatile ("" : : : "memory");
- #endif
- // будим каждого ждущего производителя
- pthread_cond_signal((pthread_cond_t *)&pq->cond);
- }
- pthread_mutex_unlock(&q->lock);
- }
- // это поток-потребитель
- void *
- consumer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- printf("Consumer: %ld\n", (long)gettid());
- for (;;) {
- void *data = dequeue(q);
- // это сигнал, что очередь окончена
- if (!data)
- break; // значит, пора закрывать поток
- char *str = (char *)data;
- // тут наша обработка данных
- printf ("%ld consuming: %s", (long)gettid(), str);
- sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
- // printf ("%ld consumed: %s", (long)gettid(), str);
- free(str);
- }
- printf ("%ld cons exit\n", (long)gettid());
- return 0;
- }
- // делаем тестовые строки с последовательными номерами
- char *
- getdata()
- {
- char *str = (char *)malloc(100);
- volatile static int seq = 0;
- static pthread_mutex_t seqlock = PTHREAD_MUTEX_INITIALIZER;
- pthread_mutex_lock(&seqlock);
- sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
- printf("%s", str);
- pthread_mutex_unlock(&seqlock);
- return str;
- }
- // это поток-производитель
- void *
- producer_thread (void *arg)
- {
- struct producer_consumer_queue *q = (typeof(q))arg;
- printf("Producer: %ld\n", (long)gettid());
- while (q->is_alive) {
- enqueue(getdata(), q);
- sleep(rand() % 2 + 1);
- }
- printf ("%ld prod exit\n", (long)gettid());
- return 0;
- }
- void
- print_queue (struct producer_consumer_queue *q, int lock)
- {
- if (lock)
- pthread_mutex_lock(&q->lock);
- printf ("cnt = %d queue:\n", q->cnt);
- struct producer_consumer_queue_item *p;
- for (p = q->head; p; p = p->next)
- printf ("%s", (char *)(p->data));
- puts ("producers queue:");
- struct producer_queue_item *pq;
- for (pq= q->pqhead; pq; pq = pq->next)
- #if DEBUG
- printf ("%ld (%d) %s",
- (long)pq->tid, pq->signaled, (char *)(pq->item->data));
- #else
- printf ("%s", (char *)(pq->item->data));
- #endif
- puts("");
- if (lock)
- pthread_mutex_unlock(&q->lock);
- }
- int
- main (int ac, char *av[])
- {
- int swp = 0, n = av[1] ? atoi(av[1]) : 5;
- if (n < 0) {
- n = -n; swp = 1;
- }
- if (n < 2)
- n = 5;
- int i, nc = n / 2, np = n - nc;
- if (swp) {
- swp = nc; nc = np; np = swp;
- }
- printf ("test %d consumers and %d producers\n", nc, np);
- pthread_t threads[n];
- void *res = 0;
- char *in = NULL;
- size_t sz;
- // создадим очередь:
- struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
- // и потоки-«потребители» (изм. consumer на consumer_thread)
- for (i = 0; i < nc; i++)
- pthread_create(threads + i, 0, consumer_thread, (void *)q);
- // и потоки "производители"
- for (; i < n; i++)
- pthread_create(threads + i, 0, producer_thread, (void *)q);
- // главный цикл
- // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
- while ((puts("Enter (for info or ^D for exit)"),
- getline(&in, &sz, stdin)) > 0) {
- #if 0
- enqueue(in, q);
- in = NULL;
- #endif
- print_queue(q, 1);
- }
- producer_consumer_queue_stop(q);
- puts("Fin2");
- print_queue(q, 1);
- for (i = 0; i < n; i++)
- if (pthread_join(threads[i], &res))
- perror("join");
- return (long)res;
- }
- /*
- avp 2011
- there are no gettid() in Linux libc
- so make it
- */
- #include <sys/types.h>
- #include <sys/syscall.h>
- pid_t
- gettid()
- {
- return syscall(SYS_gettid);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement