Advertisement
Guest User

Untitled

a guest
May 20th, 2019
68
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.24 KB | None | 0 0
  1. #define _POSIX_C_SOURCE 200809
  2. #include <signal.h>
  3. #include <stdatomic.h>
  4. #include <unistd.h>
  5. #include <pthread.h>
  6. #include <sys/prctl.h>
  7. #include <assert.h>
  8. #include <sys/wait.h>
  9. #include <stdio.h>
  10. #include <stdbool.h>
  11. #include <poll.h>
  12. #include <string.h>
  13. #include <stdlib.h>
  14.  
  15. /**
  16. * Testing arbitrary and cycling piping setups.
  17. *
  18. * Current logic error is the an EOF can be sent to a successor if the current has closed,
  19. * but there may be a situation that current still has output to propagate. Might need
  20. * a way to pump output from the close_proc fn.
  21. *
  22. * Compile with clang -g -std=c11 pipeon.c -lpthread
  23. */
  24.  
  25. #define DEFAULT_SUCCS 10
  26. #define DEBUG true
  27. /*#define DEBUG false */
  28. bool needs_signal_cleanup;
  29. bool run_waiter;
  30.  
  31. struct process_comms {
  32. // stdin, stdout
  33. int to_child[2];
  34. int from_child[2];
  35. pid_t pid;
  36.  
  37. const char* proc_name;
  38.  
  39. // this process can have multiple successors
  40. int num_succs;
  41. // amount of space in the successors array
  42. int capacity;
  43. struct process_comms** successors;
  44.  
  45. // as we need to know whether all inputs to a prog have closed?
  46. int num_preds;
  47.  
  48. // whether we should ignore this in the cleanup
  49. bool closed;
  50.  
  51. // whether all output from this process has been forwarded on
  52. atomic_bool can_close_successors;
  53. };
  54.  
  55. // chain of the active processes (could do with a better name)
  56. struct active_processes {
  57. struct active_processes* prev;
  58. struct active_processes* next;
  59.  
  60. struct process_comms* payload;
  61. };
  62.  
  63. /* append succ to the successor of parent, resizing if necessary */
  64. void add_successor(struct process_comms* parent, struct process_comms* succ) {
  65. if (parent->num_succs == parent->capacity) {
  66. parent->capacity *= 2;
  67. parent->successors = realloc(parent->successors, sizeof(struct process_comms*) * parent->capacity);
  68. }
  69.  
  70. parent->successors[parent->num_succs++] = succ;
  71. succ->num_preds++;
  72. }
  73.  
  74. // be sure to use the mutex when reading or writing!
  75. struct active_processes* root = NULL;
  76. pthread_mutex_t linked_list_mut;
  77.  
  78. // append into the linked list
  79. void append_active_proc(struct process_comms* proc) {
  80. pthread_mutex_lock(&linked_list_mut);
  81.  
  82. if (root == NULL) {
  83. root = malloc(sizeof(struct active_processes));
  84. root->prev = NULL;
  85. root->next = NULL;
  86. root->payload = proc;
  87. } else {
  88. struct active_processes* new = malloc(sizeof(struct active_processes));
  89. struct active_processes* last = root;
  90. while (last->next != NULL) last = last->next;
  91. last->next = new;
  92. new->prev = last;
  93. new->next = NULL;
  94. new->payload = proc;
  95. }
  96.  
  97. pthread_mutex_unlock(&linked_list_mut);
  98. }
  99.  
  100. // called when signal occurs
  101. void process_closure(int signum) {
  102. assert(signum == SIGCHLD);
  103. // XXX: hmm, we probably should just flip a signal here?
  104. __atomic_store_n(&needs_signal_cleanup, true, __ATOMIC_SEQ_CST);
  105. }
  106.  
  107. /* make the process and add it into the linked list */
  108. struct process_comms* make_process(char* const * program) {
  109. struct process_comms* proc = malloc(sizeof(struct process_comms));
  110. proc->capacity = DEFAULT_SUCCS;
  111. proc->successors = malloc(sizeof(struct process_comms*)* proc->capacity);
  112. proc->num_succs = 0;
  113. proc->num_preds = 0;
  114. proc->closed = false;
  115. proc->proc_name = program[0];
  116. atomic_store(&proc->can_close_successors, false);
  117.  
  118. bool res = pipe(proc->to_child) < 0;
  119. res |= pipe(proc->from_child) < 0;
  120.  
  121. pid_t pid = fork();
  122. // our process to exec
  123. if (pid == 0) {
  124. close(proc->to_child[1]);
  125. close(proc->from_child[0]);
  126. dup2(proc->to_child[0], STDIN_FILENO);
  127. dup2(proc->from_child[1], STDOUT_FILENO);
  128.  
  129. prctl(PR_SET_PDEATHSIG, SIGTERM);
  130. execvp(program[0], program);
  131. // reach here? something borked
  132. exit(EXIT_FAILURE);
  133. } else {
  134. proc->pid = pid;
  135. // the parent needs to close the ends
  136. // close the reading end of stdin
  137. close(proc->to_child[0]);
  138. // close the writing end of stdout
  139. close(proc->from_child[1]);
  140. }
  141.  
  142. return proc;
  143. }
  144.  
  145. /* close this process, potentially recursively closing others if the successors for this one
  146. * have all predecessors closed
  147. * closing means to close the stdin. */
  148. void close_proc(struct process_comms* proc) {
  149. // awkward recursive closure case?
  150. if (proc->closed) return;
  151.  
  152. // close the stdin (this may either be a terminated process, or a running process)
  153. int res = close(proc->to_child[1]);
  154. if (DEBUG) printf("close status: %d\n", res);
  155. proc->closed = true;
  156.  
  157. if (DEBUG) printf("about to close %s\n", proc->proc_name);
  158.  
  159. // spin until we can continue. XXX: add some yield?
  160. while (!atomic_load(&proc->can_close_successors));
  161.  
  162. if (DEBUG) printf("closing %s\n", proc->proc_name);
  163.  
  164. for (int i = 0; i < proc->num_succs; ++i) {
  165. struct process_comms* next = proc->successors[i];
  166. next->num_preds--;
  167. if (next->num_preds == 0) {
  168. if (next->closed) continue;
  169. if (DEBUG) printf("EOF sent to: %s\n", next->proc_name);
  170. // changed this to be non-recursive...the signal can catch the later ones
  171. close(next->to_child[1]);
  172.  
  173. // close this one too, as it has no predecessors remaining.
  174. /*close_proc(proc->successors[i]);*/
  175. }
  176. }
  177. }
  178.  
  179. // assumes mutex has been properly obtained
  180. void remove_linked_list_node(struct active_processes* proc) {
  181. if (proc->prev) {
  182. proc->prev->next = proc->next;
  183. }
  184. if (proc->next) {
  185. proc->next->prev = proc->prev;
  186. }
  187.  
  188. if (root == proc) {
  189. root = proc->next;
  190. }
  191.  
  192. free(proc);
  193. }
  194.  
  195. /* a long running thread that checks whether there are signals and deals with them */
  196. void* proc_waiter(void* arg) {
  197. // silence IDE's warning, bleh
  198. (void) (void*) arg;
  199.  
  200. while (__atomic_load_n(&run_waiter, __ATOMIC_SEQ_CST)) {
  201. // XXX: switch to weak? we're in a loop
  202. // use CAS to toggle needs_signal_cleanup to false if it's true, and enter the conditional
  203. bool does_need_cleanup = true;
  204. if (__atomic_compare_exchange_n(&needs_signal_cleanup, &does_need_cleanup, false, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
  205.  
  206. // go through and find the matching process(es) [signal can catch multiple at once]
  207. // ensure that we have all children close events
  208. while (true) {
  209. int status;
  210. pid_t captured_pid = waitpid(-1, &status, WNOHANG);
  211. if (captured_pid <= 0) break;
  212. if (DEBUG) printf("closing pid: %d\n", captured_pid);
  213.  
  214. pthread_mutex_lock(&linked_list_mut);
  215. // find the pid and remove it from the linked list
  216. struct active_processes* curr = root;
  217. while (curr != NULL) {
  218.  
  219. if (curr->payload->pid == captured_pid) {
  220. // close this process, and potentially close successors if they're ready
  221. close_proc(curr->payload);
  222.  
  223. remove_linked_list_node(curr);
  224.  
  225. break;
  226. }
  227.  
  228. curr = curr->next;
  229. }
  230. pthread_mutex_unlock(&linked_list_mut);
  231. }
  232. }
  233. }
  234.  
  235. return NULL;
  236. }
  237.  
  238. /* read the output for proc and if it has a successor, pipe to them, otherwise printf */
  239. void* pump_output(void* arg) {
  240. struct process_comms* proc = arg;
  241. const int buflen = 1024;
  242. char buffer[buflen];
  243. FILE* p1_reader = fdopen(proc->from_child[0], "r");
  244. char* res;
  245. while ((res = fgets(buffer, buflen - 1, p1_reader))) {
  246. if (DEBUG) printf("read line for %s: %s", proc->proc_name, buffer);
  247. int output_len = strlen(buffer);
  248. if (proc->num_succs > 0) {
  249. if (DEBUG) printf("piping");
  250. for (int i = 0; i < proc->num_succs; ++i) {
  251. write(proc->successors[i]->to_child[1], buffer, output_len);
  252. }
  253. } else {
  254. printf("%s", buffer);
  255. }
  256. }
  257.  
  258. // let the proc_waiter thread know it can finish
  259. atomic_store(&proc->can_close_successors, true);
  260. if (DEBUG) printf("can close true for: %s\n", proc->proc_name);
  261.  
  262. return NULL;
  263. }
  264.  
  265. pthread_t pumper_thread(struct process_comms* proc) {
  266. pthread_t proc_pumper;
  267. int t_suc = pthread_create(&proc_pumper, NULL, pump_output, proc);
  268. if (DEBUG) printf("THREAD 2: %d\n", t_suc);
  269. return proc_pumper;
  270. }
  271.  
  272. void wait_pumper(pthread_t proc_thr) {
  273. int res = pthread_join(proc_thr, NULL);
  274. if (DEBUG) printf("THREAD join: %d\n", res);
  275. }
  276.  
  277. int main(int argc, char* argv[]) {
  278. if (DEBUG) puts("****** START *******");
  279.  
  280. // whether the waitpid thread should keep spinning
  281. __atomic_store_n(&run_waiter, true, __ATOMIC_SEQ_CST);
  282. pthread_mutex_init(&linked_list_mut, NULL);
  283.  
  284. // setup for us to indicate whether we need to close process' pipes
  285. signal(SIGCHLD, process_closure);
  286.  
  287. // thread to lookout for SIGCHLDs
  288. pthread_t process_waiter;
  289. int t_suc = pthread_create(&process_waiter, NULL, proc_waiter, NULL);
  290. if (DEBUG) printf("THREAD 1: %d\n", t_suc);
  291.  
  292. char* prog1[] = {"/bin/echo", "burgers are highly regarded", NULL};
  293. char* prog2[] = {"/bin/grep", "-o", "gh", NULL};
  294. char* prog3[] = {"/bin/cat", NULL};
  295.  
  296. struct process_comms* proc1 = make_process(prog1);
  297. struct process_comms* proc2 = make_process(prog2);
  298. struct process_comms* proc3 = make_process(prog3);
  299. append_active_proc(proc1);
  300. append_active_proc(proc2);
  301. append_active_proc(proc3);
  302.  
  303. // connect echo to grep
  304. add_successor(proc1, proc2);
  305. // can even have this too, to forward output
  306. /*add_successor(proc1, proc2);*/
  307. add_successor(proc2, proc3);
  308.  
  309. pthread_t proc1_pumper = pumper_thread(proc1);
  310. pthread_t proc2_pumper = pumper_thread(proc2);
  311. pthread_t proc3_pumper = pumper_thread(proc3);
  312.  
  313. wait_pumper(proc1_pumper);
  314. wait_pumper(proc2_pumper);
  315. wait_pumper(proc3_pumper);
  316.  
  317. __atomic_store_n(&run_waiter, false, __ATOMIC_SEQ_CST);
  318. int res = pthread_join(process_waiter, NULL);
  319. if (DEBUG) printf("THREAD 1 join: %d\n", res);
  320.  
  321. // cleanup
  322. struct active_processes* curr = root;
  323. while (curr != NULL) {
  324. struct active_processes* tmp = curr->next;
  325. remove_linked_list_node(curr);
  326. curr = tmp;
  327. }
  328.  
  329. if (DEBUG) puts("****** END *******");
  330. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement