00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifdef PARALLEL
00014 #include "mpi.h"
00015
00016 typedef MPI_Request mdp_request;
00017
00018 class mdp_communicator : public mdp_log {
00019 private:
00020 MPI_Comm communicator;
00021 double mytime;
00022 int wormholes_open;
00023 int my_id;
00024 int my_nproc;
00025 public:
00026 double comm_time;
00027 void open_wormholes(int argc, char** argv,
00028 MPI_Comm communicator_=MPI_COMM_WORLD) {
00029 if(wormholes_open) return;
00030 communicator=communicator_;
00031 MPI_Init(&argc, &argv);
00032 MPI_Comm_rank(communicator, &(my_id));
00033 MPI_Comm_size(communicator, &(my_nproc));
00034 print=true;
00035 for(int i=0; i<nproc(); i++) {
00036 if(me()==i)
00037 (*this) << "MPI PROCESS " << me()
00038 << " of " << nproc()
00039 << " STARTING ON " << getname() << "\n";
00040 barrier();
00041 }
00042 if(me()==0) print=true; else print=false;
00043
00044 begin_function("PROGRAM");
00045 begin_function("open_wormholes");
00046 (*this) <<
00047 "<head>\n"
00048 "*************************************************\n"
00049 "* Starting [Matrix Distributed Processing] *\n"
00050 "* created by Massimo Di Pierro *\n"
00051 "* copyrighted by www.metacryption.com *\n"
00052 "*************************************************\n"
00053 "</head>\n";
00054
00055
00056 reset_time();
00057 wormholes_open=true;
00058
00059 double a,b;
00060 getcpuusage(a,b);
00061 end_function("open_wormholes");
00062 }
00063 int tag(int i, int j) {
00064 return i*nproc()+j;
00065 }
00066 inline const int me() {
00067 return my_id;
00068 }
00069 inline const int nproc() {
00070 return my_nproc;
00071 }
00072 void print_stats() {
00073 #ifndef NO_POSIX
00074 int i;
00075 double *a=new double[nproc()];
00076 double *b=new double[nproc()];
00077 double *c=new double[nproc()];
00078 for(i=0; i<nproc(); i++) a[i]=b[i]=c[i]=0;
00079 getcpuusage(a[me()],b[me()]);
00080 c[me()]=100.0*comm_time/(time());
00081 add(a,nproc());
00082 add(b,nproc());
00083 add(c,nproc());
00084 char buffer[256];
00085 for(i=0; i<nproc(); i++) {
00086 sprintf(buffer,
00087 "* Process %i stats: CPU=%.2f%% PROCESS=%.2f%% COMM=%.2f%%\n",
00088 i, a[i],b[i],c[i]);
00089 (*this) << buffer;
00090 }
00091 (*this) << "* (above numbers make no sense under windows)\n";
00092 delete[] a;
00093 delete[] b;
00094 delete[] c;
00095 #endif
00096 }
00097 void close_wormholes() {
00098 begin_function("close_wormholes");
00099 (*this) <<
00100 "<foot>\n"
00101 "*************************************************\n"
00102 "* Ending [Matrix Distributed Processing] *\n";
00103 print_stats();
00104 (*this) <<
00105 "*************************************************\n"
00106 "</foot>\n";
00107 (*this) << "PROCESS " << me() << " ENDING AFTER " << time() << " sec.\n";
00108 wormholes_open=false;
00109 MPI_Finalize();
00110 end_function("close_wormholes");
00111 end_function("PROGRAM");
00112 }
00113 void abort() {
00114 MPI_Abort(communicator,1);
00115 }
00116 mdp_communicator() {
00117 wormholes_open=false;
00118 }
00119 virtual ~mdp_communicator() {
00120 }
00121 template<class T>
00122 void put(T &obj, int destination) {
00123 mdp_request r;
00124 MPI_Isend(&obj, sizeof(T)/sizeof(char), MPI_CHAR, destination,
00125 tag(me(),destination), communicator, &r);
00126 wait(r);
00127 }
00128 template<class T>
00129 void put(T &obj, int destination, mdp_request &r) {
00130 MPI_Isend(&obj, sizeof(T)/sizeof(char), MPI_CHAR, destination,
00131 tag(me(),destination), communicator, &r);
00132 }
00133 template<class T>
00134 void get(T &obj, int source) {
00135 MPI_Status status;
00136 MPI_Recv(&obj, sizeof(T)/sizeof(char), MPI_CHAR, source,
00137 tag(source,me()), communicator, &status);
00138 }
00139 template<class T>
00140 void put(T* objptr, mdp_int length, int destination) {
00141 mdp_request r;
00142 MPI_Isend(objptr, length*sizeof(T)/sizeof(char), MPI_CHAR, destination,
00143 tag(me(),destination), communicator, &r);
00144 wait(r);
00145 }
00146 template<class T>
00147 void put(T* objptr, mdp_int length, int destination, mdp_request &r) {
00148 if(MPI_Isend(objptr, length*sizeof(T)/sizeof(char), MPI_CHAR, destination,
00149 tag(me(),destination), communicator, &r)!=MPI_SUCCESS)
00150 error("Failure to send");
00151 }
00152 template<class T>
00153 void get(T* objptr, mdp_int length, int source) {
00154 MPI_Status status;
00155 if(MPI_Recv(objptr, length*sizeof(T)/sizeof(char), MPI_CHAR, source,
00156 tag(source,me()), communicator, &status)!=MPI_SUCCESS)
00157 error("Failure to receive");
00158 }
00159 void wait(mdp_request &r) {
00160 MPI_Status status;
00161 MPI_Wait(&r, &status);
00162 }
00163 void wait(mdp_request *r, int length) {
00164 MPI_Status status;
00165 MPI_Waitall(length, r, &status);
00166 }
00167
00168
00169 void add(float &obj1, float &obj2) {
00170 MPI_Allreduce(&obj1, &obj2, 1, MPI_FLOAT, MPI_SUM, communicator);
00171 }
00172 void add(float* obj1, float *obj2, mdp_int length) {
00173 MPI_Allreduce(obj1, obj2, length, MPI_FLOAT, MPI_SUM, communicator);
00174 }
00175 void add(double &obj1, double &obj2) {
00176 MPI_Allreduce(&obj1, &obj2, 1, MPI_DOUBLE, MPI_SUM, communicator);
00177 }
00178 void add(double* obj1, double *obj2, mdp_int length) {
00179 MPI_Allreduce(obj1, obj2, length, MPI_DOUBLE, MPI_SUM, communicator);
00180 }
00181 void add(mdp_int &obj1) {
00182 mdp_int obj2=0;
00183 MPI_Allreduce(&obj1, &obj2, 1, MPI_LONG, MPI_SUM, communicator);
00184 obj1=obj2;
00185 }
00186 void add(float &obj1) {
00187 float obj2=0;
00188 MPI_Allreduce(&obj1, &obj2, 1, MPI_FLOAT, MPI_SUM, communicator);
00189 obj1=obj2;
00190 }
00191 void add(double &obj1) {
00192 double obj2=0;
00193 MPI_Allreduce(&obj1, &obj2, 1, MPI_DOUBLE, MPI_SUM, communicator);
00194 obj1=obj2;
00195 }
00196 void add(mdp_int *obj1, mdp_int length) {
00197 mdp_int i;
00198 mdp_int *obj2=new mdp_int[length];
00199 for(i=0; i<length; i++) obj2[i]=0;
00200 MPI_Allreduce(obj1, obj2, length, MPI_LONG, MPI_SUM, communicator);
00201 for(i=0; i<length; i++) obj1[i]=obj2[i];
00202 delete[] obj2;
00203 }
00204 void add(float *obj1, mdp_int length) {
00205 mdp_int i;
00206 float *obj2=new float[length];
00207 for(i=0; i<length; i++) obj2[i]=0;
00208 MPI_Allreduce(obj1, obj2, length, MPI_FLOAT, MPI_SUM, communicator);
00209 for(i=0; i<length; i++) obj1[i]=obj2[i];
00210 delete[] obj2;
00211 }
00212 void add(double *obj1, mdp_int length) {
00213 mdp_int i;
00214 double *obj2=new double[length];
00215 for(i=0; i<length; i++) obj2[i]=0;
00216 MPI_Allreduce(obj1, obj2, length, MPI_DOUBLE, MPI_SUM, communicator);
00217 for(i=0; i<length; i++) obj1[i]=obj2[i];
00218 delete[] obj2;
00219 }
00220 void add(mdp_complex &obj1) {
00221 mdp_complex obj2=0;
00222 #if !defined(USE_DOUBLE_PRECISION)
00223 MPI_Allreduce(&obj1, &obj2, 2, MPI_FLOAT, MPI_SUM, communicator);
00224 #else
00225 MPI_Allreduce(&obj1, &obj2, 2, MPI_DOUBLE, MPI_SUM, communicator);
00226 #endif
00227 obj1=obj2;
00228 }
00229 void add(mdp_complex *obj1, mdp_int length) {
00230 mdp_int i;
00231 mdp_complex *obj2=new mdp_complex[length];
00232 for(i=0; i<length; i++) obj2[i]=0;
00233 #if !defined(USE_DOUBLE_PRECISION)
00234 MPI_Allreduce(obj1, obj2, 2*length, MPI_FLOAT, MPI_SUM, communicator);
00235 #else
00236 MPI_Allreduce(obj1, obj2, 2*length, MPI_DOUBLE, MPI_SUM, communicator);
00237 #endif
00238 for(i=0; i<length; i++) obj1[i]=obj2[i];
00239 delete[] obj2;
00240 }
00241 void add(mdp_matrix &a) {
00242 add(a.address(), a.rowmax()*a.colmax());
00243 }
00244 void add(mdp_matrix *a, mdp_int length) {
00245 mdp_int i;
00246 for(i=0; i<length; i++) add(a[i]);
00247 }
00248 template<class T>
00249 void add(vector<T> &a) {
00250 add(&a[0],a.size());
00251 }
00252
00253 void barrier() {
00254 MPI_Barrier(communicator);
00255 }
00256 template<class T>
00257 void broadcast(T &obj, int p) {
00258 MPI_Bcast(&obj, sizeof(T)/sizeof(char), MPI_CHAR, p, communicator);
00259 }
00260 template<class T>
00261 void broadcast(T* obj, mdp_int length, int p) {
00262 MPI_Bcast(obj, length*sizeof(T)/sizeof(char), MPI_CHAR, p,communicator);
00263 }
00264 void reset_time() {
00265 mytime=MPI_Wtime();
00266 comm_time=0;
00267 }
00268 double time() {
00269 return MPI_Wtime()-mytime;
00270 }
00271 };
00272
00273 #else
00274 #include "time.h"
00275 typedef int mdp_request;
00276
00289 class mdp_communicator : public mdp_log {
00290 private:
00291 #ifndef NO_POSIX
00292 mdp_psim *nodes;
00293 #endif
00294 int communicator;
00295 int wormholes_open;
00296 double mytime;
00297 double MPI_Wtime() {
00298 return walltime();
00299 }
00300 int my_id;
00301 int my_nproc;
00302 public:
00303 double comm_time;
00304 mdp_communicator() {
00305 wormholes_open=false;
00306 }
00307 template<class T>
00308 void put(T &obj, int destination) {
00309 #ifndef NO_POSIX
00310 nodes->send(destination,"",obj);
00311 #endif
00312 }
00313 template<class T>
00314 void put(T &obj, int destination, mdp_request &r) {
00315 #ifndef NO_POSIX
00316 nodes->send(destination,"",obj);
00317 #endif
00318 }
00319 template<class T>
00320 void get(T &obj, int source) {
00321 #ifndef NO_POSIX
00322 nodes->recv(source,"",obj);
00323 #endif
00324 }
00325 template<class T>
00326 void put(T* objptr, mdp_int length, int destination) {
00327 #ifndef NO_POSIX
00328 nodes->send(destination,"",objptr, length);
00329 #endif
00330 }
00331 template<class T>
00332 void put(T* objptr, mdp_int length, int destination, mdp_request &r) {
00333 #ifndef NO_POSIX
00334 nodes->send(destination,"",objptr, length);
00335 #endif
00336 }
00337 template<class T>
00338 void get(T* objptr, mdp_int length, int source) {
00339 #ifndef NO_POSIX
00340 nodes->recv(source,"",objptr,length);
00341 #endif
00342 }
00343
00344
00345
00346 void add(float &obj1, float &obj2) {
00347 #ifndef NO_POSIX
00348 obj2=nodes->add(obj1);
00349 #endif
00350 }
00351 void add(float* obj1, float *obj2, mdp_int length) {
00352 #ifndef NO_POSIX
00353 for(int i=0; i<length; i++) {
00354 obj2[i]=nodes->add(obj1[i]);
00355 }
00356 #endif
00357 }
00358 void add(double &obj1, double &obj2) {
00359 #ifndef NO_POSIX
00360 obj2=nodes->add(obj1);
00361 #endif
00362 }
00363 void add(double* obj1, double *obj2, mdp_int length) {
00364 #ifndef NO_POSIX
00365 for(int i=0; i<length; i++) {
00366 obj2[i]=nodes->add(obj1[i]);
00367 }
00368 #endif
00369 }
00370
00371
00372 void add(mdp_int &obj1) {
00373 #ifndef NO_POSIX
00374 obj1=nodes->add(obj1);
00375 #endif
00376 }
00377 void add(float &obj1) {
00378 #ifndef NO_POSIX
00379 obj1=nodes->add(obj1);
00380 #endif
00381 }
00382 void add(double &obj1) {
00383 #ifndef NO_POSIX
00384 obj1=nodes->add(obj1);
00385 #endif
00386 }
00387 void add(mdp_int *obj1, mdp_int length) {
00388 #ifndef NO_POSIX
00389 for(int i=0; i<length; i++) {
00390 obj1[i]=nodes->add(obj1[i]);
00391 }
00392 #endif
00393 }
00394
00395
00396
00397
00398
00399
00400
00401 void add(float *obj1, mdp_int length) {
00402 #ifndef NO_POSIX
00403 for(mdp_int i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00404 #endif
00405 }
00406 void add(double *obj1, mdp_int length) {
00407 #ifndef NO_POSIX
00408 for(mdp_int i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00409 #endif
00410 }
00411 void add(mdp_complex &obj1) {
00412 #ifndef NO_POSIX
00413 obj1=nodes->add(obj1);
00414 #endif
00415 }
00416 void add(mdp_complex *obj1, mdp_int length) {
00417 #ifndef NO_POSIX
00418 for(mdp_int i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00419 #endif
00420 }
00421 void add(mdp_matrix &a) {
00422 #ifndef NO_POSIX
00423 for(mdp_int i=0; i<a.size(); i++)
00424 a.address()[i]=nodes->add(a.address()[i]);
00425 #endif
00426 }
00427 void add(mdp_matrix *a, mdp_int length) {
00428 #ifndef NO_POSIX
00429 for(mdp_int j=0; j<length; j++)
00430 for(mdp_int i=0; i<a[j].size(); i++)
00431 a[j].address()[i]=nodes->add(a[j].address()[i]);
00432 #endif
00433 }
00434 template<class T>
00435 void add(vector<T> &a) {
00436 add(&a[0],a.size());
00437 }
00438 template<class T>
00439 void broadcast(T &obj, int p) {
00440 #ifndef NO_POSIX
00441 nodes->broadcast(p,obj);
00442 #endif
00443 }
00444 template<class T>
00445 void broadcast(T* obj, mdp_int length, int p) {
00446 #ifndef NO_POSIX
00447 nodes->broadcast(p,obj,length);
00448 #endif
00449 }
00450 void wait(mdp_request &r) {}
00451 void wait(mdp_request *r, int length) {}
00452 inline const int me() {
00453 return my_id;
00454 }
00455 inline const int nproc() {
00456 return my_nproc;
00457 }
00458 void barrier() {}
00459 int tag(int i, int j) {
00460 return i*nproc()+j;
00461 }
00462 void reset_time() {
00463 mytime=MPI_Wtime();
00464 comm_time=0;
00465 }
00467 double time() {
00468 return MPI_Wtime()-mytime;
00469 }
00472 void open_wormholes(int argc, char** argv) {
00473 if(wormholes_open) return;
00474
00475 #ifndef NO_POSIX
00476 nodes=new mdp_psim(argc, argv);
00477 my_id=nodes->id();
00478 my_nproc=nodes->nprocs();
00479 #else
00480 my_id=0;
00481 my_nproc=1;
00482 #endif
00483 if(me()==0) print=true; else print=false;
00484 begin_function("PROGRAM");
00485 begin_function("open_wormholes");
00486 (*this) <<
00487 "<head>\n"
00488 "*************************************************\n"
00489 "* Starting [Matrix Distributed Processing] *\n"
00490 "* created by Massimo Di Pierro *\n"
00491 "* copyrighted by www.metacryption.com *\n"
00492 "*************************************************\n"
00493 "</head>\n";
00494 reset_time();
00495 double a,b;
00496 getcpuusage(a,b);
00497 wormholes_open=true;
00498 end_function("open_wormholes");
00499 }
00501 void print_stats() {
00502 #ifndef NO_POSIX
00503 int i;
00504 double *a=new double[nproc()];
00505 double *b=new double[nproc()];
00506 double *c=new double[nproc()];
00507 for(i=0; i<nproc(); i++) a[i]=b[i]=c[i]=0;
00508 getcpuusage(a[me()],b[me()]);
00509 c[me()]=100.0*comm_time/(time());
00510 add(a,nproc());
00511 add(b,nproc());
00512 add(c,nproc());
00513 char buffer[256];
00514 for(i=0; i<nproc(); i++) {
00515 sprintf(buffer,
00516 "* Process %i stats: CPU=%.2f%% PROCESS=%.2f%% COMM=%.2f%%\n",
00517 i, a[i],b[i],c[i]);
00518 (*this) << buffer;
00519 }
00520 (*this) << "* (above numbers make no sense under windows)\n";
00521 delete[] a;
00522 delete[] b;
00523 delete[] c;
00524 #endif
00525 }
00527 void close_wormholes() {
00528 begin_function("close_wormholes");
00529 (*this) <<
00530 "<foot>\n"
00531 "*************************************************\n"
00532 "* Ending [Matrix Distributed Processing] *\n";
00533 print_stats();
00534 (*this) <<
00535 "*************************************************\n"
00536 "</foot>\n";
00537 (*this) << "PROCESS " << me() << " ENDING AFTER " << time() << " sec.\n";
00538 wormholes_open=false;
00539 end_function("close_wormholes");
00540 end_function("PROGRAM");
00541 #ifndef NO_POSIX
00542 if(nodes) delete nodes;
00543 #endif
00544 }
00546 void abort() {
00547 (*this) << "calling abort...";
00548 exit(-1);
00549 }
00550 };
00551
00552 #endif
00553
00555 mdp_communicator mdp;
00556
00558 mdp_communicator& mpi=mdp;
00559
00560 void _mpi_error_message(string a, string b, int c) {
00561 mpi.error_message(a,b,c);
00562 }
00563
00565 inline void begin_function(string s) {
00566 mpi.begin_function(s);
00567 }
00568
00570 inline void end_function(string s) {
00571 mpi.end_function(s);
00572 }
00573