00001 #include <vector>
00002 #include "armci_impl.h"
00003 
00004 using namespace std;
00005 
00006 int **_armciRednLookupTable;
00007 
00008 
00009 
00010 
00011 extern "C" void armciLibStart(void) {
00012   int argc=CkGetArgc();
00013   char **argv=CkGetArgv();
00014   ARMCI_Main_cpp(argc, argv);
00015 }
00016 
00017 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(sum,ret[i]+=value[i];)
00018 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(product,ret[i]*=value[i];)
00019 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
00020 _ARMCI_GENERATE_POLYMORPHIC_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
00021 _ARMCI_GENERATE_ABS_REDUCTION()
00022 
00023 static int armciLibStart_idx = -1;
00024 
00025 #if CMK_TRACE_ENABLED
00026 #include "register.h" 
00027 CsvExtern(funcmap*, tcharm_funcmap);
00028 #endif
00029 
00030 void armciNodeInit(void) {
00031 #if CMK_TRACE_ENABLED
00032   TCharm::nodeInit(); 
00033   int funclength = sizeof(funclist)/sizeof(char*);
00034   for (int i=0; i<funclength; i++) {
00035     int event_id = traceRegisterUserEvent(funclist[i], -1);
00036     CsvAccess(tcharm_funcmap)->insert(std::pair<std::string, int>(funclist[i], event_id));
00037   }
00038 
00039   
00040   
00041   for (int i=0; i<_chareTable.size(); i++){
00042     if (strcmp(_chareTable[i]->name, "dummy_thread_chare") == 0)
00043       _chareTable[i]->name = "ARMCI";
00044   }
00045   for (int i=0; i<_entryTable.size(); i++){
00046     if (strcmp(_entryTable[i]->name, "dummy_thread_ep") == 0)
00047       _entryTable[i]->setName("thread");
00048   }
00049 #endif
00050   CmiAssert(armciLibStart_idx == -1);
00051   armciLibStart_idx = TCHARM_Register_thread_function((TCHARM_Thread_data_start_fn)armciLibStart);
00052 
00053   
00054   _armciRednLookupTable = new int*[_ARMCI_NUM_REDN_OPS];
00055   for (int ops=0; ops<_ARMCI_NUM_REDN_OPS; ops++) {
00056     _armciRednLookupTable[ops] = new int[ARMCI_NUM_DATATYPES];
00057   }
00058 
00059   
00060   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(sum,_ARMCI_REDN_OP_SUM);
00061   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(product,_ARMCI_REDN_OP_SUM);
00062   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(max,_ARMCI_REDN_OP_MAX);
00063   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(min,_ARMCI_REDN_OP_MIN);
00064   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(absmax,_ARMCI_REDN_OP_ABSMAX);
00065   _ARMCI_REGISTER_POLYMORPHIC_REDUCTION(absmin,_ARMCI_REDN_OP_ABSMIN);
00066 }
00067 
00068 
00069 
00070 
00071 static void ArmciDefaultSetup(void) {
00072   
00073   TCHARM_Create(TCHARM_Get_num_chunks(), armciLibStart_idx);
00074 }
00075 
00076 CtvDeclare(ArmciVirtualProcessor *, _armci_ptr);
00077 
00078 
00079 void armciProcInit(void) {
00080   CtvInitialize(ArmciVirtualProcessor, _armci_ptr);
00081   CtvAccess(_armci_ptr) = NULL;
00082 
00083   
00084   TCHARM_Set_fallback_setup(ArmciDefaultSetup);
00085 }
00086 
00087 ArmciVirtualProcessor::ArmciVirtualProcessor(const CProxy_TCharm &_thr_proxy)
00088   : TCharmClient1D(_thr_proxy) {
00089   thisProxy = this;
00090   tcharmClientInit();
00091   thread->semaPut(ARMCI_TCHARM_SEMAID,this);
00092   memBlock = CmiIsomallocBlockListNew();
00093   thisProxy = CProxy_ArmciVirtualProcessor(thisArrayID);
00094   addressReply = NULL;
00095   
00096 }
00097 
00098 ArmciVirtualProcessor::ArmciVirtualProcessor(CkMigrateMessage *m) 
00099   : TCharmClient1D(m) 
00100 {
00101 
00102   thread = NULL;
00103   addressReply = NULL;
00104 }
00105 
00106 ArmciVirtualProcessor::~ArmciVirtualProcessor()
00107 {
00108   CmiIsomallocBlockListDelete(memBlock);
00109   if (addressReply) {delete addressReply;}
00110 }
00111 
00112 void ArmciVirtualProcessor::setupThreadPrivate(CthThread forThread) {
00113   CtvAccessOther(forThread, _armci_ptr) = this;
00114   armci_nproc = thread->getNumElements();
00115 }
00116 
00117 void ArmciVirtualProcessor::getAddresses(AddressMsg *msg) {
00118   addressReply = msg;
00119   thread->resume();
00120 }
00121 
00122 
00123 void ArmciVirtualProcessor::put(pointer src, pointer dst,
00124                    int nbytes, int dst_proc) {
00125 
00126 
00127 
00128 
00129   int hdl = hdlList.size();
00130   Armci_Hdl* entry = new Armci_Hdl(ARMCI_BPUT, dst_proc, nbytes, src, dst);
00131   hdlList.push_back(entry);
00132 
00133   ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
00134   memcpy(msg->data, src, nbytes);
00135   thisProxy[dst_proc].putData(msg);
00136 
00137 }
00138 
00139 int ArmciVirtualProcessor::nbput(pointer src, pointer dst,
00140                    int nbytes, int dst_proc) {
00141 
00142 
00143 
00144 
00145   int hdl = hdlList.size();
00146   Armci_Hdl* entry = new Armci_Hdl(ARMCI_PUT, dst_proc, nbytes, src, dst);
00147   hdlList.push_back(entry);
00148 
00149   ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
00150   memcpy(msg->data, src, nbytes);
00151   thisProxy[dst_proc].putData(msg);
00152   
00153   return hdl;
00154 }
00155 
00156 void ArmciVirtualProcessor::nbput_implicit(pointer src, pointer dst,
00157                       int nbytes, int dst_proc) {
00158   int hdl = hdlList.size();
00159   Armci_Hdl* entry = new Armci_Hdl(ARMCI_IPUT, dst_proc, nbytes, src, dst);
00160   hdlList.push_back(entry);
00161 
00162   ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,thisIndex,hdl);
00163   memcpy(msg->data, src, nbytes);
00164   thisProxy[dst_proc].putData(msg);
00165 }
00166 
00167 void ArmciVirtualProcessor::putData(pointer dst, int nbytes, char *data,
00168                     int src_proc, int hdl) {
00169   memcpy(dst, data, nbytes);
00170   thisProxy[src_proc].putAck(hdl);
00171 }
00172 
00173 void ArmciVirtualProcessor::putData(ArmciMsg *m) {
00174   memcpy(m->dst, m->data, m->nbytes);
00175   thisProxy[m->src_proc].putAck(m->hdl);
00176   delete m;
00177 }
00178 
00179 void ArmciVirtualProcessor::putAck(int hdl) {
00180   if(hdl != -1) { 
00181     hdlList[hdl]->acked = 1;  
00182     if (hdlList[hdl]->wait == 1) {
00183       hdlList[hdl]->wait = 0;
00184       thread->resume();
00185     }
00186   }
00187   thread->resume();
00188 }
00189 
00190 void ArmciVirtualProcessor::get(pointer src, pointer dst,
00191                    int nbytes, int src_proc) {
00192 
00193 
00194 
00195 
00196   thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, -1);
00197   
00198   thread->suspend();
00199 }
00200 
00201 int ArmciVirtualProcessor::nbget(pointer src, pointer dst,
00202                    int nbytes, int src_proc) {
00203 
00204 
00205 
00206 
00207 
00208   int hdl = hdlList.size();
00209   Armci_Hdl* entry = new Armci_Hdl(ARMCI_GET, src_proc, nbytes, src, dst);
00210   hdlList.push_back(entry);
00211   
00212   thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, hdl);
00213 
00214   return hdl;
00215 }
00216 
00217 void ArmciVirtualProcessor::nbget_implicit(pointer src, pointer dst,
00218                        int nbytes, int src_proc) {
00219   int hdl = hdlList.size();
00220   Armci_Hdl* entry = new Armci_Hdl(ARMCI_IGET, src_proc, nbytes, src, dst);
00221   hdlList.push_back(entry);
00222   
00223   thisProxy[src_proc].requestFromGet(src, dst, nbytes, thisIndex, hdl);
00224 }
00225 
00226 void ArmciVirtualProcessor::wait(int hdl){
00227   if(hdl == -1) return;
00228   while (1) {
00229     if(hdlList[hdl]->acked != 0)
00230       break;
00231     else
00232       thread->suspend();
00233   }
00234 }
00235 
00236 
00237 
00238 
00239 
00240 
00241 
00242 
00243 void ArmciVirtualProcessor::waitmulti(vector<int> procs){
00244   for(int i=0;i<procs.size();i++){
00245     wait(procs[i]);
00246   }
00247 }
00248 
00249 void ArmciVirtualProcessor::waitproc(int proc){
00250   vector<int> procs;
00251   for(int i=0;i<hdlList.size();i++){
00252     if((hdlList[i]->acked == 0) && 
00253        (hdlList[i]->proc == proc) && 
00254        ((hdlList[i]->op & IMPLICIT_MASK) != 0)) {
00255       hdlList[i]->wait = 1;
00256       procs.push_back(i);
00257     }
00258   }
00259   waitmulti(procs);
00260 }
00261 
00262 void ArmciVirtualProcessor::waitall(){
00263   vector<int> procs;
00264   for(int i=0;i<hdlList.size();i++){
00265     if((hdlList[i]->acked == 0) && 
00266        ((hdlList[i]->op & IMPLICIT_MASK) != 0)) {
00267       hdlList[i]->wait = 1;
00268       procs.push_back(i);
00269     }
00270   }
00271   waitmulti(procs);
00272 }
00273 
00274 void ArmciVirtualProcessor::fence(int proc){
00275   vector<int> procs;
00276   for(int i=0;i<hdlList.size();i++){
00277     if((hdlList[i]->acked == 0) && 
00278        ((hdlList[i]->op & BLOCKING_MASK) != 0) && 
00279        (hdlList[i]->proc == proc))
00280       procs.push_back(i);
00281   }
00282   waitmulti(procs);
00283 }
00284 void ArmciVirtualProcessor::allfence(){
00285   vector<int> procs;
00286   for(int i=0;i<hdlList.size();i++){
00287     if((hdlList[i]->acked == 0) && 
00288        ((hdlList[i]->op & BLOCKING_MASK) != 0))
00289       procs.push_back(i);
00290   }
00291   waitmulti(procs);
00292 }
00293 void ArmciVirtualProcessor::barrier(){
00294   allfence();
00295   CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
00296   contribute(0,NULL,CkReduction::sum_int,cb);
00297   thread->suspend();
00298 }
00299 
00300 void ArmciVirtualProcessor::resumeThread(void){
00301   thread->resume();
00302 }
00303 
00304 int ArmciVirtualProcessor::test(int hdl){
00305   if(hdl == -1) return 1;
00306   return hdlList[hdl]->acked;
00307 }
00308 
00309 void ArmciVirtualProcessor::requestFromGet(pointer src, pointer dst, int nbytes,
00310                        int dst_proc, int hdl) {
00311   ArmciMsg *msg = new (nbytes, 0) ArmciMsg(dst,nbytes,-1,hdl);
00312   memcpy(msg->data, src, nbytes);
00313   thisProxy[dst_proc].putDataFromGet(msg);
00314 }
00315 
00316 
00317 
00318 
00319 void ArmciVirtualProcessor::putDataFromGet(pointer dst, int nbytes, char *data, int hdl) {
00320   memcpy(dst, data, nbytes);
00321   if(hdl != -1) { 
00322     hdlList[hdl]->acked = 1;  
00323     if (hdlList[hdl]->wait == 1) {
00324       hdlList[hdl]->wait = 0;
00325       thread->resume();
00326     }
00327   }
00328   thread->resume();
00329 }
00330 
00331 void ArmciVirtualProcessor::putDataFromGet(ArmciMsg *m) {
00332   memcpy(m->dst, m->data, m->nbytes);
00333   if(m->hdl != -1) { 
00334     hdlList[m->hdl]->acked = 1;  
00335     if (hdlList[m->hdl]->wait == 1) {
00336       hdlList[m->hdl]->wait = 0;
00337       thread->resume();
00338     }
00339   }
00340   delete m;
00341   thread->resume();
00342 }
00343 
00344 void ArmciVirtualProcessor::puts(pointer src_ptr, int src_stride_ar[], 
00345        pointer dst_ptr, int dst_stride_ar[],
00346        int count[], int stride_levels, int dst_proc){
00347   int nbytes = 1;
00348   for(int i=0;i<stride_levels+1;i++) 
00349     nbytes *= count[i];
00350   
00351 
00352 
00353 
00354 
00355 
00356 
00357 
00358   int hdl = hdlList.size();
00359   Armci_Hdl* entry = new Armci_Hdl(ARMCI_BPUT, dst_proc, nbytes, src_ptr, dst_ptr);
00360   hdlList.push_back(entry);
00361   
00362   ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00363 
00364   memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00365   memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00366   stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00367   thisProxy[dst_proc].putsData(m);
00368 }
00369 
00370 int ArmciVirtualProcessor::nbputs(pointer src_ptr, int src_stride_ar[], 
00371        pointer dst_ptr, int dst_stride_ar[],
00372        int count[], int stride_levels, int dst_proc){
00373   int nbytes = 1;
00374   for(int i=0;i<stride_levels+1;i++) 
00375     nbytes *= count[i];
00376   
00377 
00378 
00379 
00380 
00381 
00382 
00383 
00384   int hdl = hdlList.size();
00385   Armci_Hdl* entry = new Armci_Hdl(ARMCI_PUT, dst_proc, nbytes, src_ptr, dst_ptr);
00386   hdlList.push_back(entry);
00387  
00388   ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00389 
00390   memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00391   memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00392   stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00393   thisProxy[dst_proc].putsData(m);
00394   return hdl;
00395 }
00396 
00397 void ArmciVirtualProcessor::nbputs_implicit(pointer src_ptr, 
00398                         int src_stride_ar[], 
00399                         pointer dst_ptr, 
00400                         int dst_stride_ar[],
00401                         int count[], int stride_levels, 
00402                         int dst_proc){
00403   int nbytes = 1;
00404   for(int i=0;i<stride_levels+1;i++) 
00405     nbytes *= count[i];
00406   int hdl = hdlList.size();
00407   Armci_Hdl* entry = new Armci_Hdl(ARMCI_IPUT, dst_proc, nbytes, 
00408                    src_ptr, dst_ptr);
00409   hdlList.push_back(entry);
00410  
00411   ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00412 
00413   memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00414   memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00415   stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00416   thisProxy[dst_proc].putsData(m);
00417 }
00418 
00419 void ArmciVirtualProcessor::putsData(pointer dst_ptr, int dst_stride_ar[], 
00420         int count[], int stride_levels,
00421         int nbytes, char *data, int src_proc, int hdl){
00422   stridedCopy(dst_ptr, data, dst_stride_ar, count, stride_levels, 0);
00423   thisProxy[src_proc].putAck(hdl);
00424 }
00425 
00426 void ArmciVirtualProcessor::putsData(ArmciStridedMsg *m){
00427   stridedCopy(m->dst, m->data, m->dst_stride_ar, m->count, m->stride_levels, 0);
00428   thisProxy[m->src_proc].putAck(m->hdl);
00429   delete m;
00430 }
00431 
00432 void ArmciVirtualProcessor::gets(pointer src_ptr, int src_stride_ar[], 
00433        pointer dst_ptr, int dst_stride_ar[],
00434        int count[], int stride_levels, int src_proc){
00435 
00436 
00437 
00438 
00439 
00440 
00441 
00442 
00443 
00444 
00445 
00446   thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar, 
00447                     count, stride_levels, thisIndex, -1);
00448   
00449   thread->suspend();
00450 }
00451 
00452 int ArmciVirtualProcessor::nbgets(pointer src_ptr, int src_stride_ar[], 
00453        pointer dst_ptr, int dst_stride_ar[],
00454        int count[], int stride_levels, int src_proc){
00455   int hdl = hdlList.size();
00456   int nbytes = 1;
00457   for(int i=0;i<stride_levels+1;i++) 
00458     nbytes *= count[i];
00459 
00460 
00461 
00462 
00463 
00464 
00465 
00466 
00467   
00468   Armci_Hdl* entry = new Armci_Hdl(ARMCI_GET, src_proc, nbytes, src_ptr, dst_ptr);
00469   hdlList.push_back(entry);
00470 
00471   thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar, 
00472                     count, stride_levels, thisIndex, hdl);
00473 
00474   return hdl;
00475 }
00476 
00477 void ArmciVirtualProcessor::nbgets_implicit(pointer src_ptr, 
00478                         int src_stride_ar[], 
00479                         pointer dst_ptr, 
00480                         int dst_stride_ar[],
00481                         int count[], int stride_levels, 
00482                         int src_proc) {
00483   int hdl = hdlList.size();
00484   int nbytes = 1;
00485   for(int i=0;i<stride_levels+1;i++) 
00486     nbytes *= count[i];
00487 
00488   Armci_Hdl* entry = new Armci_Hdl(ARMCI_IGET, src_proc, nbytes, src_ptr, dst_ptr);
00489   hdlList.push_back(entry);
00490 
00491   thisProxy[src_proc].requestFromGets(src_ptr, src_stride_ar, dst_ptr, dst_stride_ar, 
00492                     count, stride_levels, thisIndex, hdl);
00493 }
00494 
00495 void ArmciVirtualProcessor::requestFromGets(pointer src_ptr, int src_stride_ar[], 
00496        pointer dst_ptr, int dst_stride_ar[], int count[], int stride_levels, int dst_proc, int hdl){
00497   int nbytes = 1;
00498   for(int i=0;i<stride_levels+1;i++) 
00499     nbytes *= count[i];
00500   
00501   ArmciStridedMsg *m = new (stride_levels,stride_levels+1,nbytes, 0) ArmciStridedMsg(dst_ptr,stride_levels,nbytes,thisIndex,hdl);
00502 
00503   memcpy(m->dst_stride_ar,dst_stride_ar,sizeof(int)*stride_levels);
00504   memcpy(m->count,count,sizeof(int)*(stride_levels+1));
00505   stridedCopy(src_ptr, m->data, src_stride_ar, count, stride_levels, 1);
00506   thisProxy[dst_proc].putDataFromGets(m);
00507 }
00508 void ArmciVirtualProcessor::putDataFromGets(pointer dst_ptr, int dst_stride_ar[], 
00509         int count[], int stride_levels, int nbytes, char *data, int hdl){
00510   stridedCopy(dst_ptr, data, dst_stride_ar, count, stride_levels, 0);
00511   if(hdl != -1) { 
00512     hdlList[hdl]->acked = 1;  
00513     if (hdlList[hdl]->wait == 1) {
00514       hdlList[hdl]->wait = 0;
00515       thread->resume();
00516     }
00517   }
00518   thread->resume();
00519 }
00520 
00521 void ArmciVirtualProcessor::putDataFromGets(ArmciStridedMsg *m){
00522   stridedCopy(m->dst, m->data, m->dst_stride_ar, m->count, m->stride_levels, 0);
00523   if(m->hdl != -1) { 
00524     hdlList[m->hdl]->acked = 1;  
00525     if (hdlList[m->hdl]->wait == 1) {
00526       hdlList[m->hdl]->wait = 0;
00527       thread->resume();
00528     }
00529   }
00530   delete m;
00531   thread->resume();
00532 }
00533 
00534 void ArmciVirtualProcessor::notify(int proc){
00535   thisProxy[proc].sendNote(thisIndex);
00536 }
00537 void ArmciVirtualProcessor::sendNote(int proc){
00538   
00539   
00540   
00541   int hasNote = -1;
00542   for(int i=0;i<noteList.size();i++){
00543     if(noteList[i]->proc == proc){
00544       hasNote = i;
00545       break;
00546     }
00547   }
00548   if(hasNote!=-1){
00549     (noteList[hasNote]->notified)++;
00550   } else {
00551     Armci_Note* newNote = new Armci_Note(proc, 0, 1);
00552     noteList.push_back(newNote);
00553     hasNote = noteList.size() - 1;
00554   }
00555   if(noteList[hasNote]->notified >= noteList[hasNote]->waited){
00556 
00557 
00558 
00559 
00560     thread->resume();
00561   }
00562 }
00563 void ArmciVirtualProcessor::notify_wait(int proc){
00564   
00565   
00566   
00567   int hasNote = -1;
00568   for(int i=0;i<noteList.size();i++){
00569     if(noteList[i]->proc == proc){
00570       hasNote = i;
00571       break;
00572     }
00573   }
00574   if(hasNote!=-1){
00575     (noteList[hasNote]->waited)++;
00576   } else {
00577     Armci_Note* newNote = new Armci_Note(proc, 1, 0);
00578     noteList.push_back(newNote);
00579     hasNote = noteList.size() - 1;
00580   }
00581   if(noteList[hasNote]->notified < noteList[hasNote]->waited){
00582     thread->suspend();
00583   }
00584 }
00585 
00586 void ArmciVirtualProcessor::pup(PUP::er &p) {
00587   TCharmClient1D::pup(p);
00588   CmiIsomallocBlockListPup(&p, &memBlock);
00589   p|thisProxy;
00590   p|hdlList;
00591   p|noteList;
00592   CkPupMessage(p, (void **)&addressReply, 1);
00593 }
00594 
00595 
00596 void ArmciVirtualProcessor::requestAddresses(pointer ptr, pointer ptr_arr[], int bytes) {
00597   int thisPE = armci_me;
00598   int numPE = armci_nproc;
00599   
00600   addressReply = NULL;
00601   addressPair *pair = new addressPair;
00602   pair->pe = thisPE;
00603   pair->ptr = ptr;
00604   
00605   CkCallback cb(CkIndex_ArmciVirtualProcessor::mallocClient(NULL),CkArrayIndex1D(0),thisProxy);
00606   contribute(sizeof(addressPair), pair, CkReduction::concat, cb);
00607   
00608   while(addressReply==NULL) thread->suspend();
00609 
00610   
00611   for (int i=0; i<numPE; i++) {
00612     ptr_arr[i] = addressReply->addresses[i];
00613   }
00614   delete addressReply;
00615   addressReply = NULL;
00616 }
00617 
00618 void ArmciVirtualProcessor::stridedCopy(void *base, void *buffer_ptr,
00619           int *stride, int *count, int stride_levels, bool flatten) {
00620   if (stride_levels == 0) {
00621     if (flatten) {
00622       memcpy(buffer_ptr, base, count[stride_levels]);
00623     } else {
00624       memcpy(base, buffer_ptr, count[stride_levels]);
00625     }
00626   } else {
00627     int mystride = 1;
00628     for(int i=0;i<stride_levels;i++)
00629       mystride *= count[i];
00630     for (int i=0; i<count[stride_levels]; i++) {
00631       stridedCopy((void *)((char *)base + stride[stride_levels-1]*i), 
00632         (void *)((char *)buffer_ptr + mystride*i), stride, count, stride_levels-1, flatten);
00633     }
00634   }
00635 }
00636 
00637 
00638 void ArmciVirtualProcessor::mallocClient(CkReductionMsg *msg) {
00639   int numBlocks = msg->getSize()/sizeof(addressPair);
00640   addressPair *dataBlocks = (addressPair *)msg->getData();
00641   AddressMsg *addrmsg = new(numBlocks, 0) AddressMsg;
00642   
00643   for (int i=0; i<numBlocks; i++) {
00644     addrmsg->addresses[dataBlocks[i].pe] = dataBlocks[i].ptr;
00645   }
00646   
00647   thisProxy.getAddresses(addrmsg);
00648   delete msg;
00649 }
00650 
00651 
00652 
00653 
00654 
00655 
00656 
00657 
00658 void ArmciVirtualProcessor::msgBcast(void *buffer, int len, int root) {
00659   int me;
00660   ARMCI_Myid(&me);
00661   if (me == root) {
00662     thisProxy.recvMsgBcast(len, (char *)buffer, root);
00663   } else {
00664     
00665     collectiveTmpBufferPtr = buffer;
00666     thread->suspend();
00667   }
00668 }
00669 
00670 
00671 void ArmciVirtualProcessor::recvMsgBcast(int len, char *buffer, int root) {
00672   int me;
00673   ARMCI_Myid(&me);
00674   if (me != root) {
00675     
00676     
00677     
00678     collectiveTmpBufferPtr = memcpy(collectiveTmpBufferPtr, buffer, len);
00679     collectiveTmpBufferPtr = NULL;
00680     thread->resume();
00681   }
00682 }
00683 
00684 
00685 
00686 
00687 void ArmciVirtualProcessor::msgGop(void *x, int n, char *op, int type) {
00688   CkReduction::reducerType reducer;
00689   if (strcmp(op,"+") == 0) {
00690   } else if (strcmp(op,"*") == 0) {
00691   } else if (strcmp(op,"min") == 0) {
00692   } else if (strcmp(op,"max") == 0) {
00693   } else if (strcmp(op,"absmin") == 0) {
00694   } else if (strcmp(op,"absmax") == 0) {
00695   } else {
00696     CkPrintf("Operator %s not supported\n",op);
00697     CmiAbort("ARMCI ERROR: msgGop - Unknown operator\n");
00698   }
00699   switch (type) {
00700   case ARMCI_INT:
00701     
00702     break;
00703   case ARMCI_LONG:
00704     break;
00705   case ARMCI_LONG_LONG:
00706     break;
00707   case ARMCI_FLOAT:
00708     break;
00709   case ARMCI_DOUBLE:
00710     break;
00711   default:
00712     CkPrintf("ARMCI Type %d not supported\n", type);
00713     CmiAbort("ARMCI ERROR: msgGop - Unknown type\n");
00714   }
00715 }
00716 
00717 
00718 class ckptClientStruct {
00719 public:
00720   const char *dname;
00721   ArmciVirtualProcessor *vp;
00722   ckptClientStruct(const char *s, ArmciVirtualProcessor *p): dname(s), vp(p) {}
00723 };
00724 
00725 static void checkpointClient(void *param,void *msg)
00726 {       
00727   ckptClientStruct *client = (ckptClientStruct*)param;
00728   const char *dname = client->dname;
00729   ArmciVirtualProcessor *vp = client->vp;
00730   vp->checkpoint(strlen(dname), dname);
00731   delete client;
00732 }               
00733                 
00734 void ArmciVirtualProcessor::startCheckpoint(const char* dname){
00735   if (thisIndex==0) {
00736     ckptClientStruct *clientData = new ckptClientStruct(dname, this);
00737     CkCallback cb(checkpointClient, clientData);
00738     contribute(0, NULL, CkReduction::sum_int, cb);
00739   } else {
00740     contribute(0, NULL, CkReduction::sum_int);
00741   }
00742   thread->suspend();
00743 }
00744 void ArmciVirtualProcessor::checkpoint(int len, const char* dname){
00745   if (len == 0) { 
00746     CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
00747     CkStartMemCheckpoint(cb);
00748   } else {
00749     char dirname[256];
00750     strncpy(dirname,dname,len);
00751     dirname[len]='\0';
00752     CkCallback cb(CkIndex_ArmciVirtualProcessor::resumeThread(),thisProxy);
00753     CkStartCheckpoint(dirname,cb);
00754   }
00755 }
00756 
00757 #include "armci.def.h"
00758