Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <cmath>
- #include <ctime>
- #include <mpi.h>
- #include <algorithm>
- using namespace std;
- static int ProcNum = 0; // Number of available processes
- static int ProcRank = -1; // Rank of current process
- static int DataSize;
- const double RandomDataMultiplier = 1000.0;
- // Function for simple setting the data to be sorted
- void DummyDataInitialization(double *&pData, int &DataSize)
- {
- for(int i = 0; i < DataSize; i++)
- pData[i] = DataSize - i;
- }
- // Function for initializing the data by the random generator
- void RandomDataInitialization(double *&pData, int &DataSize)
- {
- srand( (unsigned)time(0) );
- for(int i = 0; i < DataSize; i++)
- pData[i] = double(rand()) / RAND_MAX * RandomDataMultiplier;
- }
- void PrintData(double *pData, int DataSize)
- {
- cout << ProcRank << " says: ";
- for (int i = 0; i < DataSize; i++)
- {
- cout << pData[i] << " ";
- }
- cout << endl;
- }
- // Function for allocating the memory and setting the initial values
- void InitProcess(double *&pProcData, int &ProcDataSize)
- {
- setvbuf(stdout, 0, _IONBF, 0);
- if (ProcRank == 0)
- {
- bool Correct = true;
- do
- {
- Correct = true;
- cout << "Enter the size of data to be sorted: ";
- cin >> DataSize;
- /*if (DataSize < ProcNum)
- {
- cout << "Data size should be greater than number of processes\n";
- Correct = false;
- }*/
- if (DataSize % ProcNum != 0)
- {
- cout << "Data size should be divider of the number of processes\n";
- Correct = false;
- }
- }
- while (!Correct);
- cout << "Sorting " << DataSize << " data items\n";
- }
- // Broadcasting the data size
- MPI_Bcast(&DataSize, 1, MPI_INT, 0, MPI_COMM_WORLD);
- ProcDataSize = DataSize / ProcNum;
- pProcData = new double[ProcDataSize];
- double *pData = 0;
- if (ProcRank == 0)
- {
- pData = new double[DataSize];
- // Data initalization
- RandomDataInitialization(pData, DataSize);
- //DummyDataInitialization(pData, DataSize);
- }
- MPI_Scatter(pData, ProcDataSize, MPI_DOUBLE,
- pProcData, ProcDataSize, MPI_DOUBLE, 0, MPI_COMM_WORLD);
- // cout << ProcRank << " received " << ProcDataSize << " elems at " << pProcData << endl;
- if (ProcRank == 0)
- delete[] pData;
- // PrintData(pProcData, ProcDataSize);
- }
- // Function for computational process termination
- void TerminateProcess(double *pProcData, int &ProcDataSize)
- {
- // cout << ProcRank << " is ready to free " << ProcDataSize << " elems from " << pProcData << endl;
- int *pRecvSize = NULL;
- int *pRecvOff = NULL;
- double *Sorted = NULL;
- if (ProcRank == 0)
- {
- pRecvSize = new int[ProcNum];
- pRecvOff = new int[ProcNum];
- }
- MPI_Gather(&ProcDataSize, 1, MPI_INT, pRecvSize, 1, MPI_INT, 0, MPI_COMM_WORLD);
- if (ProcRank == 0)
- {
- pRecvOff[0] = 0;
- for (int i = 1; i < ProcNum; i++)
- pRecvOff[i] = pRecvOff[i-1] + pRecvSize[i-1];
- Sorted = new double[DataSize];
- cout << "Buffers sizes: ";
- for (int i = 0; i < ProcNum; i++)
- {
- cout << pRecvOff[i] << ":" << pRecvSize[i] << " ";
- }
- cout << endl;
- }
- MPI_Gatherv(pProcData, ProcDataSize, MPI_DOUBLE, Sorted, pRecvSize, pRecvOff, MPI_DOUBLE, 0, MPI_COMM_WORLD);
- if (ProcRank == 0)
- {
- //cout << "Sorted data: ";
- //PrintData(Sorted, DataSize);
- delete Sorted;
- }
- delete[] pProcData;
- }
- void QuickSort(double *pData, int i1, int i2)
- {
- if (i1 < i2)
- {
- double pivot = pData[i1];
- int is = i1;
- for (int i = i1 + 1; i < i2; i++)
- if (pData[i] <= pivot)
- {
- is = is + 1;
- swap(pData[is], pData[i]);
- }
- swap(pData[i1], pData[is]);
- QuickSort(pData, i1, is);
- QuickSort(pData, is + 1, i2);
- }
- }
- void LocalDataSort(double *pProcData, int ProcDataSize)
- {
- //QuickSort(pProcData, 0, ProcDataSize);
- std::sort(pProcData, pProcData + ProcDataSize);
- }
- int GetProcDataDivisionPos(double *pProcData, int ProcDataSize, double Pivot)
- {
- int Result = 0;
- while ((Result < ProcDataSize) && (pProcData[Result] < Pivot))
- Result++;
- return Result;
- }
- void PivotDistribution (double *pProcData, int ProcDataSize, int Dim, int Mask, int Iter, double *pPivot)
- {
- MPI_Group WorldGroup;
- MPI_Group SubcubeGroup; // группа процессов - подгиперкуб
- MPI_Comm SubcubeComm; // коммуникатор подгиперкуба
- int j = 0;
- int GroupNum = ProcNum / (int)pow(2.0, Dim-Iter);
- int *ProcRanks = new int[GroupNum];
- // формирование списка рангов процессов для гиперкуба
- int StartProc = ProcRank - GroupNum;
- if (StartProc < 0 )
- StartProc = 0;
- int EndProc = ProcRank + GroupNum;
- if (EndProc > ProcNum )
- EndProc = ProcNum;
- for (int proc = StartProc; proc < EndProc; proc++)
- {
- if ((ProcRank & Mask)>>(Iter) == (proc & Mask)>>(Iter))
- {
- ProcRanks[j++] = proc;
- }
- }
- //объединение процессов подгиперкуба в одну группу
- MPI_Comm_group(MPI_COMM_WORLD, &WorldGroup);
- MPI_Group_incl(WorldGroup, GroupNum, ProcRanks, &SubcubeGroup);
- MPI_Comm_create(MPI_COMM_WORLD, SubcubeGroup, &SubcubeComm);
- // поиск и рассылка ведущего элемента всем процессам подгиперкуба
- if (ProcRank == ProcRanks[0])
- *pPivot = pProcData[(ProcDataSize - 1)/2];
- MPI_Bcast(pPivot, 1, MPI_DOUBLE, 0, SubcubeComm);
- MPI_Group_free(&SubcubeGroup);
- MPI_Comm_free(&SubcubeComm);
- delete[] ProcRanks;
- }
- void ParallelHyperQuickSort(double *&pProcData, int &ProcDataSize)
- {
- MPI_Status status;
- int CommProcRank; // ранг процессора, с которым выполняется взаимодействие
- double *pData, // часть блока, остающаяся на процессоре
- *pSendData, // часть блока, передаваемая процессору CommProcRank
- *pRecvData, // часть блока, получаемая от процессора CommProcRank
- *pMergeData; // блок данных, получаемый после слияния
- int DataSize, // размер блока данных
- SendDataSize, // размер блока отосланных CommProcRank данных
- RecvDataSize, // размер блока полученных от CommProcRank данных
- MergeDataSize; // размер блока слитых данных
- int HypercubeDim = (int)ceil(log(float(ProcNum))/log(2.0f)); //размерность гиперкуба
- int Mask = ProcNum;
- double Pivot;
- // первоначальная сортировка блоков данных на каждом процессоре
- LocalDataSort(pProcData, ProcDataSize);
- // итерации обобщенной быстрой сортировки
- for (int i = HypercubeDim; i > 0; i--)
- {
- // определение ведущего значения и его рассылка всем процессорам
- PivotDistribution(pProcData, ProcDataSize, HypercubeDim, Mask, i, &Pivot);
- Mask = Mask >> 1;
- // определение границы разделения блока
- int pos = GetProcDataDivisionPos(pProcData, ProcDataSize, Pivot);
- // cout << ProcRank << " says: pivot is " << Pivot << ", pos is " << pos << endl;
- // разделение блока на части
- if (((ProcRank & Mask) >> (i-1)) == 0)
- { // старший бит = 0
- pSendData = &pProcData[pos+1];
- SendDataSize = ProcDataSize - pos - 1;
- if ( SendDataSize < 0 )
- SendDataSize = 0;
- CommProcRank = ProcRank + Mask;
- pData = &pProcData[0];
- DataSize = pos + 1;
- }
- else
- { // старший бит = 1
- pSendData = &pProcData[0];
- SendDataSize = pos + 1;
- if (SendDataSize > ProcDataSize)
- SendDataSize = pos;
- CommProcRank = ProcRank - Mask;
- pData = &pProcData[pos+1];
- DataSize = ProcDataSize - pos - 1;
- if (DataSize < 0)
- DataSize = 0;
- }
- // пересылка размеров частей блоков данных
- MPI_Sendrecv(&SendDataSize, 1, MPI_INT, CommProcRank, 0,
- &RecvDataSize, 1, MPI_INT, CommProcRank, 0, MPI_COMM_WORLD, &status);
- // пересылка частей блоков данных
- pRecvData = new double[RecvDataSize];
- MPI_Sendrecv(pSendData, SendDataSize, MPI_DOUBLE,
- CommProcRank, 0, pRecvData, RecvDataSize, MPI_DOUBLE,
- CommProcRank, 0, MPI_COMM_WORLD, &status);
- // слияние частей
- MergeDataSize = DataSize + RecvDataSize;
- pMergeData = new double[MergeDataSize];
- // cout << ProcRank << " says: RecvDataSize is " << RecvDataSize << endl;
- //DataMerge(pMergeData, pMergeData, pData, DataSize, pRecvData, RecvDataSize);
- //std::merge(pProcData, pProcData + BlockSize, pDualData, pDualData + DualBlockSize, pMergedData);
- std::merge(pData, pData + DataSize, pRecvData, pRecvData + RecvDataSize, pMergeData);
- delete[] pProcData;
- delete[] pRecvData;
- pProcData = pMergeData;
- ProcDataSize = MergeDataSize;
- // cout << "After merging at iter " << i << ": ";
- // cout << "Merged size is " << MergeDataSize << "; ";
- // PrintData(pMergeData, MergeDataSize);
- }
- }
- int main(int argc, char *argv[])
- {
- double *pProcData; // Блок данных для процесса
- int ProcDataSize; // Размер блока данных
- double start, finish, duration; // Времязасекалки
- MPI_Init(&argc, &argv);
- MPI_Comm_rank(MPI_COMM_WORLD, &ProcRank);
- MPI_Comm_size(MPI_COMM_WORLD, &ProcNum);
- // Инициализация данных и их распределение между процессами
- InitProcess(pProcData, ProcDataSize);
- //cout << "Buffer of " << ProcRank << " was " << pProcData << " with size " << ProcDataSize << endl;
- // параллельная сортировка
- start = MPI_Wtime();
- ParallelHyperQuickSort(pProcData, ProcDataSize);
- finish = MPI_Wtime();
- //cout << "After all buffer of " << ProcRank << " became " << pProcData << " with size " << ProcDataSize << endl;
- // Завершение вычислений процесса
- TerminateProcess(pProcData, ProcDataSize);
- duration = finish - start;
- if(ProcRank == 0)
- cout << "Time of execution: " << duration << endl;
- MPI_Finalize();
- //int i;
- //cin >> i;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement