話說 直插排序串行只需要十幾行 並行就大不一樣。。。
基本思路是
1/主線程控制數據讀取 分發和匯總
2/從線程中數據按線程號非降有序 每次有新數據 主線程廣播 從線程按照數據大小判斷是否在自己處理范圍內 如果在的話 在自己保存的數組中進行直接插入
3/最後 主線程按序收集顯示所有數據
- #include "mpi.h"
- #include <unistd.h>
- #include <fcntl.h>
- #include <ctype.h>
- #include <stdio.h>
- #include <stdlib.h>
-
- #define END 0
- #define INIT 1
- #define DATA 2
- #define ROOT 0
-
- int readValue(char* file,int* values)
- {
- int fd,size,count=0;
- char buffer[80],num[11];
- fd=open(file,O_RDONLY);
- do
- {
- size=read(fd,buffer,sizeof(buffer));
- int j=0;
- for(int i=0;i<size;++i)
- {
- if(buffer[i]<'9'&&buffer[i]>'0')
- {
- num[j++]=buffer[i];
- }else
- {
- num[j]='\0';
- values[count++]=atoi(num);
- j=0;
- }
- }
- }while(size!=0);
- close(fd);
- return count;
- }
- void insert(int* arr,int pos,int temp)
- {
- while(pos>=0&&temp<arr[pos])
- {
- arr[pos+1]=arr[pos];
- --pos;
- }
- arr[pos+1]=temp;
- }
- void serial(int* arr,int size)
- {
- for(int i=1;i<size;++i)
- {
- insert(arr,i-1,arr[i]);
- }
- }
-
- int main(int argc,char *argv[])
- {
- int* values;
- int self,size,length,temp;
- MPI_Init(&argc,&argv);
- MPI_Comm_size(MPI_COMM_WORLD,&size);
- MPI_Comm_rank(MPI_COMM_WORLD,&self);
- MPI_Status status;
- if(self==0)
- {
- //read value from file
- values=(int*)malloc(100*sizeof(int));
- length=readValue("/home/gt/parellel/sort/data.in",values);
- //initial root value of each process
- serial(values,size);
- //broadcast the max length of each process
- MPI_Bcast(&length,1,MPI_INT,ROOT,MPI_COMM_WORLD);
- //the boundary
- for(int i=1;i<size;++i)
- {
- MPI_Ssend(&values[i-1],1,MPI_INT,i,INIT,MPI_COMM_WORLD);
- }
- MPI_Recv(&temp,1,MPI_INT,self+1,DATA,MPI_COMM_WORLD,&status);
- //broadcast each value and do insert in each process
- for(int i=size-1;i<length;++i)
- {
- temp=values[i];
- MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);
- printf("broadcast %d \n",values[i]);
- //MPI_Barrier(MPI_COMM_WORLD);
- }
-
- //ends
- printf("ends \n");
- temp=END;
- MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);
- //recieve from each process in order
- int pos=0;
- int len=0;
- for(int i=1;i<size;++i)
- {
- MPI_Recv(&len,1,MPI_INT,i,DATA,MPI_COMM_WORLD,&status);
- for(int j=0;j<len;++j)
- {
- MPI_Recv(&temp,1,MPI_INT,i,DATA,MPI_COMM_WORLD,&status);
- values[pos++]=temp;
- }
- }
- for(int i=0;i<length;++i)
- {
- printf("%d \n",values[i]);
- }
- free(values);
- }
- else
- {
- MPI_Request request;
- //recieve max length
- MPI_Bcast(&length,1,MPI_INT,ROOT,MPI_COMM_WORLD);
- //position
- int pos=0;
- //post irecv
- MPI_Irecv(&temp,1,MPI_INT,ROOT,INIT,MPI_COMM_WORLD,&request);
- //data on this process
- int* local=(int*) malloc(length*sizeof(int)+1);
- //wait for the recieve
- MPI_Wait(&request,&status);
- //exchange upper limit
- int upper=999999;//max of int
- if(self!=size-1)MPI_Irecv(&upper,1,MPI_INT,self+1,DATA,MPI_COMM_WORLD,&request);
- MPI_Ssend(&temp,1,MPI_INT,self-1,DATA,MPI_COMM_WORLD);
- if(self!=size-1)MPI_Wait(&request,&status);
- local[pos++]=temp;
- MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);
- //MPI_Barrier(MPI_COMM_WORLD);
- while(temp!=END)
- {
- printf("%d recieve %d \n",self,temp);
- if(temp<=upper&&(self==1||local[0]<temp))
- {
- printf("%d insert %d \n",self,temp);
- local[pos++]=temp;
- insert(local,pos-2,temp);
- }
- MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);
- //MPI_Barrier(MPI_COMM_WORLD);
- }
- //ends
- //sends the data in order to root process
- MPI_Ssend(&pos,1,MPI_INT,ROOT,DATA,MPI_COMM_WORLD);
- for(int i=0;i<pos;++i)
- {
- MPI_Ssend(&local[i],1,MPI_INT,ROOT,DATA,MPI_COMM_WORLD);
- printf("% d send back %d \n",self,local[i]);
- }
- free(local);
- }
- MPI_Finalize();
- return 0;
- }