Advertisement
Guest User

3 c/c++ test programms for pthread queues

a guest
May 22nd, 2013
440
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. /*
  2. avp 3 тестовые pthread программки для очередей
  3. */
  4.  
  5. // pq1.c потребитель-производитель с "бесконечной" очередью
  6.  
  7. #include <stdio.h>
  8. #include <stdlib.h>
  9. #include <sys/types.h>
  10. #include <unistd.h>
  11. #include <pthread.h>
  12.  
  13. // объявляем структуру данных для одного задания
  14. struct producer_consumer_queue_item {
  15. struct producer_consumer_queue_item *next;
  16. // здесь идут собственно данные. вы можете поменять этот кусок,
  17. // использовав структуру, более специфичную для вашей задачи
  18. void *data;
  19. };
  20.  
  21. // объявляем очередь с дополнительными структурами для синхронизации.
  22. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  23. struct producer_consumer_queue {
  24. struct producer_consumer_queue_item *head, *tail;
  25. // head == tail == 0, если очередь пуста
  26. pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
  27. pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  28. int is_alive; // показывает, не закончила ли очередь свою работу
  29. };
  30.  
  31. // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  32.  
  33. void
  34. enqueue (void *data, struct producer_consumer_queue *aq)
  35. {
  36. volatile struct producer_consumer_queue *q = aq;
  37. // упакуем задание в новую структуру
  38. struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  39. p->data = data;
  40. p->next = 0;
  41.  
  42. // получим "эксклюзивный" доступ к очереди заданий
  43. pthread_mutex_lock(&aq->lock);
  44. // ... и добавим новое задание туда:
  45. if (q->tail)
  46. q->tail->next = p;
  47. else {
  48. q->head = p;
  49. // очередь была пуста, а теперь нет -- надо разбудить потребителей
  50. pthread_cond_broadcast(&aq->cond);
  51. }
  52. q->tail = p;
  53. asm volatile ("" : : : "memory");
  54. // зафиксируем изменения очереди в памяти
  55.  
  56. // разрешаем доступ всем снова
  57. pthread_mutex_unlock(&aq->lock);
  58. }
  59.  
  60. void *
  61. dequeue(struct producer_consumer_queue *aq)
  62. {
  63. volatile struct producer_consumer_queue *q = aq;
  64. // получаем эксклюзивный доступ к очереди:
  65. pthread_mutex_lock(&aq->lock);
  66.  
  67. while (!q->head && q->is_alive) {
  68. // очередь пуста, делать нечего, ждем...
  69. pthread_cond_wait(&aq->cond, &aq->lock);
  70. // wait разрешает доступ другим на время ожидания
  71. }
  72.  
  73. // запоминаем текущий элемент или 0, если очередь умерла
  74. struct producer_consumer_queue_item *p = q->head;
  75.  
  76. if (p)
  77. {
  78. // и удаляем его из очереди
  79. q->head = q->head->next;
  80. if (!q->head)
  81. q->tail = q->head;
  82. asm volatile ("" : : : "memory");
  83. // зафиксируем изменения очереди в памяти
  84. }
  85.  
  86. // возвращаем эксклюзивный доступ другим участникам
  87. pthread_mutex_unlock(&aq->lock);
  88.  
  89. // отдаём данные
  90. void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  91. // согласно 7.20.3.2/2, можно не проверять на 0
  92. free(p);
  93. return data;
  94. }
  95.  
  96. struct producer_consumer_queue *
  97. producer_consumer_queue_create()
  98. {
  99. struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  100. q->head = q->tail = 0;
  101. q->is_alive = 1;
  102. pthread_mutex_init(&q->lock, 0);
  103. pthread_cond_init(&q->cond, 0);
  104.  
  105. return q;
  106. }
  107.  
  108. // И процедура для закрытия очереди:
  109.  
  110. void
  111. producer_consumer_queue_stop(struct producer_consumer_queue *aq)
  112. {
  113. volatile struct producer_consumer_queue *q = aq;
  114. // для обращения к разделяемым переменным необходим эксклюзивный доступ
  115. pthread_mutex_lock(&aq->lock);
  116. q->is_alive = 0;
  117. asm volatile ("" : : : "memory");
  118. // зафиксируем изменения очереди в памяти
  119. pthread_cond_broadcast(&aq->cond); // !!!!! avp
  120. pthread_mutex_unlock(&aq->lock);
  121. }
  122.  
  123. // это поток-потребитель
  124. void *
  125. consumer_thread (void *arg)
  126. {
  127. struct producer_consumer_queue *q = (typeof(q))arg;
  128.  
  129. for (;;) {
  130. void *data = dequeue(q);
  131. // это сигнал, что очередь окончена
  132. if (!data)
  133. break; // значит, пора закрывать поток
  134.  
  135. char *str = (char *)data;
  136. // тут наша обработка данных
  137. printf ("consuming: %s\n", str);
  138. sleep(2); // 2000 заменил на 2 avp
  139. printf ("consumed: %s\n", str);
  140. free(str);
  141. }
  142. return 0;
  143. }
  144.  
  145. int
  146. main ()
  147. {
  148. pthread_t consumer_threads[2];
  149.  
  150. void *res = 0;
  151.  
  152. char *in = NULL;
  153. size_t sz;
  154. int l;
  155.  
  156. // создадим очередь:
  157. struct producer_consumer_queue *q = producer_consumer_queue_create(); //add struct
  158.  
  159. // и потоки-«потребители» (изм. consumer на consumer_thread)
  160. pthread_create(&consumer_threads[0], 0, consumer_thread, (void *)q);
  161. pthread_create(&consumer_threads[1], 0, consumer_thread, (void *)q);
  162.  
  163. // главный цикл
  164. // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
  165. while ((puts("Enter"), getline(&in, &sz, stdin)) > 0) {
  166. enqueue(in, q);
  167. in = NULL;
  168. }
  169.  
  170. producer_consumer_queue_stop(q);
  171. puts("Fin2");
  172.  
  173. if (pthread_join(consumer_threads[0], &res) ||
  174. pthread_join(consumer_threads[1], &res))
  175. perror("join");
  176.  
  177. return (long)res;
  178. }
  179. // pq2.c очередь ограниченного размера без упорядочивания ожидающих производителей
  180.  
  181. #include <stdio.h>
  182. #include <stdlib.h>
  183. #include <string.h>
  184. #include <sys/types.h>
  185. #include <unistd.h>
  186. #include <pthread.h>
  187.  
  188.  
  189. // объявляем структуру данных для одного задания
  190. struct producer_consumer_queue_item {
  191. struct producer_consumer_queue_item *next;
  192. // здесь идут собственно данные. вы можете поменять этот кусок,
  193. // использовав структуру, более специфичную для вашей задачи
  194. void *data;
  195. };
  196.  
  197. // объявляем очередь с дополнительными структурами для синхронизации.
  198. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  199. struct producer_consumer_queue {
  200. struct producer_consumer_queue_item *head, *tail;
  201. // head == tail == 0, если очередь пуста
  202. pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
  203. pthread_cond_t condp; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  204. pthread_cond_t condc; // этот cond "сигналим", когда в очереди ПОЯВИЛОСЬ СВОБОДНОЕ МЕСТО
  205. int is_alive; // показывает, не закончила ли очередь свою работу
  206. int max, cnt, // максимальный размер очереди и число заданий в ней
  207. pqcnt; // количество производителей, ждущих свободного места в очереди
  208. };
  209.  
  210. void print_queue (struct producer_consumer_queue *q, int lock);
  211.  
  212. extern
  213. #ifdef __cplusplus
  214. "C"
  215. #endif
  216. pid_t gettid();
  217.  
  218.  
  219. // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  220.  
  221. void
  222. enqueue (void *data, struct producer_consumer_queue *aq)
  223. {
  224. volatile struct producer_consumer_queue *q = aq;
  225. // упакуем задание в новую структуру
  226. struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  227. p->data = data;
  228. p->next = 0;
  229.  
  230. // получим "эксклюзивный" доступ к очереди заданий
  231. pthread_mutex_lock(&aq->lock);
  232.  
  233. // проверим не переполнена ли она
  234. if (q->max <= q->cnt) {
  235. q->pqcnt++;
  236. asm volatile ("" : : : "memory");
  237. // зафиксируем изменения очереди в памяти
  238. // будем ждать пока потребители ее слегка не опустошат
  239. while(q->max <= q->cnt & q->is_alive)
  240. pthread_cond_wait(&aq->condc, &aq->lock);
  241. q->pqcnt--;
  242. asm volatile ("" : : : "memory");
  243. }
  244. // ... и добавим новое задание туда:
  245. if (q->tail)
  246. q->tail->next = p;
  247. else {
  248. q->head = p;
  249. // очередь была пуста, а теперь нет -- надо разбудить потребителей
  250. pthread_cond_broadcast(&aq->condp);
  251. }
  252. q->tail = p;
  253. q->cnt++;
  254. asm volatile ("" : : : "memory");
  255.  
  256. // разрешаем доступ всем снова
  257. pthread_mutex_unlock(&aq->lock);
  258. }
  259.  
  260.  
  261. void *
  262. dequeue(struct producer_consumer_queue *aq)
  263. {
  264. volatile struct producer_consumer_queue *q = aq;
  265. // получаем эксклюзивный доступ к очереди:
  266. pthread_mutex_lock(&aq->lock);
  267.  
  268. if (q->pqcnt && q->max > q->cnt)
  269. // в очереди есть место, а кто-то спит, разбудим их
  270. pthread_cond_broadcast(&aq->condc);
  271.  
  272. while (!q->head && q->is_alive) {
  273. // очередь пуста, делать нечего, ждем...
  274. pthread_cond_wait(&aq->condp, &aq->lock);
  275. // wait разрешает доступ другим на время ожидания
  276. }
  277.  
  278. // запоминаем текущий элемент или 0, если очередь умерла
  279. struct producer_consumer_queue_item *p = q->head;
  280. if (p) {
  281. // и удаляем его из очереди
  282. q->head = q->head->next;
  283. if (!q->head)
  284. q->tail = q->head;
  285. q->cnt--;
  286. asm volatile ("" : : : "memory");
  287. // зафиксируем изменения очереди в памяти
  288. // разбудим поставщиков в их очереди
  289. pthread_cond_broadcast(&aq->condc);
  290. }
  291.  
  292. // возвращаем эксклюзивный доступ другим участникам
  293. pthread_mutex_unlock(&aq->lock);
  294.  
  295. // отдаём данные
  296. void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  297. // согласно 7.20.3.2/2, можно не проверять на 0
  298. free(p);
  299. return data;
  300. }
  301.  
  302. struct producer_consumer_queue *
  303. producer_consumer_queue_create(int max)
  304. {
  305. struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  306. q->head = q->tail = 0;
  307. q->is_alive = 1;
  308. q->max = max;
  309. q->cnt = 0;
  310. q->pqcnt = 0;
  311. pthread_mutex_init(&q->lock, 0);
  312. pthread_cond_init(&q->condc, 0);
  313. pthread_cond_init(&q->condp, 0);
  314.  
  315. return q;
  316. }
  317.  
  318. // И процедура для закрытия очереди:
  319.  
  320. void
  321. producer_consumer_queue_stop(struct producer_consumer_queue *aq)
  322. {
  323. volatile struct producer_consumer_queue *q = aq;
  324. // для обращения к разделяемым переменным необходим эксклюзивный доступ
  325. pthread_mutex_lock(&aq->lock);
  326. q->is_alive = 0;
  327. asm volatile ("" : : : "memory");
  328. pthread_cond_broadcast(&aq->condc);
  329. pthread_cond_broadcast(&aq->condp);
  330. pthread_mutex_unlock(&aq->lock);
  331. }
  332.  
  333. // это поток-потребитель
  334. void *
  335. consumer_thread (void *arg)
  336. {
  337. struct producer_consumer_queue *q = (typeof(q))arg;
  338.  
  339. printf("Consumer: %ld\n", (long)gettid());
  340. for (;;) {
  341. void *data = dequeue(q);
  342. // это сигнал, что очередь окончена
  343. if (!data)
  344. break; // значит, пора закрывать поток
  345.  
  346. char *str = (char *)data;
  347. // тут наша обработка данных
  348. printf ("%ld consuming: %s", (long)gettid(), str);
  349. sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
  350. // printf ("%ld consumed: %s", (long)gettid(), str);
  351. free(str);
  352. }
  353. printf ("%ld cons exit\n", (long)gettid());
  354. return 0;
  355. }
  356.  
  357. char *
  358. getdata()
  359. {
  360. char *str = (char *)malloc(100);
  361. volatile static int seq = 0;
  362. static pthread_mutex_t seqlock = PTHREAD_MUTEX_INITIALIZER;
  363.  
  364. pthread_mutex_lock(&seqlock);
  365. sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
  366. printf("%s", str);
  367. pthread_mutex_unlock(&seqlock);
  368.  
  369. return str;
  370. }
  371.  
  372. void *
  373. producer_thread (void *arg)
  374. {
  375. struct producer_consumer_queue *q = (typeof(q))arg;
  376.  
  377. printf("Producer: %ld\n", (long)gettid());
  378. while (q->is_alive) {
  379. enqueue(getdata(), q);
  380. sleep(rand() % 2 + 1);
  381. }
  382. printf ("%ld prod exit\n", (long)gettid());
  383. return 0;
  384. }
  385.  
  386. void
  387. print_queue (struct producer_consumer_queue *q, int lock)
  388. {
  389. if (lock)
  390. pthread_mutex_lock(&q->lock);
  391.  
  392. printf ("cnt = %d pqcnt = %d queue:\n", q->cnt, q->pqcnt);
  393. struct producer_consumer_queue_item *p;
  394. for (p = q->head; p; p = p->next)
  395. printf ("%s", (char *)(p->data));
  396. puts("");
  397.  
  398. if (lock)
  399. pthread_mutex_unlock(&q->lock);
  400. }
  401.  
  402. int
  403. main (int ac, char *av[])
  404. {
  405. int swp = 0, n = av[1] ? atoi(av[1]) : 5;
  406. if (n < 0) {
  407. n = -n; swp = 1;
  408. }
  409. if (n < 2)
  410. n = 5;
  411.  
  412. int i, nc = n / 2, np = n - nc;
  413. if (swp) {
  414. swp = nc; nc = np; np = swp;
  415. }
  416. printf ("test %d consumers and %d producers\n", nc, np);
  417.  
  418. pthread_t threads[n];
  419. void *res = 0;
  420.  
  421. char *in = NULL;
  422. size_t sz;
  423.  
  424.  
  425.  
  426. // создадим очередь:
  427. struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
  428.  
  429. // и потоки-«потребители» (изм. consumer на consumer_thread)
  430. for (i = 0; i < nc; i++)
  431. pthread_create(threads + i, 0, consumer_thread, (void *)q);
  432.  
  433. // и потоки "производители"
  434. for (; i < n; i++)
  435. pthread_create(threads + i, 0, producer_thread, (void *)q);
  436.  
  437. // главный цикл
  438. // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
  439. while ((puts("Enter (for info or ^D for exit)"),
  440. getline(&in, &sz, stdin)) > 0) {
  441. #if 0
  442. enqueue(in, q);
  443. in = NULL;
  444. #endif
  445. print_queue(q, 1);
  446. }
  447.  
  448. producer_consumer_queue_stop(q);
  449. puts("Fin2");
  450. print_queue(q, 1);
  451.  
  452. for (i = 0; i < n; i++)
  453. if (pthread_join(threads[i], &res))
  454. perror("join");
  455.  
  456. return (long)res;
  457. }
  458. // pq3.c очередь ограниченного размера с упорядочиванием ожидающих производителей
  459.  
  460. #include <stdio.h>
  461. #include <stdlib.h>
  462. #include <string.h>
  463. #include <sys/types.h>
  464. #include <unistd.h>
  465. #include <pthread.h>
  466.  
  467. // объявляем структуру данных для одного задания
  468. struct producer_consumer_queue_item {
  469. struct producer_consumer_queue_item *next;
  470. // здесь идут собственно данные. вы можете поменять этот кусок,
  471. // использовав структуру, более специфичную для вашей задачи
  472. void *data;
  473. };
  474.  
  475. // струкура данных для спящего (ждущего свободного места) потока-производителя
  476. struct producer_queue_item {
  477. struct producer_queue_item *next;
  478. struct producer_consumer_queue_item *item; // данные для которых нет места
  479. pthread_cond_t cond; // этот cond "сигналим", когда в очереди появилось место
  480.  
  481. #if DEBUG
  482. pid_t tid; // linux thread id for debug print
  483. int signaled; // индикатор "побудки" for debug print
  484. #endif
  485. };
  486.  
  487. // объявляем очередь данных с дополнительными структурами для синхронизации.
  488. // в этой очереди будут храниться произведённые, но ещё не потреблённые задания.
  489. struct producer_consumer_queue {
  490. struct producer_consumer_queue_item *head, *tail;
  491. // head == tail == 0, если очередь пуста
  492. pthread_mutex_t lock; // мьютекс для всех манипуляций с очередью
  493. pthread_cond_t cond; // этот cond "сигналим", когда очередь стала НЕ ПУСТОЙ
  494. int is_alive; // показывает, не закончила ли очередь свою работу
  495. int max, cnt; // максимальный размер очереди и число заданий в ней
  496. // очередь потоков-производителей, ждущих свободного места для своих данных
  497. struct producer_queue_item *pqhead,
  498. *pqtail;
  499. };
  500.  
  501. extern
  502. #ifdef __cplusplus
  503. "C"
  504. #endif
  505. pid_t gettid();
  506.  
  507. void print_queue (struct producer_consumer_queue *q, int lock);
  508.  
  509. // Теперь нам нужны процедуры добавления и извлечения заданий из очереди.
  510.  
  511. void
  512. enqueue (void *data, struct producer_consumer_queue *q)
  513. {
  514. volatile struct producer_consumer_queue *vq = q;
  515. // упакуем задание в новую структуру
  516. struct producer_consumer_queue_item *p = (typeof(p))malloc(sizeof(*p));
  517. p->data = data;
  518. p->next = 0;
  519.  
  520. // получим "эксклюзивный" доступ к очереди заданий
  521. pthread_mutex_lock(&q->lock);
  522.  
  523. #if DEBUG
  524. printf("%ld (cnt: %d) ---> %s", (long)gettid(), vq->cnt, (char *)(p->data));
  525. #endif
  526.  
  527. // ... и добавим новое задание туда:
  528. if (vq->max <= vq->cnt || vq->pqtail) {// производитель должен ждать
  529.  
  530. #if DEBUG
  531. if (vq->cnt < vq->max) {
  532. puts("========================");
  533. print_queue(q, 0);
  534. puts("========================");
  535. }
  536. #endif
  537.  
  538. struct producer_queue_item *pq = (typeof(pq))malloc(sizeof(*pq));
  539. pthread_cond_init(&pq->cond, 0); // cond по которому его разбудят
  540. pq->next = 0;
  541. pq->item = p; // сохраним данные на время сна
  542.  
  543. #if DEBUG
  544. pq->tid = gettid();
  545. #endif
  546.  
  547. // поместим себя в очередь спящих производителей
  548. if (vq->pqtail)
  549. vq->pqtail->next = pq;
  550. else
  551. vq->pqhead = pq;
  552. vq->pqtail = pq;
  553. asm volatile ("" : : : "memory");
  554. // зафиксируем изменения очереди в памяти
  555.  
  556. #if DEBUG
  557. int at = 0; // счетчик циклов пробуждения
  558. #endif
  559.  
  560. do { // пойдем спать до появления свободного места в очереди данных
  561.  
  562. #if DEBUG
  563. printf ("%ld prod cond wait (cnt: %d at: %d) %s",
  564. (long)gettid(), vq->cnt, at++, (char *)(p->data));
  565. pq->signaled = 0;
  566. #endif
  567.  
  568. pthread_cond_wait(&pq->cond, &q->lock);
  569. } while(vq->max <= vq->cnt && vq->is_alive);
  570. // проснулись и владеем очередью
  571.  
  572. /*
  573. Вот тонкий момент. Порядок активизации потоков не определен,
  574. а нам надо соблюдать очередность данных.
  575. Поэтому переустановим локальные переменные из очереди,
  576. хотя это могут быть данные, положенные туда другим потоком.
  577. */
  578. #if DEBUG
  579. if (pq != vq->pqhead) {
  580. printf ("BAAAD %ld (cnt: %d at: %d) %s",
  581. (long)gettid(), vq->cnt, at, (char *)(p->data));
  582. print_queue(q, 0);
  583. if (vq->is_alive)
  584. exit(1); // совсем плохо, такого быть не должно
  585. else
  586. puts("CONTINUE");
  587. }
  588. #endif
  589.  
  590. pq = vq->pqhead; // в любом случае берем голову очереди производителей
  591. if ((vq->pqhead = pq->next) == 0) // и удаляем ее
  592. vq->pqtail = 0;
  593. asm volatile ("" : : : "memory");
  594. p = pq->item;
  595. free(pq);
  596.  
  597. #if DEBUG
  598. printf ("%ld prod enqueued after wait (cnt: %d at: %d) %s",
  599. (long)gettid(), vq->cnt, at, (char *)(p->data));
  600. #endif
  601.  
  602. }
  603.  
  604. // вот тут реально кладем data в очередь для потребителей
  605. if (vq->tail)
  606. vq->tail->next = p;
  607. else {
  608. vq->head = p;
  609. // очередь была пуста, а теперь нет -- надо разбудить потребителей
  610. pthread_cond_broadcast(&q->cond);
  611. }
  612. vq->tail = p;
  613. vq->cnt++;
  614. asm volatile ("" : : : "memory");
  615. // сбросим изменения очереди в память
  616.  
  617. // разрешаем доступ всем снова
  618. pthread_mutex_unlock(&q->lock);
  619. }
  620.  
  621. #if DEBUG
  622. #define cond_signal_producer(q) ({ \
  623. if ((q)->pqhead) { \
  624. (q)->pqhead->signaled = 1; \
  625. pthread_cond_signal(&(q)->pqhead->cond); \
  626. } \
  627. })
  628. #else
  629. #define cond_signal_producer(q) ({ \
  630. if ((q)->pqhead) \
  631. pthread_cond_signal(&(q)->pqhead->cond); \
  632. })
  633. #endif
  634.  
  635. void *
  636. dequeue(struct producer_consumer_queue *q)
  637. {
  638. volatile struct producer_consumer_queue *vq = q;
  639.  
  640. // получаем эксклюзивный доступ к очереди:
  641. pthread_mutex_lock(&q->lock);
  642.  
  643. // если есть спящие производители, то разбудим первого
  644. cond_signal_producer(vq);
  645. while (!vq->head && vq->is_alive) {
  646. // очередь пуста, делать нечего, ждем...
  647. pthread_cond_wait(&q->cond, &q->lock);
  648. // wait разрешает доступ другим на время ожидания
  649. }
  650.  
  651. // запоминаем текущий элемент или 0, если очередь умерла
  652. struct producer_consumer_queue_item *p = vq->head;
  653. if (p) {
  654. // и удаляем его из очереди
  655. vq->head = vq->head->next;
  656. if (!vq->head)
  657. vq->tail = vq->head;
  658. vq->cnt--;
  659. asm volatile ("" : : : "memory");
  660. // сбросим изменения очереди в память
  661. // разбудим первого поставщика в их очереди
  662. cond_signal_producer(vq);
  663. }
  664.  
  665. // возвращаем эксклюзивный доступ другим участникам
  666. pthread_mutex_unlock(&q->lock);
  667.  
  668. // отдаём данные
  669. void *data = p ? p->data : 0; // 0 означает, что данных больше не будет
  670. // согласно 7.20.3.2/2, можно не проверять на 0
  671. free(p);
  672. return data;
  673. }
  674.  
  675. struct producer_consumer_queue *
  676. producer_consumer_queue_create(int max)
  677. {
  678. struct producer_consumer_queue *q = (typeof(q))malloc(sizeof(*q));
  679. q->head = q->tail = 0;
  680. q->pqhead = q->pqtail = 0;
  681. q->is_alive = 1;
  682. q->max = max;
  683. q->cnt = 0;
  684. pthread_mutex_init(&q->lock, 0);
  685. pthread_cond_init(&q->cond, 0);
  686.  
  687. return q;
  688. }
  689.  
  690. // И процедура для закрытия очереди:
  691.  
  692. void
  693. producer_consumer_queue_stop(struct producer_consumer_queue *q)
  694. {
  695. volatile struct producer_consumer_queue *vq = q;
  696. // для обращения к разделяемым переменным необходим эксклюзивный доступ
  697. pthread_mutex_lock(&q->lock);
  698.  
  699. vq->is_alive = 0;
  700. pthread_cond_broadcast(&q->cond); // разбудим потребителей
  701. volatile struct producer_queue_item *pq;
  702. for (pq = vq->pqhead; pq; pq = pq->next) {
  703. #if DEBUG
  704. pq->signaled = 1;
  705. asm volatile ("" : : : "memory");
  706. #endif
  707. // будим каждого ждущего производителя
  708. pthread_cond_signal((pthread_cond_t *)&pq->cond);
  709. }
  710.  
  711. pthread_mutex_unlock(&q->lock);
  712. }
  713.  
  714. // это поток-потребитель
  715. void *
  716. consumer_thread (void *arg)
  717. {
  718. struct producer_consumer_queue *q = (typeof(q))arg;
  719.  
  720. printf("Consumer: %ld\n", (long)gettid());
  721. for (;;) {
  722. void *data = dequeue(q);
  723. // это сигнал, что очередь окончена
  724. if (!data)
  725. break; // значит, пора закрывать поток
  726.  
  727. char *str = (char *)data;
  728. // тут наша обработка данных
  729. printf ("%ld consuming: %s", (long)gettid(), str);
  730. sleep(rand() % 3 + 1); // 2000 заменил на 2 avp
  731. // printf ("%ld consumed: %s", (long)gettid(), str);
  732. free(str);
  733. }
  734. printf ("%ld cons exit\n", (long)gettid());
  735. return 0;
  736. }
  737.  
  738. // делаем тестовые строки с последовательными номерами
  739. char *
  740. getdata()
  741. {
  742. char *str = (char *)malloc(100);
  743. volatile static int seq = 0;
  744. static pthread_mutex_t seqlock = PTHREAD_MUTEX_INITIALIZER;
  745.  
  746. pthread_mutex_lock(&seqlock);
  747. sprintf(str, "%ld seq = %d\n", (long)gettid(), ++seq);
  748. printf("%s", str);
  749. pthread_mutex_unlock(&seqlock);
  750.  
  751. return str;
  752. }
  753.  
  754. // это поток-производитель
  755. void *
  756. producer_thread (void *arg)
  757. {
  758. struct producer_consumer_queue *q = (typeof(q))arg;
  759.  
  760. printf("Producer: %ld\n", (long)gettid());
  761. while (q->is_alive) {
  762. enqueue(getdata(), q);
  763. sleep(rand() % 2 + 1);
  764. }
  765. printf ("%ld prod exit\n", (long)gettid());
  766. return 0;
  767. }
  768.  
  769. void
  770. print_queue (struct producer_consumer_queue *q, int lock)
  771. {
  772. if (lock)
  773. pthread_mutex_lock(&q->lock);
  774.  
  775. printf ("cnt = %d queue:\n", q->cnt);
  776. struct producer_consumer_queue_item *p;
  777. for (p = q->head; p; p = p->next)
  778. printf ("%s", (char *)(p->data));
  779. puts ("producers queue:");
  780. struct producer_queue_item *pq;
  781. for (pq= q->pqhead; pq; pq = pq->next)
  782. #if DEBUG
  783. printf ("%ld (%d) %s",
  784. (long)pq->tid, pq->signaled, (char *)(pq->item->data));
  785. #else
  786. printf ("%s", (char *)(pq->item->data));
  787. #endif
  788.  
  789. puts("");
  790.  
  791. if (lock)
  792. pthread_mutex_unlock(&q->lock);
  793. }
  794.  
  795. int
  796. main (int ac, char *av[])
  797. {
  798. int swp = 0, n = av[1] ? atoi(av[1]) : 5;
  799. if (n < 0) {
  800. n = -n; swp = 1;
  801. }
  802. if (n < 2)
  803. n = 5;
  804.  
  805. int i, nc = n / 2, np = n - nc;
  806. if (swp) {
  807. swp = nc; nc = np; np = swp;
  808. }
  809. printf ("test %d consumers and %d producers\n", nc, np);
  810.  
  811. pthread_t threads[n];
  812. void *res = 0;
  813.  
  814. char *in = NULL;
  815. size_t sz;
  816.  
  817.  
  818.  
  819. // создадим очередь:
  820. struct producer_consumer_queue *q = producer_consumer_queue_create(5); //add struct
  821.  
  822. // и потоки-«потребители» (изм. consumer на consumer_thread)
  823. for (i = 0; i < nc; i++)
  824. pthread_create(threads + i, 0, consumer_thread, (void *)q);
  825.  
  826. // и потоки "производители"
  827. for (; i < n; i++)
  828. pthread_create(threads + i, 0, producer_thread, (void *)q);
  829.  
  830. // главный цикл
  831. // получаем данные с клавиатуры: (изм. убрал l, добавил puts())
  832. while ((puts("Enter (for info or ^D for exit)"),
  833. getline(&in, &sz, stdin)) > 0) {
  834. #if 0
  835. enqueue(in, q);
  836. in = NULL;
  837. #endif
  838. print_queue(q, 1);
  839. }
  840.  
  841. producer_consumer_queue_stop(q);
  842. puts("Fin2");
  843. print_queue(q, 1);
  844.  
  845. for (i = 0; i < n; i++)
  846. if (pthread_join(threads[i], &res))
  847. perror("join");
  848.  
  849. return (long)res;
  850. }
  851. /*
  852. avp 2011
  853. there are no gettid() in Linux libc
  854. so make it
  855. */
  856.  
  857. #include <sys/types.h>
  858. #include <sys/syscall.h>
  859.  
  860. pid_t
  861. gettid()
  862. {
  863. return syscall(SYS_gettid);
  864. }
Advertisement
RAW Paste Data Copied
Advertisement