SHARE
TWEET

Producer/consumer

a guest Apr 8th, 2015 283 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. Паттерн producer/consumer достаточно часто встречается в многопоточном программировании. Его смысл состоит в том, что один или несколько потоков _производят_ данные, и параллельно этому один или несколько потоков _потребляют_ их.
  2.  
  3. Например, произведённые данные могут быть представлять вычислительно интенсивное _задание_. В этом случае разумно иметь единственный производящий поток, и несколько выполняющих потоков (например, столько, сколько в системе ядер процессора, если узкое место обработки — вычисления). Или производящий поток загружает данные из сети, а по окончанию загрузки выполняющий поток производит разбор загруженных данных.
  4.  
  5. Нетривиальность проблемы заключается в том, что потенциально как создание новых данных, так и их потребление могут занимать длительное время, и хотелось бы, чтобы обработка шла без простоев, на максимально возможной скорости.
  6.  
  7. Производящий поток (или потоки) называется «производитель», «поставщик» или просто «producer», потребляющий (-ие) — «потребитель» или «consumer».
  8.  
  9. Реализация данного паттерна различается по степени сложности в различных языках.
  10.  
  11. ---
  12.  
  13. **Имплементация на C с pthreads**
  14.  
  15. В C, в соответствии с духом языка, нет встроенных высокоуровневых синхронизирующихся коллекций. Наверное самой популярной и широко используемой библиотекой, реализующей многопоточность, является pthreads. С её помощью паттерн можно реализовать так (спасибо за код @avp):
  16.  
  17.     #include <pthread.h>
  18.  
  19.     // объявляем структуру данных для одного задания
  20.     struct producer_consumer_queue_item {
  21.       struct producer_consumer_queue_item *next;
  22.       // здесь идут собственно данные. вы можете поменять этот кусок,
  23.       // использовав структуру, более специфичную для вашей задачи
  24.       void *data;
  25.     };
  26.  
  27.     // объявляем очередь с дополнительными структурами для синхронизации.
  28.     // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  29.     struct producer_consumer_queue {
  30.       struct producer_consumer_queue_item *head, *tail;
  31.                                   // head == tail == 0, если очередь пуста
  32.       pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  33.       pthread_cond_t cond;        // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  34.       int is_alive;               // показывает, не закончила ли очередь свою работу
  35.     };
  36.  
  37. Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  38.  
  39.     void
  40.     enqueue (void *data, struct producer_consumer_queue *q)
  41.     {
  42.       // упакуем задание в новую структуру
  43.       struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  44.       p->data = data;
  45.       p->next = 0;
  46.  
  47.       // получим "эксклюзивный" доступ к очереди заданий
  48.       pthread_mutex_lock(&q->lock);
  49.       // ... и добавим новое задание туда:
  50.       if (q->tail)
  51.         q->tail->next = p;
  52.       else {
  53.         q->head = p;
  54.         // очередь была пуста, а теперь нет -- надо разбудить потребителей
  55.         pthread_cond_broadcast(&q->cond);
  56.       }
  57.       q->tail = p;
  58.  
  59.       // разрешаем доступ всем снова
  60.       pthread_mutex_unlock(&q->lock);
  61.     }
  62.  
  63.     void *
  64.     dequeue(struct producer_consumer_queue *q)
  65.     {
  66.       // получаем эксклюзивный доступ к очереди:
  67.       pthread_mutex_lock(&q->lock);
  68.  
  69.       while (!q->head && q->is_alive) {
  70.         // очередь пуста, делать нечего, ждем...
  71.         pthread_cond_wait(&q->cond, &q->lock);
  72.         // wait разрешает доступ другим на время ожидания
  73.       }
  74.  
  75.       // запоминаем текущий элемент или 0, если очередь умерла
  76.       struct producer_consumer_queue_item *p = q->head;
  77.  
  78.       if (p)
  79.       {
  80.         // и удаляем его из очереди
  81.         q->head = q->head->next;
  82.         if (!q->head)
  83.           q->tail = q->head;
  84.       }
  85.  
  86.       // возвращаем эксклюзивный доступ другим участникам
  87.       pthread_mutex_unlock(&q->lock);
  88.  
  89.       // отдаём данные
  90.       void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  91.       // согласно 7.20.3.2/2, можно не проверять на 0
  92.       free(p);
  93.       return data;
  94.     }
  95.  
  96. Ещё нужна процедура для инициализации очереди:
  97.  
  98.     struct producer_consumer_queue *
  99.     producer_consumer_queue_create()
  100.     {
  101.       struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  102.       q->head = q->tail = 0;
  103.       q->is_alive = 1;
  104.       pthread_mutex_init(&q->lock, 0);
  105.       pthread_cond_init(&q->cond, 0);
  106.  
  107.       return q;
  108.     }
  109.  
  110. И процедура для закрытия очереди:
  111.  
  112.     void
  113.     producer_consumer_queue_stop(struct producer_consumer_queue *q)
  114.     {
  115.       // для обращения к разделяемым переменным необходим эксклюзивный доступ
  116.       pthread_mutex_lock(&q->lock);
  117.       q->is_alive = 0;
  118.       pthread_cond_broadcast(&q->cond);
  119.       pthread_mutex_unlock(&q->lock);
  120.     }
  121.  
  122. Отлично, у нас есть всё, что нам надо.
  123.  
  124. Как использовать это? Нужно:
  125.  
  126.  * запустить несколько потоков-«производителей» и несколько «потребителей»
  127.  * придумать структуру данных для задания
  128.  
  129. Пример: (производитель — главный поток, потребители — 2 потока)
  130.  
  131.     // это поток-потребитель
  132.     void *
  133.     consumer_thread (void *arg)
  134.     {
  135.       struct producer_consumer_queue *q = (typeof(q))arg;
  136.  
  137.       for (;;) {
  138.         void *data = dequeue(q);
  139.         // это сигнал, что очередь окончена
  140.         if (!data)
  141.           break; // значит, пора закрывать поток
  142.  
  143.         char *str = (char *)data;
  144.         // тут наша обработка данных
  145.         printf ("consuming: %s\n", str);
  146.         sleep(2);
  147.         printf ("consumed: %s\n", str);
  148.         free(str);
  149.       }
  150.       return 0;
  151.     }
  152.  
  153.     int
  154.     main ()
  155.     {
  156.       pthread_t consumer_threads[2];
  157.  
  158.       void *res = 0;
  159.  
  160.       char *in = NULL;
  161.       size_t sz;
  162.  
  163.       // создадим очередь:
  164.       struct producer_consumer_queue *q = producer_consumer_queue_create();
  165.  
  166.       // и потоки-«потребители»
  167.       pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
  168.       pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
  169.  
  170.       // главный цикл
  171.       // получаем данные с клавиатуры:
  172.       while (getline(&in, &sz, stdin) > 0) {
  173.         enqueue(in, q);
  174.         in = NULL;
  175.       }
  176.  
  177.       producer_consumer_queue_stop(q);
  178.       if (pthread_join(consumer_threads[0], &res) ||
  179.           pthread_join(consumer_threads[1], &res))
  180.         perror("join");
  181.  
  182.       return (long)res;
  183.     }
  184.  
  185. ---
  186.  
  187. **Имплементация на C#**
  188.  
  189. Если вы работаете со старой версией C#, вы можете воспользоваться встроенным классом `Monitor`, который является аналогом condition variable из pthreads.
  190.  
  191.     public class ProducerConsumer<T> where T : class
  192.     {
  193.         object mutex = new object();
  194.         Queue<T> queue = new Queue<T>();
  195.         bool isDead = false;
  196.  
  197.         public void Enqueue(T task)
  198.         {
  199.             if (task == null)
  200.                 throw new ArgumentNullException("task");
  201.             lock (mutex)
  202.             {
  203.                 if (isDead)
  204.                     throw new InvalidOperationException("Queue already stopped");
  205.                 queue.Enqueue(task);
  206.                 Monitor.Pulse(mutex);
  207.             }
  208.         }
  209.  
  210.         public T Dequeue()
  211.         {
  212.             lock (mutex)
  213.             {
  214.                 while (queue.Count == 0 && !isDead)
  215.                     Monitor.Wait(mutex);
  216.  
  217.                 if (queue.Count == 0)
  218.                     return null;
  219.  
  220.                 return queue.Dequeue();
  221.             }
  222.         }
  223.  
  224.         public void Stop()
  225.         {
  226.             lock (mutex)
  227.             {
  228.                 isDead = true;
  229.                 Monitor.PulseAll(mutex);
  230.             }
  231.         }
  232.     }
  233.  
  234. Использование (аналогичный пример):
  235.  
  236.     class Program
  237.     {
  238.         static public void Main()
  239.         {
  240.             new Program().Run();
  241.         }
  242.  
  243.         ProducerConsumer<string> q = new ProducerConsumer<string>();
  244.  
  245.         void Run()
  246.         {
  247.             var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
  248.             foreach (var t in threads)
  249.                 t.Start();
  250.  
  251.             string s;
  252.             while ((s = Console.ReadLine()).Length != 0)
  253.                 q.Enqueue(s);
  254.  
  255.             q.Stop();
  256.  
  257.             foreach (var t in threads)
  258.                 t.Join();
  259.         }
  260.  
  261.         void Consumer()
  262.         {
  263.             while (true)
  264.             {
  265.                 string s = q.Dequeue();
  266.                 if (s == null)
  267.                     break;
  268.                 Console.WriteLine("Processing: {0}", s);
  269.                 Thread.Sleep(2000);
  270.                 Console.WriteLine("Processed: {0}", s);
  271.             }
  272.         }
  273.     }
  274.  
  275. ---
  276.  
  277. Для современных версий языка (начиная с C# 4.0), лучше, по совету @Flammable, [воспользоваться готовым классом `BlockingCollection`](http://www.albahari.com/threading/part5.aspx#_BlockingCollectionT), представляющим ту же функциональность. Пример:
  278.  
  279.     class Program
  280.     {
  281.         static public void Main()
  282.         {
  283.             new Program().Run();
  284.         }
  285.  
  286.         BlockingCollection<string> q = new BlockingCollection<string>();
  287.  
  288.         void Run()
  289.         {
  290.             var threads = new [] { new Thread(Consumer), new Thread(Consumer) };
  291.             foreach (var t in threads)
  292.                 t.Start();
  293.  
  294.             string s;
  295.             while ((s = Console.ReadLine()).Length != 0)
  296.                 q.Add(s);
  297.  
  298.             q.CompleteAdding(); // останавливаем
  299.  
  300.             foreach (var t in threads)
  301.                 t.Join();
  302.         }
  303.  
  304.         void Consumer()
  305.         {
  306.             foreach (var s in q.GetConsumingEnumerable())
  307.             {
  308.                 Console.WriteLine("Processing: {0}", s);
  309.                 Thread.Sleep(2000);
  310.                 Console.WriteLine("Processed: {0}", s);
  311.             }
  312.         }
  313.     }
  314.  
  315. ==================================================================
  316.  
  317. Comments:
  318.  
  319. Я поправил сишную часть по мелочам, добавил struct перед producer_consumer_queue, заменил sleep(2000) на sleep(2) и убрал `l` из `main()`, все равно не используется.
  320.  
  321. Единственная серьезная правка, это добавил в `producer_consumer_queue_stop()` `pthread_cond_broadcast(&q->cond);`.
  322.  
  323. Без этого вызова потоки остаются ждать на `cond_wait` и `main` "зависает" на `join`.
  324.  
  325. --
  326.  
  327. Наверное стоит добавить (про си), что этот вариант правильно работает только для очереди "бесконечной" длины, т.е. когда производители не переходят в ожидание.
  328.  
  329. Вечерком набросаю код для этого случая.
  330. (20 Май '13 21:38) avp
  331.  
  332. @avp: спасибо, что заметили! в шарповом коде в `Stop()` я не забыл `Monitor.PulseAll`, а тут забыл. Кстати, у нас немного разные стратегии в `enqueue`: вы делаете broadcast только когда очередь была пустой, а я делаю signal каждый раз. Судя по всему, и так и так корректно.
  333. (20 Май '13 22:45) VladD
  334.  
  335. Да, в такой ситуации корректно. pthread_cond сигналы не накапливаются.
  336.  
  337. Более того, если в момент вызова pthread_cond_broadcast()/pthread_cond_signal() нет потоков, ожидающих на cond, то сигнал попросту теряется. Нельзя сигнализировать "впрок".
  338.  
  339. --
  340.  
  341. Это серьезное отличие от семафоров, источник потенциальных ошибок синхронизации. В случае семафоров при каждой доставке данных в очередь, можно вызывать post и счетчик семафора будет содержать коолчество элементов очереди. Естественно, потребитель говорит wait на семафор каждый раз, когда он собирается взять данные из очереди.
  342. (20 Май '13 23:21) avp
  343.  
  344. А не могли бы вы развить данное исследование до уровня синхронизации между процессами, при обращении к общим ресурсам (не только между потоками)?
  345. (22 Май '13 14:11) mister
  346.  
  347. @mister: синхронизация между процессами — другое дело, т. к. отсутствует разделяемый ресурс: общая память. Например, можно устроить один хост-диспетчер, который будет (1) принимать задачи от хостов-поставщиков, (2) класть их во внутреннюю producer-consumer-очередь, и (3) раздавать хостам-потребителям на выполнение.
  348. (22 Май '13 14:53) VladD
  349.  
  350. Ок, спасибо за ответы.
  351. (22 Май '13 15:23) mister
  352.  
  353. @VladD, я сделал еще пару тестовых программок для очередей с ограничением на количество данных и ожиданием производителя, пока в очереди не появится свободное место и положил их все в pastebin все 3 + утилитка в одном файле.
  354.  
  355. В pq2.c активизируется произвольный ждущий производитель, а в pq3.c "правильный" (тот, что раньше пришел). Плюс поправил работу с памятью между потоками, добавив кое-где volatile.
  356.  
  357. --
  358.  
  359. Посмотрите, добавьте в текст исследования, что сочтете интересным.
  360.  
  361. Если появятся какие-то вопросы, то лучше пишите мне на почту (доступна через мой ник).
  362. (23 Май '13 1:30) avp
  363.  
  364. Извиняюсь за глупый вопросы. Метод `Dequeue`
  365.  
  366. Почему `while`, а не `if`?
  367.  
  368.     ... while (queue.Count == 0 && !isDead) Monitor.Wait(mutex);
  369.  
  370. (27 Май '14 13:19) Veikedo
  371.  
  372. @VladD, благодарю. Наверное самое хорошее русскоязычное описание паттерна Producer-Consumer. Очень помогло при записи из разных потоков в один файл.
  373. (13 Июл '14 22:52) z668
  374.  
  375. pthread_mutex_lock(&q->lock); // ... и добавим новое задание туда: if (q->tail) q->tail->next = p; else { q->head = p; // очередь была пуста, а теперь нет -- надо разбудить потребителей pthread_cond_broadcast(&q->cond); } q->tail = p;
  376.  
  377. Почему, если tail существовал, мы сначала ему в next записываем p, а потом перезатираем сам tail этим p? Зачем вообще эта операция. И второе - правильно ли я понимаю, что даже если мы уже сделали бродкаст, пока мы не вызовем "pthread_mutex_unlock", потребители будут висеть на pthread_cond_wait?
  378. (29 Окт '14 14:34) smallFish
  379.  
  380. @smallFish, по поводу broadcast/wait/unlock правильно понимаете.
  381.  
  382. Что же касается первого вопроса, то сейчас я уже не помню этих деталей и опять влезать в код неохота. Можете поправить, как считаете будет правильно.
  383.  
  384. --
  385.  
  386. Кстати, а у Вас оповещения об изменениях здесь на почту приходят?
  387.  
  388. (у меня нет, так что время реакции непредсказуемо).
  389. (10 Дек '14 3:03) avp
  390.  
  391.  
  392. ====================================================================================================
  393.  
  394. /*
  395.   avp 3 тестовые pthread программки для очередей
  396.  */
  397.  
  398. // pq1.c потребитель-производитель с "бесконечной" очередью
  399.  
  400. #include <stdio.h>
  401. #include <stdlib.h>
  402. #include <sys/types.h>
  403. #include <unistd.h>
  404. #include <pthread.h>
  405.  
  406. // объявляем структуру данных для одного задания
  407. struct producer_consumer_queue_item {
  408.   struct producer_consumer_queue_item *next;
  409.   // здесь идут собственно данные. вы можете поменять этот кусок,
  410.   // использовав структуру, более специфичную для вашей задачи
  411.   void *data;
  412. };
  413.  
  414. // объявляем очередь с дополнительными структурами для синхронизации.
  415. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  416. struct producer_consumer_queue {
  417.   struct producer_consumer_queue_item *head, *tail;
  418.                               // head == tail == 0, если очередь пуста
  419.   pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  420.   pthread_cond_t cond;        // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  421.   int is_alive;               // показывает, не закончила ли очередь свою работу
  422. };
  423.  
  424. // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  425.  
  426. void
  427. enqueue (void *data, struct producer_consumer_queue *aq)
  428. {
  429.   volatile struct producer_consumer_queue *q = aq;
  430.   // упакуем задание в новую структуру
  431.   struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  432.   p->data = data;
  433.   p->next = 0;
  434.  
  435.   // получим "эксклюзивный" доступ к очереди заданий
  436.   pthread_mutex_lock(&aq->lock);
  437.   // ... и добавим новое задание туда:
  438.   if (q->tail)
  439.     q->tail->next = p;
  440.   else {
  441.     q->head = p;
  442.     // очередь была пуста, а теперь нет -- надо разбудить потребителей
  443.     pthread_cond_broadcast(&aq->cond);
  444.   }
  445.   q->tail = p;
  446.   asm volatile ("" : : : "memory");
  447.   // зафиксируем изменения очереди в памяти
  448.  
  449.   // разрешаем доступ всем снова
  450.   pthread_mutex_unlock(&aq->lock);
  451. }
  452.  
  453. void *
  454. dequeue(struct producer_consumer_queue *aq)
  455. {
  456.   volatile struct producer_consumer_queue *q = aq;
  457.   // получаем эксклюзивный доступ к очереди:
  458.   pthread_mutex_lock(&aq->lock);
  459.  
  460.   while (!q->head && q->is_alive) {
  461.     // очередь пуста, делать нечего, ждем...
  462.     pthread_cond_wait(&aq->cond, &aq->lock);
  463.     // wait разрешает доступ другим на время ожидания
  464.   }
  465.  
  466.   // запоминаем текущий элемент или 0, если очередь умерла
  467.   struct producer_consumer_queue_item *p = q->head;
  468.  
  469.   if (p)
  470.   {
  471.     // и удаляем его из очереди
  472.     q->head = q->head->next;
  473.     if (!q->head)
  474.       q->tail = q->head;
  475.     asm volatile ("" : : : "memory");
  476.     // зафиксируем изменения очереди в памяти
  477.   }
  478.  
  479.   // возвращаем эксклюзивный доступ другим участникам
  480.   pthread_mutex_unlock(&aq->lock);
  481.  
  482.   // отдаём данные
  483.   void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  484.   // согласно 7.20.3.2/2, можно не проверять на 0
  485.   free(p);
  486.   return data;
  487. }
  488.  
  489. struct producer_consumer_queue *
  490. producer_consumer_queue_create()
  491. {
  492.   struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  493.   q->head = q->tail = 0;
  494.   q->is_alive = 1;
  495.   pthread_mutex_init(&q->lock, 0);
  496.   pthread_cond_init(&q->cond, 0);
  497.  
  498.   return q;
  499. }
  500.  
  501. // И процедура для закрытия очереди:
  502.  
  503. void
  504. producer_consumer_queue_stop(struct producer_consumer_queue *aq)
  505. {
  506.   volatile struct producer_consumer_queue *q = aq;
  507.   // для обращения к разделяемым переменным необходим эксклюзивный доступ
  508.   pthread_mutex_lock(&aq->lock);
  509.   q->is_alive = 0;
  510.   asm volatile ("" : : : "memory");
  511.   // зафиксируем изменения очереди в памяти
  512.   pthread_cond_broadcast(&aq->cond); // !!!!! avp
  513.   pthread_mutex_unlock(&aq->lock);
  514. }
  515.  
  516. // это поток-потребитель
  517. void *
  518. consumer_thread (void *arg)
  519. {
  520.   struct producer_consumer_queue *q = (typeof(q))arg;
  521.  
  522.   for (;;) {
  523.     void *data = dequeue(q);
  524.     // это сигнал, что очередь окончена
  525.     if (!data)
  526.       break; // значит, пора закрывать поток
  527.  
  528.     char *str = (char *)data;
  529.     // тут наша обработка данных
  530.     printf ("consuming: %s\n", str);
  531.     sleep(2); // 2000 заменил на 2 avp
  532.     printf ("consumed: %s\n", str);
  533.     free(str);
  534.   }
  535.   return 0;
  536. }
  537.  
  538. int
  539. main ()
  540. {
  541.   pthread_t consumer_threads[2];
  542.  
  543.   void *res = 0;
  544.  
  545.   char *in = NULL;
  546.   size_t sz;
  547.   int l;
  548.  
  549.   // создадим очередь:
  550.   struct producer_consumer_queue *q = producer_consumer_queue_create(); //add struct
  551.  
  552.   // и потоки-«потребители» (изм. consumer на consumer_thread)
  553.   pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
  554.   pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
  555.  
  556.   // главный цикл
  557.   // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
  558.   while ((puts("Enter"), getline(&in, &sz, stdin)) > 0) {
  559.     enqueue(in, q);
  560.     in = NULL;
  561.   }
  562.  
  563.   producer_consumer_queue_stop(q);
  564.   puts("Fin2");
  565.  
  566.   if (pthread_join(consumer_threads[0], &res) ||
  567.       pthread_join(consumer_threads[1], &res))
  568.     perror("join");
  569.  
  570.   return (long)res;
  571. }
  572. // pq2.c очередь ограниченного размера без упорядочивания ожидающих производителей
  573.  
  574. #include <stdio.h>
  575. #include <stdlib.h>
  576. #include <string.h>
  577. #include <sys/types.h>
  578. #include <unistd.h>
  579. #include <pthread.h>
  580.  
  581.  
  582. // объявляем структуру данных для одного задания
  583. struct producer_consumer_queue_item {
  584.   struct producer_consumer_queue_item *next;
  585.   // здесь идут собственно данные. вы можете поменять этот кусок,
  586.   // использовав структуру, более специфичную для вашей задачи
  587.   void *data;
  588. };
  589.  
  590. // объявляем очередь с дополнительными структурами для синхронизации.
  591. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  592. struct producer_consumer_queue {
  593.   struct producer_consumer_queue_item *head, *tail;
  594.                               // head == tail == 0, если очередь пуста
  595.   pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  596.   pthread_cond_t condp;       // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  597.   pthread_cond_t condc;       // этот cond "сигналим", когда в очереди ПОЯВИЛОСЬ СВОБОДНОЕ МЕСТО
  598.   int is_alive;               // показывает, не закончила ли очередь свою работу
  599.   int max, cnt,               // максимальный размер очереди и число заданий в ней
  600.     pqcnt;                    // количество производителей, ждущих свободного места в очереди
  601. };
  602.  
  603. void print_queue (struct producer_consumer_queue *q, int lock);
  604.  
  605. extern
  606. #ifdef __cplusplus
  607.  "C"
  608. #endif
  609. pid_t gettid();
  610.  
  611.  
  612. // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  613.  
  614. void
  615. enqueue (void *data, struct producer_consumer_queue *aq)
  616. {
  617.   volatile struct producer_consumer_queue *q = aq;
  618.   // упакуем задание в новую структуру
  619.   struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  620.   p->data = data;
  621.   p->next = 0;
  622.  
  623.   // получим "эксклюзивный" доступ к очереди заданий
  624.   pthread_mutex_lock(&aq->lock);
  625.  
  626.   // проверим не переполнена ли она
  627.   if (q->max <= q->cnt) {
  628.     q->pqcnt++;
  629.     asm volatile ("" : : : "memory");
  630.     // зафиксируем изменения очереди в памяти
  631.     // будем ждать пока потребители ее слегка не опустошат
  632.     while(q->max <= q->cnt & q->is_alive)
  633.       pthread_cond_wait(&aq->condc, &aq->lock);
  634.     q->pqcnt--;
  635.     asm volatile ("" : : : "memory");
  636.   }
  637.   // ... и добавим новое задание туда:
  638.   if (q->tail)
  639.     q->tail->next = p;
  640.   else {
  641.     q->head = p;
  642.     // очередь была пуста, а теперь нет -- надо разбудить потребителей
  643.     pthread_cond_broadcast(&aq->condp);
  644.   }
  645.   q->tail = p;
  646.   q->cnt++;
  647.   asm volatile ("" : : : "memory");
  648.  
  649.   // разрешаем доступ всем снова
  650.   pthread_mutex_unlock(&aq->lock);
  651. }
  652.  
  653.  
  654. void *
  655. dequeue(struct producer_consumer_queue *aq)
  656. {
  657.   volatile struct producer_consumer_queue *q = aq;
  658.   // получаем эксклюзивный доступ к очереди:
  659.   pthread_mutex_lock(&aq->lock);
  660.  
  661.   if (q->pqcnt && q->max > q->cnt)
  662.     // в очереди есть место, а кто-то спит, разбудим их
  663.     pthread_cond_broadcast(&aq->condc);
  664.  
  665.   while (!q->head && q->is_alive) {
  666.     // очередь пуста, делать нечего, ждем...
  667.     pthread_cond_wait(&aq->condp, &aq->lock);
  668.     // wait разрешает доступ другим на время ожидания
  669.   }
  670.  
  671.   // запоминаем текущий элемент или 0, если очередь умерла
  672.   struct producer_consumer_queue_item *p = q->head;
  673.   if (p) {
  674.     // и удаляем его из очереди
  675.     q->head = q->head->next;
  676.     if (!q->head)
  677.       q->tail = q->head;
  678.     q->cnt--;
  679.     asm volatile ("" : : : "memory");
  680.     // зафиксируем изменения очереди в памяти
  681.     // разбудим поставщиков в их очереди
  682.     pthread_cond_broadcast(&aq->condc);
  683.   }
  684.  
  685.   // возвращаем эксклюзивный доступ другим участникам
  686.   pthread_mutex_unlock(&aq->lock);
  687.  
  688.   // отдаём данные
  689.   void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  690.   // согласно 7.20.3.2/2, можно не проверять на 0
  691.   free(p);
  692.   return data;
  693. }
  694.  
  695. struct producer_consumer_queue *
  696. producer_consumer_queue_create(int max)
  697. {
  698.   struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  699.   q->head = q->tail = 0;
  700.   q->is_alive = 1;
  701.   q->max = max;
  702.   q->cnt = 0;
  703.   q->pqcnt = 0;
  704.   pthread_mutex_init(&q->lock, 0);
  705.   pthread_cond_init(&q->condc, 0);
  706.   pthread_cond_init(&q->condp, 0);
  707.  
  708.   return q;
  709. }
  710.  
  711. // И процедура для закрытия очереди:
  712.  
  713. void
  714. producer_consumer_queue_stop(struct producer_consumer_queue *aq)
  715. {
  716.   volatile struct producer_consumer_queue *q = aq;
  717.   // для обращения к разделяемым переменным необходим эксклюзивный доступ
  718.   pthread_mutex_lock(&aq->lock);
  719.   q->is_alive = 0;
  720.   asm volatile ("" : : : "memory");
  721.   pthread_cond_broadcast(&aq->condc);
  722.   pthread_cond_broadcast(&aq->condp);
  723.   pthread_mutex_unlock(&aq->lock);
  724. }
  725.  
  726. // это поток-потребитель
  727. void *
  728. consumer_thread (void *arg)
  729. {
  730.   struct producer_consumer_queue *q = (typeof(q))arg;
  731.  
  732.   printf("Consumer: %ld\n", (long)gettid());
  733.   for (;;) {
  734.     void *data = dequeue(q);
  735.     // это сигнал, что очередь окончена
  736.     if (!data)
  737.       break; // значит, пора закрывать поток
  738.  
  739.     char *str = (char *)data;
  740.     // тут наша обработка данных
  741.     printf ("%ld consuming: %s", (long)gettid(), str);
  742.     sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
  743.     //    printf ("%ld consumed: %s", (long)gettid(), str);
  744.     free(str);
  745.   }
  746.   printf ("%ld cons exit\n", (long)gettid());
  747.   return 0;
  748. }
  749.  
  750. char *
  751. getdata()
  752. {
  753.   char *str = (char *)malloc(100);
  754.   volatile static int seq = 0;
  755.   static pthread_mutex_t  seqlock = PTHREAD_MUTEX_INITIALIZER;
  756.  
  757.   pthread_mutex_lock(&seqlock);
  758.   sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
  759.   printf("%s", str);
  760.   pthread_mutex_unlock(&seqlock);
  761.  
  762.   return str;
  763. }
  764.  
  765. void *
  766. producer_thread (void *arg)
  767. {
  768.   struct producer_consumer_queue *q = (typeof(q))arg;
  769.  
  770.   printf("Producer: %ld\n", (long)gettid());
  771.   while (q->is_alive) {
  772.     enqueue(getdata(), q);
  773.     sleep(rand() % 2 + 1);
  774.   }
  775.   printf ("%ld prod exit\n", (long)gettid());
  776.   return 0;
  777. }
  778.  
  779. void
  780. print_queue (struct producer_consumer_queue *q, int lock)
  781. {
  782.   if (lock)
  783.     pthread_mutex_lock(&q->lock);
  784.  
  785.   printf ("cnt = %d pqcnt = %d queue:\n", q->cnt, q->pqcnt);
  786.   struct producer_consumer_queue_item *p;
  787.   for (p = q->head; p; p = p->next)
  788.     printf ("%s", (char *)(p->data));
  789.   puts("");
  790.  
  791.   if (lock)
  792.     pthread_mutex_unlock(&q->lock);
  793. }
  794.  
  795. int
  796. main (int ac, char *av[])
  797. {
  798.   int swp = 0, n = av[1] ? atoi(av[1]) : 5;
  799.   if (n < 0) {
  800.     n = -n; swp = 1;
  801.   }
  802.   if (n < 2)
  803.     n = 5;
  804.  
  805.   int i, nc = n / 2, np = n - nc;
  806.   if (swp) {
  807.     swp = nc; nc = np; np = swp;
  808.   }
  809.   printf ("test %d consumers and %d producers\n", nc, np);
  810.  
  811.   pthread_t threads[n];
  812.   void *res = 0;
  813.  
  814.   char *in = NULL;
  815.   size_t sz;
  816.  
  817.  
  818.  
  819.   // создадим очередь:
  820.   struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
  821.  
  822.   // и потоки-«потребители» (изм. consumer на consumer_thread)
  823.   for (i = 0; i < nc; i++)
  824.     pthread_create(threads + i, 0, consumer_thread, (void *)q);
  825.  
  826.   // и потоки "производители"
  827.   for (; i < n; i++)
  828.     pthread_create(threads + i, 0, producer_thread, (void *)q);
  829.  
  830.   // главный цикл
  831.   // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
  832.   while ((puts("Enter (for info or ^D for exit)"),
  833.           getline(&in, &sz, stdin)) > 0) {
  834. #if 0
  835.     enqueue(in, q);
  836.     in = NULL;
  837. #endif
  838.     print_queue(q, 1);
  839.   }
  840.  
  841.   producer_consumer_queue_stop(q);
  842.   puts("Fin2");
  843.   print_queue(q, 1);
  844.  
  845.   for (i = 0; i < n; i++)
  846.     if (pthread_join(threads[i], &res))
  847.       perror("join");
  848.  
  849.   return (long)res;
  850. }
  851. // pq3.c очередь ограниченного размера с упорядочиванием ожидающих производителей
  852.  
  853. #include <stdio.h>
  854. #include <stdlib.h>
  855. #include <string.h>
  856. #include <sys/types.h>
  857. #include <unistd.h>
  858. #include <pthread.h>
  859.  
  860. // объявляем структуру данных для одного задания
  861. struct producer_consumer_queue_item {
  862.   struct producer_consumer_queue_item *next;
  863.   // здесь идут собственно данные. вы можете поменять этот кусок,
  864.   // использовав структуру, более специфичную для вашей задачи
  865.   void *data;
  866. };
  867.  
  868. // струкура данных для спящего (ждущего свободного места) потока-производителя
  869. struct producer_queue_item {
  870.   struct producer_queue_item *next;
  871.   struct producer_consumer_queue_item *item; // данные для которых нет места
  872.   pthread_cond_t cond;  // этот cond "сигналим", когда в очереди появилось место
  873.  
  874. #if DEBUG
  875.   pid_t tid;    // linux thread id for debug print
  876.   int signaled; // индикатор "побудки" for debug print
  877. #endif
  878. };
  879.  
  880. // объявляем очередь данных с дополнительными структурами для синхронизации.
  881. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  882. struct producer_consumer_queue {
  883.   struct producer_consumer_queue_item *head, *tail;
  884.                               // head == tail == 0, если очередь пуста
  885.   pthread_mutex_t lock;       // мьютекс для всех манипуляций с очередью
  886.   pthread_cond_t cond;        // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  887.   int is_alive;               // показывает, не закончила ли очередь свою работу
  888.   int max, cnt;               // максимальный размер очереди и число заданий в ней
  889.   // очередь  потоков-производителей, ждущих свободного места для своих данных
  890.   struct producer_queue_item *pqhead,
  891.     *pqtail;
  892. };
  893.  
  894. extern
  895. #ifdef __cplusplus
  896.  "C"
  897. #endif
  898. pid_t gettid();
  899.  
  900. void print_queue (struct producer_consumer_queue *q, int lock);
  901.  
  902. // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  903.  
  904. void
  905. enqueue (void *data, struct producer_consumer_queue *q)
  906. {
  907.   volatile struct producer_consumer_queue *vq = q;
  908.   // упакуем задание в новую структуру
  909.   struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  910.   p->data = data;
  911.   p->next = 0;
  912.  
  913.   // получим "эксклюзивный" доступ к очереди заданий
  914.   pthread_mutex_lock(&q->lock);
  915.  
  916. #if DEBUG
  917.   printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data));
  918. #endif
  919.  
  920.   // ... и добавим новое задание туда:
  921.   if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать
  922.  
  923. #if DEBUG
  924.     if (vq->cnt < vq->max) {
  925.       puts("========================");
  926.       print_queue(q, 0);
  927.       puts("========================");
  928.     }
  929. #endif
  930.  
  931.     struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq));
  932.     pthread_cond_init(&pq->cond, 0);  // cond по которому его разбудят
  933.     pq->next = 0;
  934.     pq->item = p;  // сохраним данные на время сна
  935.  
  936. #if DEBUG
  937.     pq->tid = gettid();
  938. #endif
  939.  
  940.     // поместим себя в очередь спящих производителей
  941.     if (vq->pqtail)
  942.       vq->pqtail->next = pq;
  943.     else
  944.       vq->pqhead = pq;
  945.     vq->pqtail = pq;
  946.     asm volatile ("" : : : "memory");
  947.     // зафиксируем изменения очереди в памяти
  948.  
  949. #if DEBUG
  950.     int at = 0; // счетчик циклов пробуждения
  951. #endif
  952.  
  953.     do { // пойдем спать до появления свободного места в очереди данных
  954.  
  955. #if DEBUG
  956.       printf ("%ld prod cond wait (cnt: %d  at: %d) %s",
  957.               (long)gettid(), vq->cnt, at++, (char *)(p->data));
  958.       pq->signaled = 0;
  959. #endif
  960.  
  961.       pthread_cond_wait(&pq->cond, &q->lock);
  962.     } while(vq->max <= vq->cnt && vq->is_alive);
  963.     // проснулись и владеем очередью
  964.  
  965.     /*
  966.       Вот тонкий момент. Порядок активизации потоков не определен,
  967.       а нам надо соблюдать очередность данных.
  968.       Поэтому переустановим локальные переменные из очереди,
  969.       хотя это могут быть данные, положенные туда другим потоком.
  970.     */
  971. #if DEBUG
  972.     if (pq != vq->pqhead) {
  973.       printf ("BAAAD %ld (cnt: %d  at: %d) %s",
  974.               (long)gettid(), vq->cnt, at, (char *)(p->data));
  975.       print_queue(q, 0);
  976.       if (vq->is_alive)
  977.         exit(1); // совсем плохо, такого быть не должно
  978.       else
  979.         puts("CONTINUE");
  980.     }
  981. #endif
  982.    
  983.     pq = vq->pqhead;  // в любом случае берем голову очереди производителей
  984.     if ((vq->pqhead = pq->next) == 0) // и удаляем ее
  985.       vq->pqtail = 0;
  986.     asm volatile ("" : : : "memory");
  987.     p = pq->item;
  988.     free(pq);
  989.  
  990. #if DEBUG
  991.     printf ("%ld prod enqueued after wait (cnt: %d  at: %d) %s",
  992.             (long)gettid(), vq->cnt, at, (char *)(p->data));    
  993. #endif
  994.  
  995.   }
  996.  
  997.   // вот тут реально кладем data в очередь для потребителей
  998.   if (vq->tail)
  999.     vq->tail->next = p;
  1000.   else {
  1001.     vq->head = p;
  1002.     // очередь была пуста, а теперь нет -- надо разбудить потребителей
  1003.     pthread_cond_broadcast(&q->cond);
  1004.   }
  1005.   vq->tail = p;
  1006.   vq->cnt++;
  1007.   asm volatile ("" : : : "memory");
  1008.   // сбросим изменения очереди в память
  1009.  
  1010.   // разрешаем доступ всем снова
  1011.   pthread_mutex_unlock(&q->lock);
  1012. }
  1013.  
  1014. #if DEBUG                                      
  1015. #define cond_signal_producer(q) ({                      \
  1016.       if ((q)->pqhead) {                                \
  1017.           (q)->pqhead->signaled = 1;                    \
  1018.           pthread_cond_signal(&(q)->pqhead->cond);      \
  1019.       }                                                 \
  1020.     })
  1021. #else
  1022. #define cond_signal_producer(q) ({                      \
  1023.       if ((q)->pqhead)                                  \
  1024.           pthread_cond_signal(&(q)->pqhead->cond);      \
  1025.     })
  1026. #endif
  1027.  
  1028. void *
  1029. dequeue(struct producer_consumer_queue *q)
  1030. {
  1031.   volatile struct producer_consumer_queue *vq = q;
  1032.  
  1033.   // получаем эксклюзивный доступ к очереди:
  1034.   pthread_mutex_lock(&q->lock);
  1035.  
  1036.   // если есть спящие производители, то разбудим первого
  1037.   cond_signal_producer(vq);  
  1038.   while (!vq->head && vq->is_alive) {
  1039.     // очередь пуста, делать нечего, ждем...
  1040.     pthread_cond_wait(&q->cond, &q->lock);
  1041.     // wait разрешает доступ другим на время ожидания
  1042.   }
  1043.  
  1044.   // запоминаем текущий элемент или 0, если очередь умерла
  1045.   struct producer_consumer_queue_item *p = vq->head;
  1046.   if (p) {
  1047.     // и удаляем его из очереди
  1048.     vq->head = vq->head->next;
  1049.     if (!vq->head)
  1050.       vq->tail = vq->head;
  1051.     vq->cnt--;
  1052.     asm volatile ("" : : : "memory");
  1053.     // сбросим изменения очереди в память
  1054.     // разбудим первого поставщика в их очереди
  1055.     cond_signal_producer(vq);
  1056.   }
  1057.  
  1058.   // возвращаем эксклюзивный доступ другим участникам
  1059.   pthread_mutex_unlock(&q->lock);
  1060.  
  1061.   // отдаём данные
  1062.   void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  1063.   // согласно 7.20.3.2/2, можно не проверять на 0
  1064.   free(p);
  1065.   return data;
  1066. }
  1067.  
  1068. struct producer_consumer_queue *
  1069. producer_consumer_queue_create(int max)
  1070. {
  1071.   struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  1072.   q->head = q->tail = 0;
  1073.   q->pqhead = q->pqtail = 0;
  1074.   q->is_alive = 1;
  1075.   q->max = max;
  1076.   q->cnt = 0;
  1077.   pthread_mutex_init(&q->lock, 0);
  1078.   pthread_cond_init(&q->cond, 0);
  1079.  
  1080.   return q;
  1081. }
  1082.  
  1083. // И процедура для закрытия очереди:
  1084.  
  1085. void
  1086. producer_consumer_queue_stop(struct producer_consumer_queue *q)
  1087. {
  1088.   volatile struct producer_consumer_queue *vq = q;
  1089.   // для обращения к разделяемым переменным необходим эксклюзивный доступ
  1090.   pthread_mutex_lock(&q->lock);
  1091.  
  1092.   vq->is_alive = 0;
  1093.   pthread_cond_broadcast(&q->cond); // разбудим потребителей
  1094.   volatile struct producer_queue_item *pq;
  1095.   for (pq = vq->pqhead; pq; pq = pq->next) {
  1096. #if DEBUG
  1097.     pq->signaled = 1;
  1098.     asm volatile ("" : : : "memory");
  1099. #endif
  1100.     // будим каждого ждущего производителя
  1101.     pthread_cond_signal((pthread_cond_t *)&pq->cond);
  1102.   }
  1103.  
  1104.   pthread_mutex_unlock(&q->lock);
  1105. }
  1106.  
  1107. // это поток-потребитель
  1108. void *
  1109. consumer_thread (void *arg)
  1110. {
  1111.   struct producer_consumer_queue *q = (typeof(q))arg;
  1112.  
  1113.   printf("Consumer: %ld\n", (long)gettid());
  1114.   for (;;) {
  1115.     void *data = dequeue(q);
  1116.     // это сигнал, что очередь окончена
  1117.     if (!data)
  1118.       break; // значит, пора закрывать поток
  1119.  
  1120.     char *str = (char *)data;
  1121.     // тут наша обработка данных
  1122.     printf ("%ld consuming: %s", (long)gettid(), str);
  1123.     sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
  1124.     //    printf ("%ld consumed: %s", (long)gettid(), str);
  1125.     free(str);
  1126.   }
  1127.   printf ("%ld cons exit\n", (long)gettid());
  1128.   return 0;
  1129. }
  1130.  
  1131. // делаем тестовые строки с последовательными номерами
  1132. char *
  1133. getdata()
  1134. {
  1135.   char *str = (char *)malloc(100);
  1136.   volatile static int seq = 0;
  1137.   static pthread_mutex_t  seqlock = PTHREAD_MUTEX_INITIALIZER;
  1138.  
  1139.   pthread_mutex_lock(&seqlock);
  1140.   sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
  1141.   printf("%s", str);
  1142.   pthread_mutex_unlock(&seqlock);
  1143.  
  1144.   return str;
  1145. }
  1146.  
  1147. // это поток-производитель
  1148. void *
  1149. producer_thread (void *arg)
  1150. {
  1151.   struct producer_consumer_queue *q = (typeof(q))arg;
  1152.  
  1153.   printf("Producer: %ld\n", (long)gettid());
  1154.   while (q->is_alive) {
  1155.     enqueue(getdata(), q);
  1156.     sleep(rand() % 2 + 1);
  1157.   }
  1158.   printf ("%ld prod exit\n", (long)gettid());
  1159.   return 0;
  1160. }
  1161.  
  1162. void
  1163. print_queue (struct producer_consumer_queue *q, int lock)
  1164. {
  1165.   if (lock)
  1166.     pthread_mutex_lock(&q->lock);
  1167.  
  1168.   printf ("cnt = %d queue:\n", q->cnt);
  1169.   struct producer_consumer_queue_item *p;
  1170.   for (p = q->head; p; p = p->next)
  1171.     printf ("%s", (char *)(p->data));
  1172.   puts ("producers queue:");
  1173.   struct producer_queue_item *pq;
  1174.   for (pq= q->pqhead; pq; pq = pq->next)
  1175. #if DEBUG
  1176.     printf ("%ld (%d) %s",
  1177.             (long)pq->tid, pq->signaled, (char *)(pq->item->data));
  1178. #else
  1179.     printf ("%s", (char *)(pq->item->data));
  1180. #endif
  1181.  
  1182.   puts("");
  1183.  
  1184.   if (lock)
  1185.     pthread_mutex_unlock(&q->lock);
  1186. }
  1187.  
  1188. int
  1189. main (int ac, char *av[])
  1190. {
  1191.   int swp = 0, n = av[1] ? atoi(av[1]) : 5;
  1192.   if (n < 0) {
  1193.     n = -n; swp = 1;
  1194.   }
  1195.   if (n < 2)
  1196.     n = 5;
  1197.  
  1198.   int i, nc = n / 2, np = n - nc;
  1199.   if (swp) {
  1200.     swp = nc; nc = np; np = swp;
  1201.   }
  1202.   printf ("test %d consumers and %d producers\n", nc, np);
  1203.  
  1204.   pthread_t threads[n];
  1205.   void *res = 0;
  1206.  
  1207.   char *in = NULL;
  1208.   size_t sz;
  1209.  
  1210.  
  1211.  
  1212.   // создадим очередь:
  1213.   struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
  1214.  
  1215.   // и потоки-«потребители» (изм. consumer на consumer_thread)
  1216.   for (i = 0; i < nc; i++)
  1217.     pthread_create(threads + i, 0, consumer_thread, (void *)q);
  1218.  
  1219.   // и потоки "производители"
  1220.   for (; i < n; i++)
  1221.     pthread_create(threads + i, 0, producer_thread, (void *)q);
  1222.  
  1223.   // главный цикл
  1224.   // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
  1225.   while ((puts("Enter (for info or ^D for exit)"),
  1226.           getline(&in, &sz, stdin)) > 0) {
  1227. #if 0
  1228.     enqueue(in, q);
  1229.     in = NULL;
  1230. #endif
  1231.     print_queue(q, 1);
  1232.   }
  1233.  
  1234.   producer_consumer_queue_stop(q);
  1235.   puts("Fin2");
  1236.   print_queue(q, 1);
  1237.  
  1238.   for (i = 0; i < n; i++)
  1239.     if (pthread_join(threads[i], &res))
  1240.       perror("join");
  1241.  
  1242.   return (long)res;
  1243. }
  1244. /*
  1245.   avp 2011
  1246.   there are no gettid() in Linux libc
  1247.   so make it
  1248. */
  1249.  
  1250. #include <sys/types.h>
  1251. #include <sys/syscall.h>
  1252.  
  1253. pid_t
  1254. gettid()
  1255. {
  1256.   return syscall(SYS_gettid);
  1257. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top