Advertisement
Guest User

Untitled

a guest
May 27th, 2019
108
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.29 KB | None | 0 0
  1. #include <mpi.h>
  2. #include <stdio.h>
  3. #include <string.h>
  4. #include <stdlib.h>
  5. #include <unistd.h>
  6. #define HOSTNAME_MAXSIZE 100
  7. #define every(x, i) (int i=0;i<x;i++)
  8. #define PURPOSE_SERVER 0
  9. #define PURPOSE_CLIENT 1
  10. #define PURPOSE_MAIN 2
  11. #define MAIN_SERVER_RANK 0
  12.  
  13. #define SERVER_TAG 10
  14. #define CLIENT_TAG 11
  15. #define REPLICA_NAME_MAX 100
  16. struct argv;
  17. typedef struct argv argvinfo;
  18.  
  19.  
  20. /***
  21. * Command Line arguments
  22. */
  23. struct argv{
  24. unsigned short server_nodes;
  25. unsigned short client_nodes;
  26.  
  27. };
  28. argvinfo*alloc_argv(){
  29. return malloc(sizeof(struct argv));
  30. }
  31. argvinfo*free_argv(argvinfo*ptr){
  32. free(ptr);
  33. return NULL;
  34. }
  35.  
  36. struct argv*parseargv(int argc,char*argv[]){
  37. argvinfo*retval=alloc_argv();
  38. for(int i=0;i<argc;i++){
  39. if(!strcmp("-s",argv[i])){
  40. retval->server_nodes=strtol(argv[i+1],NULL,10);
  41. }
  42. else if(!strcmp("-c",argv[i])){
  43. retval->client_nodes=strtol(argv[i+1],NULL,10);
  44. }
  45. }
  46. return retval;
  47.  
  48. }
  49.  
  50.  
  51.  
  52. void **alloc2D(int n,int m,ssize_t typesize){
  53. int i=0;
  54. void **retval=malloc(n*sizeof(char*));
  55. if(!retval)printf("[WARN]Null returned at alloc2D , it will crash maybe due to LOWMEM");
  56. for (i=0;i<n;i++)
  57. {
  58. retval[i]=(char*)malloc(typesize*m);
  59. if(!retval[i])printf("[WARN]Null returned at alloc2D , it will crash maybe due to LOWMEM");
  60. }
  61. return retval;
  62.  
  63. }
  64. void** free2D(void**ptr,int n){
  65. for(int i=0;i<n;i++)free(ptr[i]);
  66. return NULL;
  67. }
  68. char **receiveHostnames(int procsize){
  69. int source=0;
  70. int tag=0;
  71. char message[100];
  72. MPI_Status status;
  73. char **hosts=(char**)alloc2D(procsize,HOSTNAME_MAXSIZE,sizeof(char));
  74. for(source=1;source<procsize;source++){
  75. MPI_Recv(hosts[source],HOSTNAME_MAXSIZE,MPI_CHAR,source,tag,MPI_COMM_WORLD,&status);
  76.  
  77. }
  78. return hosts;
  79.  
  80. }
  81. int getRandomPort(){
  82. rand();
  83. }
  84.  
  85.  
  86. int *generatePorts(int procsize){
  87. int source=0;
  88. int tag=0;
  89. MPI_Status status;
  90. int*ports=(int*)malloc(sizeof(int)*procsize);
  91. for(source=1;source<procsize;source++){
  92. ports[source]=getRandomPort();
  93. MPI_Send(ports+source,1,MPI_INT,source,tag,MPI_COMM_WORLD);
  94. }
  95. return ports;
  96.  
  97. }
  98. int nodeAs(int rank,int purpose){
  99. MPI_Send(&purpose,1,MPI_INT,rank,0,MPI_COMM_WORLD);
  100. return purpose;
  101. }
  102. int nodeAsMain(int rank){
  103. return PURPOSE_MAIN;
  104. }
  105. int nodeAsServer(int rank){
  106. return nodeAs(rank,PURPOSE_SERVER);
  107. }
  108. int nodeAsClient(int rank){
  109. return nodeAs(rank,PURPOSE_CLIENT);
  110. }
  111. char *clearNullChar(char*any){
  112. any[strlen(any)-1]='\\';
  113. return any;
  114. }
  115. void setupPath(char*hostname,int port){
  116. char command[1000];
  117. hostname=clearNullChar(hostname);
  118. sprintf(command,"mkdir -p %s/instance%d",hostname,port);
  119. system(command);
  120. return;
  121.  
  122. }
  123. void startReplicaServer(char*hostname,int port,char*replicaName){
  124. printf("%s\n",replicaName);
  125. }
  126. char*getReplicaName(){
  127. MPI_Status status;
  128. char*replicaName=(char*)malloc(sizeof(char)*REPLICA_NAME_MAX);
  129. MPI_Recv(replicaName,REPLICA_NAME_MAX,MPI_CHAR,MAIN_SERVER_RANK,SERVER_TAG,MPI_COMM_WORLD,&status);
  130. return replicaName;
  131.  
  132. }
  133. void server_start(char*hostname,int port){
  134. setupPath(hostname,port);
  135. char*replicaName=getReplicaName();
  136. startReplicaServer(hostname,port,replicaName);
  137.  
  138. }
  139.  
  140. void client_start(char*hostname,int port){
  141.  
  142. }
  143.  
  144. void applyPurpose(int purpose,char*hostname,int port){
  145. switch(purpose){
  146. case PURPOSE_SERVER:server_start(hostname,port);break;
  147. case PURPOSE_CLIENT:client_start(hostname,port);break;
  148.  
  149. }
  150. }
  151. char*getHostname(){
  152. char *hostname=(char*)malloc(sizeof(char)*HOSTNAME_MAXSIZE);
  153. char path[HOSTNAME_MAXSIZE];
  154. FILE *fp=popen("/bin/hostname","r");
  155. while (fgets(path, sizeof(path)-1, fp) != NULL) {
  156. sprintf(hostname,"%s", path);
  157. }
  158.  
  159. return hostname;
  160.  
  161. }
  162. int*assignPurposes(int procsize){
  163. int*purposes=(int*)malloc(sizeof(int)*procsize);
  164. purposes[0]=nodeAsMain(0);
  165. for(int i=1;i<procsize;i++){
  166. if(i%2)purposes[i]=nodeAsClient(i);
  167. else purposes[i]=nodeAsServer(i);
  168. }
  169. return purposes;
  170.  
  171. }
  172. void sentReplicaNames(int procsize,int replicas_per_shard){
  173. char replicaPrefix[3]="rs";
  174. char replicaBuffer[REPLICA_NAME_MAX];
  175. for(int i=1;i+replicas_per_shard<procsize;i+=replicas_per_shard){
  176. sprintf(replicaBuffer,"%s_%d\0",replicaPrefix,i);
  177. printf("%s\n",replicaBuffer);
  178. for(int j=0;j<replicas_per_shard;j++){
  179. MPI_Send(replicaBuffer,strlen(replicaBuffer),MPI_CHAR,i+j,SERVER_TAG,MPI_COMM_WORLD);
  180. printf("%d\t",i+j);
  181. }
  182. printf("\n");
  183. }
  184.  
  185. }
  186. void setupConfigServer(){
  187.  
  188. }
  189. void setupMongos(){
  190.  
  191.  
  192. }
  193.  
  194. void setup(int procsize,char** hosts,int*ports,int*purposes,int replicas_per_shard){
  195.  
  196. sentReplicaNames(procsize,replicas_per_shard);
  197. //setupConfigServer();
  198. //setupMongos();
  199.  
  200.  
  201.  
  202.  
  203. }
  204. void run(){/*while(1);*/}
  205. int main(int argc,char*argv[]){
  206.  
  207. int my_rank;
  208. int p;
  209. int source;
  210. int dest=MAIN_SERVER_RANK;
  211. int tag=0;
  212. int provided;
  213.  
  214.  
  215.  
  216. char *hostname;
  217. int port=getRandomPort();
  218. int purpose=0;
  219.  
  220.  
  221.  
  222.  
  223. FILE *fp;
  224. int mainServer=0;
  225. MPI_Status status;
  226. MPI_Init(&argc,&argv);
  227.  
  228. MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
  229. MPI_Comm_size(MPI_COMM_WORLD,&p);
  230.  
  231.  
  232. /*
  233. * Secondary Processors
  234. */
  235. if(my_rank!=0){
  236.  
  237. char sentbuffer[HOSTNAME_MAXSIZE];
  238. char*replicaName[REPLICA_NAME_MAX];
  239.  
  240. hostname=getHostname();
  241. strncpy(sentbuffer,hostname,strlen(hostname)); //copy the hostname because MPI destroys the buffer
  242. MPI_Send(sentbuffer,strlen(sentbuffer)+1,MPI_CHAR,dest,tag,MPI_COMM_WORLD); //sent my hostname to main
  243. MPI_Recv(&port,1,MPI_INT,mainServer,tag,MPI_COMM_WORLD,&status); //receive my port number to main
  244. MPI_Recv(&purpose,1,MPI_INT,mainServer,tag,MPI_COMM_WORLD,&status); //receive purpose
  245.  
  246. applyPurpose(purpose,hostname,port);
  247.  
  248. free(hostname);
  249. }
  250. /**
  251. * The main processor
  252. * The main processor is also the configuration server
  253. * */
  254. else{
  255. MPI_Status status;
  256. int tmp;
  257. char**hosts;
  258. int*ports;
  259. int*purposes;
  260. printf("Size of cluster %d\n",p);
  261. argvinfo*user_params=parseargv(argc,argv);
  262. hosts=receiveHostnames(p);
  263. for(int i=1;i<p;i++){
  264. //printf("node %d hostname %s\n",i,hosts[i]);
  265. }
  266. ports=generatePorts(p);
  267. purposes=assignPurposes(p);
  268.  
  269.  
  270. setup(p,
  271. hosts,
  272. ports,
  273. purposes,
  274. 5); //TODO : make it taking from command line
  275.  
  276. run();
  277.  
  278.  
  279. free2D(hosts,p);
  280. free(ports);
  281. free(purposes);
  282. }
  283.  
  284. MPI_Finalize();
  285. fflush(stdout);
  286. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement