Advertisement
Yurry

MPI hypercube quicksort

Sep 26th, 2013
492
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 10.26 KB | None | 0 0
  1. #include <iostream>
  2. #include <cmath>
  3. #include <ctime>
  4. #include <mpi.h>
  5. #include <algorithm>
  6.  
  7. using namespace std;
  8.  
  9. static int ProcNum = 0;     // Number of available processes
  10. static int ProcRank = -1;   // Rank of current process
  11. static int DataSize;
  12.  
  13. const double RandomDataMultiplier = 1000.0;
  14.  
  15. // Function for simple setting the data to be sorted
  16. void DummyDataInitialization(double *&pData, int &DataSize)
  17. {
  18.     for(int i = 0; i < DataSize; i++)
  19.         pData[i] = DataSize - i;
  20. }
  21.  
  22. // Function for initializing the data by the random generator
  23. void RandomDataInitialization(double *&pData, int &DataSize)
  24. {
  25.     srand( (unsigned)time(0) );
  26.  
  27.     for(int i = 0; i < DataSize; i++)
  28.         pData[i] = double(rand()) / RAND_MAX * RandomDataMultiplier;
  29. }
  30.  
  31. void PrintData(double *pData, int DataSize)
  32. {
  33.     cout << ProcRank << " says: ";
  34.     for (int i = 0; i < DataSize; i++)
  35.     {
  36.         cout << pData[i] << " ";
  37.     }
  38.     cout << endl;
  39. }
  40.  
  41. // Function for allocating the memory and setting the initial values
  42. void InitProcess(double *&pProcData, int &ProcDataSize)
  43. {
  44.     setvbuf(stdout, 0, _IONBF, 0);
  45.     if (ProcRank == 0)
  46.     {
  47.         bool Correct = true;
  48.         do
  49.         {
  50.             Correct = true;
  51.             cout << "Enter the size of data to be sorted: ";
  52.             cin >> DataSize;
  53.             /*if (DataSize < ProcNum)
  54.             {
  55.                 cout << "Data size should be greater than number of processes\n";
  56.                 Correct = false;
  57.             }*/
  58.             if (DataSize % ProcNum != 0)
  59.             {
  60.                 cout << "Data size should be divider of the number of processes\n";
  61.                 Correct = false;
  62.             }
  63.         }
  64.         while (!Correct);
  65.  
  66.         cout << "Sorting " << DataSize << " data items\n";
  67.     }
  68.  
  69.     // Broadcasting the data size
  70.     MPI_Bcast(&DataSize, 1, MPI_INT, 0, MPI_COMM_WORLD);
  71.  
  72.     ProcDataSize = DataSize / ProcNum;
  73.  
  74.     pProcData = new double[ProcDataSize];
  75.  
  76.     double *pData = 0;
  77.     if (ProcRank == 0)
  78.     {
  79.         pData = new double[DataSize];
  80.  
  81.         // Data initalization
  82.         RandomDataInitialization(pData, DataSize);
  83.         //DummyDataInitialization(pData, DataSize);
  84.     }
  85.     MPI_Scatter(pData, ProcDataSize, MPI_DOUBLE,
  86.                 pProcData, ProcDataSize, MPI_DOUBLE, 0, MPI_COMM_WORLD);
  87. //  cout << ProcRank << " received " << ProcDataSize << " elems at " << pProcData << endl;
  88.  
  89.     if (ProcRank == 0)
  90.         delete[] pData;
  91.  
  92. //  PrintData(pProcData, ProcDataSize);
  93. }
  94.  
  95. // Function for computational process termination
  96. void TerminateProcess(double *pProcData, int &ProcDataSize)
  97. {
  98. //  cout << ProcRank << " is ready to free " << ProcDataSize << " elems from " << pProcData << endl;
  99.  
  100.     int *pRecvSize = NULL;
  101.     int *pRecvOff = NULL;
  102.     double *Sorted = NULL;
  103.     if (ProcRank == 0)
  104.     {
  105.         pRecvSize = new int[ProcNum];
  106.         pRecvOff = new int[ProcNum];
  107.     }
  108.     MPI_Gather(&ProcDataSize, 1, MPI_INT, pRecvSize, 1, MPI_INT, 0, MPI_COMM_WORLD);
  109.     if (ProcRank == 0)
  110.     {
  111.         pRecvOff[0] = 0;
  112.         for (int i = 1; i < ProcNum; i++)
  113.             pRecvOff[i] = pRecvOff[i-1] + pRecvSize[i-1];
  114.         Sorted = new double[DataSize];
  115.  
  116.         cout << "Buffers sizes: ";
  117.         for (int i = 0; i < ProcNum; i++)
  118.         {
  119.             cout << pRecvOff[i] << ":" << pRecvSize[i] << " ";
  120.         }
  121.         cout << endl;
  122.     }
  123.     MPI_Gatherv(pProcData, ProcDataSize, MPI_DOUBLE, Sorted, pRecvSize, pRecvOff, MPI_DOUBLE, 0, MPI_COMM_WORLD);
  124.     if (ProcRank == 0)
  125.     {
  126.         //cout << "Sorted data: ";
  127.         //PrintData(Sorted, DataSize);
  128.         delete Sorted;
  129.     }
  130.     delete[] pProcData;
  131. }
  132.  
  133. void QuickSort(double *pData, int i1, int i2)
  134. {
  135.     if (i1 < i2)
  136.     {
  137.         double pivot = pData[i1];
  138.         int is = i1;
  139.         for (int i = i1 + 1; i < i2; i++)
  140.             if (pData[i] <= pivot)
  141.             {
  142.                 is = is + 1;
  143.                 swap(pData[is], pData[i]);
  144.             }
  145.         swap(pData[i1], pData[is]);
  146.         QuickSort(pData, i1,     is);
  147.         QuickSort(pData, is + 1, i2);
  148.     }
  149. }
  150.  
  151. void LocalDataSort(double *pProcData, int ProcDataSize)
  152. {
  153.     //QuickSort(pProcData, 0, ProcDataSize);
  154.     std::sort(pProcData, pProcData + ProcDataSize);
  155. }
  156.  
  157. int GetProcDataDivisionPos(double *pProcData, int ProcDataSize, double Pivot)
  158. {
  159.     int Result = 0;
  160.     while ((Result < ProcDataSize) && (pProcData[Result] < Pivot))
  161.         Result++;
  162.     return Result;
  163. }
  164.  
  165. void PivotDistribution (double *pProcData, int ProcDataSize, int Dim, int Mask, int Iter, double *pPivot)
  166. {
  167.     MPI_Group WorldGroup;  
  168.     MPI_Group SubcubeGroup; // группа процессов - подгиперкуб
  169.     MPI_Comm  SubcubeComm;  // коммуникатор подгиперкуба
  170.     int j = 0;
  171.  
  172.     int GroupNum = ProcNum / (int)pow(2.0, Dim-Iter);
  173.     int *ProcRanks = new int[GroupNum];  
  174.  
  175.     // формирование списка рангов процессов для гиперкуба
  176.     int StartProc = ProcRank - GroupNum;
  177.     if (StartProc < 0 )
  178.         StartProc = 0;
  179.     int EndProc = ProcRank + GroupNum;
  180.     if (EndProc > ProcNum )
  181.         EndProc = ProcNum;
  182.     for (int proc = StartProc; proc < EndProc; proc++)
  183.     {
  184.         if ((ProcRank & Mask)>>(Iter) == (proc & Mask)>>(Iter))
  185.         {
  186.             ProcRanks[j++] = proc;  
  187.         }
  188.     }
  189.     //объединение процессов подгиперкуба в одну группу  
  190.     MPI_Comm_group(MPI_COMM_WORLD, &WorldGroup);  
  191.     MPI_Group_incl(WorldGroup, GroupNum, ProcRanks, &SubcubeGroup);  
  192.     MPI_Comm_create(MPI_COMM_WORLD, SubcubeGroup, &SubcubeComm);
  193.        // поиск и рассылка ведущего элемента всем процессам подгиперкуба
  194.     if (ProcRank == ProcRanks[0])
  195.         *pPivot = pProcData[(ProcDataSize - 1)/2];
  196.  
  197.     MPI_Bcast(pPivot, 1, MPI_DOUBLE, 0, SubcubeComm);
  198.     MPI_Group_free(&SubcubeGroup);
  199.     MPI_Comm_free(&SubcubeComm);
  200.     delete[] ProcRanks;
  201. }
  202.  
  203. void ParallelHyperQuickSort(double *&pProcData, int &ProcDataSize)
  204. {  
  205.     MPI_Status status;
  206.     int CommProcRank; // ранг процессора, с которым выполняется взаимодействие
  207.     double *pData,      // часть блока, остающаяся на процессоре
  208.            *pSendData,  // часть блока, передаваемая процессору CommProcRank
  209.            *pRecvData,  // часть блока, получаемая от процессора CommProcRank
  210.            *pMergeData; // блок данных, получаемый после слияния
  211.    
  212.     int DataSize,       // размер блока данных
  213.         SendDataSize,   // размер блока отосланных CommProcRank данных
  214.         RecvDataSize,   // размер блока полученных от CommProcRank данных
  215.         MergeDataSize;  // размер блока слитых данных
  216.    
  217.     int HypercubeDim = (int)ceil(log(float(ProcNum))/log(2.0f)); //размерность гиперкуба
  218.     int Mask = ProcNum;  
  219.     double Pivot;  
  220.  
  221.     // первоначальная сортировка блоков данных на каждом процессоре  
  222.     LocalDataSort(pProcData, ProcDataSize);  
  223.  
  224.     // итерации обобщенной быстрой сортировки
  225.     for (int i = HypercubeDim; i > 0; i--)
  226.     {
  227.         // определение ведущего значения и его рассылка всем процессорам
  228.         PivotDistribution(pProcData, ProcDataSize, HypercubeDim, Mask, i, &Pivot);
  229.         Mask = Mask >> 1;
  230.  
  231.         // определение границы разделения блока
  232.         int pos = GetProcDataDivisionPos(pProcData, ProcDataSize, Pivot);
  233. //      cout << ProcRank << " says: pivot is " << Pivot << ", pos is " << pos << endl;
  234.  
  235.         // разделение блока на части
  236.         if (((ProcRank & Mask) >> (i-1)) == 0)
  237.         { // старший бит = 0  
  238.             pSendData = &pProcData[pos+1];
  239.             SendDataSize = ProcDataSize - pos - 1;
  240.             if ( SendDataSize < 0 )
  241.                 SendDataSize = 0;
  242.             CommProcRank = ProcRank + Mask;
  243.             pData = &pProcData[0];
  244.             DataSize = pos + 1;  
  245.         }
  246.         else
  247.         { // старший бит = 1
  248.             pSendData = &pProcData[0];
  249.             SendDataSize = pos + 1;
  250.             if (SendDataSize > ProcDataSize)
  251.                 SendDataSize = pos;
  252.             CommProcRank = ProcRank - Mask;
  253.             pData = &pProcData[pos+1];
  254.             DataSize = ProcDataSize - pos - 1;  
  255.             if (DataSize < 0)
  256.                 DataSize = 0;
  257.         }
  258.         // пересылка размеров частей блоков данных
  259.         MPI_Sendrecv(&SendDataSize, 1, MPI_INT, CommProcRank, 0,  
  260.                      &RecvDataSize, 1, MPI_INT, CommProcRank, 0, MPI_COMM_WORLD, &status);
  261.  
  262.         // пересылка частей блоков данных
  263.         pRecvData = new double[RecvDataSize];
  264.         MPI_Sendrecv(pSendData, SendDataSize, MPI_DOUBLE,
  265.                      CommProcRank, 0, pRecvData, RecvDataSize, MPI_DOUBLE,
  266.                      CommProcRank, 0, MPI_COMM_WORLD, &status);
  267.  
  268.         // слияние частей
  269.         MergeDataSize = DataSize + RecvDataSize;
  270.         pMergeData = new double[MergeDataSize];
  271. //      cout << ProcRank << " says: RecvDataSize is " << RecvDataSize << endl;
  272.         //DataMerge(pMergeData, pMergeData, pData, DataSize, pRecvData, RecvDataSize);
  273.         //std::merge(pProcData, pProcData + BlockSize, pDualData, pDualData + DualBlockSize, pMergedData);
  274.         std::merge(pData, pData + DataSize, pRecvData, pRecvData + RecvDataSize, pMergeData);
  275.         delete[] pProcData;
  276.         delete[] pRecvData;
  277.         pProcData = pMergeData;
  278.         ProcDataSize = MergeDataSize;
  279. //      cout << "After merging at iter " << i << ": ";
  280. //      cout << "Merged size is " << MergeDataSize << "; ";
  281. //      PrintData(pMergeData, MergeDataSize);
  282.     }
  283. }
  284.  
  285. int main(int argc, char *argv[])
  286. {
  287.     double *pProcData;     // Блок данных для процесса
  288.     int ProcDataSize;  // Размер блока данных
  289.     double start, finish, duration; // Времязасекалки
  290.  
  291.     MPI_Init(&argc, &argv);
  292.     MPI_Comm_rank(MPI_COMM_WORLD, &ProcRank);
  293.     MPI_Comm_size(MPI_COMM_WORLD, &ProcNum);
  294.  
  295.     // Инициализация данных и их распределение между процессами
  296.     InitProcess(pProcData, ProcDataSize);
  297.  
  298.     //cout << "Buffer of " << ProcRank << " was " << pProcData << " with size " << ProcDataSize << endl;
  299.  
  300.     // параллельная сортировка
  301.     start = MPI_Wtime();
  302.     ParallelHyperQuickSort(pProcData, ProcDataSize);
  303.     finish = MPI_Wtime();
  304.     //cout << "After all buffer of " << ProcRank << " became " << pProcData << " with size " << ProcDataSize << endl;
  305.  
  306.     // Завершение вычислений процесса
  307.     TerminateProcess(pProcData, ProcDataSize);
  308.  
  309.     duration = finish - start;
  310.     if(ProcRank == 0)
  311.         cout << "Time of execution: " << duration << endl;
  312.  
  313.     MPI_Finalize();
  314.  
  315.     //int i;
  316.     //cin >> i;
  317. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement