00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "cstdio"
00014 #include "cstdlib"
00015 #include "string"
00016 #include "iostream"
00017 #include "string"
00018 #include "vector"
00019 #include "map"
00020 #include <sys/file.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023 #include <sys/socket.h>
00024 #include <fcntl.h>
00025 using namespace std;
00026
00051 class mdp_psim {
00052 private:
00053
00054
00055 const static int PROCESS_COUNT_MIN= 1;
00056 const static int PROCESS_COUNT_MAX= 128;
00057 const static int CONN_LIST_IGNORE = -1;
00058 const static int PROCESS_PARENT = 0;
00059 const static int COMM_RECV = 0;
00060 const static int COMM_SEND= 1;
00061 const static int COMM_TIMEOUT_DEFAULT = 86400;
00062
00063
00064 enum enumBegEnd { LOG_BEGIN, LOG_END };
00065 enum enumSendRecv { LOG_SR_SEND, LOG_SR_RECV };
00066 enum enumSendRecvStep { LOG_SR_START, LOG_SR_FAIL, LOG_SR_SUCCESS };
00067
00068
00069
00070
00071 int _verbatim;
00072 int _processCount;
00073 string _logFileName;
00074 bool _doLogging;
00075 FILE* _logfileFD;
00076 int _processID;
00077
00078 int (*_socketFD)[2];
00079 int _commTimeout;
00080
00081 map< string, vector<char> >* _hash;
00082
00083
00084
00085
00086
00087
00088
00089
00090 void psim_begin(int processCount, string logFileName, int verbatim) {
00091 _processCount=processCount;
00092 _logFileName=logFileName;
00093 _verbatim=verbatim;
00094
00095 open_log();
00096 if ((processCount<PROCESS_COUNT_MIN) || (processCount<PROCESS_COUNT_MIN)) {
00097 log("PSIM ERROR: Invalid number of processes");
00098 throw string("PSIM ERROR: Invalid number of processes");
00099 }
00100
00101 initialize(processCount);
00102 create_sockets();
00103 fork_processes();
00104
00105 char buffer[256];
00106 sprintf(buffer, "process %i of %i created with pid=%i",
00107 _processID, _processCount, getpid());
00108 log(buffer,1);
00109
00110 }
00111
00112
00113
00114
00115
00116
00117
00118
00119 void psim_end() {
00120 for (int source=0; source<_processCount; source++) {
00121 for (int dest=0; dest<_processCount; dest++) {
00122 close(_socketFD[_processCount*source+dest][COMM_SEND]);
00123 close(_socketFD[_processCount*source+dest][COMM_RECV]);
00124 }
00125 }
00126 if (_socketFD != NULL)
00127 delete [] _socketFD;
00128 _socketFD = NULL;
00129
00130
00131 if (_hash != NULL) {
00132 delete [] _hash;
00133 _hash = NULL;
00134 }
00135
00136 char buffer[256];
00137 sprintf(buffer, "process %i terminating", _processID);
00138 log(buffer,1);
00139
00140 close_log();
00141 }
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151 void initialize(int processCount) {
00152 _processCount = processCount;
00153 _processID = PROCESS_PARENT;
00154 _commTimeout = COMM_TIMEOUT_DEFAULT;
00155
00156 _hash = new map< string, vector<char> >[_processCount];
00157 if (_hash == NULL) {
00158 log("PSIM ERROR: failure to allocate hash");
00159 throw string("PSIM ERROR: failure to allocate hash");
00160 }
00161 _socketFD = new int[_processCount*_processCount][2];
00162 if (_socketFD == NULL) {
00163 log("PSIM ERROR: failed to create socket array");
00164 throw string("PSIM ERROR: failed to create socket array");
00165 }
00166 }
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176 void create_sockets() {
00177 for (int source=0; source<_processCount; source++) {
00178 for (int dest=0; dest<_processCount; dest++) {
00179 if (socketpair(AF_LOCAL, SOCK_STREAM, 0,
00180 _socketFD[_processCount*source+dest]) < 0) {
00181 log("PSIM ERROR: socketpair");
00182 throw string("PSIM ERROR: socketpair");
00183 }
00184 }
00185 }
00186 char buffer[256];
00187 for (int source=0; source<_processCount; source++)
00188 for (int dest=0; dest<_processCount; dest++) {
00189 sprintf(buffer,"_socketFD[%i*%i+%i]={%i,%i}",
00190 source,_processCount,dest,
00191 _socketFD[_processCount*source+dest][COMM_SEND],
00192 _socketFD[_processCount*source+dest][COMM_RECV]);
00193 log(buffer);
00194 }
00195 }
00196
00197
00198
00199
00200
00201
00202 void fork_processes() {
00203 _processID=0;
00204 for (int i=1; i<_processCount; i++) {
00205 int pid = fork();
00206
00207 if (pid == -1) {
00208 log("PSIM ERROR: fork");
00209 throw("PSIM ERROR: fork");
00210 } else if (pid == 0) {
00211 _processID = i;
00212 break;
00213 }
00214 }
00215 }
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225 void check_process_id(int processID) {
00226
00227 if ((processID == _processID) ||
00228 (processID < PROCESS_PARENT) ||
00229 (processID >= _processCount)) {
00230
00231 char buffer[256];
00232 sprintf(buffer, "PSIM ERROR: process %i referencing %i.",
00233 _processID, processID);
00234 log(buffer);
00235 throw string( buffer );
00236 }
00237 }
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247 void open_log() {
00248 _doLogging = false;
00249 if (_logFileName.length()==0) {
00250 return;
00251 }
00252
00253
00254 if ((_logfileFD = fopen(_logFileName.c_str(), "w")) == NULL) {
00255 log("PSIM ERROR: unable to create logfile");
00256 throw string("PSIM ERROR: unable to create logfile");
00257 }
00258
00259 close_log();
00260
00261
00262 if ((_logfileFD = fopen(_logFileName.c_str(), "a")) == NULL) {
00263 log("PSIM ERROR: unable to open logfile");
00264 throw string("PSIM ERROR: unable to open logfile");
00265 }
00266 _doLogging=true;
00267
00268 }
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278 void close_log() {
00279 if (_doLogging)
00280 fclose(_logfileFD);
00281
00282 }
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293 void logSendRecv(int sourcedestProcessID,
00294 string tag,
00295 enumSendRecv method,
00296 enumSendRecvStep step){
00297
00298 char buffer[256];
00299 const char cmdSendRecv[2][8] = { "send", "recv" };
00300 const char stepSendRecv[3][12] = { "starting...", "failed!", "success." };
00301
00302 sprintf(buffer, "%i %s(%i,%s) %s",
00303 _processID, cmdSendRecv[method],
00304 sourcedestProcessID, tag.c_str(), stepSendRecv[step]);
00305 log(buffer);
00306 }
00307
00308
00309
00310
00311
00312
00313 int get_source_index(int source) {
00314 check_process_id(source);
00315 return _processCount*source+_processID;
00316 }
00317
00318
00319
00320
00321
00322
00323 int get_dest_index(int dest) {
00324 check_process_id(dest);
00325 return _processCount*_processID+dest;
00326 }
00327
00328
00329
00330
00331
00332
00333
00334
00335 void send_buffer(int destProcessID,
00336 const void* pdataToSend, mdp_int dataSize) {
00337
00338 int destIndex = get_dest_index(destProcessID);
00339 if (write(_socketFD[destIndex][COMM_SEND], pdataToSend, dataSize)
00340 != dataSize) {
00341 log("PSIM ERROR: failure to write to socket");
00342 throw string("PSIM ERROR: failure to write to socket");
00343 }
00344 }
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354 void send_binary(int destProcessID,
00355 const string& tag,
00356 const vector<char>& data) {
00357
00358 int tagSize = tag.size();
00359 int dataSize = data.size();
00360 send_buffer(destProcessID, &tagSize, sizeof(tagSize));
00361 send_buffer(destProcessID, tag.c_str(), tagSize);
00362 send_buffer(destProcessID, &dataSize, sizeof(dataSize));
00363 send_buffer(destProcessID, &data[0], dataSize);
00364 }
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374 void recv_buffer(int sourceProcessID,
00375 void* pdataToReceive, mdp_int dataSize) {
00376 mdp_int bytes = 0;
00377 mdp_int t0=time(NULL);
00378 int sourceIndex = get_source_index(sourceProcessID);
00379
00380
00381 do{
00382 bytes+=read(_socketFD[sourceIndex][COMM_RECV],
00383 (char*) pdataToReceive+bytes,dataSize-bytes);
00384 if(time(NULL)-t0>_commTimeout) {
00385 log("PSIM ERROR: timeout error in readin from socket");
00386 throw string("PSIM ERROR: timeout error in readin from socket");
00387 }
00388 } while (bytes<dataSize);
00389 }
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400 void recv_binary(int sourceProcessID,
00401 const string& tag,
00402 vector<char>& data ) {
00403
00404 static vector<char> dataTmp;
00405 static string tagReceived;
00406 int size;
00407
00408 map< string, vector<char> >::iterator itr;
00409
00410 while (true) {
00411 itr = _hash[sourceProcessID].find(tag);
00412
00413 if (itr != _hash[sourceProcessID].end()) {
00414 data = _hash[sourceProcessID][tag];
00415 _hash[sourceProcessID].erase(itr);
00416 break;
00417 } else {
00418 recv_buffer(sourceProcessID, &size, sizeof(size));
00419 char* buffer= new char[size+1];
00420 recv_buffer(sourceProcessID, buffer, size);
00421 buffer[size] = 0;
00422 tagReceived = buffer;
00423 delete buffer;
00424
00425 if (tagReceived == tag) {
00426 recv_buffer(sourceProcessID, &size, sizeof(size));
00427 data.resize(size);
00428 recv_buffer(sourceProcessID, &data[0], size);
00429 break;
00430 } else {
00431 recv_buffer(sourceProcessID, &size, sizeof(size));
00432 dataTmp.resize(size);
00433 recv_buffer(sourceProcessID, &dataTmp[0], size);
00434 _hash[sourceProcessID][tagReceived] = dataTmp;
00435 }
00436 }
00437 }
00438 }
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450 void doBegEndLog(string method, enumBegEnd begEnd) {
00451 char buffer[256];
00452 char* be;
00453
00454 if (begEnd == LOG_BEGIN) be = "BEGIN";
00455 else be = "END";
00456
00457 sprintf(buffer, "%i %s [%s]", _processID, be, method.c_str());
00458 log(buffer);
00459 }
00460
00461 public:
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480 mdp_psim(int processCount, string logFileName=".psim.log", int verbatim=0) {
00481 psim_begin(processCount, logFileName, verbatim);
00482 }
00483
00484 mdp_psim(int argc, char** argv) {
00485 int processCount=parse_argv_nprocs(argc,argv);
00486 string logFileName=parse_argv_logfile(argc,argv);
00487 int verbatim=parse_argv_verbatim(argc,argv);
00488 psim_begin(processCount, logFileName, verbatim);
00489 }
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500 virtual ~mdp_psim() {
00501 psim_end();
00502 }
00503
00504
00505
00506
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516 void log(string message,int level=2) {
00517 if (_doLogging) {
00518 int fd=fileno(_logfileFD);
00519 flock(fd,LOCK_EX);
00520 fwrite("PSIM LOG: ", 10, 1, _logfileFD);
00521 fwrite(message.c_str(), message.length(), 1, _logfileFD);
00522 fwrite("\n", 1, 1, _logfileFD);
00523
00524 fflush(_logfileFD);
00525 flock(fd,LOCK_UN);
00526 }
00527 if(_verbatim>=level) {
00528 cout << "PSIM LOG: " << message << endl;
00529 cout.flush();
00530 }
00531 }
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542 int id() {
00543 return _processID;
00544 }
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554 int nprocs() {
00555 return _processCount;
00556 }
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568 void setCommTimeout(unsigned int commTimeout) {
00569 _commTimeout = commTimeout;
00570 }
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584 template<class T>
00585 void send(int destProcessID, string dataTag, T &dataToSend) {
00586 logSendRecv(destProcessID, dataTag,LOG_SR_SEND, LOG_SR_START);
00587 vector<char> data(sizeof(T));
00588 for(unsigned int k=0; k<sizeof(T); k++)
00589 data[k]=((char*)&dataToSend)[k];
00590 send_binary(destProcessID, dataTag, data);
00591
00592 logSendRecv(destProcessID, dataTag, LOG_SR_SEND, LOG_SR_SUCCESS);
00593 }
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605 template<class T>
00606 void send(int destProcessID, string dataTag,
00607 T *pdataToSend, mdp_int dataSize) {
00608 logSendRecv(destProcessID, dataTag, LOG_SR_SEND, LOG_SR_START);
00609 vector<char> data(sizeof(T)*dataSize);
00610 for(mdp_int k=0; k<data.size(); k++)
00611 data[k]=((char*)pdataToSend)[k];
00612 send_binary(destProcessID, dataTag, data);
00613 logSendRecv(destProcessID, dataTag,LOG_SR_SEND, LOG_SR_SUCCESS);
00614 }
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625 template<class T>
00626 void recv(int sourceProcessID, string dataTag, T &dataToReceive) {
00627 logSendRecv(sourceProcessID, dataTag,LOG_SR_RECV, LOG_SR_START);
00628 vector<char> data;
00629 recv_binary(sourceProcessID, dataTag, data);
00630 if(data.size()!=sizeof(T)) {
00631 log("PSIM ERROR: recv invalid data)");
00632 throw string("PSIM ERROR: recv invalid data)");
00633 };
00634 for(unsigned int k=0; k<sizeof(T); k++)
00635 ((char*)&dataToReceive)[k]=data[k];
00636
00637 logSendRecv(sourceProcessID, dataTag, LOG_SR_RECV, LOG_SR_SUCCESS);
00638 }
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648
00649
00650 template<class T>
00651 void recv(int sourceProcessID, string dataTag,
00652 T *pdataToReceive, mdp_int dataSize) {
00653 logSendRecv(sourceProcessID, dataTag, LOG_SR_RECV, LOG_SR_START);
00654 vector<char> data;
00655 recv_binary(sourceProcessID, dataTag, data);
00656 if(data.size()!=sizeof(T)*dataSize) {
00657 log("PSIM ERROR: recv invalid data size");
00658 throw string("PSIM ERROR: recv invalid data size");
00659 }
00660 for(mdp_int k=0; k<data.size(); k++)
00661 ((char*)pdataToReceive)[k]=data[k];
00662 logSendRecv(sourceProcessID, dataTag, LOG_SR_RECV, LOG_SR_SUCCESS);
00663 }
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674 template<class T>
00675 void broadcast(int sourceProcessID, T &data) {
00676 static string tag = "BROADCAST:0";
00677 doBegEndLog(tag, LOG_BEGIN);
00678 if (_processID == sourceProcessID) {
00679 for (int i=0; i<_processCount; i++) {
00680 if (i != sourceProcessID) {
00681 send(i, tag, data);
00682 }
00683 }
00684 } else {
00685 recv(sourceProcessID, tag, data);
00686 }
00687 if(tag=="BROADCAST:0") tag="BROADCAST:1"; else tag="BROADCAST:0";
00688 doBegEndLog(tag, LOG_END);
00689 }
00690
00691 template<class T>
00692 void broadcast(int sourceProcessID, T *data, int dataSize) {
00693 static string tag = "BROADCASTV:0";
00694 doBegEndLog(tag, LOG_BEGIN);
00695 if (_processID == sourceProcessID) {
00696 for (int i=0; i<_processCount; i++) {
00697 if (i != sourceProcessID)
00698 send(i, tag, data, dataSize);
00699 }
00700 } else {
00701 recv(sourceProcessID, tag, data, dataSize);
00702 }
00703 if(tag=="BROADCASTV:0") tag="BROADCASTV:1"; else tag="BROADCASTV:0";
00704 doBegEndLog(tag, LOG_END);
00705 }
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718 template<class T>
00719 vector<T> collect(int dest, T &data) {
00720 static string tag="COLLECT";
00721 vector<T> dataList;
00722 T dataToReceive;
00723 dataList.resize(_processCount);
00724 doBegEndLog(tag, LOG_BEGIN);
00725
00726 if (_processID != dest) {
00727 send(dest, tag, data);
00728 } else {
00729 dataList[dest]=data;
00730
00731 for (int i=0; i<_processCount; i++) {
00732 if(i!=dest) {
00733 recv(i, tag, dataToReceive);
00734 dataList[i]=dataToReceive;
00735 }
00736 }
00737 }
00738 if(tag=="COLLECT:0") tag="COLLECT:1"; else tag="COLLECT:0";
00739 doBegEndLog(tag, LOG_END);
00740 return dataList;
00741 }
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753 template<class T>
00754 vector<T> combine(T &data) {
00755 vector<T> dataList=collect(PROCESS_PARENT,data);
00756 cout << id() << " size=" << dataList.size() << endl;
00757
00758 broadcast(PROCESS_PARENT, &dataList[0], dataList.size());
00759 cout << id() << " list=" << dataList[0] << dataList[1]<< dataList[2]<< endl;
00760 return dataList;
00761 }
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771 void barrier() {
00772 int dummy;
00773 broadcast(PROCESS_PARENT,dummy);
00774 collect(PROCESS_PARENT,dummy);
00775 }
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785 template<class T>
00786 T add(T &item) {
00787 vector<T> dataList;
00788 T total=0;
00789 dataList=collect(PROCESS_PARENT,item);
00790 if(_processID==PROCESS_PARENT)
00791 for (int i=0; i<dataList.size(); i++) {
00792 total += dataList[i];
00793 }
00794 broadcast(PROCESS_PARENT,total);
00795 return total;
00796 }
00797
00798
00799
00800
00801
00802
00803
00804
00805
00806
00807 static int parse_argv_nprocs(int argc, char** argv) {
00808 int n=1;
00809 for(int i=1; i<argc; i++)
00810 if(strncmp(argv[i],"-PSIM_NPROCS=",13)==0) {
00811 sscanf(argv[i]+13,"%i",&n);
00812 break;
00813 }
00814 return n;
00815 }
00816
00817 static string parse_argv_logfile(int argc, char** argv) {
00818 for(int i=1; i<argc; i++)
00819 if(strncmp(argv[i],"-PSIM_LOGFILE=",14)==0) {
00820 return string(argv[i]+14);
00821 }
00822 return string("");
00823 }
00824
00825 static int parse_argv_verbatim(int argc, char** argv) {
00826 int n=1;
00827 for(int i=1; i<argc; i++)
00828 if(strncmp(argv[i],"-PSIM_VERBATIM=",15)==0) {
00829 sscanf(argv[i]+15,"%i",&n);
00830 break;
00831 }
00832 return n;
00833 }
00834 };
00835
00836