Advertisement
Guest User

Untitled

a guest
May 22nd, 2019
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 16.08 KB | None | 0 0
  1. #include "mpi.h"
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <assert.h>
  5. #include <pthread.h>
  6. #include <unistd.h>
  7. #include <time.h>
  8. #define NUM_PROC 20
  9. #define MSG_HELLO 100
  10.  
  11. int male = -1;
  12. int state = 0;
  13. int previous_state;
  14. int room = -1;
  15. int timer;
  16. int received_messages = 0;
  17. int expected_messages = NUM_PROC - 1;
  18. int rank;
  19. int proc_num = NUM_PROC;
  20. int max_time = -1;
  21. int mes_queue[NUM_PROC] = {-1};
  22. int mes_queue_indx = 0;
  23. int room_av[9] = {0};
  24. int room_capacity = 2;
  25. int my_room = -1;
  26. // n - kobiety w ntej szatni (np room_av[0], room_av[3], room_av[6])
  27. // n + 1 - mężczyźni w ntej szatni (np room_av[1], room_av[4], room_av[7])
  28. // n + 2 - liczba zajętych szafek w ntej szatni (np room_av[2], room_av[5], room_av[8])
  29. int visited_pool_num = 0;
  30. int was_on_pool = -1;
  31. int messages_sent[NUM_PROC] = {-1};
  32. int additional_messages = 0;
  33.  
  34. pthread_mutex_t lock0 = PTHREAD_MUTEX_INITIALIZER;
  35. pthread_cond_t cond0 = PTHREAD_COND_INITIALIZER;
  36.  
  37. pthread_mutex_t lock1 = PTHREAD_MUTEX_INITIALIZER;
  38. pthread_cond_t cond1 = PTHREAD_COND_INITIALIZER;
  39.  
  40. int own_rand(int start, int end){
  41. int rnd = rand();
  42. int range = end-start;
  43. return start + (rnd%range);
  44. }
  45.  
  46. void exit_with_error(char* err){
  47. printf(err);
  48. printf("Liczba wizyt na basenie: %d\n", visited_pool_num);
  49. exit(1234);
  50. }
  51.  
  52. int better_priority(int r_rank, int r_timer, int r_prev_state){
  53. if(r_prev_state == 4){ // 4 - stan basen
  54. if(previous_state == 4){
  55. if(r_timer == timer){
  56. if(r_rank < rank) return 0;
  57. } else {
  58. if(r_timer < timer) return 0;
  59. }
  60. } else {
  61. return 0;
  62. }
  63. } else {
  64. if(previous_state != 4){
  65. if(r_timer == timer){
  66. if(r_rank < rank) return 0;
  67. } else {
  68. if(r_timer < timer) return 0;
  69. }
  70. }
  71. }
  72.  
  73. return 1;
  74. }
  75.  
  76. void *wait_for_message(void *arguments){
  77. int msg[4] = {-1};
  78. while(1){
  79. sleep(0.01);
  80. MPI_Status status;
  81. MPI_Recv(msg, 4, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
  82. int sender = status.MPI_SOURCE;
  83. // printf("%d Received: sender %d, msg0 %d, msg1 %d, msg2 %d, msg3 %d, state %d\n", rank, sender, msg[0], msg[1], msg[2], msg[3], state);
  84. int received_message_state = msg[0];
  85. int received_time;
  86. int r_timer;
  87. int r_previous_state;
  88.  
  89. switch (state) {
  90. case 0:
  91. // sekcja lokalna
  92. switch(received_message_state){
  93. // wiadomości z liczbą ostatnią cyfrą == 1 (np 21) -> wiadomość pytanie
  94. // wiadomości z liczbą ostatnią cyfrą == 0 (np 20) -> wiadomość odpowiedź
  95. case 0:
  96. received_messages++; //zwiększamy liczbę otrzymanych wiadomości
  97. // printf("%d: received %d messages\n", rank, received_messages);
  98. received_time = msg[1];
  99. if(received_time > max_time) max_time = received_time;
  100. if(received_messages == proc_num - 1){
  101. received_messages = 0;
  102. pthread_cond_signal(&cond0);
  103. }
  104. break;
  105. case 1:
  106. send_case_1(sender);
  107. break;
  108. case 11:
  109. // printf("%d\n", msg[0]);
  110. send_case_11(sender);
  111. break;
  112. case 21:
  113. send_case_21(sender);
  114. break;
  115. case 30:
  116. break;
  117. default:
  118. printf("%d: MSG STATE %d\n", rank, received_message_state);
  119. exit_with_error("ERROR R case 0\n");
  120. break;
  121. }
  122. break;
  123. case 1:
  124. // P1
  125. switch(received_message_state){
  126. case 1:
  127. send_case_1(sender);
  128. break;
  129. case 11:
  130. r_timer = msg[1];
  131. r_previous_state = msg[2];
  132.  
  133. // printf("%d: Sender %d\n", rank, sender);
  134. if(!better_priority(sender, r_timer, r_previous_state && messages_sent[sender] > -1)){ // jeśli ma lepszy priorytet i wysłaliśmmy mu już wiadomość
  135. additional_messages++; //zwiększamy ilość oczekiwanych wiadomości o 1
  136. // printf("sdfsdfsfds\n");
  137. send_msg(sender,11, timer, previous_state, -1); // wysyłamy kolejną wiadomość
  138. }
  139.  
  140. if(better_priority(sender, r_timer, r_previous_state)){ //mamy lepszy priorytet lub proces już nam pozowolił wejść
  141. // kolejkujemy odebraną wiadomość do późniejszego odesłania
  142. mes_queue[mes_queue_indx] = sender;
  143. // printf("%d kolejkuje %d\n", rank, sender);
  144. mes_queue_indx++;
  145. } else {
  146. // printf("%d: Lepsze %d\n", rank, sender);
  147. send_case_11(sender);
  148. }
  149. break;
  150. case 10:
  151. received_messages++; //zwiększamy liczbę otrzymanych wiadomości
  152. messages_sent[sender] = 0; // odznaczamy, że dostaliśmy wiadomość od tego procesu
  153. // printf("%d dostaje %d\n", rank, sender);
  154. if(received_messages == proc_num + additional_messages - 1){
  155. received_messages = 0;
  156. additional_messages = 0;
  157. pthread_cond_signal(&cond0);
  158. }
  159. break;
  160. case 21:
  161. send_case_21(sender);
  162. break;
  163. case 30:
  164. break;
  165. default:
  166. printf("%d: MSG STATE %d\n", rank, received_message_state);
  167. exit_with_error("ERROR R case 1\n");
  168. break;
  169. }
  170. break;
  171. case 2:
  172. // P2
  173. switch(received_message_state){
  174. case 1:
  175. send_case_1(sender);
  176. break;
  177. case 11:
  178. // if(msg[2] == 4 && previous_state != 4) { // jeśli ktoś przychodzi z basenu to nie będziemy go blokować skoro nie możemy wejść
  179. // send_msg(sender,11, timer, previous_state, -1); // wysyłamy kolejną wiadomość
  180. // additional_messages++; //zwiększamy ilość oczekiwanych wiadomości o 1
  181. // pthread_cond_signal(&cond1); //zwalniamy blokade
  182. // } else {
  183. mes_queue[mes_queue_indx] = sender;
  184. // printf("%d kolejkuje %d\n", rank, sender);
  185. mes_queue_indx++;
  186. // }
  187. break;
  188. case 20:
  189. received_messages++;
  190. // printf("%d Received: sender %d, msg0 %d, w szatni %d, szatnia %d, plec %d\n", rank, sender, msg[0], msg[1], msg[2], msg[3]);
  191. increment_rooms(msg[1], msg[2], msg[3]);
  192.  
  193. if(received_messages == proc_num - 1){
  194. received_messages = 0;
  195. pthread_cond_signal(&cond0);
  196. }
  197. break;
  198. case 21:
  199. // printf("%d: sender %d\n", rank, sender);
  200. exit_with_error("Wątek będąc w stanie 2 odebrał wiadomość od stanu 2\n");
  201. break;
  202. case 30:
  203. decrement_rooms(msg[1], msg[2], msg[3]);
  204. if(available_room() > -1) pthread_cond_signal(&cond1);
  205. break;
  206. default:
  207. printf("%d: MSG STATE %d\n", rank, received_message_state);
  208. exit_with_error("ERROR R case 2\n");
  209.  
  210. break;
  211. }
  212. break;
  213. case 3:
  214. // szatnia
  215. switch(received_message_state){
  216. case 1:
  217. send_case_1(sender);
  218. break;
  219. case 11:
  220. send_case_11(sender);
  221. break;
  222. case 21:
  223. send_case_21(sender);
  224. break;
  225. case 30:
  226. break;
  227. default:
  228. printf("%d: MSG STATE %d\n", rank, received_message_state);
  229. exit_with_error("ERROR R case 3\n");
  230. break;
  231. }
  232. break;
  233. case 4:
  234. //basen
  235. switch(received_message_state){
  236. case 1:
  237. send_case_1(sender);
  238. break;
  239. case 11:
  240. send_case_11(sender);
  241. break;
  242. case 21:
  243. send_case_21(sender);
  244. break;
  245. case 30:
  246. break;
  247. default:
  248. printf("%d: MSG STATE %d\n", rank, received_message_state);
  249. exit_with_error("ERROR R case 4\n");
  250. break;
  251. }
  252. break;
  253. default:
  254. exit_with_error("ERROR Receive\n");
  255. break;
  256. }
  257. }
  258. }
  259.  
  260. void send_case_21(int send_to){
  261. if(state == 3){
  262. send_msg(send_to, 20, 1, my_room, male);
  263. } else {
  264. send_msg(send_to, 20, 0, my_room, male);
  265. }
  266. }
  267.  
  268. void send_msg(int send_to, int m0, int m1, int m2, int m3){
  269. int send_msg[] = {m0, m1, m2, m3};
  270.  
  271. // printf("%d Send: to %d msg0 %d\n", rank, send_to, msg[0]);
  272. MPI_Send(send_msg, 4, MPI_INT, send_to, MSG_HELLO, MPI_COMM_WORLD);
  273. }
  274.  
  275. void send_case_1(int send_to){
  276. send_msg(send_to, 0, timer, -1, -1);
  277. }
  278.  
  279. void send_case_11(int send_to){
  280. send_msg(send_to, 10, -1, -1, -1);
  281. }
  282.  
  283. void increment_rooms(int m1, int m2, int m3){
  284. if(m1 > 0){// czy jest w szatni
  285. room_av[3 * m2 + m3]++; // zwiększamy licznik danej płci w szatni
  286. room_av[3 * m2 + 2]++; // zajmuje szafke
  287. } else if(m2 >= 0) { // jeśli jest poza szatnią, ale ma zajętą szafke
  288. room_av[3 * m2 + 2]++;
  289. }
  290. }
  291.  
  292. void decrement_rooms(int m1, int m2, int m3){
  293. if(m1 == 1){// był na basenie, to znaczy, że może zwolnić szafke
  294. room_av[3 * m2 + m3]--; // zmniejszamy licznik danej płci w szatni
  295. room_av[3 * m2 + 2]--; // zwalnia szafke
  296. } else if(m2 >= 0) { // nie idzie na basen to nie zwalnia szafki, tylko licznik danej płci w szatni
  297. room_av[3 * m2 + m3]--; // zmniejszamy licznik danej płci w szatni
  298. }
  299. }
  300.  
  301. void init(int rank){
  302. timer = rank;
  303. // male = own_rand(0, 2);
  304. male = rank % 2;
  305. }
  306.  
  307. void other_stuff(){
  308. // sleep(own_rand(0,10));
  309. sleep(1);
  310. }
  311.  
  312. void send_to_all(int m0, int m1, int m2, int m3){
  313. for(int i = 0; i < NUM_PROC; i++) {
  314. if(i == rank) continue;
  315. messages_sent[i] = 1;
  316. send_msg(i, m0, m1, m2, m3);
  317. }
  318. }
  319.  
  320. void reset_global_variables(){
  321. //reset msg
  322. received_messages = 0;
  323.  
  324. for(int i = 0; i < 9; i++){
  325. room_av[i] = 0;
  326. }
  327.  
  328. for(int i = 0; i < NUM_PROC; i++){
  329. messages_sent[i] = -1;
  330. }
  331. }
  332.  
  333. void change_state(int new_state){
  334. printf("%d: Zmieniam stan z %d na %d, [szatnia: %d, płeć: %d, timer: %d]\n", rank, state, new_state, my_room, male, timer);
  335. previous_state = state;
  336. state = new_state;
  337. reset_global_variables();
  338. }
  339.  
  340. int available_room() {
  341. for(int i = 0; i < 3; i++){ // dla każdej sztani
  342. // sprawdzam czy jest jakaś wolna szafka
  343. // oraz czy w danej szatni jest aktualnie osoba przeciwnej płci
  344. // jeśli tak to zwracam numer tej szatni
  345. if(room_av[3*i + 2] < room_capacity && room_av[3*i + 1 - male] == 0){
  346. return i;
  347. } else if(room_av[3*i + 2] > room_capacity){
  348. exit_with_error("ERROR więcej zajętych szafek niż dostępnych!\n");
  349. } else if(room_av[3*i + 1] > 0 && room_av[3*i] > 0) {
  350. exit_with_error("ERROR kobieta i mężczyzna w jednej szatni!\n");
  351. }
  352. }
  353. // jeśli nie znajdzie szatni to zwracam -1
  354. return -1;
  355. }
  356.  
  357. void resend_queued_messages(){
  358. for(int i = 0; i < mes_queue_indx; i++ ){ // odsyłam każdemu komu nie odpowiedziałem
  359. // printf("%d resend %d\n", rank, mes_queue[i]);
  360. send_msg(mes_queue[i], 10, -1, -1, -1);
  361. // printf("%d: ods %d\n", rank, mes_queue[i]);
  362. mes_queue[i] = -1;
  363. }
  364. mes_queue_indx = 0;
  365. }
  366.  
  367. int main(int argc, char **argv)
  368. {
  369. srand(rank);
  370. MPI_Init(&argc, &argv);
  371.  
  372. MPI_Comm_rank( MPI_COMM_WORLD, &rank );
  373.  
  374. init(rank);
  375.  
  376.  
  377. int receiver;
  378.  
  379. // ##### THREADS
  380. pthread_t threads[1];
  381. int thread_args[1];
  382. int result_code;
  383. int first_time = 1;
  384. int tmp_room = -1;
  385.  
  386. thread_args[0] = 0;
  387. result_code = pthread_create(&threads[0], NULL, wait_for_message, &thread_args[0]);
  388. assert(!result_code);
  389. // #####
  390.  
  391. printf("%d: Zaczynam od stanu %d, [szatnia: %d, płeć: %d]\n", rank, state, my_room, male);
  392. sleep(1);
  393. while(1){
  394. sleep(0.01);
  395. switch (state) {
  396. case 0: //sekcja lokalna
  397. sleep(timer%4);
  398. // printf("%d Send\n", rank);
  399. send_to_all(1, timer, -1, -1); // wysyłamy wiadomość do wszystkich
  400. pthread_cond_wait(&cond0, &lock0);
  401. // printf("%d : max time: %d\n", rank, max_time);
  402. timer = max_time + 1;
  403. change_state(1);
  404. break;
  405. case 1: // P1
  406. // sleep(1);
  407. // printf("%d wysyłam ALL\n", rank);
  408. send_to_all(11, timer, previous_state, -1);
  409. // sleep(1);
  410. pthread_cond_wait(&cond0, &lock0);
  411. change_state(2);
  412. break;
  413. case 2: // P2
  414. send_to_all(21, -1, -1, -1); //pytamy o to kto w jakiej szatni
  415. // while(tmp_room == -1) {
  416. // pthread_cond_wait(&cond0, &lock0); // czekamy na wszystkie odpowiedzi
  417. // tmp_room = available_room();
  418. // if(tmp_room == -1){ // jeśli nie mamy wolnej szatni
  419. // pthread_cond_wait(&cond1, &lock1); // blokujemy i czekamy aż się zwolni miejsce
  420. // tmp_room = available_room(); // powinno być większe od -1, chyba, że przyszedł ktoś z basenu
  421. // }
  422. // }
  423. // my_room = tmp_room;
  424. // tmp_room = -1;
  425. pthread_cond_wait(&cond0, &lock0); // czekamy na wszystkie odpowiedzi
  426. my_room = available_room();
  427. if(my_room == -1){
  428. change_state(1);
  429. resend_queued_messages();
  430. }
  431. change_state(3);
  432. break;
  433. case 3: // szatnia
  434. resend_queued_messages();
  435. sleep(timer%4 + 1);
  436. if(was_on_pool == -1){
  437. send_to_all(30, 0, my_room, male); // 32, czy był na basenie, nr szatni, plec
  438. change_state(4);
  439. } else {
  440. send_to_all(30, 1, my_room, male); // 32, czy był na basenie, nr szatni, plec
  441. my_room = -1;
  442. was_on_pool = -1;
  443. change_state(0);
  444. }
  445. break;
  446. case 4: // basen
  447. sleep(timer%4 + 1);
  448. visited_pool_num++;
  449. was_on_pool = 1;
  450. change_state(1);
  451. break;
  452. default:
  453. exit_with_error("ERROR Send\n");
  454. break;
  455. }
  456. }
  457.  
  458. pthread_kill(threads[0], NULL);
  459. MPI_Finalize();
  460. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement