堆排是串行排序中最適合並行化的排序之一
1/分為主線程和從線程
2/主線程分發數據,使用大小與從線程個數相同的堆作為私有堆,進行最後整理用
3/從線程維護一個堆,每次返回給主線程堆頂元素
4/主線程 提取堆頂元素 通知相應從線程提交新的堆頂元素
5/主從線程並行進行重建堆(heapify)的動作
- #include "mpi.h"
- #include <stdio.h>
- #include <stdlib.h>
-
- #define ROOT 0
- #define TAG 0
- #define NEXT 1
- #define END -1
- typedef struct node {
- int v;
- int r;
- } Node;
-
- static inline int left(int i) {
- return i*2+1;
- }
- static inline int right(int i) {
- return left(i)+1;
- }
- void defineIntVector(MPI_Datatype* type,int length) {
- MPI_Type_vector(length,1,1,MPI_INT,type);
- MPI_Type_commit(type);
- }
- //for slaves
- void heapifyI(int *a,int root,int size) {
- int l=left(root);
- int r=right(root);
- int m=root;
- if(l<size&&a[l]<a[m]) {
- m=l;
- }
- if(r<size&&a[r]<a[m]) {
- m=r;
- }
- if(m!=root) {
- int t=a[root];
- a[root]=a[m];
- a[m]=t;
- heapifyI(a,m,size);
- }
- }
- void buildHeapI(int *a,int size) {
- for(int i=(size-1)/2;i>=0;--i) {
- heapifyI(a,i,size);
- }
- }
- //for root
- void heapifyN(Node *a,int root,int size) {
- int l=left(root);
- int r=right(root);
- int m=root;
- if(l<size&&a[l].v<a[m].v) {
- m=l;
- }
- if(r<size&&a[r].v<a[m].v) {
- m=r;
- }
- if(m!=root) {
- Node t=a[root];
- a[root]=a[m];
- a[m]=t;
- heapifyN(a,m,size);
- }
- }
- void buildHeapN(Node *a,int size) {
- for(int i=(size-1)/2;i>=0;--i) {
- heapifyN(a,i,size);
- }
- }
- int main(int argc,char *argv[]) {
- int self,size;
- int part;//threshold
- MPI_Init(&argc,&argv);
- MPI_Comm_rank(MPI_COMM_WORLD,&self);
- MPI_Comm_size(MPI_COMM_WORLD,&size);
- MPI_Status status;
- MPI_Datatype MPI_VEC;
- if(ROOT==self) {
- //all data
- int values[]={1,5,3,8,4,2,9,6,7,54,34,6,13,63,32,14,53,21,65,33,24,213,23,14,21,65,32};
- int length=27;
- //for each process
- part=length/(size-1);
- MPI_Bcast(&part,1,MPI_INT,ROOT,MPI_COMM_WORLD);
- defineIntVector(&MPI_VEC,part);
- //heap for min value from each process
- Node *nodes=(Node*)malloc((size-1)*sizeof(Node));
- for(int i=0;i<size-1;++i) {
- nodes[i].r=i+1;
- }
- //send out all the data
- for(int i=1;i<size;++i) {
- MPI_Ssend(&values[(i-1)*part],1,MPI_VEC,i,TAG,MPI_COMM_WORLD);
- }
- for(int i=1;i<size;++i) {
- MPI_Recv(&(nodes[i-1].v),1,MPI_INT,i,TAG,MPI_COMM_WORLD,&status);
- }
- //end flag
- part=size-1;
- //build a heap representing process
- buildHeapN(nodes,part);
- int p=0;
- while(part>0) {
- values[p++]=nodes[0].v;
- //similar to the message passing system of select sort
- MPI_Ssend(&p,1,MPI_INT,nodes[0].r,NEXT,MPI_COMM_WORLD);
- MPI_Recv(&(nodes[0].v),1,MPI_INT,nodes[0].r,TAG,MPI_COMM_WORLD,&status);
- if(END==nodes[0].v) {
- nodes[0]=nodes[--part];
- }
- heapifyN(nodes,0,part);
- }
- for(int i=0;i<length;++i)
- printf("%d ",values[i]);
- free(nodes);
- } else {
- MPI_Bcast(&part,1,MPI_INT,ROOT,MPI_COMM_WORLD);
- defineIntVector(&MPI_VEC,part);
- int *values=(int*)malloc(part*sizeof(int));
- MPI_Recv(values,1,MPI_VEC,ROOT,TAG,MPI_COMM_WORLD,&status);
- buildHeapI(values,part);
- //send the smallest
- MPI_Ssend(&values[0],1,MPI_INT,ROOT,TAG,MPI_COMM_WORLD);
- //pop out one
- values[0]=values[--part];
- //rebuild heap
- heapifyI(values,0,part);
- while(part>0) {
- MPI_Recv(&values[part],1,MPI_INT,ROOT,NEXT,MPI_COMM_WORLD,&status);
- MPI_Ssend(&values[0],1,MPI_INT,ROOT,TAG,MPI_COMM_WORLD);
- values[0]=values[--part];
- heapifyI(values,0,part);
- }
- MPI_Recv(&values[part],1,MPI_INT,ROOT,NEXT,MPI_COMM_WORLD,&status);
- values[0]=END;
- MPI_Ssend(&values[0],1,MPI_INT,ROOT,TAG,MPI_COMM_WORLD);
- free(values);
- }
- MPI_Finalize();
- return 0;
- }