00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifdef PARALLEL
00014 #include "mpi.h"
00015
00016 typedef MPI_Request mdp_request;
00017
00018 class mdp_communicator : public mdp_log {
00019 private:
00020 MPI_Comm communicator;
00021 double mytime;
00022 int wormholes_open;
00023 int my_id;
00024 int my_nproc;
00025 public:
00026 double comm_time;
00027 void open_wormholes(int argc, char** argv,
00028 MPI_Comm communicator_=MPI_COMM_WORLD) {
00029 if(wormholes_open) return;
00030 communicator=communicator_;
00031 MPI_Init(&argc, &argv);
00032 MPI_Comm_rank(communicator, &(my_id));
00033 MPI_Comm_size(communicator, &(my_nproc));
00034 print=true;
00035 for(int i=0; i<nproc(); i++) {
00036 if(me()==i)
00037 (*this) << "MPI PROCESS " << me()
00038 << " of " << nproc()
00039 << " STARTING ON " << getname() << "\n";
00040 barrier();
00041 }
00042 if(me()==0) print=true; else print=false;
00043
00044 begin_function("PROGRAM");
00045 begin_function("open_wormholes");
00046 (*this) <<
00047 "<head>\n"
00048 "*************************************************\n"
00049 "* Starting [Matrix Distributed Processing] *\n"
00050 "* created by Massimo Di Pierro *\n"
00051 "* copyrighted by www.metacryption.com *\n"
00052 "*************************************************\n";
00053 "</head>\n";
00054
00055
00056 reset_time();
00057 wormholes_open=TRUE;
00058
00059 double a,b;
00060 getcpuusage(a,b);
00061 end_function("open_wormholes");
00062 }
00063 int tag(int i, int j) {
00064 return i*nproc()+j;
00065 }
00066 inline const int me() {
00067 return my_id;
00068 }
00069 inline const int nproc() {
00070 return my_nproc;
00071 }
00072 void print_stats() {
00073 #ifndef NO_POSIX
00074 int i;
00075 double *a=new double[nproc()];
00076 double *b=new double[nproc()];
00077 double *c=new double[nproc()];
00078 for(i=0; i<nproc(); i++) a[i]=b[i]=c[i]=0;
00079 getcpuusage(a[me()],b[me()]);
00080 c[me()]=100.0*comm_time/(time());
00081 add(a,nproc());
00082 add(b,nproc());
00083 add(c,nproc());
00084 char buffer[256];
00085 for(i=0; i<nproc(); i++) {
00086 sprintf(buffer,
00087 "* Process %i stats: CPU=%.2f%% PROCESS=%.2f%% COMM=%.2f%%\n",
00088 i, a[i],b[i],c[i]);
00089 (*this) << buffer;
00090 }
00091 (*this) << "* (above numbers make no sense under windows)\n";
00092 delete[] a;
00093 delete[] b;
00094 delete[] c;
00095 #endif
00096 }
00097 void close_wormholes() {
00098 begin_function("close_wormholes");
00099 (*this) <<
00100 "<foot>\n"
00101 "*************************************************\n"
00102 "* Ending [Matrix Distributed Processing] *\n";
00103 print_stats();
00104 (*this) <<
00105 "*************************************************\n";
00106 "</foot>\n";
00107 (*this) << "PROCESS " << me() << " ENDING AFTER " << time() << " sec.\n";
00108 wormholes_open=FALSE;
00109 MPI_Finalize();
00110 end_function("close_wormholes");
00111 end_function("PROGRAM");
00112 }
00113 void abort() {
00114 MPI_Abort(communicator,1);
00115 }
00116 mdp_communicator() {
00117 wormholes_open=FALSE;
00118 }
00119 virtual ~mdp_communicator() {
00120 }
00121 template<class T>
00122 void put(T &obj, int destination) {
00123 mdp_request r;
00124 MPI_Isend(&obj, sizeof(T)/sizeof(char), MPI_CHAR, destination,
00125 tag(me(),destination), communicator, &r);
00126 wait(r);
00127 }
00128 template<class T>
00129 void put(T &obj, int destination, mdp_request &r) {
00130 MPI_Isend(&obj, sizeof(T)/sizeof(char), MPI_CHAR, destination,
00131 tag(me(),destination), communicator, &r);
00132 }
00133 template<class T>
00134 void get(T &obj, int source) {
00135 MPI_Status status;
00136 MPI_Recv(&obj, sizeof(T)/sizeof(char), MPI_CHAR, source,
00137 tag(source,me()), communicator, &status);
00138 }
00139 template<class T>
00140 void put(T* objptr, long length, int destination) {
00141 mdp_request r;
00142 MPI_Isend(objptr, length*sizeof(T)/sizeof(char), MPI_CHAR, destination,
00143 tag(me(),destination), communicator, &r);
00144 wait(r);
00145 }
00146 template<class T>
00147 void put(T* objptr, long length, int destination, mdp_request &r) {
00148 if(MPI_Isend(objptr, length*sizeof(T)/sizeof(char), MPI_CHAR, destination,
00149 tag(me(),destination), communicator, &r)!=MPI_SUCCESS)
00150 error("Failure to send");
00151 }
00152 template<class T>
00153 void get(T* objptr, long length, int source) {
00154 MPI_Status status;
00155 if(MPI_Recv(objptr, length*sizeof(T)/sizeof(char), MPI_CHAR, source,
00156 tag(source,me()), communicator, &status)!=MPI_SUCCESS)
00157 error("Failure to receive");
00158 }
00159 void wait(mdp_request &r) {
00160 MPI_Status status;
00161 MPI_Wait(&r, &status);
00162 }
00163 void wait(mdp_request *r, int length) {
00164 MPI_Status status;
00165 MPI_Waitall(length, r, &status);
00166 }
00167
00168
00169 void add(float &obj1, float &obj2) {
00170 MPI_Allreduce(&obj1, &obj2, 1, MPI_FLOAT, MPI_SUM, communicator);
00171 }
00172 void add(float* obj1, float *obj2, long length) {
00173 MPI_Allreduce(obj1, obj2, length, MPI_FLOAT, MPI_SUM, communicator);
00174 }
00175 void add(double &obj1, double &obj2) {
00176 MPI_Allreduce(&obj1, &obj2, 1, MPI_DOUBLE, MPI_SUM, communicator);
00177 }
00178 void add(double* obj1, double *obj2, long length) {
00179 MPI_Allreduce(obj1, obj2, length, MPI_DOUBLE, MPI_SUM, communicator);
00180 }
00181 void add(int &obj1) {
00182 int obj2=0;
00183 MPI_Allreduce(&obj1, &obj2, 1, MPI_INT, MPI_SUM, communicator);
00184 obj1=obj2;
00185 }
00186 void add(long &obj1) {
00187 long obj2=0;
00188 MPI_Allreduce(&obj1, &obj2, 1, MPI_LONG, MPI_SUM, communicator);
00189 obj1=obj2;
00190 }
00191 void add(float &obj1) {
00192 float obj2=0;
00193 MPI_Allreduce(&obj1, &obj2, 1, MPI_FLOAT, MPI_SUM, communicator);
00194 obj1=obj2;
00195 }
00196 void add(double &obj1) {
00197 double obj2=0;
00198 MPI_Allreduce(&obj1, &obj2, 1, MPI_DOUBLE, MPI_SUM, communicator);
00199 obj1=obj2;
00200 }
00201 void add(int *obj1, long length) {
00202 long i;
00203 int *obj2=new int[length];
00204 for(i=0; i<length; i++) obj2[i]=0;
00205 MPI_Allreduce(obj1, obj2, length, MPI_INT, MPI_SUM, communicator);
00206 for(i=0; i<length; i++) obj1[i]=obj2[i];
00207 delete[] obj2;
00208 }
00209 void add(long *obj1, long length) {
00210 long i;
00211 long *obj2=new long[length];
00212 for(i=0; i<length; i++) obj2[i]=0;
00213 MPI_Allreduce(obj1, obj2, length, MPI_LONG, MPI_SUM, communicator);
00214 for(i=0; i<length; i++) obj1[i]=obj2[i];
00215 delete[] obj2;
00216 }
00217 void add(float *obj1, long length) {
00218 long i;
00219 float *obj2=new float[length];
00220 for(i=0; i<length; i++) obj2[i]=0;
00221 MPI_Allreduce(obj1, obj2, length, MPI_FLOAT, MPI_SUM, communicator);
00222 for(i=0; i<length; i++) obj1[i]=obj2[i];
00223 delete[] obj2;
00224 }
00225 void add(double *obj1, long length) {
00226 long i;
00227 double *obj2=new double[length];
00228 for(i=0; i<length; i++) obj2[i]=0;
00229 MPI_Allreduce(obj1, obj2, length, MPI_DOUBLE, MPI_SUM, communicator);
00230 for(i=0; i<length; i++) obj1[i]=obj2[i];
00231 delete[] obj2;
00232 }
00233 void add(mdp_complex &obj1) {
00234 mdp_complex obj2=0;
00235 #if !defined(USE_DOUBLE_PRECISION)
00236 MPI_Allreduce(&obj1, &obj2, 2, MPI_FLOAT, MPI_SUM, communicator);
00237 #else
00238 MPI_Allreduce(&obj1, &obj2, 2, MPI_DOUBLE, MPI_SUM, communicator);
00239 #endif
00240 obj1=obj2;
00241 }
00242 void add(mdp_complex *obj1, long length) {
00243 long i;
00244 mdp_complex *obj2=new mdp_complex[length];
00245 for(i=0; i<length; i++) obj2[i]=0;
00246 #if !defined(USE_DOUBLE_PRECISION)
00247 MPI_Allreduce(obj1, obj2, 2*length, MPI_FLOAT, MPI_SUM, communicator);
00248 #else
00249 MPI_Allreduce(obj1, obj2, 2*length, MPI_DOUBLE, MPI_SUM, communicator);
00250 #endif
00251 for(i=0; i<length; i++) obj1[i]=obj2[i];
00252 delete[] obj2;
00253 }
00254 void add(mdp_matrix &a) {
00255 add(a.address(), a.rowmax()*a.colmax());
00256 }
00257 void add(mdp_matrix *a, long length) {
00258 long i;
00259 for(i=0; i<length; i++) add(a[i]);
00260 }
00261
00262 void barrier() {
00263 MPI_Barrier(communicator);
00264 }
00265 template<class T>
00266 void broadcast(T &obj, int p) {
00267 MPI_Bcast(&obj, sizeof(T)/sizeof(char), MPI_CHAR, p, communicator);
00268 }
00269 template<class T>
00270 void broadcast(T* obj, long length, int p) {
00271 MPI_Bcast(obj, length*sizeof(T)/sizeof(char), MPI_CHAR, p,communicator);
00272 }
00273 void reset_time() {
00274 mytime=MPI_Wtime();
00275 comm_time=0;
00276 }
00277 double time() {
00278 return MPI_Wtime()-mytime;
00279 }
00280 };
00281
00282 #else
00283 #include "time.h"
00284 typedef int mdp_request;
00285
00298 class mdp_communicator : public mdp_log {
00299 private:
00300 #ifndef NO_POSIX
00301 mdp_psim *nodes;
00302 #endif
00303 int communicator;
00304 int wormholes_open;
00305 double mytime;
00306 double MPI_Wtime() {
00307 return walltime();
00308 }
00309 int my_id;
00310 int my_nproc;
00311 public:
00312 double comm_time;
00313 mdp_communicator() {
00314 wormholes_open=FALSE;
00315 }
00316 template<class T>
00317 void put(T &obj, int destination) {
00318 #ifndef NO_POSIX
00319 nodes->send(destination,"",obj);
00320 #endif
00321 }
00322 template<class T>
00323 void put(T &obj, int destination, mdp_request &r) {
00324 #ifndef NO_POSIX
00325 nodes->send(destination,"",obj);
00326 #endif
00327 }
00328 template<class T>
00329 void get(T &obj, int source) {
00330 #ifndef NO_POSIX
00331 nodes->recv(source,"",obj);
00332 #endif
00333 }
00334 template<class T>
00335 void put(T* objptr, long length, int destination) {
00336 #ifndef NO_POSIX
00337 nodes->send(destination,"",objptr, length);
00338 #endif
00339 }
00340 template<class T>
00341 void put(T* objptr, long length, int destination, mdp_request &r) {
00342 #ifndef NO_POSIX
00343 nodes->send(destination,"",objptr, length);
00344 #endif
00345 }
00346 template<class T>
00347 void get(T* objptr, long length, int source) {
00348 #ifndef NO_POSIX
00349 nodes->recv(source,"",objptr,length);
00350 #endif
00351 }
00352
00353
00354
00355 void add(float &obj1, float &obj2) {
00356 #ifndef NO_POSIX
00357 obj2=nodes->add(obj1);
00358 #endif
00359 }
00360 void add(float* obj1, float *obj2, long length) {
00361 #ifndef NO_POSIX
00362 for(int i=0; i<length; i++) {
00363 obj2[i]=nodes->add(obj1[i]);
00364 }
00365 #endif
00366 }
00367 void add(double &obj1, double &obj2) {
00368 #ifndef NO_POSIX
00369 obj2=nodes->add(obj1);
00370 #endif
00371 }
00372 void add(double* obj1, double *obj2, long length) {
00373 #ifndef NO_POSIX
00374 for(int i=0; i<length; i++) {
00375 obj2[i]=nodes->add(obj1[i]);
00376 }
00377 #endif
00378 }
00379
00380
00381 void add(int &obj1) {
00382 #ifndef NO_POSIX
00383 obj1=nodes->add(obj1);
00384 #endif
00385 }
00386 void add(long &obj1) {
00387 #ifndef NO_POSIX
00388 obj1=nodes->add(obj1);
00389 #endif
00390 }
00391 void add(float &obj1) {
00392 #ifndef NO_POSIX
00393 obj1=nodes->add(obj1);
00394 #endif
00395 }
00396 void add(double &obj1) {
00397 #ifndef NO_POSIX
00398 obj1=nodes->add(obj1);
00399 #endif
00400 }
00401 void add(int *obj1, long length) {
00402 #ifndef NO_POSIX
00403 for(int i=0; i<length; i++) {
00404 obj1[i]=nodes->add(obj1[i]);
00405 }
00406 #endif
00407 }
00408 void add(long *obj1, long length) {
00409 #ifndef NO_POSIX
00410 for(long i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00411 #endif
00412 }
00413 void add(float *obj1, long length) {
00414 #ifndef NO_POSIX
00415 for(long i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00416 #endif
00417 }
00418 void add(double *obj1, long length) {
00419 #ifndef NO_POSIX
00420 for(long i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00421 #endif
00422 }
00423 void add(mdp_complex &obj1) {
00424 #ifndef NO_POSIX
00425 obj1=nodes->add(obj1);
00426 #endif
00427 }
00428 void add(mdp_complex *obj1, long length) {
00429 #ifndef NO_POSIX
00430 for(long i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00431 #endif
00432 }
00433 void add(mdp_matrix &a) {
00434 #ifndef NO_POSIX
00435 for(long i=0; i<a.size(); i++)
00436 a.address()[i]=nodes->add(a.address()[i]);
00437 #endif
00438 }
00439 void add(mdp_matrix *a, long length) {
00440 #ifndef NO_POSIX
00441 for(long j=0; j<length; j++)
00442 for(long i=0; i<a[j].size(); i++)
00443 a[j].address()[i]=nodes->add(a[j].address()[i]);
00444 #endif
00445 }
00446 template<class T>
00447 void broadcast(T &obj, int p) {
00448 #ifndef NO_POSIX
00449 nodes->broadcast(p,obj);
00450 #endif
00451 }
00452 template<class T>
00453 void broadcast(T* obj, long length, int p) {
00454 #ifndef NO_POSIX
00455 nodes->broadcast(p,obj,length);
00456 #endif
00457 }
00458 void wait(mdp_request &r) {}
00459 void wait(mdp_request *r, int length) {}
00460 inline const int me() {
00461 return my_id;
00462 }
00463 inline const int nproc() {
00464 return my_nproc;
00465 }
00466 void barrier() {}
00467 int tag(int i, int j) {
00468 return i*nproc()+j;
00469 }
00470 void reset_time() {
00471 mytime=MPI_Wtime();
00472 comm_time=0;
00473 }
00475 double time() {
00476 return MPI_Wtime()-mytime;
00477 }
00480 void open_wormholes(int argc, char** argv) {
00481 if(wormholes_open) return;
00482
00483 #ifndef NO_POSIX
00484 nodes=new mdp_psim(argc, argv);
00485 my_id=nodes->id();
00486 my_nproc=nodes->nprocs();
00487 #else
00488 my_id=0;
00489 my_nproc=1;
00490 #endif
00491 if(me()==0) print=true; else print=false;
00492 begin_function("PROGRAM");
00493 begin_function("open_wormholes");
00494 (*this) <<
00495 "<head>\n"
00496 "*************************************************\n"
00497 "* Starting [mdp_matrix Distributed Processing] *\n"
00498 "* created by Massimo Di Pierro *\n"
00499 "* copyrighted by www.metacryption.com *\n"
00500 "*************************************************\n";
00501 "</head>\n";
00502 reset_time();
00503 double a,b;
00504 getcpuusage(a,b);
00505 wormholes_open=TRUE;
00506 end_function("open_wormholes");
00507 }
00509 void print_stats() {
00510 #ifndef NO_POSIX
00511 int i;
00512 double *a=new double[nproc()];
00513 double *b=new double[nproc()];
00514 double *c=new double[nproc()];
00515 for(i=0; i<nproc(); i++) a[i]=b[i]=c[i]=0;
00516 getcpuusage(a[me()],b[me()]);
00517 c[me()]=100.0*comm_time/(time());
00518 add(a,nproc());
00519 add(b,nproc());
00520 add(c,nproc());
00521 char buffer[256];
00522 for(i=0; i<nproc(); i++) {
00523 sprintf(buffer,
00524 "* Process %i stats: CPU=%.2f%% PROCESS=%.2f%% COMM=%.2f%%\n",
00525 i, a[i],b[i],c[i]);
00526 (*this) << buffer;
00527 }
00528 (*this) << "* (above numbers make no sense under windows)\n";
00529 delete[] a;
00530 delete[] b;
00531 delete[] c;
00532 #endif
00533 }
00535 void close_wormholes() {
00536 begin_function("close_wormholes");
00537 (*this) <<
00538 "<foot>\n"
00539 "*************************************************\n"
00540 "* Ending [mdp_matrix Distributed Processing] *\n";
00541 print_stats();
00542 (*this) <<
00543 "*************************************************\n";
00544 "</foot>\n";
00545 (*this) << "PROCESS " << me() << " ENDING AFTER " << time() << " sec.\n";
00546 wormholes_open=FALSE;
00547 end_function("close_wormholes");
00548 end_function("PROGRAM");
00549 #ifndef NO_POSIX
00550 if(nodes) delete nodes;
00551 #endif
00552 }
00554 void abort() {
00555 (*this) << "calling abort...";
00556 exit(-1);
00557 }
00558 };
00559
00560 #endif
00561
00563 mdp_communicator mdp;
00564
00566 mdp_communicator& mpi=mdp;
00567
00568 void _mpi_error_message(string a, string b, int c) {
00569 mpi.error_message(a,b,c);
00570 }
00571
00573 inline void begin_function(string s) {
00574 mpi.begin_function(s);
00575 }
00576
00578 inline void end_function(string s) {
00579 mpi.end_function(s);
00580 }
00581