Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <string>
- #include <iostream>
- #include <fstream>
- #include <sstream>
- #include <vector>
- #include <map>
- #include <math.h>
- #include <mpi.h>
- using namespace std;
- char* res_part_file = "./PartialDataSet/residences.dat";
- char* serv_part_file = "./PartialDataSet/services.dat";
- char* res_full_file = "./FullDataSet/residences.dat";
- char* serv_full_file = "./FullDataSet/services.dat";
- double calculateDistances(vector<map<string, double>> serv, double rX, double rY)
- {
- map<string, double> closest = map<string, double>();
- double d = DBL_MAX;
- const double mToKm = 1000.00;
- for(vector<map<string, double>>::iterator it = serv.begin(); it != serv.end(); it++){
- double distance = 0.0;
- distance = sqrt(pow(it->find("xLoc")->second - rX, 2) + pow(it->find("yLoc")->second - rY, 2));
- if(distance < d)
- d = distance;
- }
- return d/mToKm;
- }
- map<string, int> analyzeResidents(vector<map<string, double>> params, int rank, int numProcs) {
- string line;
- unsigned int i_ID;
- int i = 0;
- char xLoc[13];
- char yLoc[14];
- map<string, int> residents = map<string, int>();
- char* file = res_part_file;
- ifstream resFile(file);
- if (!resFile.is_open() || resFile.fail())
- {
- cerr << "ERROR (process " << rank << "): Could not open file '"
- << file << "'" << endl;
- }
- else
- {
- string rec;
- getline(resFile, rec);
- int rowBytes = resFile.tellg();
- resFile.seekg(0, ios::end);
- int fBytes = (int) resFile.tellg();
- int numRows = fBytes/rowBytes;
- int numRowsPerProc = numRows/numProcs;
- resFile.seekg(rank * numRowsPerProc * rowBytes, ios::beg);
- int numRowsMine = numRowsPerProc;
- if(rank == numProcs - 1)
- numRowsMine += numRows % numProcs;
- for (int i = 0; i < numRowsMine; ++i)
- {
- resFile >> xLoc >> yLoc;
- if (resFile.eof())
- break;
- double d = calculateDistances(params, stod(xLoc), stod(yLoc));
- if(d <= 1.0)
- residents["0-1"]++;
- else if(d > 1.0 && d <= 2.0)
- residents["1-2"]++;
- else if(d > 2.0 && d <= 5.0)
- residents["2-5"]++;
- else
- residents[">5"]++;
- }
- //resFile.close();
- }
- return residents;
- }
- //vector<map<string, double>> analyzeServices(unsigned int params, int rank, int numProcs) {
- // string line;
- // unsigned int i_ID;
- // int i = 0;
- // char id[7];
- // char xLoc[13];
- // char yLoc[14];
- // vector<map<string, double>> services = vector<map<string, double>>();
- // char* file = serv_part_file;
- // ifstream servFile(file);
- //
- // if (!servFile.is_open() || servFile.fail())
- // {
- // cerr << "ERROR (process " << rank << "): Could not open file '"
- // << file << "'" << endl;
- // }
- // else
- // {
- // //Determine number of bytes per record
- // string rec;
- // getline(servFile, rec);
- // int rowBytes = (int)servFile.tellg();
- //
- // servFile.seekg(0, ios::end);
- // int fBytes = (int) servFile.tellg();
- // int numRows = fBytes/rowBytes;
- // int numRowsPerProc = numRows/numProcs;
- //
- // servFile.seekg(rank * numRowsPerProc * rowBytes, ios::beg);
- // int numRowsMine = numRowsPerProc;
- // if(rank == numProcs - 1)
- // numRowsMine += numRows % numProcs;
- //
- // for (int i = 0; i < numRowsMine; ++i)
- // {
- // servFile >> id >> xLoc >> yLoc;
- //
- // if (servFile.eof())
- // break;
- //
- // i_ID = (unsigned int) stoi(id);
- // if(i_ID == params){
- // i++;
- // map<string, double> serv = map<string, double>();
- // serv.insert(pair<string, double>("id", stod(id)));
- // serv.insert(pair<string, double>("xLoc", stod(xLoc)));
- // serv.insert(pair<string, double>("yLoc", stod(yLoc)));
- // services.push_back(serv);
- // }
- // }
- //
- // //servFile.close();
- // memset(&id, 0, sizeof(id));
- // memset(&xLoc, 0, sizeof(xLoc));
- // memset(&yLoc, 0, sizeof(yLoc));
- // }
- // return services;
- //}
- vector<map<string, double>> analyzeServices(unsigned int params) {
- string line;
- unsigned int i_ID;
- int i = 0;
- char id[7];
- char xLoc[13];
- char yLoc[14];
- vector<map<string, double>> services = vector<map<string, double>>();
- ifstream servFile("./FullDataSet/services.dat");
- while(getline(servFile, line)){
- istringstream iss(line);
- iss >> id;
- iss >> xLoc;
- iss >> yLoc;
- i_ID = (unsigned int) stoi(id);
- if(i_ID == params){
- i++;
- //cout << id << " : " << xLoc << " : " << yLoc << endl;
- map<string, double> serv = map<string, double>();
- serv.insert(pair<string, double>("id", stod(id)));
- serv.insert(pair<string, double>("xLoc", stod(xLoc)));
- serv.insert(pair<string, double>("yLoc", stod(yLoc)));
- services.push_back(serv);
- }
- }
- servFile.close();
- memset(&id, 0, sizeof(id));
- memset(&xLoc, 0, sizeof(xLoc));
- memset(&yLoc, 0, sizeof(yLoc));
- return services;
- }
- double CalcFrequency(int occurances, int numHands){
- double p = 100.00;
- double num = double(numHands);
- return (occurances / num) * p;
- }
- void displayOutput(map<string, int> res, int numProcs, double elapsedTime, int rank, int serviceCount, int serviceNum){
- map<string, int> total = map<string,int>();
- MPI_Reduce(&res["0-1"], &total["0-1"], 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
- MPI_Reduce(&res["1-2"], &total["1-2"], 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
- MPI_Reduce(&res["2-5"], &total["2-5"], 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
- MPI_Reduce(&res[">5"], &total[">5"], 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
- int msgBuff[100];
- static MPI_Status status;
- if(rank > 0)
- {
- int totalres = res["0-1"] + res["1-2"] + res["2-5"] + res[">5"];
- MPI_Recv(&msgBuff, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &status); // arbitrary hold
- cout << endl << "Process #" << rank + 1 << " Results for " << totalres << " Addresses..." << endl;
- cout << "Nearest Service (km) # of Addresses % of Addresses" << endl;
- //printf("%20s %14s %14s\n", "Nearest Service (km)", "# of Addresses", " % of Addresses");
- for(map<string, int>::iterator it = res.begin(); it != res.end(); it++){
- //printf("%20s %14s %14s\n", it->first, it->second, CalcFrequency(it->second, totalres));
- cout << it->first << " : " << it->second << " : " << CalcFrequency(it->second, totalres) << endl;
- }
- cout << endl;
- if(numProcs - 1 != rank)
- MPI_Send(&msgBuff[0], 1, MPI_INT, rank + 1, 1, MPI_COMM_WORLD); //tell each slave in order to finish
- else
- MPI_Send(&msgBuff[0], 1, MPI_INT, 0, 2, MPI_COMM_WORLD); // tell master im done
- }
- if(rank == 0)
- {
- int totalres = total["0-1"] + total["1-2"] + total["2-5"] + total[">5"];
- printf("%28s%16s\n", "Service:", "PLACEHOLDER");
- printf("%28s%16d\n", "Service Code:", serviceNum);
- printf("%28s%16d\n", "Number of Service Locations:", serviceCount);
- printf("%28s%16d\n", "Number of Processes:", numProcs);
- printf("%28s%16g\n", "Elapsed Time in Seconds:", elapsedTime);
- cout << endl;
- cout << "Process #" << rank + 1 << " Results for " << totalres / numProcs << " Addresses..." << endl;
- //printf("%20s %14s %14s\n", "Nearest Service (km)", "# of Addresses", " % of Addresses");
- cout << "Nearest Service (km) # of Addresses % of Addresses" << endl;
- for(map<string, int>::iterator it = res.begin(); it != res.end(); it++){
- //printf("%20s %14s %14s\n", it->first, it->second, CalcFrequency(it->second, totalres));
- cout << it->first << " : " << it->second << " : " << CalcFrequency(it->second, totalres) << endl;
- }
- MPI_Send(&msgBuff[0], 1, MPI_INT, 1, 1, MPI_COMM_WORLD); // start other threads
- MPI_Recv(&msgBuff, 1, MPI_INT, MPI_ANY_SOURCE, 2, MPI_COMM_WORLD, &status); // arbitrary hold
- //aggregate results
- cout << endl;
- cout << "Aggregate Results for all " << totalres << " Addresses..." << endl;
- cout << "Nearest Service (km) # of Addresses % of Addresses" << endl;
- for(map<string, int>::iterator it = total.begin(); it != total.end(); it++){
- cout << it->first << " : " << it->second << " : " << CalcFrequency(it->second, totalres) << endl;
- }
- }
- }
- int main(int argc, char* argv[]){
- if(MPI_Init(&argc, &argv) == MPI_SUCCESS){
- // Get the # of processes and rank of this process
- int numProcs, rank;
- MPI_Comm_size(MPI_COMM_WORLD, &numProcs);
- MPI_Comm_rank(MPI_COMM_WORLD, &rank);
- if(argc == 2) {
- int params = stoi(argv[1]);
- if(params > 0) {
- double startTime = MPI_Wtime();
- if (rank == 0)
- cout << "Proximities of Residential Addresses to Services in Toronto" << endl << string(59, '-') << endl;
- vector<map<string, double>> services = analyzeServices(params);
- map<string, int> resCount = analyzeResidents(services, rank, numProcs);
- double elapsedTime = MPI_Wtime() - startTime;
- displayOutput(resCount, numProcs, elapsedTime, rank, services.size(), params);
- } else {
- cout << "Parameter must be positive integer" << endl;
- return EXIT_FAILURE;
- }
- } else if (rank == 0) {
- cout << "Invalid Arguments" << endl;
- return EXIT_FAILURE;
- }
- MPI_Finalize();
- }
- return EXIT_SUCCESS;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement