Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- * prod.c - the program reads lines of text from the standard input
- * and sends them to the POSIX message queue, specified with
- * the first parameter. Optional second parameter can specify
- * the queue length (default: 1), and the third: sleep time before
- * each message was sent.
- * Example call:
- * ./prod /mq 5
- */#include "mqcommon.h"
- int main(int argc, char *argv[])
- {
- char buf[25], *mq_name;
- int msgnr = 0, quelen = 0, timeout=0;
- mqd_t mqdes;
- struct mq_attr attr;
- if (argc < 2 || argc > 4)
- err_quit("usage: prod MQ_NAME [quelen [ timeout]]\n\n");
- mq_name = argv[1];
- if (argc > 2)
- quelen = atoi(argv[2]);
- if (quelen < 1)
- quelen = 1;
- if (argc > 3)
- timeout = atoi(argv[3]);
- //remove existing queue (we don't care if it succeeds)
- Mq_unlink(mq_name);
- attr.mq_maxmsg = quelen; // queue length
- attr.mq_msgsize = sizeof(buf); // message length
- // create the queue
- mqdes = Mq_open(mq_name, O_RDWR | O_CREAT, FILE_MODE, &attr);
- fprintf(stderr, "Message queue %s created, PID=%ld\n", mq_name,
- (long)getpid());
- Mq_getattr(mqdes, &attr);
- fprintf(stderr, "maxmsg=%ld, msgsize%ld, in queue: %ld\n",
- attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
- while (fgets(buf, sizeof(buf), stdin)) {
- char *ptr;
- int pri = msgnr % 3;
- ptr = strchr(buf, '\n');
- if (ptr)
- *ptr = '\0';
- else
- buf[sizeof(buf) - 1] = '\0';
- if (timeout>0) sleep(timeout);
- if (Mq_send(mqdes, buf, strlen(buf) + 1, pri))
- break;
- msgnr++;
- printf("Prod: sent msg nr %d, pri=%d\n", msgnr, pri);
- }
- // close the producer access to the queue
- Mq_close(mqdes);
- printf("Prod: exiting, msgnr=%d\n", msgnr);
- exit(0);
- }
- /* cons.c - the program reads messages from the POSIX message queue,
- * specified with the first parameter. Optional parameters can specify
- * - delay after reading a message (before an attempt to read again),
- * - timeout -i.e. the maximum waiting time for a meassage.
- * Example call:
- * ./cons /mq 1 20
- */
- #include "mqcommon.h"
- #include <signal.h>
- #include <stdlib.h>
- #include <time.h>
- #include <unistd.h>
- static struct sigaction sa;
- void alrm(int sig)
- {
- return;
- }
- int main(int argc, char *argv[])
- {
- char buf[25], *mq_name;
- int msgnr = 0, sleep_time = 0, timeout = 0;
- mqd_t mqdes;
- struct mq_attr attr;
- struct timespec req_sleep_time, rem;
- unsigned int pri;
- if (argc < 2 || argc > 4)
- err_quit("usage: cons MQ_NAME [sleep_time [timeout]]\n\n");
- mq_name = argv[1];
- if (argc >= 3)
- sleep_time = atoi(argv[2]);
- req_sleep_time.tv_sec = sleep_time;
- req_sleep_time.tv_nsec = 0;
- if (argc >= 4)
- timeout = atoi(argv[3]);
- mqdes = Mq_open_short(mq_name);
- sa.sa_handler = alrm;
- sigaction(SIGALRM, &sa, NULL);
- Mq_getattr(mqdes, &attr);
- fprintf(stderr, "maxmsg=%ld, msgsize=%ld, in queue: %ld\n",
- attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
- if (attr.mq_msgsize > sizeof(buf)) {
- fprintf(stderr, "Buffer is too short\n");
- exit(1);
- }
- while (1) {
- int ret;
- if (timeout > 0)
- alarm(timeout);
- if ((ret = Mq_receive(mqdes, buf, sizeof(buf), &pri)) < 0)
- break;
- alarm(0);
- msgnr++;
- printf("Cons: received msg nr %d, pri=%d, ret=%d\n", msgnr, pri,
- ret);
- buf[sizeof(buf) - 1] = '\0';
- puts(buf); //write(1,buf,attr.mq_msgsize);
- if (sleep_time > 0) {
- while (nanosleep(&req_sleep_time, &rem) == -1
- && errno == EINTR) {
- req_sleep_time.tv_sec = rem.tv_sec;
- req_sleep_time.tv_nsec = rem.tv_nsec;
- }
- }
- }
- Mq_close(mqdes);
- printf("Cons: exiting, msgnr=%d\n", msgnr);
- exit(0);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement