Advertisement
Guest User

Producer/consumer

a guest
Apr 8th, 2015
337
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 46.80 KB | None | 0 0
  1. Паттерн producer/consumer достаточно часто встречается в многопоточном программировании. Его смысл состоит в том, что один или несколько потоков _производят_ данные, и параллельно этому один или несколько потоков _потребляют_ их.
  2.  
  3. Например, произведённые данные могут быть представлять вычислительно интенсивное _задание_. В этом случае разумно иметь единственный производящий поток, и несколько выполняющих потоков (например, столько, сколько в системе ядер процессора, если узкое место обработки — вычисления). Или производящий поток загружает данные из сети, а по окончанию загрузки выполняющий поток производит разбор загруженных данных.
  4.  
  5. Нетривиальность проблемы заключается в том, что потенциально как создание новых данных, так и их потребление могут занимать длительное время, и хотелось бы, чтобы обработка шла без простоев, на максимально возможной скорости.
  6.  
  7. Производящий поток (или потоки) называется «производитель», «поставщик» или просто «producer», потребляющий (-ие) — «потребитель» или «consumer».
  8.  
  9. Реализация данного паттерна различается по степени сложности в различных языках.
  10.  
  11. ---
  12.  
  13. **Имплементация на C с pthreads**
  14.  
  15. В C, в соответствии с духом языка, нет встроенных высокоуровневых синхронизирующихся коллекций. Наверное самой популярной и широко используемой библиотекой, реализующей многопоточность, является pthreads. С её помощью паттерн можно реализовать так (спасибо за код @avp):
  16.  
  17. #include <pthread.h>
  18.  
  19. // объявляем структуру данных для одного задания
  20. struct producer_consumer_queue_item {
  21. struct producer_consumer_queue_item *next;
  22. // здесь идут собственно данные. вы можете поменять этот кусок,
  23. // использовав структуру, более специфичную для вашей задачи
  24. void *data;
  25. };
  26.  
  27. // объявляем очередь с дополнительными структурами для синхронизации.
  28. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  29. struct producer_consumer_queue {
  30. struct producer_consumer_queue_item *head, *tail;
  31. // head == tail == 0, если очередь пуста
  32. pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
  33. pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  34. int is_alive; // показывает, не закончила ли очередь свою работу
  35. };
  36.  
  37. Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  38.  
  39. void
  40. enqueue (void *data, struct producer_consumer_queue *q)
  41. {
  42. // упакуем задание в новую структуру
  43. struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  44. p->data = data;
  45. p->next = 0;
  46.  
  47. // получим "эксклюзивный" доступ к очереди заданий
  48. pthread_mutex_lock(&q->lock);
  49. // ... и добавим новое задание туда:
  50. if (q->tail)
  51. q->tail->next = p;
  52. else {
  53. q->head = p;
  54. // очередь была пуста, а теперь нет -- надо разбудить потребителей
  55. pthread_cond_broadcast(&q->cond);
  56. }
  57. q->tail = p;
  58.  
  59. // разрешаем доступ всем снова
  60. pthread_mutex_unlock(&q->lock);
  61. }
  62.  
  63. void *
  64. dequeue(struct producer_consumer_queue *q)
  65. {
  66. // получаем эксклюзивный доступ к очереди:
  67. pthread_mutex_lock(&q->lock);
  68.  
  69. while (!q->head && q->is_alive) {
  70. // очередь пуста, делать нечего, ждем...
  71. pthread_cond_wait(&q->cond, &q->lock);
  72. // wait разрешает доступ другим на время ожидания
  73. }
  74.  
  75. // запоминаем текущий элемент или 0, если очередь умерла
  76. struct producer_consumer_queue_item *p = q->head;
  77.  
  78. if (p)
  79. {
  80. // и удаляем его из очереди
  81. q->head = q->head->next;
  82. if (!q->head)
  83. q->tail = q->head;
  84. }
  85.  
  86. // возвращаем эксклюзивный доступ другим участникам
  87. pthread_mutex_unlock(&q->lock);
  88.  
  89. // отдаём данные
  90. void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  91. // согласно 7.20.3.2/2, можно не проверять на 0
  92. free(p);
  93. return data;
  94. }
  95.  
  96. Ещё нужна процедура для инициализации очереди:
  97.  
  98. struct producer_consumer_queue *
  99. producer_consumer_queue_create()
  100. {
  101. struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  102. q->head = q->tail = 0;
  103. q->is_alive = 1;
  104. pthread_mutex_init(&q->lock, 0);
  105. pthread_cond_init(&q->cond, 0);
  106.  
  107. return q;
  108. }
  109.  
  110. И процедура для закрытия очереди:
  111.  
  112. void
  113. producer_consumer_queue_stop(struct producer_consumer_queue *q)
  114. {
  115. // для обращения к разделяемым переменным необходим эксклюзивный доступ
  116. pthread_mutex_lock(&q->lock);
  117. q->is_alive = 0;
  118. pthread_cond_broadcast(&q->cond);
  119. pthread_mutex_unlock(&q->lock);
  120. }
  121.  
  122. Отлично, у нас есть всё, что нам надо.
  123.  
  124. Как использовать это? Нужно:
  125.  
  126. * запустить несколько потоков-«производителей» и несколько «потребителей»
  127. * придумать структуру данных для задания
  128.  
  129. Пример: (производитель — главный поток, потребители — 2 потока)
  130.  
  131. // это поток-потребитель
  132. void *
  133. consumer_thread (void *arg)
  134. {
  135. struct producer_consumer_queue *q = (typeof(q))arg;
  136.  
  137. for (;;) {
  138. void *data = dequeue(q);
  139. // это сигнал, что очередь окончена
  140. if (!data)
  141. break; // значит, пора закрывать поток
  142.  
  143. char *str = (char *)data;
  144. // тут наша обработка данных
  145. printf ("consuming: %s\n", str);
  146. sleep(2);
  147. printf ("consumed: %s\n", str);
  148. free(str);
  149. }
  150. return 0;
  151. }
  152.  
  153. int
  154. main ()
  155. {
  156. pthread_t consumer_threads[2];
  157.  
  158. void *res = 0;
  159.  
  160. char *in = NULL;
  161. size_t sz;
  162.  
  163. // создадим очередь:
  164. struct producer_consumer_queue *q = producer_consumer_queue_create();
  165.  
  166. // и потоки-«потребители»
  167. pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
  168. pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
  169.  
  170. // главный цикл
  171. // получаем данные с клавиатуры:
  172. while (getline(&in, &sz, stdin) > 0) {
  173. enqueue(in, q);
  174. in = NULL;
  175. }
  176.  
  177. producer_consumer_queue_stop(q);
  178. if (pthread_join(consumer_threads[0], &res) ||
  179. pthread_join(consumer_threads[1], &res))
  180. perror("join");
  181.  
  182. return (long)res;
  183. }
  184.  
  185. ---
  186.  
  187. **Имплементация на C#**
  188.  
  189. Если вы работаете со старой версией C#, вы можете воспользоваться встроенным классом `Monitor`, который является аналогом condition variable из pthreads.
  190.  
  191. public class ProducerConsumer<T> where T : class
  192. {
  193. object mutex = new object();
  194. Queue<T> queue = new Queue<T>();
  195. bool isDead = false;
  196.  
  197. public void Enqueue(T task)
  198. {
  199. if (task == null)
  200. throw new ArgumentNullException("task");
  201. lock (mutex)
  202. {
  203. if (isDead)
  204. throw new InvalidOperationException("Queue already stopped");
  205. queue.Enqueue(task);
  206. Monitor.Pulse(mutex);
  207. }
  208. }
  209.  
  210. public T Dequeue()
  211. {
  212. lock (mutex)
  213. {
  214. while (queue.Count == 0 && !isDead)
  215. Monitor.Wait(mutex);
  216.  
  217. if (queue.Count == 0)
  218. return null;
  219.  
  220. return queue.Dequeue();
  221. }
  222. }
  223.  
  224. public void Stop()
  225. {
  226. lock (mutex)
  227. {
  228. isDead = true;
  229. Monitor.PulseAll(mutex);
  230. }
  231. }
  232. }
  233.  
  234. Использование (аналогичный пример):
  235.  
  236. class Program
  237. {
  238. static public void Main()
  239. {
  240. new Program().Run();
  241. }
  242.  
  243. ProducerConsumer<string> q = new ProducerConsumer<string>();
  244.  
  245. void Run()
  246. {
  247. var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
  248. foreach (var t in threads)
  249. t.Start();
  250.  
  251. string s;
  252. while ((s = Console.ReadLine()).Length != 0)
  253. q.Enqueue(s);
  254.  
  255. q.Stop();
  256.  
  257. foreach (var t in threads)
  258. t.Join();
  259. }
  260.  
  261. void Consumer()
  262. {
  263. while (true)
  264. {
  265. string s = q.Dequeue();
  266. if (s == null)
  267. break;
  268. Console.WriteLine("Processing: {0}", s);
  269. Thread.Sleep(2000);
  270. Console.WriteLine("Processed: {0}", s);
  271. }
  272. }
  273. }
  274.  
  275. ---
  276.  
  277. Для современных версий языка (начиная с C# 4.0), лучше, по совету @Flammable, [воспользоваться готовым классом `BlockingCollection`](http://www.albahari.com/threading/part5.aspx#_BlockingCollectionT), представляющим ту же функциональность. Пример:
  278.  
  279. class Program
  280. {
  281. static public void Main()
  282. {
  283. new Program().Run();
  284. }
  285.  
  286. BlockingCollection<string> q = new BlockingCollection<string>();
  287.  
  288. void Run()
  289. {
  290. var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
  291. foreach (var t in threads)
  292. t.Start();
  293.  
  294. string s;
  295. while ((s = Console.ReadLine()).Length != 0)
  296. q.Add(s);
  297.  
  298. q.CompleteAdding(); // останавливаем
  299.  
  300. foreach (var t in threads)
  301. t.Join();
  302. }
  303.  
  304. void Consumer()
  305. {
  306. foreach (var s in q.GetConsumingEnumerable())
  307. {
  308. Console.WriteLine("Processing: {0}", s);
  309. Thread.Sleep(2000);
  310. Console.WriteLine("Processed: {0}", s);
  311. }
  312. }
  313. }
  314.  
  315. ==================================================================
  316.  
  317. Comments:
  318.  
  319. Я поправил сишную часть по мелочам, добавил struct перед producer_consumer_queue, заменил sleep(2000) на sleep(2) и убрал `l` из `main()`, все равно не используется.
  320.  
  321. Единственная серьезная правка, это добавил в `producer_consumer_queue_stop()` `pthread_cond_broadcast(&q->cond);`.
  322.  
  323. Без этого вызова потоки остаются ждать на `cond_wait` и `main` "зависает" на `join`.
  324.  
  325. --
  326.  
  327. Наверное стоит добавить (про си), что этот вариант правильно работает только для очереди "бесконечной" длины, т.е. когда производители не переходят в ожидание.
  328.  
  329. Вечерком набросаю код для этого случая.
  330. (20 Май '13 21:38) avp
  331.  
  332. @avp: спасибо, что заметили! в шарповом коде в `Stop()` я не забыл `Monitor.PulseAll`, а тут забыл. Кстати, у нас немного разные стратегии в `enqueue`: вы делаете broadcast только когда очередь была пустой, а я делаю signal каждый раз. Судя по всему, и так и так корректно.
  333. (20 Май '13 22:45) VladD
  334.  
  335. Да, в такой ситуации корректно. pthread_cond сигналы не накапливаются.
  336.  
  337. Более того, если в момент вызова pthread_cond_broadcast()/pthread_cond_signal() нет потоков, ожидающих на cond, то сигнал попросту теряется. Нельзя сигнализировать "впрок".
  338.  
  339. --
  340.  
  341. Это серьезное отличие от семафоров, источник потенциальных ошибок синхронизации. В случае семафоров при каждой доставке данных в очередь, можно вызывать post и счетчик семафора будет содержать коолчество элементов очереди. Естественно, потребитель говорит wait на семафор каждый раз, когда он собирается взять данные из очереди.
  342. (20 Май '13 23:21) avp
  343.  
  344. А не могли бы вы развить данное исследование до уровня синхронизации между процессами, при обращении к общим ресурсам (не только между потоками)?
  345. (22 Май '13 14:11) mister
  346.  
  347. @mister: синхронизация между процессами — другое дело, т. к. отсутствует разделяемый ресурс: общая память. Например, можно устроить один хост-диспетчер, который будет (1) принимать задачи от хостов-поставщиков, (2) класть их во внутреннюю producer-consumer-очередь, и (3) раздавать хостам-потребителям на выполнение.
  348. (22 Май '13 14:53) VladD
  349.  
  350. Ок, спасибо за ответы.
  351. (22 Май '13 15:23) mister
  352.  
  353. @VladD, я сделал еще пару тестовых программок для очередей с ограничением на количество данных и ожиданием производителя, пока в очереди не появится свободное место и положил их все в pastebin все 3 + утилитка в одном файле.
  354.  
  355. В pq2.c активизируется произвольный ждущий производитель, а в pq3.c "правильный" (тот, что раньше пришел). Плюс поправил работу с памятью между потоками, добавив кое-где volatile.
  356.  
  357. --
  358.  
  359. Посмотрите, добавьте в текст исследования, что сочтете интересным.
  360.  
  361. Если появятся какие-то вопросы, то лучше пишите мне на почту (доступна через мой ник).
  362. (23 Май '13 1:30) avp
  363.  
  364. Извиняюсь за глупый вопросы. Метод `Dequeue`
  365.  
  366. Почему `while`, а не `if`?
  367.  
  368. ... while (queue.Count == 0 && !isDead) Monitor.Wait(mutex);
  369.  
  370. (27 Май '14 13:19) Veikedo
  371.  
  372. @VladD, благодарю. Наверное самое хорошее русскоязычное описание паттерна Producer-Consumer. Очень помогло при записи из разных потоков в один файл.
  373. (13 Июл '14 22:52) z668
  374.  
  375. pthread_mutex_lock(&q->lock); // ... и добавим новое задание туда: if (q->tail) q->tail->next = p; else { q->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&q->cond); } q->tail = p;
  376.  
  377. Почему, если tail существовал, мы сначала ему в next записываем p, а потом перезатираем сам tail этим p? Зачем вообще эта операция. И второе - правильно ли я понимаю, что даже если мы уже сделали бродкаст, пока мы не вызовем "pthread_mutex_unlock", потребители будут висеть на pthread_cond_wait?
  378. (29 Окт '14 14:34) smallFish
  379.  
  380. @smallFish, по поводу broadcast/wait/unlock правильно понимаете.
  381.  
  382. Что же касается первого вопроса, то сейчас я уже не помню этих деталей и опять влезать в код неохота. Можете поправить, как считаете будет правильно.
  383.  
  384. --
  385.  
  386. Кстати, а у Вас оповещения об изменениях здесь на почту приходят?
  387.  
  388. (у меня нет, так что время реакции непредсказуемо).
  389. (10 Дек '14 3:03) avp
  390.  
  391.  
  392. ====================================================================================================
  393.  
  394. /*
  395. avp 3 тестовые pthread программки для очередей
  396. */
  397.  
  398. // pq1.c потребитель-производитель с "бесконечной" очередью
  399.  
  400. #include <stdio.h>
  401. #include <stdlib.h>
  402. #include <sys/types.h>
  403. #include <unistd.h>
  404. #include <pthread.h>
  405.  
  406. // объявляем структуру данных для одного задания
  407. struct producer_consumer_queue_item {
  408. struct producer_consumer_queue_item *next;
  409. // здесь идут собственно данные. вы можете поменять этот кусок,
  410. // использовав структуру, более специфичную для вашей задачи
  411. void *data;
  412. };
  413.  
  414. // объявляем очередь с дополнительными структурами для синхронизации.
  415. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  416. struct producer_consumer_queue {
  417. struct producer_consumer_queue_item *head, *tail;
  418. // head == tail == 0, если очередь пуста
  419. pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
  420. pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  421. int is_alive; // показывает, не закончила ли очередь свою работу
  422. };
  423.  
  424. // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  425.  
  426. void
  427. enqueue (void *data, struct producer_consumer_queue *aq)
  428. {
  429. volatile struct producer_consumer_queue *q = aq;
  430. // упакуем задание в новую структуру
  431. struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  432. p->data = data;
  433. p->next = 0;
  434.  
  435. // получим "эксклюзивный" доступ к очереди заданий
  436. pthread_mutex_lock(&aq->lock);
  437. // ... и добавим новое задание туда:
  438. if (q->tail)
  439. q->tail->next = p;
  440. else {
  441. q->head = p;
  442. // очередь была пуста, а теперь нет -- надо разбудить потребителей
  443. pthread_cond_broadcast(&aq->cond);
  444. }
  445. q->tail = p;
  446. asm volatile ("" : : : "memory");
  447. // зафиксируем изменения очереди в памяти
  448.  
  449. // разрешаем доступ всем снова
  450. pthread_mutex_unlock(&aq->lock);
  451. }
  452.  
  453. void *
  454. dequeue(struct producer_consumer_queue *aq)
  455. {
  456. volatile struct producer_consumer_queue *q = aq;
  457. // получаем эксклюзивный доступ к очереди:
  458. pthread_mutex_lock(&aq->lock);
  459.  
  460. while (!q->head && q->is_alive) {
  461. // очередь пуста, делать нечего, ждем...
  462. pthread_cond_wait(&aq->cond, &aq->lock);
  463. // wait разрешает доступ другим на время ожидания
  464. }
  465.  
  466. // запоминаем текущий элемент или 0, если очередь умерла
  467. struct producer_consumer_queue_item *p = q->head;
  468.  
  469. if (p)
  470. {
  471. // и удаляем его из очереди
  472. q->head = q->head->next;
  473. if (!q->head)
  474. q->tail = q->head;
  475. asm volatile ("" : : : "memory");
  476. // зафиксируем изменения очереди в памяти
  477. }
  478.  
  479. // возвращаем эксклюзивный доступ другим участникам
  480. pthread_mutex_unlock(&aq->lock);
  481.  
  482. // отдаём данные
  483. void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  484. // согласно 7.20.3.2/2, можно не проверять на 0
  485. free(p);
  486. return data;
  487. }
  488.  
  489. struct producer_consumer_queue *
  490. producer_consumer_queue_create()
  491. {
  492. struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  493. q->head = q->tail = 0;
  494. q->is_alive = 1;
  495. pthread_mutex_init(&q->lock, 0);
  496. pthread_cond_init(&q->cond, 0);
  497.  
  498. return q;
  499. }
  500.  
  501. // И процедура для закрытия очереди:
  502.  
  503. void
  504. producer_consumer_queue_stop(struct producer_consumer_queue *aq)
  505. {
  506. volatile struct producer_consumer_queue *q = aq;
  507. // для обращения к разделяемым переменным необходим эксклюзивный доступ
  508. pthread_mutex_lock(&aq->lock);
  509. q->is_alive = 0;
  510. asm volatile ("" : : : "memory");
  511. // зафиксируем изменения очереди в памяти
  512. pthread_cond_broadcast(&aq->cond); // !!!!! avp
  513. pthread_mutex_unlock(&aq->lock);
  514. }
  515.  
  516. // это поток-потребитель
  517. void *
  518. consumer_thread (void *arg)
  519. {
  520. struct producer_consumer_queue *q = (typeof(q))arg;
  521.  
  522. for (;;) {
  523. void *data = dequeue(q);
  524. // это сигнал, что очередь окончена
  525. if (!data)
  526. break; // значит, пора закрывать поток
  527.  
  528. char *str = (char *)data;
  529. // тут наша обработка данных
  530. printf ("consuming: %s\n", str);
  531. sleep(2); // 2000 заменил на 2 avp
  532. printf ("consumed: %s\n", str);
  533. free(str);
  534. }
  535. return 0;
  536. }
  537.  
  538. int
  539. main ()
  540. {
  541. pthread_t consumer_threads[2];
  542.  
  543. void *res = 0;
  544.  
  545. char *in = NULL;
  546. size_t sz;
  547. int l;
  548.  
  549. // создадим очередь:
  550. struct producer_consumer_queue *q = producer_consumer_queue_create(); //add struct
  551.  
  552. // и потоки-«потребители» (изм. consumer на consumer_thread)
  553. pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
  554. pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
  555.  
  556. // главный цикл
  557. // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
  558. while ((puts("Enter"), getline(&in, &sz, stdin)) > 0) {
  559. enqueue(in, q);
  560. in = NULL;
  561. }
  562.  
  563. producer_consumer_queue_stop(q);
  564. puts("Fin2");
  565.  
  566. if (pthread_join(consumer_threads[0], &res) ||
  567. pthread_join(consumer_threads[1], &res))
  568. perror("join");
  569.  
  570. return (long)res;
  571. }
  572. // pq2.c очередь ограниченного размера без упорядочивания ожидающих производителей
  573.  
  574. #include <stdio.h>
  575. #include <stdlib.h>
  576. #include <string.h>
  577. #include <sys/types.h>
  578. #include <unistd.h>
  579. #include <pthread.h>
  580.  
  581.  
  582. // объявляем структуру данных для одного задания
  583. struct producer_consumer_queue_item {
  584. struct producer_consumer_queue_item *next;
  585. // здесь идут собственно данные. вы можете поменять этот кусок,
  586. // использовав структуру, более специфичную для вашей задачи
  587. void *data;
  588. };
  589.  
  590. // объявляем очередь с дополнительными структурами для синхронизации.
  591. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  592. struct producer_consumer_queue {
  593. struct producer_consumer_queue_item *head, *tail;
  594. // head == tail == 0, если очередь пуста
  595. pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
  596. pthread_cond_t condp; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  597. pthread_cond_t condc; // этот cond "сигналим", когда в очереди ПОЯВИЛОСЬ СВОБОДНОЕ МЕСТО
  598. int is_alive; // показывает, не закончила ли очередь свою работу
  599. int max, cnt, // максимальный размер очереди и число заданий в ней
  600. pqcnt; // количество производителей, ждущих свободного места в очереди
  601. };
  602.  
  603. void print_queue (struct producer_consumer_queue *q, int lock);
  604.  
  605. extern
  606. #ifdef __cplusplus
  607. "C"
  608. #endif
  609. pid_t gettid();
  610.  
  611.  
  612. // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  613.  
  614. void
  615. enqueue (void *data, struct producer_consumer_queue *aq)
  616. {
  617. volatile struct producer_consumer_queue *q = aq;
  618. // упакуем задание в новую структуру
  619. struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  620. p->data = data;
  621. p->next = 0;
  622.  
  623. // получим "эксклюзивный" доступ к очереди заданий
  624. pthread_mutex_lock(&aq->lock);
  625.  
  626. // проверим не переполнена ли она
  627. if (q->max <= q->cnt) {
  628. q->pqcnt++;
  629. asm volatile ("" : : : "memory");
  630. // зафиксируем изменения очереди в памяти
  631. // будем ждать пока потребители ее слегка не опустошат
  632. while(q->max <= q->cnt & q->is_alive)
  633. pthread_cond_wait(&aq->condc, &aq->lock);
  634. q->pqcnt--;
  635. asm volatile ("" : : : "memory");
  636. }
  637. // ... и добавим новое задание туда:
  638. if (q->tail)
  639. q->tail->next = p;
  640. else {
  641. q->head = p;
  642. // очередь была пуста, а теперь нет -- надо разбудить потребителей
  643. pthread_cond_broadcast(&aq->condp);
  644. }
  645. q->tail = p;
  646. q->cnt++;
  647. asm volatile ("" : : : "memory");
  648.  
  649. // разрешаем доступ всем снова
  650. pthread_mutex_unlock(&aq->lock);
  651. }
  652.  
  653.  
  654. void *
  655. dequeue(struct producer_consumer_queue *aq)
  656. {
  657. volatile struct producer_consumer_queue *q = aq;
  658. // получаем эксклюзивный доступ к очереди:
  659. pthread_mutex_lock(&aq->lock);
  660.  
  661. if (q->pqcnt && q->max > q->cnt)
  662. // в очереди есть место, а кто-то спит, разбудим их
  663. pthread_cond_broadcast(&aq->condc);
  664.  
  665. while (!q->head && q->is_alive) {
  666. // очередь пуста, делать нечего, ждем...
  667. pthread_cond_wait(&aq->condp, &aq->lock);
  668. // wait разрешает доступ другим на время ожидания
  669. }
  670.  
  671. // запоминаем текущий элемент или 0, если очередь умерла
  672. struct producer_consumer_queue_item *p = q->head;
  673. if (p) {
  674. // и удаляем его из очереди
  675. q->head = q->head->next;
  676. if (!q->head)
  677. q->tail = q->head;
  678. q->cnt--;
  679. asm volatile ("" : : : "memory");
  680. // зафиксируем изменения очереди в памяти
  681. // разбудим поставщиков в их очереди
  682. pthread_cond_broadcast(&aq->condc);
  683. }
  684.  
  685. // возвращаем эксклюзивный доступ другим участникам
  686. pthread_mutex_unlock(&aq->lock);
  687.  
  688. // отдаём данные
  689. void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  690. // согласно 7.20.3.2/2, можно не проверять на 0
  691. free(p);
  692. return data;
  693. }
  694.  
  695. struct producer_consumer_queue *
  696. producer_consumer_queue_create(int max)
  697. {
  698. struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  699. q->head = q->tail = 0;
  700. q->is_alive = 1;
  701. q->max = max;
  702. q->cnt = 0;
  703. q->pqcnt = 0;
  704. pthread_mutex_init(&q->lock, 0);
  705. pthread_cond_init(&q->condc, 0);
  706. pthread_cond_init(&q->condp, 0);
  707.  
  708. return q;
  709. }
  710.  
  711. // И процедура для закрытия очереди:
  712.  
  713. void
  714. producer_consumer_queue_stop(struct producer_consumer_queue *aq)
  715. {
  716. volatile struct producer_consumer_queue *q = aq;
  717. // для обращения к разделяемым переменным необходим эксклюзивный доступ
  718. pthread_mutex_lock(&aq->lock);
  719. q->is_alive = 0;
  720. asm volatile ("" : : : "memory");
  721. pthread_cond_broadcast(&aq->condc);
  722. pthread_cond_broadcast(&aq->condp);
  723. pthread_mutex_unlock(&aq->lock);
  724. }
  725.  
  726. // это поток-потребитель
  727. void *
  728. consumer_thread (void *arg)
  729. {
  730. struct producer_consumer_queue *q = (typeof(q))arg;
  731.  
  732. printf("Consumer: %ld\n", (long)gettid());
  733. for (;;) {
  734. void *data = dequeue(q);
  735. // это сигнал, что очередь окончена
  736. if (!data)
  737. break; // значит, пора закрывать поток
  738.  
  739. char *str = (char *)data;
  740. // тут наша обработка данных
  741. printf ("%ld consuming: %s", (long)gettid(), str);
  742. sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
  743. // printf ("%ld consumed: %s", (long)gettid(), str);
  744. free(str);
  745. }
  746. printf ("%ld cons exit\n", (long)gettid());
  747. return 0;
  748. }
  749.  
  750. char *
  751. getdata()
  752. {
  753. char *str = (char *)malloc(100);
  754. volatile static int seq = 0;
  755. static pthread_mutex_t seqlock = PTHREAD_MUTEX_INITIALIZER;
  756.  
  757. pthread_mutex_lock(&seqlock);
  758. sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
  759. printf("%s", str);
  760. pthread_mutex_unlock(&seqlock);
  761.  
  762. return str;
  763. }
  764.  
  765. void *
  766. producer_thread (void *arg)
  767. {
  768. struct producer_consumer_queue *q = (typeof(q))arg;
  769.  
  770. printf("Producer: %ld\n", (long)gettid());
  771. while (q->is_alive) {
  772. enqueue(getdata(), q);
  773. sleep(rand() % 2 + 1);
  774. }
  775. printf ("%ld prod exit\n", (long)gettid());
  776. return 0;
  777. }
  778.  
  779. void
  780. print_queue (struct producer_consumer_queue *q, int lock)
  781. {
  782. if (lock)
  783. pthread_mutex_lock(&q->lock);
  784.  
  785. printf ("cnt = %d pqcnt = %d queue:\n", q->cnt, q->pqcnt);
  786. struct producer_consumer_queue_item *p;
  787. for (p = q->head; p; p = p->next)
  788. printf ("%s", (char *)(p->data));
  789. puts("");
  790.  
  791. if (lock)
  792. pthread_mutex_unlock(&q->lock);
  793. }
  794.  
  795. int
  796. main (int ac, char *av[])
  797. {
  798. int swp = 0, n = av[1] ? atoi(av[1]) : 5;
  799. if (n < 0) {
  800. n = -n; swp = 1;
  801. }
  802. if (n < 2)
  803. n = 5;
  804.  
  805. int i, nc = n / 2, np = n - nc;
  806. if (swp) {
  807. swp = nc; nc = np; np = swp;
  808. }
  809. printf ("test %d consumers and %d producers\n", nc, np);
  810.  
  811. pthread_t threads[n];
  812. void *res = 0;
  813.  
  814. char *in = NULL;
  815. size_t sz;
  816.  
  817.  
  818.  
  819. // создадим очередь:
  820. struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
  821.  
  822. // и потоки-«потребители» (изм. consumer на consumer_thread)
  823. for (i = 0; i < nc; i++)
  824. pthread_create(threads + i, 0, consumer_thread, (void *)q);
  825.  
  826. // и потоки "производители"
  827. for (; i < n; i++)
  828. pthread_create(threads + i, 0, producer_thread, (void *)q);
  829.  
  830. // главный цикл
  831. // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
  832. while ((puts("Enter (for info or ^D for exit)"),
  833. getline(&in, &sz, stdin)) > 0) {
  834. #if 0
  835. enqueue(in, q);
  836. in = NULL;
  837. #endif
  838. print_queue(q, 1);
  839. }
  840.  
  841. producer_consumer_queue_stop(q);
  842. puts("Fin2");
  843. print_queue(q, 1);
  844.  
  845. for (i = 0; i < n; i++)
  846. if (pthread_join(threads[i], &res))
  847. perror("join");
  848.  
  849. return (long)res;
  850. }
  851. // pq3.c очередь ограниченного размера с упорядочиванием ожидающих производителей
  852.  
  853. #include <stdio.h>
  854. #include <stdlib.h>
  855. #include <string.h>
  856. #include <sys/types.h>
  857. #include <unistd.h>
  858. #include <pthread.h>
  859.  
  860. // объявляем структуру данных для одного задания
  861. struct producer_consumer_queue_item {
  862. struct producer_consumer_queue_item *next;
  863. // здесь идут собственно данные. вы можете поменять этот кусок,
  864. // использовав структуру, более специфичную для вашей задачи
  865. void *data;
  866. };
  867.  
  868. // струкура данных для спящего (ждущего свободного места) потока-производителя
  869. struct producer_queue_item {
  870. struct producer_queue_item *next;
  871. struct producer_consumer_queue_item *item; // данные для которых нет места
  872. pthread_cond_t cond; // этот cond "сигналим", когда в очереди появилось место
  873.  
  874. #if DEBUG
  875. pid_t tid; // linux thread id for debug print
  876. int signaled; // индикатор "побудки" for debug print
  877. #endif
  878. };
  879.  
  880. // объявляем очередь данных с дополнительными структурами для синхронизации.
  881. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  882. struct producer_consumer_queue {
  883. struct producer_consumer_queue_item *head, *tail;
  884. // head == tail == 0, если очередь пуста
  885. pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
  886. pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  887. int is_alive; // показывает, не закончила ли очередь свою работу
  888. int max, cnt; // максимальный размер очереди и число заданий в ней
  889. // очередь потоков-производителей, ждущих свободного места для своих данных
  890. struct producer_queue_item *pqhead,
  891. *pqtail;
  892. };
  893.  
  894. extern
  895. #ifdef __cplusplus
  896. "C"
  897. #endif
  898. pid_t gettid();
  899.  
  900. void print_queue (struct producer_consumer_queue *q, int lock);
  901.  
  902. // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  903.  
  904. void
  905. enqueue (void *data, struct producer_consumer_queue *q)
  906. {
  907. volatile struct producer_consumer_queue *vq = q;
  908. // упакуем задание в новую структуру
  909. struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  910. p->data = data;
  911. p->next = 0;
  912.  
  913. // получим "эксклюзивный" доступ к очереди заданий
  914. pthread_mutex_lock(&q->lock);
  915.  
  916. #if DEBUG
  917. printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data));
  918. #endif
  919.  
  920. // ... и добавим новое задание туда:
  921. if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать
  922.  
  923. #if DEBUG
  924. if (vq->cnt < vq->max) {
  925. puts("========================");
  926. print_queue(q, 0);
  927. puts("========================");
  928. }
  929. #endif
  930.  
  931. struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq));
  932. pthread_cond_init(&pq->cond, 0); // cond по которому его разбудят
  933. pq->next = 0;
  934. pq->item = p; // сохраним данные на время сна
  935.  
  936. #if DEBUG
  937. pq->tid = gettid();
  938. #endif
  939.  
  940. // поместим себя в очередь спящих производителей
  941. if (vq->pqtail)
  942. vq->pqtail->next = pq;
  943. else
  944. vq->pqhead = pq;
  945. vq->pqtail = pq;
  946. asm volatile ("" : : : "memory");
  947. // зафиксируем изменения очереди в памяти
  948.  
  949. #if DEBUG
  950. int at = 0; // счетчик циклов пробуждения
  951. #endif
  952.  
  953. do { // пойдем спать до появления свободного места в очереди данных
  954.  
  955. #if DEBUG
  956. printf ("%ld prod cond wait (cnt: %d at: %d) %s",
  957. (long)gettid(), vq->cnt, at++, (char *)(p->data));
  958. pq->signaled = 0;
  959. #endif
  960.  
  961. pthread_cond_wait(&pq->cond, &q->lock);
  962. } while(vq->max <= vq->cnt && vq->is_alive);
  963. // проснулись и владеем очередью
  964.  
  965. /*
  966. Вот тонкий момент. Порядок активизации потоков не определен,
  967. а нам надо соблюдать очередность данных.
  968. Поэтому переустановим локальные переменные из очереди,
  969. хотя это могут быть данные, положенные туда другим потоком.
  970. */
  971. #if DEBUG
  972. if (pq != vq->pqhead) {
  973. printf ("BAAAD %ld (cnt: %d at: %d) %s",
  974. (long)gettid(), vq->cnt, at, (char *)(p->data));
  975. print_queue(q, 0);
  976. if (vq->is_alive)
  977. exit(1); // совсем плохо, такого быть не должно
  978. else
  979. puts("CONTINUE");
  980. }
  981. #endif
  982.  
  983. pq = vq->pqhead; // в любом случае берем голову очереди производителей
  984. if ((vq->pqhead = pq->next) == 0) // и удаляем ее
  985. vq->pqtail = 0;
  986. asm volatile ("" : : : "memory");
  987. p = pq->item;
  988. free(pq);
  989.  
  990. #if DEBUG
  991. printf ("%ld prod enqueued after wait (cnt: %d at: %d) %s",
  992. (long)gettid(), vq->cnt, at, (char *)(p->data));
  993. #endif
  994.  
  995. }
  996.  
  997. // вот тут реально кладем data в очередь для потребителей
  998. if (vq->tail)
  999. vq->tail->next = p;
  1000. else {
  1001. vq->head = p;
  1002. // очередь была пуста, а теперь нет -- надо разбудить потребителей
  1003. pthread_cond_broadcast(&q->cond);
  1004. }
  1005. vq->tail = p;
  1006. vq->cnt++;
  1007. asm volatile ("" : : : "memory");
  1008. // сбросим изменения очереди в память
  1009.  
  1010. // разрешаем доступ всем снова
  1011. pthread_mutex_unlock(&q->lock);
  1012. }
  1013.  
  1014. #if DEBUG
  1015. #define cond_signal_producer(q) ({ \
  1016. if ((q)->pqhead) { \
  1017. (q)->pqhead->signaled = 1; \
  1018. pthread_cond_signal(&(q)->pqhead->cond); \
  1019. } \
  1020. })
  1021. #else
  1022. #define cond_signal_producer(q) ({ \
  1023. if ((q)->pqhead) \
  1024. pthread_cond_signal(&(q)->pqhead->cond); \
  1025. })
  1026. #endif
  1027.  
  1028. void *
  1029. dequeue(struct producer_consumer_queue *q)
  1030. {
  1031. volatile struct producer_consumer_queue *vq = q;
  1032.  
  1033. // получаем эксклюзивный доступ к очереди:
  1034. pthread_mutex_lock(&q->lock);
  1035.  
  1036. // если есть спящие производители, то разбудим первого
  1037. cond_signal_producer(vq);
  1038. while (!vq->head && vq->is_alive) {
  1039. // очередь пуста, делать нечего, ждем...
  1040. pthread_cond_wait(&q->cond, &q->lock);
  1041. // wait разрешает доступ другим на время ожидания
  1042. }
  1043.  
  1044. // запоминаем текущий элемент или 0, если очередь умерла
  1045. struct producer_consumer_queue_item *p = vq->head;
  1046. if (p) {
  1047. // и удаляем его из очереди
  1048. vq->head = vq->head->next;
  1049. if (!vq->head)
  1050. vq->tail = vq->head;
  1051. vq->cnt--;
  1052. asm volatile ("" : : : "memory");
  1053. // сбросим изменения очереди в память
  1054. // разбудим первого поставщика в их очереди
  1055. cond_signal_producer(vq);
  1056. }
  1057.  
  1058. // возвращаем эксклюзивный доступ другим участникам
  1059. pthread_mutex_unlock(&q->lock);
  1060.  
  1061. // отдаём данные
  1062. void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  1063. // согласно 7.20.3.2/2, можно не проверять на 0
  1064. free(p);
  1065. return data;
  1066. }
  1067.  
  1068. struct producer_consumer_queue *
  1069. producer_consumer_queue_create(int max)
  1070. {
  1071. struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  1072. q->head = q->tail = 0;
  1073. q->pqhead = q->pqtail = 0;
  1074. q->is_alive = 1;
  1075. q->max = max;
  1076. q->cnt = 0;
  1077. pthread_mutex_init(&q->lock, 0);
  1078. pthread_cond_init(&q->cond, 0);
  1079.  
  1080. return q;
  1081. }
  1082.  
  1083. // И процедура для закрытия очереди:
  1084.  
  1085. void
  1086. producer_consumer_queue_stop(struct producer_consumer_queue *q)
  1087. {
  1088. volatile struct producer_consumer_queue *vq = q;
  1089. // для обращения к разделяемым переменным необходим эксклюзивный доступ
  1090. pthread_mutex_lock(&q->lock);
  1091.  
  1092. vq->is_alive = 0;
  1093. pthread_cond_broadcast(&q->cond); // разбудим потребителей
  1094. volatile struct producer_queue_item *pq;
  1095. for (pq = vq->pqhead; pq; pq = pq->next) {
  1096. #if DEBUG
  1097. pq->signaled = 1;
  1098. asm volatile ("" : : : "memory");
  1099. #endif
  1100. // будим каждого ждущего производителя
  1101. pthread_cond_signal((pthread_cond_t *)&pq->cond);
  1102. }
  1103.  
  1104. pthread_mutex_unlock(&q->lock);
  1105. }
  1106.  
  1107. // это поток-потребитель
  1108. void *
  1109. consumer_thread (void *arg)
  1110. {
  1111. struct producer_consumer_queue *q = (typeof(q))arg;
  1112.  
  1113. printf("Consumer: %ld\n", (long)gettid());
  1114. for (;;) {
  1115. void *data = dequeue(q);
  1116. // это сигнал, что очередь окончена
  1117. if (!data)
  1118. break; // значит, пора закрывать поток
  1119.  
  1120. char *str = (char *)data;
  1121. // тут наша обработка данных
  1122. printf ("%ld consuming: %s", (long)gettid(), str);
  1123. sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
  1124. // printf ("%ld consumed: %s", (long)gettid(), str);
  1125. free(str);
  1126. }
  1127. printf ("%ld cons exit\n", (long)gettid());
  1128. return 0;
  1129. }
  1130.  
  1131. // делаем тестовые строки с последовательными номерами
  1132. char *
  1133. getdata()
  1134. {
  1135. char *str = (char *)malloc(100);
  1136. volatile static int seq = 0;
  1137. static pthread_mutex_t seqlock = PTHREAD_MUTEX_INITIALIZER;
  1138.  
  1139. pthread_mutex_lock(&seqlock);
  1140. sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
  1141. printf("%s", str);
  1142. pthread_mutex_unlock(&seqlock);
  1143.  
  1144. return str;
  1145. }
  1146.  
  1147. // это поток-производитель
  1148. void *
  1149. producer_thread (void *arg)
  1150. {
  1151. struct producer_consumer_queue *q = (typeof(q))arg;
  1152.  
  1153. printf("Producer: %ld\n", (long)gettid());
  1154. while (q->is_alive) {
  1155. enqueue(getdata(), q);
  1156. sleep(rand() % 2 + 1);
  1157. }
  1158. printf ("%ld prod exit\n", (long)gettid());
  1159. return 0;
  1160. }
  1161.  
  1162. void
  1163. print_queue (struct producer_consumer_queue *q, int lock)
  1164. {
  1165. if (lock)
  1166. pthread_mutex_lock(&q->lock);
  1167.  
  1168. printf ("cnt = %d queue:\n", q->cnt);
  1169. struct producer_consumer_queue_item *p;
  1170. for (p = q->head; p; p = p->next)
  1171. printf ("%s", (char *)(p->data));
  1172. puts ("producers queue:");
  1173. struct producer_queue_item *pq;
  1174. for (pq= q->pqhead; pq; pq = pq->next)
  1175. #if DEBUG
  1176. printf ("%ld (%d) %s",
  1177. (long)pq->tid, pq->signaled, (char *)(pq->item->data));
  1178. #else
  1179. printf ("%s", (char *)(pq->item->data));
  1180. #endif
  1181.  
  1182. puts("");
  1183.  
  1184. if (lock)
  1185. pthread_mutex_unlock(&q->lock);
  1186. }
  1187.  
  1188. int
  1189. main (int ac, char *av[])
  1190. {
  1191. int swp = 0, n = av[1] ? atoi(av[1]) : 5;
  1192. if (n < 0) {
  1193. n = -n; swp = 1;
  1194. }
  1195. if (n < 2)
  1196. n = 5;
  1197.  
  1198. int i, nc = n / 2, np = n - nc;
  1199. if (swp) {
  1200. swp = nc; nc = np; np = swp;
  1201. }
  1202. printf ("test %d consumers and %d producers\n", nc, np);
  1203.  
  1204. pthread_t threads[n];
  1205. void *res = 0;
  1206.  
  1207. char *in = NULL;
  1208. size_t sz;
  1209.  
  1210.  
  1211.  
  1212. // создадим очередь:
  1213. struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
  1214.  
  1215. // и потоки-«потребители» (изм. consumer на consumer_thread)
  1216. for (i = 0; i < nc; i++)
  1217. pthread_create(threads + i, 0, consumer_thread, (void *)q);
  1218.  
  1219. // и потоки "производители"
  1220. for (; i < n; i++)
  1221. pthread_create(threads + i, 0, producer_thread, (void *)q);
  1222.  
  1223. // главный цикл
  1224. // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
  1225. while ((puts("Enter (for info or ^D for exit)"),
  1226. getline(&in, &sz, stdin)) > 0) {
  1227. #if 0
  1228. enqueue(in, q);
  1229. in = NULL;
  1230. #endif
  1231. print_queue(q, 1);
  1232. }
  1233.  
  1234. producer_consumer_queue_stop(q);
  1235. puts("Fin2");
  1236. print_queue(q, 1);
  1237.  
  1238. for (i = 0; i < n; i++)
  1239. if (pthread_join(threads[i], &res))
  1240. perror("join");
  1241.  
  1242. return (long)res;
  1243. }
  1244. /*
  1245. avp 2011
  1246. there are no gettid() in Linux libc
  1247. so make it
  1248. */
  1249.  
  1250. #include <sys/types.h>
  1251. #include <sys/syscall.h>
  1252.  
  1253. pid_t
  1254. gettid()
  1255. {
  1256. return syscall(SYS_gettid);
  1257. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement