Advertisement
Guest User

Untitled

a guest
Dec 7th, 2019
102
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.49 KB | None | 0 0
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <time.h>
  5. #include <dlfcn.h>
  6. #include <pthread.h>
  7.  
  8. #include "pubsub.h"
  9.  
  10. // each item will have an amount, place of purchase, and description.
  11. // the timestamp will be filled in by the pubsub engine when publish is called.
  12. // next is used to build a linked list
  13. struct item {
  14. int64_t timestamp_ms;
  15. float amount;
  16. char place[MAX_PLACE_LEN];
  17. char description[MAX_DESCRIPTION_LEN];
  18. struct item *next;
  19. };
  20.  
  21. int activePubs = 0; //number of currently active publisher
  22.  
  23. struct pub_args {
  24. pub_init_t pub_init_fn;
  25. char* arg;
  26. publish_t publish_fn;
  27. };
  28.  
  29. void pub_caller (pub_args* arg){
  30. activePubs++;
  31. arg.pub_init_fn(arg.arg, arg.publish_fn);
  32. activePubs--;
  33. }
  34.  
  35. // this is a simple implementation of a linked list.
  36. // we don't need to worry about thread safety because this simple implementation
  37. // is single threaded
  38. struct item *head = NULL;
  39. // keeping track of the tail simplifies the code a bit and makes the implementation
  40. // more efficient
  41. struct item *tail = NULL;
  42.  
  43. pthread_mutex_t lock; //Lock for mutex
  44.  
  45.  
  46. // return the current time in milliseconds
  47. int64_t getnow_ms() {
  48. struct timespec res;
  49. clock_gettime(CLOCK_REALTIME, &res);
  50. return res.tv_sec * 1000 + res.tv_nsec / 1000000;
  51. }
  52.  
  53. // publish a purchase to all subscribers. we will make a copy of the strings
  54. // because there is no guarantee they will stick around after the function returns.
  55. void simple_publish(float amount, const char *place, const char *description) {
  56. pthread_mutex_lock(&lock);
  57.  
  58. struct item *i = malloc(sizeof(*i));
  59. i->timestamp_ms = getnow_ms();
  60. i->amount = amount;
  61. strcpy(i->place, place);
  62. strcpy(i->description, description);
  63.  
  64. // add to end end of the list
  65. i->next = NULL;
  66. if (tail == NULL) {
  67. head = tail = i;
  68. } else {
  69. tail->next = i;
  70. tail = i;
  71. }
  72.  
  73. pthread_mutex_unlock(&lock);
  74. }
  75.  
  76. struct item *current;
  77. // returns the next element of the list of purchases published. normally this would block
  78. // if there is nothing to return and a publisher is still running.
  79. // when all publishers are finished and there is nothing left to return timestamp_ms will be -1
  80. void simple_retrieve(int64_t *timestamp_ms, float *amount, char *place, char *description) {
  81. if (current == NULL) {
  82. *timestamp_ms = -1;
  83. return;
  84. }
  85. *timestamp_ms = current->timestamp_ms;
  86. *amount = current->amount;
  87. strcpy(place, current->place);
  88. strcpy(description, current->description);
  89. current = current->next;
  90. }
  91.  
  92. int main(int argc, char **argv)
  93. {
  94. if (argc < 2 || (argc % 2) == 0) {
  95. printf("USAGE: %s pub_sub_so1 param1 pub_sub_so2 param2 ...\n", argv[0]);
  96. return 2;
  97. }
  98. int pub_count = 0;
  99. int sub_count = 0;
  100. pthread_mutex_init(&lock, NULL);
  101.  
  102. // we are allocating for the maximum possible, probably every
  103. // argument will not be both a pub and a sub
  104. pub_init_t *pubs = malloc(sizeof(*pubs) * (argc/2));
  105. sub_init_t *subs = malloc(sizeof(*subs) * (argc/2));
  106. char **pubs_arg = malloc(sizeof(*pubs_arg) * (argc/2));
  107. char **subs_arg = malloc(sizeof(*subs_arg) * (argc/2));
  108.  
  109. // we load in all the libraries specified on the command line. the library may
  110. // have a publisher, subscriber, or both!
  111. for (int i = 1; i < argc; i += 2) {
  112. void *dh = dlopen(argv[i], RTLD_LAZY);
  113. if (dh == NULL) {
  114. fprintf(stderr, "%s\n", dlerror());
  115. continue;
  116. }
  117. pub_init_t p = dlsym(dh, "pub_init");
  118. sub_init_t s = dlsym(dh, "sub_init");
  119. if (p) {
  120. pubs_arg[pub_count] = argv[i+1];
  121. pubs[pub_count++] = p;
  122.  
  123. }
  124. if (s) {
  125. subs_arg[sub_count] = argv[i+1];
  126. subs[sub_count++] = s;
  127. }
  128. }
  129.  
  130. pthread_t pubTID[pub_count];
  131. pthread_t subTID[sub_count];
  132. // do all the pubs first (this might fail if the pubs are also subs...)
  133. for (int i = 0; i < pub_count; i++) {
  134. pub_args newPub;
  135. newPub.pub_init_fn = pubs[i];
  136. newPub.arg = pubs_arg[i];
  137. newPub.publish_fn = simple_publish;
  138. pthread_create(&(pubTID[i]), NULL, pub_caller, newPub);
  139. }
  140.  
  141. for (int i = 0; i < sub_count; i++) {
  142. current = head;
  143. subs[i](subs_arg[i], simple_retrieve);
  144. pthread_create(&(subTID[i]), NULL, &subs[i](subs_arg[i], simple_retrieve), NULL)
  145. }
  146.  
  147. return 0;
  148. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement