00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 
00041 
00042 
00043 
00044 
00045 
00046 
00047 
00048 
00049 
00050 #include <limits>
00051 
00052 #include "charm++.h"
00053 #include "ck.h"
00054 
00055 #include "pathHistory.h"
00056 
00057 #if CMK_DEBUG_REDUCTIONS
00058 
00059 
00060 #define DEBR(x) CkPrintf x
00061 #define AA "Red PE%d Node%d #%d (%d,%d) Group %d> "
00062 #define AB ,CkMyPe(),CkMyNode(),redNo,nRemote,nContrib,thisgroup.idx
00063 
00064 #define DEBN(x) CkPrintf x
00065 #define AAN "Red Node%d "
00066 #define ABN ,CkMyNode()
00067 
00068 
00069 #define RED_DEB(x) //CkPrintf x
00070 #define DEBREVAC(x) CkPrintf x
00071 #define DEB_TUPLE(x) CkPrintf x
00072 #else
00073 
00074 #define DEBR(x) // CkPrintf x
00075 #define DEBRMLOG(x) CkPrintf x
00076 #define AA
00077 #define AB
00078 #define DEBN(x) //CkPrintf x
00079 #define RED_DEB(x) //CkPrintf x
00080 #define DEBREVAC(x) //CkPrintf x
00081 #define DEB_TUPLE(x) //CkPrintf x
00082 #endif
00083 
00084 #ifndef INT_MAX
00085 #define INT_MAX 2147483647
00086 #endif
00087 
00088 extern bool _inrestart;
00089 
00090 CkReductionTypesExt charm_reducers;
00091 extern int (*PyReductionExt)(char**, int*, int, char**);
00092 
00093 Group::Group():thisIndex(CkMyPe())
00094 {
00095     if (_inrestart) CmiAbort("A Group object did not call the migratable constructor of its base class!");
00096 
00097     creatingContributors();
00098     contributorStamped(&reductionInfo);
00099     contributorCreated(&reductionInfo);
00100     doneCreatingContributors();
00101 }
00102 
00103 Group::Group(CkMigrateMessage *msg):CkReductionMgr(msg),thisIndex(CkMyPe())
00104 {
00105     creatingContributors();
00106     contributorStamped(&reductionInfo);
00107     contributorCreated(&reductionInfo);
00108     doneCreatingContributors();
00109 }
00110 
00111 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(Group,
00112                     ((CkReductionMgr *)this),
00113                     reductionInfo,false)
00114 CK_REDUCTION_CLIENT_DEF(CProxy_Group,(CkReductionMgr *)CkLocalBranch(_ck_gid))
00115 
00116 CK_BARRIER_CONTRIBUTE_METHODS_DEF(Group,
00117                                    ((CkReductionMgr *)this),
00118                                    reductionInfo,false)
00119 
00120 
00121 
00122 CkGroupInitCallback::CkGroupInitCallback(void) {}
00123 
00124 
00125 
00126 
00127 void CkGroupInitCallback::callMeBack(CkGroupCallbackMsg *m)
00128 {
00129   m->call();
00130   delete m;
00131 }
00132 
00133 
00134 
00135 
00136 
00137 CkGroupReadyCallback::CkGroupReadyCallback(void)
00138 {
00139   _isReady = false;
00140 }
00141 void
00142 CkGroupReadyCallback::callBuffered(void)
00143 {
00144   int n = _msgs.length();
00145   for(int i=0;i<n;i++)
00146   {
00147     CkGroupCallbackMsg *msg = _msgs.deq();
00148     msg->call();
00149     delete msg;
00150   }
00151 }
00152 void
00153 CkGroupReadyCallback::callMeBack(CkGroupCallbackMsg *msg)
00154 {
00155   if(_isReady) {
00156     msg->call();
00157     delete msg;
00158   } else {
00159     _msgs.enq(msg);
00160   }
00161 }
00162 
00163 CkReductionClientBundle::CkReductionClientBundle(CkReductionClientFn fn_,void *param_)
00164     :CkCallback(callbackCfn,(void *)this),fn(fn_),param(param_) {}
00165 void CkReductionClientBundle::callbackCfn(void *thisPtr,void *reductionMsg)
00166 {
00167     CkReductionClientBundle *b=(CkReductionClientBundle *)thisPtr;
00168     CkReductionMsg *m=(CkReductionMsg *)reductionMsg;
00169     b->fn(b->param,m->getSize(),m->getData());
00170     delete m;
00171 }
00172 
00174 
00175 
00176 
00177 
00178 
00179 
00180 
00181 CkReductionMgr::CkReductionMgr()
00182   :
00183   thisProxy(thisgroup),
00184   isDestroying(false)
00185 { 
00186 #ifdef BINOMIAL_TREE
00187   init_BinomialTree();
00188 #elif CMK_BIGSIM_CHARM
00189   init_BinaryTree();
00190 #else
00191   init_TopoTree();
00192 #endif
00193   redNo=0;
00194   completedRedNo = -1;
00195   inProgress=false;
00196   creating=false;
00197   startRequested=false;
00198   gcount=lcount=0;
00199   nContrib=nRemote=0;
00200   is_inactive = false;
00201   maxStartRequest=0;
00202 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00203     numImmigrantRecObjs = 0;
00204     numEmigrantRecObjs = 0;
00205 #endif
00206   disableNotifyChildrenStart = false;
00207 
00208   barrier_gCount=0;
00209   barrier_nSource=0;
00210   barrier_nContrib=barrier_nRemote=0;
00211 
00212   DEBR((AA "In reductionMgr constructor at %d \n" AB,this));
00213 }
00214 
00215 CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
00216                                                     , isDestroying(false)
00217 {
00218   numKids = -1;
00219   redNo=0;
00220   completedRedNo = -1;
00221   inProgress=false;
00222   creating=false;
00223   startRequested=false;
00224   gcount=lcount=0;
00225   nContrib=nRemote=0;
00226   is_inactive = false;
00227   maxStartRequest=0;
00228   DEBR((AA "In reductionMgr migratable constructor at %d \n" AB,this));
00229 
00230   barrier_gCount=0;
00231   barrier_nSource=0;
00232   barrier_nContrib=barrier_nRemote=0;
00233 
00234 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00235   numImmigrantRecObjs = 0;
00236   numEmigrantRecObjs = 0;
00237 #endif
00238 
00239 }
00240 
00241 CkReductionMgr::~CkReductionMgr()
00242 {
00243 }
00244 
00245 void CkReductionMgr::flushStates()
00246 {
00247   
00248   redNo=0;
00249   completedRedNo = -1;
00250   inProgress=false;
00251   creating=false;
00252   startRequested=false;
00253   nContrib=nRemote=0;
00254   maxStartRequest=0;
00255 
00256   while (!msgs.isEmpty()) { delete msgs.deq(); }
00257   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
00258   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
00259   while (!finalMsgs.isEmpty()) delete finalMsgs.deq();
00260 
00261   adjVec.clear();
00262 
00263 }
00264 
00266 
00267 
00268 void CkReductionMgr::ckSetReductionClient(CkCallback *cb)
00269 {
00270   DEBR((AA "Setting reductionClient in ReductionMgr groupid %d\n" AB,thisgroup.idx));
00271 
00272   if (CkMyPe()!=0)
00273       CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");  
00274   storedCallback=*cb;
00275 }
00276 
00278 
00279 
00280 
00281 void CkReductionMgr::creatingContributors(void)
00282 {
00283   DEBR((AA "Creating contributors...\n" AB));
00284   creating=true;
00285 }
00286 void CkReductionMgr::doneCreatingContributors(void)
00287 {
00288   DEBR((AA "Done creating contributors...\n" AB));
00289   creating=false;
00290   checkIsActive();
00291   if (startRequested) startReduction(redNo,CkMyPe());
00292   finishReduction();
00293 }
00294 
00295 
00296 void CkReductionMgr::contributorStamped(contributorInfo *ci)
00297 {
00298   DEBR((AA "Contributor %p stamped\n" AB,ci));
00299   
00300   gcount++;
00301   if (inProgress)
00302   {
00303     ci->redNo=redNo+1;
00304     adj(redNo).gcount--;
00305   } else
00306     ci->redNo=redNo;
00307 }
00308 
00309 
00310 void CkReductionMgr::contributorCreated(contributorInfo *ci)
00311 {
00312   DEBR((AA "Contributor %p created in grp %d\n" AB,ci,thisgroup.idx));
00313   
00314   lcount++;
00315   
00316   for (int r=redNo;r<ci->redNo;r++)
00317     adj(r).lcount--;
00318   checkIsActive();
00319 }
00320 
00321 
00322 
00323 
00324 
00325 
00326 
00327 void CkReductionMgr::contributorDied(contributorInfo *ci)
00328 {
00329 #if CMK_MEM_CHECKPOINT
00330   
00331   if (CkInRestarting()) return;
00332 #endif
00333 
00334   if (isDestroying) return;
00335 
00336   DEBR((AA "Contributor %p(%d) died\n" AB,ci,ci->redNo));
00337   
00338   gcount--;
00339 
00340   if (ci->redNo<redNo)
00341   {
00342   
00343     DEBR((AA "Dying guy %p must have been migrating-- he's at #%d!\n" AB,ci,ci->redNo));
00344     for (int r=ci->redNo;r<redNo;r++)
00345       thisProxy[0].MigrantDied(new CkReductionNumberMsg(r));
00346   }
00347 
00348   
00349   int r;
00350   for (r=redNo;r<ci->redNo;r++)
00351   {
00352     DEBR((AA "Dead guy %p left contribution for #%d\n" AB,ci,r));
00353     adj(r).gcount++;
00354   }
00355 
00356   lcount--;
00357   
00358   for (r=redNo;r<ci->redNo;r++)
00359     adj(r).lcount++;
00360 
00361   
00362   
00363   if (ci->redNo <= redNo) {
00364     checkIsActive();
00365   }
00366   finishReduction();
00367 }
00368 
00369 
00370 void CkReductionMgr::contributorLeaving(contributorInfo *ci)
00371 {
00372   DEBR((AA "Contributor %p(%d) migrating away\n" AB,ci,ci->redNo));
00373   lcount--;
00374   
00375   for (int r=redNo;r<ci->redNo;r++)
00376     adj(r).lcount++;
00377 
00378   
00379   if (ci->redNo <= redNo) {
00380     checkIsActive();
00381   }
00382   finishReduction();
00383 }
00384 
00385 
00386 void CkReductionMgr::contributorArriving(contributorInfo *ci)
00387 {
00388   DEBR((AA "Contributor %p(%d) migrating in\n" AB,ci,ci->redNo));
00389   lcount++;
00390 #if CMK_MEM_CHECKPOINT
00391   
00392   
00393   if (CkInRestarting()) return;
00394 #endif
00395   
00396   for (int r=redNo;r<ci->redNo;r++)
00397     adj(r).lcount--;
00398 
00399   
00400   if (ci->redNo == redNo) {
00401     checkIsActive();
00402   }
00403 }
00404 
00405 
00406 
00407 
00408 void CkReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
00409 {
00410 #if CMK_BIGSIM_CHARM
00411   _TRACE_BG_TLINE_END(&(m->log));
00412 #endif
00413   DEBR((AA "Contributor %p contributed for %d in grp %d ismigratable %d \n" AB,ci,ci->redNo,thisgroup.idx,m->isMigratableContributor()));
00414   m->redNo=ci->redNo++;
00415   m->sourceFlag=-1;
00416   m->gcount=0;
00417 
00418 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00419 
00420     
00421     if(CpvAccess(_currentObj)->mlogData->immigrantRecFlag){
00422         
00423         
00424         envelope *env = UsrToEnv(m);
00425         env->flags = env->flags | CK_BYPASS_DET_MLOG;
00426         thisProxy[CpvAccess(_currentObj)->mlogData->immigrantSourcePE].contributeViaMessage(m);
00427         return;
00428     }
00429 
00430     Chare *oldObj = CpvAccess(_currentObj);
00431     CpvAccess(_currentObj) = this;
00432 
00433     
00434     addContribution(m);
00435 
00436     CpvAccess(_currentObj) = oldObj;
00437 #else
00438   addContribution(m);
00439 #endif
00440 }
00441 
00442 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00443 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){
00444     
00445     
00446     
00447     envelope *env = UsrToEnv(m);
00448     env->flags = env->flags & ~CK_BYPASS_DET_MLOG;
00449 
00450     
00451     addContribution(m);
00452 }
00453 #else
00454 void CkReductionMgr::contributeViaMessage(CkReductionMsg *m){}
00455 #endif
00456 
00457 void CkReductionMgr::checkIsActive() {
00458 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
00459   return;
00460 #endif
00461 
00462   
00463   std::map<int, int>::iterator it;
00464   int c_inactive = 0;
00465   for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
00466     if (it->second <= redNo) {
00467       DEBR((AA "Kid %d is inactive from redNo %d\n" AB, it->first, it->second));
00468       c_inactive++;
00469     }
00470   }
00471   DEBR((AA "CheckIsActive redNo %d, kids %d(inactive %d), lcount %d\n" AB, redNo,
00472     numKids, c_inactive, lcount));
00473 
00474   if(numKids == c_inactive && lcount == 0) {
00475     if(!is_inactive) {
00476       informParentInactive();
00477     }
00478     is_inactive = true;
00479   } else if(is_inactive) {
00480     is_inactive = false;
00481   }
00482 }
00483 
00484 
00485 
00486 
00487 void CkReductionMgr::checkAndAddToInactiveList(int id, int red_no) {
00488   
00489   
00490   
00491   if (inProgress && redNo == red_no) {
00492     thisProxy[id].ReductionStarting(new CkReductionNumberMsg(red_no));
00493   }
00494 
00495   std::map<int, int>::iterator it;
00496   it = inactiveList.find(id);
00497   if (it == inactiveList.end()) {
00498     inactiveList.insert(std::pair<int, int>(id, red_no));
00499   } else {
00500     it->second = red_no;
00501   }
00502   
00503   if (redNo == red_no) {
00504     checkIsActive();
00505   }
00506 }
00507 
00508 
00509 
00510 
00511 
00512 void CkReductionMgr::checkAndRemoveFromInactiveList(int id, int red_no) {
00513   std::map<int, int>::iterator it;
00514   it = inactiveList.find(id);
00515   if (it == inactiveList.end()) {
00516     return;
00517   }
00518   if (it->second <= red_no) {
00519     inactiveList.erase(it);
00520     DEBR((AA "Parent removing kid %d from inactivelist red_no %d\n" AB,
00521       id, red_no));
00522   }
00523 }
00524 
00525 
00526 void CkReductionMgr::informParentInactive() {
00527   if (hasParent()) {
00528     DEBR((AA "Inform parent to add to inactivelist red_no %d\n" AB, redNo));
00529     thisProxy[treeParent()].AddToInactiveList(
00530       new CkReductionInactiveMsg(CkMyPe(), redNo));
00531   }
00532 }
00533 
00534 
00535 
00536 
00537 
00538 void CkReductionMgr::sendReductionStartingToKids(int red_no) {
00539 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
00540   for (int k=0;k<treeKids();k++)
00541   {
00542     DEBR((AA "Asking child PE %d to start #%d\n" AB,kids[k],redNo));
00543     thisProxy[kids[k]].ReductionStarting(new CkReductionNumberMsg(redNo));
00544   }
00545 #else
00546   std::map<int, int>::iterator it;
00547   for (it = inactiveList.begin(); it != inactiveList.end(); it++) {
00548     if (it->second <= red_no) {
00549       DEBR((AA "Parent sending reductionstarting to inactive kid %d\n" AB,
00550         it->first));
00551       thisProxy[it->first].ReductionStarting(new CkReductionNumberMsg(red_no));
00552     }
00553   }
00554 #endif
00555 }
00556 
00557 
00559 
00560 void CkReductionMgr::ReductionStarting(CkReductionNumberMsg *m)
00561 {
00562  if(CkMyPe()==0){
00563     
00564     
00565     
00566  }
00567  DEBR((AA " Group ReductionStarting called for redNo %d\n" AB,m->num));
00568  int srcPE = (UsrToEnv(m))->getSrcPe();
00569   if (isPresent(m->num) && !inProgress)
00570   {
00571     DEBR((AA "Starting reduction #%d at parent's request\n" AB,m->num));
00572     startReduction(m->num,srcPE);
00573     finishReduction();
00574   } else if (isFuture(m->num)){
00575 
00576       DEBR((AA "Asked to startfuture Reduction %d \n" AB,m->num));
00577       if(maxStartRequest < m->num){
00578           maxStartRequest = m->num;
00579       }
00580  
00581       
00582     }
00583   else 
00584     DEBR((AA "Ignoring parent's late request to start #%d\n" AB,m->num));
00585   delete m;
00586 }
00587 
00588 
00589 
00590 void CkReductionMgr::LateMigrantMsg(CkReductionMsg *m)
00591 {
00592 #if CMK_BIGSIM_CHARM
00593   _TRACE_BG_TLINE_END(&(m->log));
00594 #endif
00595   addContribution(m);
00596 }
00597 
00598 
00599 void CkReductionMgr::MigrantDied(CkReductionNumberMsg *m)
00600 {
00601   if (CkMyPe() != 0 || m->num < completedRedNo) CkAbort("Late MigrantDied message recv'd!\n");
00602   DEBR((AA "Migrant died before contributing to #%d\n" AB,m->num));
00603  
00604   adj(m->num).gcount--;
00605   finishReduction();
00606   delete m;
00607 }
00608 
00610 void CkReductionMgr::startReduction(int number,int srcPE)
00611 {
00612   if (isFuture(number)){  return;}
00613   if (isPast(number)) {return;}
00614   if (inProgress){
00615     DEBR((AA "This reduction is already in progress\n" AB));
00616     return;
00617   }
00618   if (creating) 
00619   {
00620     DEBR((AA "Postponing start request #%d until we're done creating\n" AB,redNo));
00621     startRequested=true;
00622     return;
00623   }
00624 
00625 
00626   DEBR((AA "Starting reduction #%d  %d %d \n" AB,redNo,completedRedNo,number));
00627   inProgress=true;
00628  
00629 
00630 #if CMK_FAULT_EVAC
00631   if(!CmiNodeAlive(CkMyPe())){
00632     return;
00633   }
00634 #endif
00635 
00636   
00637   
00638   
00639   if(disableNotifyChildrenStart) return;
00640  
00641   
00642   sendReductionStartingToKids(redNo);
00643 }   
00644 
00645 
00646 void CkReductionMgr::addContribution(CkReductionMsg *m)
00647 {
00648   if (isPast(m->redNo))
00649   {
00650 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00651         CmiAbort("this version should not have late migrations");
00652 #else
00653     
00654     DEBR((AA "Migrant gives late contribution for #%d!\n" AB,m->redNo));
00655     
00656     
00657     thisProxy[0].LateMigrantMsg(m);
00658 #endif
00659   }
00660   else if (isFuture(m->redNo)) {
00661     DEBR((AA "Contributor gives early contribution-- for #%d\n" AB,m->redNo));
00662     futureMsgs.enq(m);
00663   } else {
00664     DEBR((AA "Recv'd local contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
00665    
00666     startReduction(m->redNo,CkMyPe());
00667     msgs.enq(m);
00668     nContrib++;
00669     finishReduction();
00670   }
00671 }
00672 
00676 void CkReductionMgr::finishReduction(void)
00677 {
00678   
00679   DEBR((AA "in finishReduction (inProgress=%d) in grp %d\n" AB,inProgress,thisgroup.idx));
00680   if ((!inProgress) || creating){
00681     DEBR((AA "Either not in Progress or creating\n" AB));
00682     return;
00683   }
00684 
00685   bool partialReduction = false;
00686 
00687   
00688 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00689     if (nContrib<(lcount+adj(redNo).lcount) - numImmigrantRecObjs + numEmigrantRecObjs){
00690           if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
00691             partialReduction = true;
00692           }
00693           else {
00694             DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
00695             return; 
00696           }
00697     }
00698 #else
00699   if (nContrib<(lcount+adj(redNo).lcount)){
00700          if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
00701            partialReduction = true;
00702          }
00703          else {
00704            DEBR((AA "Need more local messages %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
00705            return; 
00706          }
00707   }
00708 #endif
00709 
00710   if (nRemote<treeKids()) {
00711     if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
00712       partialReduction = true;
00713     }
00714     else {
00715       DEBR((AA "Need more remote messages %d %d\n" AB,nRemote,treeKids()));
00716       return; 
00717     }
00718   }
00719     
00720  
00721   DEBR((AA "Reducing data... %d %d\n" AB,nContrib,(lcount+adj(redNo).lcount)));
00722 #if CMK_BIGSIM_CHARM
00723   _TRACE_BG_END_EXECUTE(1);
00724   void* _bgParentLog = NULL;
00725   _TRACE_BG_BEGIN_EXECUTE_NOMSG("GroupReduce", &_bgParentLog, 0);
00726 #endif
00727   CkReductionMsg *result=reduceMessages(msgs);
00728   result->fromPE = CkMyPe();
00729   result->redNo=redNo;
00730   DEBR((AA "Reduced gcount=%d; sourceFlag=%d\n" AB,result->gcount,result->sourceFlag));
00731 
00732   if (partialReduction) {
00733     msgs.enq(result);
00734     return;
00735   }
00736 
00737   if (hasParent())
00738   {
00739     DEBR((AA "Passing reduced data up to parent node %d.\n" AB,treeParent()));
00740     DEBR((AA "Message gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
00741 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00742     result->gcount+=gcount+adj(redNo).gcount;
00743 #else
00744     result->gcount+=gcount+adj(redNo).gcount;
00745 #endif
00746     thisProxy[treeParent()].RecvMsg(result);
00747   }
00748   else 
00749   {
00750     DEBR((AA "Final gcount is %d+%d+%d.\n" AB,result->gcount,gcount,adj(redNo).gcount));
00751     int totalElements=result->gcount+gcount+adj(redNo).gcount;
00752     if (totalElements>result->nSources()) 
00753     {
00754       DEBR((AA "Only got %d of %d contributions (c'mon, migrators!)\n" AB,result->nSources(),totalElements));
00755       msgs.enq(result);
00756       return; 
00757     } else if (totalElements<result->nSources()) {
00758       DEBR((AA "Got %d of %d contributions\n" AB,result->nSources(),totalElements));
00759 #if !defined(_FAULT_CAUSAL_)
00760       CkAbort("ERROR! Too many contributions at root!\n");
00761 #endif
00762     }
00763     DEBR((AA "Passing result to client function\n" AB));
00764     CkSetRefNum(result, result->getUserFlag());
00765     if (!result->callback.isInvalid())
00766         result->callback.send(result);
00767     else if (!storedCallback.isInvalid())
00768         storedCallback.send(result);
00769     else
00770         CkAbort("No reduction client!\n"
00771             "You must register a client with either SetReductionClient or during contribute.\n");
00772   }
00773 
00774 
00775   
00776   redNo++;
00777   
00778   
00779   checkIsActive();
00780   
00781   int i;
00782   completedRedNo++;
00783   adjVec.erase(adjVec.begin());
00784 
00785   inProgress=false;
00786   startRequested=false;
00787   nRemote=nContrib=0;
00788 
00789   
00790   int n=futureMsgs.length();
00791   for (i=0;i<n;i++)
00792   {
00793     CkReductionMsg *m=futureMsgs.deq();
00794     if (m!=NULL) 
00795       addContribution(m);
00796   }
00797   n=futureRemoteMsgs.length();
00798   for (i=0;i<n;i++)
00799   {
00800     CkReductionMsg *m=futureRemoteMsgs.deq();
00801     if (m!=NULL) {
00802       RecvMsg(m);
00803     }
00804   }
00805 
00806   if(maxStartRequest >= redNo){
00807       startReduction(redNo,CkMyPe());
00808       finishReduction();
00809   }
00810  
00811 
00812 }
00813 
00814 
00815   void CkReductionMgr::RecvMsg(CkReductionMsg *m)
00816 {
00817 #if CMK_BIGSIM_CHARM
00818   _TRACE_BG_TLINE_END(&m->log);
00819 #endif
00820   if (isPresent(m->redNo)) { 
00821     DEBR((AA "Recv'd remote contribution %d for #%d\n" AB,nRemote,m->redNo));
00822     
00823     
00824     if (m->nSources() > 0) {
00825       checkAndRemoveFromInactiveList(m->fromPE, m->redNo);
00826     }
00827     startReduction(m->redNo, CkMyPe());
00828     msgs.enq(m);
00829     nRemote++;
00830     finishReduction();
00831   }
00832   else if (isFuture(m->redNo)) {
00833     DEBR((AA "Recv'd early remote contribution %d for #%d\n" AB,nRemote,m->redNo));
00834     futureRemoteMsgs.enq(m);
00835   } 
00836   else CkAbort("Recv'd late remote contribution!\n");
00837 }
00838 
00839 void CkReductionMgr::AddToInactiveList(CkReductionInactiveMsg *m) {
00840   int id = m->id;
00841   int last_redno = m->redno;
00842   delete m;
00843 
00844   DEBR((AA "Parent add kid %d to inactive list from redno %d\n" AB,
00845     id, last_redno));
00846   checkAndAddToInactiveList(id, last_redno);
00847 
00848   finishReduction();
00849   if (last_redno <= redNo) {
00850     checkIsActive();
00851   }
00852 }
00853 
00855 
00856 
00857 countAdjustment &CkReductionMgr::adj(int number)
00858 {
00859   number-=completedRedNo;
00860   number--;
00861   if (number<0) CkAbort("Requested adjustment to prior reduction!\n");
00862   
00863   if (adjVec.size() <= number) { adjVec.resize(number + 1); }
00864   return adjVec[number];
00865 }
00866 
00867 
00868 CkReductionMsg *CkReductionMgr::reduceMessages(CkMsgQ<CkReductionMsg> &msgs)
00869 {
00870   CkReductionMsg *ret=NULL;
00871 
00872   
00873   CkReduction::reducerType r=CkReduction::invalid;
00874   int msgs_gcount=0;
00875   int msgs_nSources=0;
00876   CMK_REFNUM_TYPE msgs_userFlag=(CMK_REFNUM_TYPE)-1;
00877   CkCallback msgs_callback;
00878   int i;
00879   int nMsgs=0;
00880   CkReductionMsg *m;
00881   std::vector<CkReductionMsg *> msgArr(msgs.length());
00882   bool isMigratableContributor;
00883 
00884   
00885   while (NULL!=(m=msgs.deq()))
00886   {
00887     DEBR((AA "***** gcount=%d; sourceFlag=%d ismigratable %d \n" AB,m->gcount,m->nSources(),m->isMigratableContributor()));
00888     msgs_gcount+=m->gcount;
00889     if (m->sourceFlag!=0)
00890     { 
00891       msgs_nSources+=m->nSources();
00892 #if CMK_BIGSIM_CHARM
00893       _TRACE_BG_ADD_BACKWARD_DEP(m->log);
00894 #endif
00895 
00896       
00897       if (nMsgs == 0 || m->reducer != CkReduction::nop) {
00898         msgArr[nMsgs++]=m;
00899         if (!m->callback.isInvalid()) {
00900 #if CMK_ERROR_CHECKING
00901           if(nMsgs > 1 && !(msgs_callback == m->callback)) {
00902             CkPrintf("Mismatched callback details: reducers (%s, %s); callback types (%s, %s)\n",
00903                      CkReduction::reducerTable()[r].name, CkReduction::reducerTable()[m->reducer].name,
00904                      CkCallback::typeName(msgs_callback.type), CkCallback::typeName(m->callback.type));
00905             CkAbort("mis-matched client callbacks in reduction messages\n");
00906           }
00907 #endif
00908           msgs_callback=m->callback;
00909         }
00910         r=m->reducer;
00911         if (m->userFlag!=(CMK_REFNUM_TYPE)-1)
00912           msgs_userFlag=m->userFlag;
00913     isMigratableContributor=m->isMigratableContributor();
00914       } else {
00915 #if CMK_ERROR_CHECKING
00916         if(!(msgs_callback == m->callback)) {
00917           CkPrintf("Mismatched callback details: reducers (%s, %s); callback types (%s, %s)\n",
00918                      CkReduction::reducerTable()[r].name, CkReduction::reducerTable()[m->reducer].name,
00919                      CkCallback::typeName(msgs_callback.type), CkCallback::typeName(m->callback.type));
00920           CkAbort("mis-matched client callbacks in reduction messages\n");
00921         }
00922 #endif
00923         delete m;
00924       }
00925     }
00926     else
00927     { 
00928       delete m;
00929     }
00930   }
00931 
00932   if (nMsgs==0||r==CkReduction::invalid)
00933   
00934     ret=CkReductionMsg::buildNew(0,NULL);
00935   else
00936   {
00937         
00938     if(nMsgs == 1 &&
00939        msgArr[0]->reducer != CkReduction::set &&
00940        msgArr[0]->reducer != CkReduction::tuple) {
00941       ret = msgArr[0];
00942     }else{
00943       if (msgArr[0]->reducer == CkReduction::nop) {
00944         
00945         
00946         
00947         delete msgArr[0];
00948         msgArr[0] = msgArr[nMsgs - 1];
00949         nMsgs--;
00950       }
00951       CkReduction::reducerFn f=CkReduction::reducerTable()[r].fn;
00952       ret=(*f)(nMsgs,msgArr.data());
00953     }
00954     ret->reducer=r;
00955   }
00956 
00957 #if USE_CRITICAL_PATH_HEADER_ARRAY
00958 #if CRITICAL_PATH_DEBUG > 3
00959   CkPrintf("[%d] combining critical path information from messages in reduceMessages(). numMsgs=%d\n", CkMyPe(), nMsgs);
00960 #endif
00961   MergeablePathHistory path(CkpvAccess(currentlyExecutingPath));
00962   path.updateMax(UsrToEnv(ret));
00963   
00964   for (i=0;i<nMsgs;i++){
00965     if (msgArr[i]!=ret){
00966       
00967       path.updateMax(UsrToEnv(msgArr[i]));
00968     } else {
00969       
00970     }
00971   }
00972 #if CRITICAL_PATH_DEBUG > 3
00973   CkPrintf("[%d] result path = %lf\n", CkMyPe(), path.getTime() );
00974 #endif
00975   
00976   PathHistoryTableEntry tableEntry(path);
00977   tableEntry.addToTableAndEnvelope(UsrToEnv(ret));
00978   
00979 #endif
00980 
00981     
00982   for (i=0;i<nMsgs;i++) if (msgArr[i]!=ret) delete msgArr[i];
00983 
00984   
00985   ret->gcount=msgs_gcount;
00986   ret->userFlag=msgs_userFlag;
00987   ret->callback=msgs_callback;
00988   ret->sourceFlag=msgs_nSources;
00989   ret->setMigratableContributor(isMigratableContributor);
00990 
00991   return ret;
00992 }
00993 
00994 
00995 
00996 
00997 
00998 
00999 
01000 void CkReductionMgr::pup(PUP::er &p)
01001 {
01002 
01003 
01004   CkGroupInitCallback::pup(p);
01005   p(redNo);
01006   p(completedRedNo);
01007   p(inProgress); p(creating); p(startRequested);
01008   p(nContrib); p(nRemote); p(disableNotifyChildrenStart);
01009   p|msgs;
01010   p|futureMsgs;
01011   p|futureRemoteMsgs;
01012   p|finalMsgs;
01013   p|adjVec;
01014   p|storedCallback;
01015     
01016   if (storedCallback.type == CkCallback::callCFn && storedCallback.d.cfn.fn == CkReductionClientBundle::callbackCfn) 
01017   {
01018     CkReductionClientBundle *bd;
01019     if (p.isUnpacking()) 
01020       bd = new CkReductionClientBundle;
01021     else
01022       bd = (CkReductionClientBundle *)storedCallback.d.cfn.param;
01023     p|*bd;
01024     if (p.isUnpacking()) storedCallback.d.cfn.param = bd;
01025   }
01026 
01027   
01028   
01029   
01030   
01031   
01032   
01033 
01034 
01035 
01036 
01037 
01038   if(p.isUnpacking()){
01039     thisProxy = thisgroup;
01040     maxStartRequest=0;
01041 #ifdef BINOMIAL_TREE
01042     init_BinomialTree();
01043 #elif CMK_BIGSIM_CHARM
01044     init_BinaryTree();
01045 #else
01046     init_TopoTree();
01047 #endif
01048     is_inactive = false;
01049     checkIsActive();
01050   }
01051 
01052   DEBR(("[%d,%d] pupping _____________  gcount = %d \n",CkMyNode(),CkMyPe(),gcount));
01053 }
01054 
01055 void CkReductionMgr::init_BinaryTree(){
01056   if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
01057     parent = CkNodeFirst(CkMyNode());
01058     numKids = 0;
01059   } else {
01060     int parentNode = (CkMyNode()-1)/TREE_WID;
01061     parent = CkMyNode() > 0 ? CkNodeFirst(parentNode) : -1;
01062     
01063     int firstKid = CkMyNode()*TREE_WID+1;
01064     numKids=CkNumNodes()-firstKid;
01065     if (numKids > TREE_WID) numKids = TREE_WID;
01066     if (numKids < 0) numKids = 0;
01067     for (int i = 0; i < numKids; i++) {
01068       kids.push_back(CkNodeFirst(firstKid+i));
01069 #if CMK_FAULT_EVAC
01070       newKids.push_back(CkNodeFirst(firstKid+i));
01071 #endif
01072     }
01073 
01074     
01075     numKids += CkNodeSize(CkMyNode())-1;
01076     for (int i = 1; i < CkNodeSize(CkMyNode()); i++) {
01077       kids.push_back(CkMyPe()+i);
01078 #if CMK_FAULT_EVAC
01079       newKids.push_back(CkMyPe()+i);
01080 #endif
01081     }
01082   }
01083 }
01084 
01085 void CkReductionMgr::init_TopoTree() {
01086   if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
01087     parent = CkNodeFirst(CkMyNode());
01088     numKids = 0;
01089   } else {
01090     if (_topoTree == NULL) CkAbort("CkReductionMgr:: topo tree has not been calculated\n");
01091 
01092     CmiSpanningTreeInfo &t = *_topoTree;
01093     if (t.parent != -1) parent = CkNodeFirst(t.parent);
01094     else parent = -1;
01095     numKids = t.child_count;
01096     for (int i=0; i < numKids; i++) {
01097       int child = CkNodeFirst(t.children[i]);
01098       kids.push_back(child);
01099 #if CMK_FAULT_EVAC
01100       newKids.push_back(child);
01101 #endif
01102     }
01103 
01104     
01105     numKids += CkNodeSize(CkMyNode())-1;
01106     for (int i = 1; i < CkNodeSize(CkMyNode()); i++) {
01107       kids.push_back(CkMyPe()+i);
01108 #if CMK_FAULT_EVAC
01109       newKids.push_back(CkMyPe()+i);
01110 #endif
01111     }
01112   }
01113 }
01114 
01115 void CkReductionMgr::init_BinomialTree(){
01116   if (CkNodeSize(CkMyNode()) > 1 && CkNodeFirst(CkMyNode()) != CkMyPe()) {
01117     parent = CkNodeFirst(CkMyNode());
01118     numKids = 0;
01119   } else {
01120     int depth = (int)ceil((log((double)CkNumNodes())/log((double)2)));
01121     upperSize = (unsigned) 1 << depth;
01122     label = upperSize-CkNodeFirst(CkMyNode())-1;
01123     int p=label;
01124     int count=0;
01125     while( p > 0){
01126       if(p % 2 == 0)
01127         break;
01128       else{
01129         p = p/2;
01130         count++;
01131       }
01132     }
01133     parent = label + (1<<count);
01134     parent = upperSize - 1 - parent;
01135     int temp;
01136     if(count != 0){
01137       numKids = 0;
01138       for(int i=0;i<count;i++){
01139         temp = label - (1<<i);
01140         temp = upperSize - 1 - temp;
01141         if(temp <= CkNumPes()-1){
01142           kids.push_back(temp);
01143           numKids++;
01144         }
01145       }
01146     }else{
01147       numKids = 0;
01148     }
01149   }
01150 }
01151 
01152 
01153 int CkReductionMgr::treeRoot(void)
01154 {
01155   return 0;
01156 }
01157 bool CkReductionMgr::hasParent(void) 
01158 {
01159   return (bool)(CkMyPe()!=treeRoot());
01160 }
01161 int CkReductionMgr::treeParent(void) 
01162 {
01163   return parent;
01164 }
01165 int CkReductionMgr::treeKids(void)
01166 {
01167   return numKids;
01168 }
01169 
01170 
01171 
01172 
01173 
01174 void CkReductionMgr::barrier(CkReductionMsg *m)
01175 {
01176   barrier_nContrib++;
01177   barrier_nSource++;
01178   if(!m->callback.isInvalid())
01179       barrier_storedCallback=m->callback;
01180   finishBarrier();
01181   delete m;
01182 }
01183 
01184 void CkReductionMgr::finishBarrier(void)
01185 {
01186        if(barrier_nContrib<lcount){
01187                DEBR(("[%d] current contrib:%d,lcount:%d\n",CkMyPe(),barrier_nContrib,lcount));
01188                return;
01189        }
01190        if(barrier_nRemote<treeKids()){
01191                DEBR(("[%d] current remote:%d,kids:%d\n",CkMyPe(),barrier_nRemote,treeKids()));
01192                return;
01193        }
01194        CkReductionMsg * result = CkReductionMsg::buildNew(0,NULL);
01195        result->callback=barrier_storedCallback;
01196        result->sourceFlag=barrier_nSource;
01197        result->gcount=barrier_gCount;
01198        if(hasParent())
01199        {
01200                DEBR(("[%d]send to parent:%d\n",CkMyPe(),treeParent()));
01201                result->gcount+=gcount;
01202                thisProxy[treeParent()].Barrier_RecvMsg(result);
01203        }
01204        else{
01205                int totalElements=result->gcount+gcount;
01206                DEBR(("[%d]root,totalElements:%d,source:%d\n",CkMyPe(),totalElements,result->nSources()));
01207                if(totalElements<result->nSources()){
01208                        CkAbort("ERROR! Too many contributions at barrier root\n");
01209                }
01210                CkSetRefNum(result,result->getUserFlag());
01211                if(!result->callback.isInvalid())
01212                        result->callback.send(result);
01213                else if(!barrier_storedCallback.isInvalid())
01214                                barrier_storedCallback.send(result);
01215                else 
01216                        CkAbort("No reduction client!\n");
01217        }
01218        barrier_nRemote=barrier_nContrib=0;
01219        barrier_gCount=0;
01220        barrier_nSource=0;
01221 }
01222 
01223 void CkReductionMgr::Barrier_RecvMsg(CkReductionMsg *m)
01224 {
01225        barrier_nRemote++;
01226        barrier_gCount+=m->gcount;
01227        barrier_nSource+=m->nSources();
01228        if(!m->callback.isInvalid())
01229                barrier_storedCallback=m->callback;
01230        finishBarrier();
01231 }
01232 
01233 
01234 
01236 
01238 
01239 
01240 
01241 
01242 #define ARM_DATASTART (sizeof(CkReductionMsg)-sizeof(double))
01243 
01244 
01245 
01246 CkReductionMsg *CkReductionMsg::buildNew(int NdataSize,const void *srcData,
01247     CkReduction::reducerType reducer, CkReductionMsg *buf)
01248 {
01249   int len[1] = { NdataSize };
01250   CkReductionMsg *ret = buf ? buf : new(len,0) CkReductionMsg();
01251 
01252   ret->dataSize=NdataSize;
01253   if (srcData!=NULL && !buf)
01254     memcpy(ret->data,srcData,NdataSize);
01255   ret->userFlag=(CMK_REFNUM_TYPE)-1;
01256   ret->reducer=reducer;
01257   ret->sourceFlag=std::numeric_limits<int>::min();
01258   ret->gcount=0;
01259   ret->migratableContributor = true;
01260 #if CMK_BIGSIM_CHARM
01261   ret->log = NULL;
01262 #endif
01263   return ret;
01264 }
01265 
01266 
01267 void *
01268 CkReductionMsg::alloc(int msgnum,size_t size,int *sz,int priobits,GroupDepNum groupDepNum)
01269 {
01270   int totalsize=ARM_DATASTART+(*sz);
01271   DEBR(("CkReductionMsg::Allocating %d store; %d bytes total\n",*sz,totalsize));
01272   CkReductionMsg *ret = (CkReductionMsg *)
01273     CkAllocMsg(msgnum,totalsize,priobits,groupDepNum);
01274   ret->data=(void *)(&ret->dataStorage);
01275   return (void *) ret;
01276 }
01277 
01278 void *
01279 CkReductionMsg::pack(CkReductionMsg* in)
01280 {
01281   DEBR(("CkReductionMsg::pack %d %d %d %d\n",in->sourceFlag,in->redNo,in->gcount,in->dataSize));
01282   
01283   in->data = NULL;
01284   return (void*) in;
01285 }
01286 
01287 CkReductionMsg* CkReductionMsg::unpack(void *in)
01288 {
01289   CkReductionMsg *ret = (CkReductionMsg *)in;
01290   DEBR(("CkReductionMsg::unpack %d %d %d %d\n",ret->sourceFlag,ret->redNo,ret->gcount,ret->dataSize));
01291   
01292   ret->data=(void *)(&ret->dataStorage);
01293   return ret;
01294 }
01295 
01296 
01299 
01300 
01301 
01302 
01303 
01304 
01305 
01306 
01307 
01308 
01309 
01310 
01311 
01313 
01314 
01315 
01316 
01317 
01318 static CkReductionMsg *invalid_reducer_fn(int nMsg,CkReductionMsg **msg)
01319 {
01320     CkAbort("Called the invalid reducer type 0.  This probably\n"
01321         "means you forgot to initialize your custom reducer index.\n");
01322     return NULL;
01323 }
01324 
01325 static CkReductionMsg *nop_fn(int nMsg,CkReductionMsg **msg)
01326 {
01327   return CkReductionMsg::buildNew(0,NULL, CkReduction::invalid, msg[0]);
01328 }
01329 
01330 #define SIMPLE_REDUCTION(name,dataType,typeStr,loop) \
01331 static CkReductionMsg *name(int nMsg,CkReductionMsg **msg)\
01332 {\
01333   RED_DEB(("/ PE_%d: " #name " invoked on %d messages\n",CkMyPe(),nMsg));\
01334   int m,i;\
01335   int nElem=msg[0]->getLength()/sizeof(dataType);\
01336   dataType *ret=(dataType *)(msg[0]->getData());\
01337   for (m=1;m<nMsg;m++)\
01338   {\
01339     dataType *value=(dataType *)(msg[m]->getData());\
01340     for (i=0;i<nElem;i++)\
01341     {\
01342       RED_DEB(("|\tmsg%d (from %d) [%d]=" typeStr "\n",m,msg[m]->sourceFlag,i,value[i]));\
01343       loop\
01344     }\
01345   }\
01346   RED_DEB(("\\ PE_%d: " #name " finished\n",CkMyPe()));\
01347   return CkReductionMsg::buildNew(nElem*sizeof(dataType),(void *)ret, CkReduction::invalid, msg[0]);\
01348 }
01349 
01350 
01351 #define SIMPLE_POLYMORPH_REDUCTION(nameBase,loop) \
01352   SIMPLE_REDUCTION(nameBase##_char_fn,char,"%c",loop) \
01353   SIMPLE_REDUCTION(nameBase##_short_fn,short,"%h",loop) \
01354   SIMPLE_REDUCTION(nameBase##_int_fn,int,"%d",loop) \
01355   SIMPLE_REDUCTION(nameBase##_long_fn,long,"%ld",loop) \
01356   SIMPLE_REDUCTION(nameBase##_long_long_fn,long long,"%lld",loop) \
01357   SIMPLE_REDUCTION(nameBase##_uchar_fn,unsigned char,"%c",loop) \
01358   SIMPLE_REDUCTION(nameBase##_ushort_fn,unsigned short,"%hu",loop) \
01359   SIMPLE_REDUCTION(nameBase##_uint_fn,unsigned int,"%u",loop) \
01360   SIMPLE_REDUCTION(nameBase##_ulong_fn,unsigned long,"%lu",loop) \
01361   SIMPLE_REDUCTION(nameBase##_ulong_long_fn,unsigned long long,"%llu",loop) \
01362   SIMPLE_REDUCTION(nameBase##_float_fn,float,"%f",loop) \
01363   SIMPLE_REDUCTION(nameBase##_double_fn,double,"%f",loop)
01364 
01365 
01366 SIMPLE_POLYMORPH_REDUCTION(sum,ret[i]+=value[i];)
01367 
01368 
01369 SIMPLE_POLYMORPH_REDUCTION(product,ret[i]*=value[i];)
01370 
01371 
01372 SIMPLE_POLYMORPH_REDUCTION(max,if (ret[i]<value[i]) ret[i]=value[i];)
01373 
01374 
01375 SIMPLE_POLYMORPH_REDUCTION(min,if (ret[i]>value[i]) ret[i]=value[i];)
01376 
01377 
01378 
01379 
01380 SIMPLE_REDUCTION(logical_and_fn,int,"%d",
01381         if (value[i]==0)
01382      ret[i]=0;
01383   ret[i]=!!ret[i];
01384 )
01385 
01386 
01387 
01388 SIMPLE_REDUCTION(logical_and_int_fn,int,"%d",
01389         if (value[i]==0)
01390      ret[i]=0;
01391   ret[i]=!!ret[i];
01392 )
01393 
01394 
01395 
01396 SIMPLE_REDUCTION(logical_and_bool_fn,bool,"%d",
01397   if (!value[i]) ret[i]=false;
01398 )
01399 
01400 
01401 
01402 SIMPLE_REDUCTION(logical_or_fn,int,"%d",
01403   if (value[i]!=0)
01404            ret[i]=1;
01405   ret[i]=!!ret[i];
01406 )
01407 
01408 
01409 
01410 SIMPLE_REDUCTION(logical_or_int_fn,int,"%d",
01411   if (value[i]!=0)
01412            ret[i]=1;
01413   ret[i]=!!ret[i];
01414 )
01415 
01416 
01417 
01418 SIMPLE_REDUCTION(logical_or_bool_fn,bool,"%d",
01419   if (value[i]) ret[i]=true;
01420 )
01421 
01422 
01423 
01424 SIMPLE_REDUCTION(logical_xor_int_fn,int,"%d",
01425   ret[i] = (!ret[i] != !value[i]);
01426 )
01427 
01428 
01429 
01430 SIMPLE_REDUCTION(logical_xor_bool_fn,bool,"%d",
01431   ret[i] = (ret[i] != value[i]);
01432 )
01433 
01434 SIMPLE_REDUCTION(bitvec_and_fn,int,"%d",ret[i]&=value[i];)
01435 SIMPLE_REDUCTION(bitvec_and_int_fn,int,"%d",ret[i]&=value[i];)
01436 SIMPLE_REDUCTION(bitvec_and_bool_fn,bool,"%d",ret[i]&=value[i];)
01437 
01438 SIMPLE_REDUCTION(bitvec_or_fn,int,"%d",ret[i]|=value[i];)
01439 SIMPLE_REDUCTION(bitvec_or_int_fn,int,"%d",ret[i]|=value[i];)
01440 SIMPLE_REDUCTION(bitvec_or_bool_fn,bool,"%d",ret[i]|=value[i];)
01441 
01442 SIMPLE_REDUCTION(bitvec_xor_fn,int,"%d",ret[i]^=value[i];)
01443 SIMPLE_REDUCTION(bitvec_xor_int_fn,int,"%d",ret[i]^=value[i];)
01444 SIMPLE_REDUCTION(bitvec_xor_bool_fn,bool,"%d",ret[i]^=value[i];)
01445 
01446 
01447 static CkReductionMsg *random_fn(int nMsg,CkReductionMsg **msg) {
01448   int idx = (int)(CrnDrand()*(nMsg-1) + 0.5);
01449   return CkReductionMsg::buildNew(msg[idx]->getLength(),
01450                                   (void *)msg[idx]->getData(),
01451                                   CkReduction::random, msg[idx]);
01452 }
01453 
01455 
01456 
01457 
01458 
01459 static CkReductionMsg *concat_fn(int nMsg,CkReductionMsg **msg)
01460 {
01461   RED_DEB(("/ PE_%d: reduction_concat invoked on %d messages\n",CkMyPe(),nMsg));
01462   
01463   int i,retSize=0;
01464   for (i=0;i<nMsg;i++)
01465       retSize+=msg[i]->getSize();
01466 
01467   RED_DEB(("|- concat'd reduction message will be %d bytes\n",retSize));
01468 
01469   
01470   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
01471 
01472   
01473   char *cur=(char *)(ret->getData());
01474   for (i=0;i<nMsg;i++) {
01475     int messageBytes=msg[i]->getSize();
01476     memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
01477     cur+=messageBytes;
01478   }
01479   RED_DEB(("\\ PE_%d: reduction_concat finished-- %d messages combined\n",CkMyPe(),nMsg));
01480   return ret;
01481 }
01482 
01484 
01485 
01486 
01487 
01488 
01489 
01490 
01491 
01492 static const int alignSize=sizeof(double);
01493 static int SET_ALIGN(int x) {return ~(alignSize-1)&((x)+alignSize-1);}
01494 
01495 
01496 static int SET_SIZE(int dataSize)
01497 {return SET_ALIGN(sizeof(int)+dataSize);}
01498 
01499 
01500 static CkReduction::setElement *SET_NEXT(CkReduction::setElement *cur)
01501 {
01502   char *next=((char *)cur)+SET_SIZE(cur->dataSize);
01503   return (CkReduction::setElement *)next;
01504 }
01505 
01506 
01507 
01508 static CkReductionMsg *set_fn(int nMsg,CkReductionMsg **msg)
01509 {
01510   RED_DEB(("/ PE_%d: reduction_set invoked on %d messages\n",CkMyPe(),nMsg));
01511   
01512   int i,retSize=0;
01513   for (i=0;i<nMsg;i++) {
01514     if (!msg[i]->isFromUser())
01515     
01516       retSize+=(msg[i]->getSize()-sizeof(int));
01517     else 
01518       retSize+=SET_SIZE(msg[i]->getSize());
01519   }
01520   retSize+=sizeof(int);
01521 
01522   RED_DEB(("|- composite set reduction message will be %d bytes\n",retSize));
01523 
01524   
01525   CkReductionMsg *ret=CkReductionMsg::buildNew(retSize,NULL);
01526 
01527   
01528   CkReduction::setElement *cur=(CkReduction::setElement *)(ret->getData());
01529   for (i=0;i<nMsg;i++)
01530     if (!msg[i]->isFromUser())
01531     {
01532                         int messageBytes=msg[i]->getSize()-sizeof(int);
01533                         RED_DEB(("|\tc msg[%d] is %d bytes\n",i,msg[i]->getSize()));
01534                         memcpy((void *)cur,(void *)msg[i]->getData(),messageBytes);
01535                         cur=(CkReduction::setElement *)(((char *)cur)+messageBytes);
01536     }
01537     else 
01538     {
01539       RED_DEB(("|\tu msg[%d] is %d bytes\n",i,msg[i]->getSize()));
01540       cur->dataSize=msg[i]->getSize();
01541       memcpy((void *)cur->data,(void *)msg[i]->getData(),msg[i]->getSize());
01542       cur=SET_NEXT(cur);
01543     }
01544   cur->dataSize=-1;
01545   RED_DEB(("\\ PE_%d: reduction_set finished-- %d messages combined\n",CkMyPe(),nMsg));
01546   return ret;
01547 }
01548 
01549 
01550 
01551 
01552 
01553 CkReduction::setElement *CkReduction::setElement::next(void)
01554 {
01555   CkReduction::setElement *n=SET_NEXT(this);
01556   if (n->dataSize==-1)
01557     return NULL;
01558   else
01559     return n;
01560 }
01561 
01562 
01564 
01565 CkReduction::statisticsElement::statisticsElement(double initialValue)
01566   : count(1)
01567   , mean(initialValue)
01568   , m2(0.0)
01569 {}
01570 
01571 
01572 
01573 
01574 
01575 
01576 static CkReductionMsg* statistics_fn(int nMsgs, CkReductionMsg** msg)
01577 {
01578   int nElem = msg[0]->getLength() / sizeof(CkReduction::statisticsElement);
01579   CkReduction::statisticsElement* ret = (CkReduction::statisticsElement*)(msg[0]->getData());
01580   for (int m = 1; m < nMsgs; m++)
01581   {
01582     CkReduction::statisticsElement* value = (CkReduction::statisticsElement*)(msg[m]->getData());
01583     for (int i = 0; i < nElem; i++)
01584     {
01585       double a_count = ret[i].count;
01586       ret[i].count += value[i].count;
01587       double delta = value[i].mean - ret[i].mean;
01588       ret[i].mean += delta * value[i].count / ret[i].count;
01589       ret[i].m2 += value[i].m2 + delta * delta * value[i].count * a_count / ret[i].count;
01590     }
01591   }
01592   return CkReductionMsg::buildNew(
01593     nElem*sizeof(CkReduction::statisticsElement),
01594     (void *)ret,
01595     CkReduction::invalid,
01596     msg[0]);
01597 }
01598 
01600 
01601 CkReduction::tupleElement::tupleElement()
01602   : dataSize(0)
01603   , data(NULL)
01604   , reducer(CkReduction::invalid)
01605   , owns_data(false)
01606 {}
01607 CkReduction::tupleElement::tupleElement(size_t dataSize_, void* data_, CkReduction::reducerType reducer_)
01608   : dataSize(dataSize_)
01609   , data((char*)data_)
01610   , reducer(reducer_)
01611   , owns_data(false)
01612 {
01613 }
01614 CkReduction::tupleElement::tupleElement(CkReduction::tupleElement&& rhs_move)
01615   : dataSize(rhs_move.dataSize)
01616   , data(rhs_move.data)
01617   , reducer(rhs_move.reducer)
01618   , owns_data(rhs_move.owns_data)
01619 {
01620   rhs_move.dataSize = 0;
01621   rhs_move.data = 0;
01622   rhs_move.reducer = CkReduction::invalid;
01623   rhs_move.owns_data = false;
01624 }
01625 CkReduction::tupleElement& CkReduction::tupleElement::operator=(CkReduction::tupleElement&& rhs_move)
01626 {
01627   if (owns_data)
01628     delete[] data;
01629   dataSize = rhs_move.dataSize;
01630   data = rhs_move.data;
01631   reducer = rhs_move.reducer;
01632   owns_data = rhs_move.owns_data;
01633   rhs_move.dataSize = 0;
01634   rhs_move.data = 0;
01635   rhs_move.reducer = CkReduction::invalid;
01636   rhs_move.owns_data = false;
01637   return *this;
01638 }
01639 CkReduction::tupleElement::~tupleElement()
01640 {
01641   if (owns_data)
01642     delete[] data;
01643 }
01644 
01645 void CkReduction::tupleElement::pup(PUP::er &p) {
01646   p|dataSize;
01647   
01648   
01649   
01650   if (p.isUnpacking()) {
01651     data = new char[dataSize];
01652     owns_data = true;
01653   }
01654   PUParray(p, data, dataSize);
01655   if (p.isUnpacking()){
01656     int temp;
01657     p|temp;
01658     reducer=(CkReduction::reducerType)temp;
01659   } else {
01660     int temp=(int)reducer;
01661     p|temp;
01662   }
01663 }
01664 
01665 CkReductionMsg* CkReductionMsg::buildFromTuple(CkReduction::tupleElement* reductions, int num_reductions)
01666 {
01667   PUP::sizer ps;
01668   ps|num_reductions;
01669   PUParray(ps, reductions, num_reductions);
01670 
01671   CkReductionMsg* msg = CkReductionMsg::buildNew(ps.size(), NULL, CkReduction::tuple);
01672   PUP::toMem p(msg->data);
01673   p|num_reductions;
01674   PUParray(p, reductions, num_reductions);
01675   if (p.size() != ps.size()) CmiAbort("Size mismatch packing CkReduction::tupleElement::tupleToBuffer\n");
01676   return msg;
01677 }
01678 
01679 void CkReductionMsg::toTuple(CkReduction::tupleElement** out_reductions, int* num_reductions)
01680 {
01681   PUP::fromMem p(this->getData());
01682   p|(*num_reductions);
01683   *out_reductions = new CkReduction::tupleElement[*num_reductions];
01684   PUParray(p, *out_reductions, *num_reductions);
01685 }
01686 
01687 
01688 CkReductionMsg* CkReduction::tupleReduction_fn(int num_messages, CkReductionMsg** messages)
01689 {
01690   std::vector<CkReduction::tupleElement*> tuple_data(num_messages);
01691   int num_reductions = 0;
01692   for (int message_idx = 0; message_idx < num_messages; ++message_idx)
01693   {
01694     int itr_num_reductions = 0;
01695     messages[message_idx]->toTuple(&tuple_data[message_idx], &itr_num_reductions);
01696 
01697     
01698     if (num_reductions == 0)
01699       num_reductions = itr_num_reductions;
01700     else if (num_reductions != itr_num_reductions)
01701       CmiAbort("num_reductions mismatch in CkReduction::tupleReduction");
01702   }
01703 
01704   DEB_TUPLE(("tupleReduction {\n  num_messages=%d,\n  num_reductions=%d,\n  length=%d\n",
01705            num_messages, num_reductions, messages[0]->getLength()));
01706 
01707   std::vector<CkReduction::tupleElement> return_data(num_reductions);
01708   
01709   
01710   std::vector<char> simulated_messages_buffer(sizeof(CkReductionMsg) * num_reductions * num_messages);
01711   std::vector<CkReductionMsg*> simulated_messages(num_messages);
01712 
01713   
01714   
01715 
01716   std::vector<CkReductionMsg *> msgs_to_delete;
01717   msgs_to_delete.reserve(num_reductions);
01718   for (int reduction_idx = 0; reduction_idx < num_reductions; ++reduction_idx)
01719   {
01720     DEB_TUPLE(("  reduction_idx=%d {\n", reduction_idx));
01721     CkReduction::reducerType reducerType = CkReduction::invalid;
01722     for (int message_idx = 0; message_idx < num_messages; ++message_idx)
01723     {
01724       CkReduction::tupleElement* reductions = (CkReduction::tupleElement*)(tuple_data[message_idx]);
01725       CkReduction::tupleElement& element = reductions[reduction_idx];
01726       DEB_TUPLE(("    msg %d, sf=%d, length=%d : { dataSize=%d, data=%p, reducer=%d },\n",
01727                  message_idx, messages[message_idx]->sourceFlag, messages[message_idx]->getLength(), element.dataSize, element.data, element.reducer));
01728 
01729       reducerType = element.reducer;
01730 
01731       size_t sim_idx = (reduction_idx * num_messages + message_idx) * sizeof(CkReductionMsg);
01732       CkReductionMsg& simulated_message = *(CkReductionMsg*)&simulated_messages_buffer[sim_idx];
01733       simulated_message.dataSize = element.dataSize;
01734       simulated_message.data = element.data;
01735       simulated_message.reducer = element.reducer;
01736       simulated_message.sourceFlag = messages[message_idx]->sourceFlag;
01737       simulated_message.userFlag = messages[message_idx]->userFlag;
01738       simulated_message.gcount = messages[message_idx]->gcount;
01739       simulated_message.migratableContributor = messages[message_idx]->migratableContributor;
01740 #if CMK_BIGSIM_CHARM
01741       simulated_message.log = NULL;
01742 #endif
01743       simulated_messages[message_idx] = &simulated_message;
01744     }
01745 
01746     
01747     const auto& reducerFp = CkReduction::reducerTable()[reducerType].fn;
01748     CkReductionMsg* result = reducerFp(num_messages, simulated_messages.data());
01749     DEB_TUPLE(("    result_len=%d\n  },\n", result->getLength()));
01750     return_data[reduction_idx] = CkReduction::tupleElement(result->getLength(), result->getData(), reducerType);
01751     
01752     
01753     if (result != simulated_messages[0]) {
01754       msgs_to_delete.push_back(result);
01755     }
01756   }
01757 
01758   CkReductionMsg* retval = CkReductionMsg::buildFromTuple(return_data.data(), num_reductions);
01759   DEB_TUPLE(("} tupleReduction msg_size=%d\n", retval->getSize()));
01760 
01761   for (auto data : tuple_data) delete[] data;
01762   for (auto msg : msgs_to_delete) delete msg;
01763 
01764   return retval;
01765 }
01766 
01767 
01769 static CkReductionMsg *external_py(int nMsgs, CkReductionMsg **msg)
01770 {
01771     
01772     std::vector<char*> msg_data(nMsgs);
01773     std::vector<int> msg_sizes(nMsgs);
01774 
01775     for (int i = 0; i < nMsgs; i++)
01776     {
01777         msg_data[i] = (char*)(msg[i]->getData());
01778         msg_sizes[i] = msg[i]->getSize();
01779         
01780     }
01781 
01782     
01783     
01784     char* reduction_result;
01785     int reduction_result_size = PyReductionExt(msg_data.data(), msg_sizes.data(), nMsgs, &reduction_result);
01786     
01787 
01788     return CkReductionMsg::buildNew(reduction_result_size, reduction_result);
01789 }
01790 
01791 
01792 
01794 CkReduction::CkReduction() {} 
01795 
01796 
01797 
01798 CkReduction::reducerType CkReduction::addReducer(reducerFn fn, bool streamable, const char* name)
01799 {
01800   CkAssert(CmiMyRank() == 0);
01801   reducerType index = (reducerType)reducerTable().size();
01802   reducerTable().emplace_back(fn, streamable, name);
01803   return index;
01804 }
01805 
01806 
01807 
01808 
01809 
01810 
01811 
01812 std::vector<CkReduction::reducerStruct> CkReduction::initReducerTable()
01813 {
01814   std::vector<CkReduction::reducerStruct> vec;
01815 
01816   vec.emplace_back(invalid_reducer_fn, true, "CkReduction::invalid");
01817   vec.emplace_back(nop_fn, true, "CkReduction::nop");
01818   
01819   vec.emplace_back(sum_char_fn, true, "CkReduction::sum_char");
01820   vec.emplace_back(sum_short_fn, true, "CkReduction::sum_short");
01821   vec.emplace_back(sum_int_fn, true, "CkReduction::sum_int");
01822   vec.emplace_back(sum_long_fn, true, "CkReduction::sum_long");
01823   vec.emplace_back(sum_long_long_fn, true, "CkReduction::sum_long_long");
01824   vec.emplace_back(sum_uchar_fn, true, "CkReduction::sum_uchar");
01825   vec.emplace_back(sum_ushort_fn, true, "CkReduction::sum_ushort");
01826   vec.emplace_back(sum_uint_fn, true, "CkReduction::sum_uint");
01827   vec.emplace_back(sum_ulong_fn, true, "CkReduction::sum_ulong");
01828   vec.emplace_back(sum_ulong_long_fn, true, "CkReduction::sum_ulong_long");
01829   vec.emplace_back(sum_float_fn, true, "CkReduction::sum_float");
01830   vec.emplace_back(sum_double_fn, true, "CkReduction::sum_double");
01831 
01832   
01833   vec.emplace_back(product_char_fn, true, "CkReduction::product_char");
01834   vec.emplace_back(product_short_fn, true, "CkReduction::product_short");
01835   vec.emplace_back(product_int_fn, true, "CkReduction::product_int");
01836   vec.emplace_back(product_long_fn, true, "CkReduction::product_long");
01837   vec.emplace_back(product_long_long_fn, true, "CkReduction::product_long_long");
01838   vec.emplace_back(product_uchar_fn, true, "CkReduction::product_uchar");
01839   vec.emplace_back(product_ushort_fn, true, "CkReduction::product_ushort");
01840   vec.emplace_back(product_uint_fn, true, "CkReduction::product_uint");
01841   vec.emplace_back(product_ulong_fn, true, "CkReduction::product_ulong");
01842   vec.emplace_back(product_ulong_long_fn, true, "CkReduction::product_ulong_long");
01843   vec.emplace_back(product_float_fn, true, "CkReduction::product_float");
01844   vec.emplace_back(product_double_fn, true, "CkReduction::product_double");
01845 
01846   
01847   vec.emplace_back(max_char_fn, true, "CkReduction::max_char");
01848   vec.emplace_back(max_short_fn, true, "CkReduction::max_short");
01849   vec.emplace_back(max_int_fn, true, "CkReduction::max_int");
01850   vec.emplace_back(max_long_fn, true, "CkReduction::max_long");
01851   vec.emplace_back(max_long_long_fn, true, "CkReduction::max_long_long");
01852   vec.emplace_back(max_uchar_fn, true, "CkReduction::max_uchar");
01853   vec.emplace_back(max_ushort_fn, true, "CkReduction::max_ushort");
01854   vec.emplace_back(max_uint_fn, true, "CkReduction::max_uint");
01855   vec.emplace_back(max_ulong_fn, true, "CkReduction::max_ulong");
01856   vec.emplace_back(max_ulong_long_fn, true, "CkReduction::max_ulong_long");
01857   vec.emplace_back(max_float_fn, true, "CkReduction::max_float");
01858   vec.emplace_back(max_double_fn, true, "CkReduction::max_double");
01859 
01860   
01861   vec.emplace_back(min_char_fn, true, "CkReduction::min_char");
01862   vec.emplace_back(min_short_fn, true, "CkReduction::min_short");
01863   vec.emplace_back(min_int_fn, true, "CkReduction::min_int");
01864   vec.emplace_back(min_long_fn, true, "CkReduction::min_long");
01865   vec.emplace_back(min_long_long_fn, true, "CkReduction::min_long_long");
01866   vec.emplace_back(min_uchar_fn, true, "CkReduction::min_uchar");
01867   vec.emplace_back(min_ushort_fn, true, "CkReduction::min_ushort");
01868   vec.emplace_back(min_uint_fn, true, "CkReduction::min_uint");
01869   vec.emplace_back(min_ulong_fn, true, "CkReduction::min_ulong");
01870   vec.emplace_back(min_ulong_long_fn, true, "CkReduction::min_ulong_long");
01871   vec.emplace_back(min_float_fn, true, "CkReduction::min_float");
01872   vec.emplace_back(min_double_fn, true, "CkReduction::min_double");
01873 
01874   
01875   
01876     
01877   vec.emplace_back(logical_and_fn, true, "CkReduction::logical_and");
01878   vec.emplace_back(logical_and_int_fn, true, "CkReduction::logical_and_int");
01879   vec.emplace_back(logical_and_bool_fn, true, "CkReduction::logical_and_bool");
01880 
01881   
01882   
01883     
01884   vec.emplace_back(logical_or_fn, true, "CkReduction::logical_or");
01885   vec.emplace_back(logical_or_int_fn, true, "CkReduction::logical_or_int");
01886   vec.emplace_back(logical_or_bool_fn, true, "CkReduction::logical_or_bool");
01887 
01888   
01889   
01890   
01891   vec.emplace_back(logical_xor_int_fn, true, "CkReduction::logical_xor_int");
01892   vec.emplace_back(logical_xor_bool_fn, true, "CkReduction::logical_xor_bool");
01893 
01894   
01895     
01896   vec.emplace_back(bitvec_and_fn, true, "CkReduction::bitvec_and");
01897   vec.emplace_back(bitvec_and_int_fn, true, "CkReduction::bitvec_and_int");
01898   vec.emplace_back(bitvec_and_bool_fn, true, "CkReduction::bitvec_and_bool");
01899 
01900   
01901     
01902   vec.emplace_back(bitvec_or_fn, true, "CkReduction::bitvec_or");
01903   vec.emplace_back(bitvec_or_int_fn, true, "CkReduction::bitvec_or_int");
01904   vec.emplace_back(bitvec_or_bool_fn, true, "CkReduction::bitvec_or_bool");
01905 
01906   
01907   vec.emplace_back(bitvec_xor_fn, true, "CkReduction::bitvec_xor");
01908   vec.emplace_back(bitvec_xor_int_fn, true, "CkReduction::bitvec_xor_int");
01909   vec.emplace_back(bitvec_xor_bool_fn, true, "CkReduction::bitvec_xor_bool");
01910 
01911   
01912   vec.emplace_back(random_fn, true, "CkReduction::random");
01913 
01914   
01915   
01916   
01917   vec.emplace_back(concat_fn, false, "CkReduction::concat");
01918 
01919   
01920   
01921   
01922   
01923   vec.emplace_back(set_fn, false, "CkReduction::set");
01924 
01925   
01926   vec.emplace_back(statistics_fn, true, "CkReduction::statistics");
01927 
01928   
01929   vec.emplace_back(CkReduction::tupleReduction_fn, false, "CkReduction::tuple");
01930 
01931   
01932   vec.emplace_back(CkReduction::reducerStruct(::external_py, false, "CkReduction::custom_python"));
01933 
01934   return vec;
01935 }
01936 
01937 
01938 
01939 std::vector<CkReduction::reducerStruct>& CkReduction::reducerTable()
01940 {
01941   static std::vector<CkReduction::reducerStruct> table = initReducerTable();
01942   return table;
01943 }
01944 
01945 
01946 typedef enum : uint8_t {
01947     array=0,
01948     group,
01949     nodegroup
01950 } extContributorType;
01951 
01952 
01953 struct CkExtContributeInfo
01954 {
01955     int cbEpIdx;
01956     int fid; 
01957     void* data;
01958     int numelems;
01959     int dataSize;
01960     CkReduction::reducerType redtype;
01961     int id;                                 
01962     int *idx;                               
01963     int ndims;                              
01964     extContributorType contributorType;     
01965 };
01966 
01967 template <typename T>
01968 T* getExtContributor(CkExtContributeInfo* contribute_params);
01969 
01970 template <>
01971 ArrayElement* getExtContributor<ArrayElement>(CkExtContributeInfo* contribute_params)
01972 {
01973     CkGroupID gId;
01974     gId.idx = contribute_params->id;
01975     CkArrayIndex arrIndex(contribute_params->ndims, contribute_params->idx);
01976     CProxyElement_ArrayBase meProxy = CProxyElement_ArrayBase(gId, arrIndex);
01977     return meProxy.ckLocal();
01978 }
01979 
01980 template <>
01981 Group* getExtContributor<Group>(CkExtContributeInfo* contribute_params)
01982 {
01983     CkGroupID gId;
01984     gId.idx = contribute_params->id;
01985     return (Group*)CkLocalBranch(gId);
01986 }
01987 
01988 
01989 
01990 extern "C" {
01991 void CkExtContributeTo(CkExtContributeInfo* contribute_params, CkCallback& cb);
01992 void CkExtContributeToChare(CkExtContributeInfo* contribute_params, int onPE, void* objPtr);
01993 void CkExtContributeToArray(CkExtContributeInfo* contribute_params, int aid, int* idx, int ndims);
01994 void CkExtContributeToGroup(CkExtContributeInfo* contribute_params, int gid, int pe);
01995 }
01996 
01997 
01998 template <class T>
01999 void CkExtContribute(CkExtContributeInfo* contribute_params, CkCallback& cb)
02000 {
02001     T* me = getExtContributor<T>(contribute_params);
02002 
02003     if (contribute_params->redtype == CkReduction::nop) {
02004         contribute_params->dataSize = 0;
02005         contribute_params->data = NULL;
02006     }
02007 
02008     me->contribute(contribute_params->dataSize, contribute_params->data, contribute_params->redtype, cb);
02009 }
02010 
02011 void CkExtContributeTo(CkExtContributeInfo* contribute_params, CkCallback& cb)
02012 {
02013 #if CMK_CHARMPY
02014     cb.isCkExtReductionCb = true;
02015 
02016     switch (contribute_params->contributorType) {
02017         case extContributorType::array :
02018             CkExtContribute<ArrayElement>(contribute_params, cb);
02019             break;
02020         case extContributorType::group :
02021             CkExtContribute<Group>(contribute_params, cb);
02022             break;
02023         default : CkAbort("Invalid external contributor type!\n");
02024     }
02025 #else
02026     CkAbort("charm4py support must be enabled to use CkExtContributeTo");
02027 #endif
02028 }
02029 
02030 
02031 void CkExtContributeToChare(CkExtContributeInfo* contribute_params, int onPE, void* objPtr)
02032 {
02033     CkChareID targetChareID;
02034     targetChareID.onPE = onPE;
02035     targetChareID.objPtr = objPtr;
02036 
02037     CkCallback cb(contribute_params->cbEpIdx, targetChareID);
02038     if (contribute_params->fid > 0) cb.setRefnum(contribute_params->fid);
02039     CkExtContributeTo(contribute_params, cb);
02040 }
02041 
02042 
02043 void CkExtContributeToArray(CkExtContributeInfo* contribute_params, int aid, int* idx, int ndims)
02044 {
02045     CkCallback cb;
02046     CkGroupID gId;
02047     gId.idx = aid;
02048 
02049     CkArrayID arrayId(gId);
02050 
02051     if (ndims > 0) {
02052         
02053         CkArrayIndex arrIndex(ndims, idx);
02054         cb = CkCallback(contribute_params->cbEpIdx, arrIndex, arrayId);
02055     }
02056     else {
02057         
02058         cb = CkCallback(contribute_params->cbEpIdx, arrayId);
02059     }
02060     if (contribute_params->fid > 0) cb.setRefnum(contribute_params->fid);
02061 
02062     CkExtContributeTo(contribute_params, cb);
02063 }
02064 
02065 
02066 void CkExtContributeToGroup(CkExtContributeInfo* contribute_params, int gid, int pe)
02067 {
02068     CkCallback cb;
02069     CkGroupID groupId;
02070     groupId.idx = gid;
02071 
02072     if (pe == -1) {
02073         
02074         cb = CkCallback(contribute_params->cbEpIdx, groupId);
02075     }
02076     else {
02077         
02078         cb = CkCallback(contribute_params->cbEpIdx, pe, groupId);
02079     }
02080     if (contribute_params->fid > 0) cb.setRefnum(contribute_params->fid);
02081 
02082     CkExtContributeTo(contribute_params, cb);
02083 }
02084 
02085 
02086 
02104 NodeGroup::NodeGroup(void):thisIndex(CkMyNode()) {
02105   __nodelock=CmiCreateLock();
02106 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02107     mlogData->objID.type = TypeNodeGroup;
02108     mlogData->objID.data.group.onPE = CkMyNode();
02109 #endif
02110 
02111 }
02112 NodeGroup::~NodeGroup() {
02113   CmiDestroyLock(__nodelock);
02114   CkpvAccess(_destroyingNodeGroup) = true;
02115 }
02116 void NodeGroup::pup(PUP::er &p)
02117 {
02118   CkNodeReductionMgr::pup(p);
02119   p|reductionInfo;
02120 }
02121 
02122 
02123 
02124 void CProxy_NodeGroup::ckSetReductionClient(CkCallback *cb) const {
02125   DEBR(("in CksetReductionClient for CProxy_NodeGroup %d\n",CkLocalNodeBranch(_ck_gid)));
02126  ((CkNodeReductionMgr *)CkLocalNodeBranch(_ck_gid))->ckSetReductionClient(cb);
02127   
02128  }
02129 
02130 CK_REDUCTION_CONTRIBUTE_METHODS_DEF(NodeGroup,
02131                     ((CkNodeReductionMgr *)this),
02132                     reductionInfo,false)
02133 
02134 
02135  
02136 void NodeGroup::contributeWithCounter(CkReductionMsg *msg,int count)
02137     {((CkNodeReductionMgr *)this)->contributeWithCounter(&reductionInfo,msg,count);}
02138 
02139 
02140 
02141 
02142 
02143 CkNodeReductionMgr::CkNodeReductionMgr()
02144   : thisProxy(thisgroup)
02145 {
02146 #ifdef BINOMIAL_TREE
02147   init_BinomialTree();
02148 #elif CMK_BIGSIM_CHARM
02149   init_BinaryTree();
02150 #else
02151   init_TopoTree();
02152 #endif
02153   storedCallback=NULL;
02154   redNo=0;
02155   inProgress=false;
02156   
02157   startRequested=false;
02158   gcount=CkNumNodes();
02159   lcount=1;
02160   nContrib=nRemote=0;
02161   lockEverything = CmiCreateLock();
02162 
02163 
02164   creating=false;
02165   interrupt = false;
02166   DEBR((AA "In NodereductionMgr constructor at %d \n" AB,this));
02167 #if CMK_FAULT_EVAC
02168     blocked = false;
02169     maxModificationRedNo = INT_MAX;
02170     killed=false;
02171     additionalGCount = newAdditionalGCount = 0;
02172 #endif
02173 }
02174 
02175 CkNodeReductionMgr::~CkNodeReductionMgr()
02176 {
02177   CmiDestroyLock(lockEverything);
02178 }
02179 
02180 void CkNodeReductionMgr::flushStates()
02181 {
02182  if(CkMyRank() == 0){
02183   
02184   redNo=0;
02185   inProgress=false;
02186 
02187   startRequested=false;
02188   gcount=CkNumNodes();
02189   lcount=1;
02190   nContrib=nRemote=0;
02191 
02192   creating=false;
02193   interrupt = false;
02194   while (!msgs.isEmpty()) { delete msgs.deq(); }
02195   while (!futureMsgs.isEmpty()) delete futureMsgs.deq();
02196   while (!futureRemoteMsgs.isEmpty()) delete futureRemoteMsgs.deq();
02197   while (!futureLateMigrantMsgs.isEmpty()) delete futureLateMigrantMsgs.deq();
02198   }
02199 }
02200 
02202 
02203 
02204 void CkNodeReductionMgr::ckSetReductionClient(CkCallback *cb)
02205 {
02206   DEBR((AA "Setting reductionClient in NodeReductionMgr %d at %d\n" AB,cb,this));
02207   if(cb->isInvalid()){
02208     DEBR((AA "Invalid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
02209   }else{
02210     DEBR((AA "Valid Callback passed to setReductionClient in nodeReductionMgr\n" AB));
02211   }
02212 
02213   if (CkMyNode()!=0)
02214       CkError("WARNING: ckSetReductionClient should only be called from processor zero!\n");
02215   delete storedCallback;
02216   storedCallback=cb;
02217 }
02218 
02219 
02220 
02221 void CkNodeReductionMgr::contribute(contributorInfo *ci,CkReductionMsg *m)
02222 {
02223 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02224     Chare *oldObj =CpvAccess(_currentObj);
02225     CpvAccess(_currentObj) = this;
02226 #endif
02227 
02228   m->redNo=ci->redNo++;
02229   m->sourceFlag=-1;
02230   m->gcount=0;
02231   DEBR(("[%d,%d] NodeGroup %d> localContribute called for redNo %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo));
02232   addContribution(m);
02233 
02234 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02235     CpvAccess(_currentObj) = oldObj;
02236 #endif
02237 }
02238 
02239 
02240 void CkNodeReductionMgr::contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count)
02241 {
02242 #if CMK_BIGSIM_CHARM
02243   _TRACE_BG_TLINE_END(&m->log);
02244 #endif
02245 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02246     Chare *oldObj =CpvAccess(_currentObj);
02247     CpvAccess(_currentObj) = this;
02248 #endif
02249   m->redNo=ci->redNo++;
02250   m->gcount=count;
02251   DEBR(("[%d,%d] contributewithCounter started for %d at %0.6f{{{\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
02252   addContribution(m);
02253   DEBR(("[%d,%d] }}}contributewithCounter finished for %d at %0.6f\n",CkMyNode(),CkMyPe(),m->redNo,CmiWallTimer()));
02254 
02255 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
02256     CpvAccess(_currentObj) = oldObj;
02257 #endif
02258 }
02259 
02260 
02262 
02263 void CkNodeReductionMgr::doRecvMsg(CkReductionMsg *m){
02264     DEBR(("[%d,%d] doRecvMsg called for  %d at %.6f[[[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
02265 #if CMK_FAULT_EVAC
02266     if(blocked){
02267         DEBR(("[%d] This node is blocked, so remote message is being buffered as no %d\n",CkMyNode(),bufferedRemoteMsgs.length()));
02268         bufferedRemoteMsgs.enq(m);
02269         return;
02270     }
02271 #endif
02272     
02273     if (isPresent(m->redNo)) { 
02274         
02275         startReduction(m->redNo,CkMyNode());
02276         msgs.enq(m);
02277         nRemote++;
02278         finishReduction();
02279     }
02280     else {
02281         if (isFuture(m->redNo)) {
02282                
02283             futureRemoteMsgs.enq(m);
02284         }else{
02285            CkPrintf("BIG Problem Present %d Mesg RedNo %d \n",redNo,m->redNo);  
02286            CkAbort("Recv'd late remote contribution!\n");
02287         }
02288     }
02289     DEBR(("[%d,%d]]]]] doRecvMsg called for  %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
02290 }
02291 
02292 
02293 void CkNodeReductionMgr::RecvMsg(CkReductionMsg *m)
02294 {
02295 #if CMK_BIGSIM_CHARM
02296   _TRACE_BG_TLINE_END(&m->log);
02297 #endif
02298 #ifndef CMK_CPV_IS_SMP
02299 #if CMK_IMMEDIATE_MSG
02300     if(interrupt == true){
02301         
02302         CpvAccess(_qd)->process(-1);
02303         CmiDelayImmediate();
02304         return;
02305     }
02306 #endif  
02307 #endif
02308    interrupt = true;
02309    CmiLock(lockEverything);   
02310    DEBR(("[%d,%d] Recv'd REMOTE contribution for %d at %.6f[[[\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
02311    doRecvMsg(m);
02312    CmiUnlock(lockEverything);    
02313    interrupt = false;
02314    DEBR(("[%d,%d] ]]]]]]Recv'd REMOTE contribution for %d at %.6f\n",CkMyNode(),CkMyPe(),m->redNo,CkWallTimer()));
02315 }
02316 
02317 void CkNodeReductionMgr::startReduction(int number,int srcNode)
02318 {
02319     if (isFuture(number)) CkAbort("Can't start reductions out of order!\n");
02320     if (isPast(number)) CkAbort("Can't restart reduction that's already finished!\n");
02321     if (inProgress){
02322         DEBR((AA "This Node reduction is already in progress\n" AB));
02323         return;
02324     }
02325     if (creating) 
02326     {
02327         DEBR((AA " Node Postponing start request #%d until we're done creating\n" AB,redNo));
02328         startRequested=true;
02329         return;
02330     }
02331     
02332     
02333     DEBR((AA "Starting Node reduction #%d on %p srcNode %d\n" AB,redNo,this,srcNode));
02334     inProgress=true;
02335 }
02336 
02337 void CkNodeReductionMgr::doAddContribution(CkReductionMsg *m){
02338 #if CMK_FAULT_EVAC
02339     if(blocked){
02340         DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
02341         bufferedMsgs.enq(m);
02342         return;
02343     }
02344 #endif
02345     
02346     if (isFuture(m->redNo)) {
02347         DEBR((AA "Contributor gives early node contribution-- for #%d\n" AB,m->redNo));
02348         futureMsgs.enq(m);
02349     } else {
02350         DEBR((AA "Recv'd local node contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
02351         
02352         startReduction(m->redNo,CkMyNode());
02353         msgs.enq(m);
02354         nContrib++;
02355         finishReduction();
02356     }
02357 }
02358 
02359 
02360 void CkNodeReductionMgr::addContribution(CkReductionMsg *m)
02361 {
02362   interrupt = true;
02363   CmiLock(lockEverything);
02364   doAddContribution(m);
02365   CmiUnlock(lockEverything);
02366   interrupt = false;
02367 }
02368 
02369 void CkNodeReductionMgr::LateMigrantMsg(CkReductionMsg *m){
02370         CmiLock(lockEverything);   
02371 #if CMK_FAULT_EVAC
02372     if(blocked){
02373         DEBR(("[%d] This node is blocked, so local message is being buffered as no %d\n",CkMyNode(),bufferedMsgs.length()));
02374         bufferedMsgs.enq(m);
02375                 CmiUnlock(lockEverything);   
02376         return;
02377     }
02378 #endif
02379     
02380     if (isFuture(m->redNo)) {
02381         DEBR((AA "Latemigrant gives early node contribution-- for #%d\n" AB,m->redNo));
02382 
02383         futureLateMigrantMsgs.enq(m);
02384     } else {
02385         DEBR((AA "Recv'd late migrant contribution %d for #%d at %d\n" AB,nContrib,m->redNo,this));
02386 
02387         msgs.enq(m);
02388         finishReduction();
02389     }
02390         CmiUnlock(lockEverything);   
02391 }
02392 
02393 
02394 
02395 
02396 
02400 void CkNodeReductionMgr::finishReduction(void)
02401 {
02402   DEBR((AA "in Nodegrp finishReduction %d treeKids %d \n" AB,inProgress,treeKids()));
02403   
02404   if ((!inProgress) || creating){
02405     DEBR((AA "Either not in Progress or creating\n" AB));
02406     return;
02407   }
02408 
02409   bool partialReduction = false;
02410 
02411   if (nContrib<(lcount)){
02412     if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
02413       partialReduction = true;
02414     }
02415     else {
02416       DEBR((AA "Nodegrp Need more local messages %d %d\n" AB,nContrib,(lcount)));
02417       return;
02418     }
02419   }
02420   if (nRemote<treeKids()){
02421     if (msgs.length() > 1 && CkReduction::reducerTable()[msgs.peek()->reducer].streamable) {
02422       partialReduction = true;
02423     }
02424     else {
02425       DEBR((AA "Nodegrp Need more Remote messages %d %d\n" AB,nRemote,treeKids()));
02426       return;
02427     }
02428   }
02429   if (nRemote>treeKids()){
02430 
02431       interrupt = false;
02432        CkAbort("Nodegrp Excess remote reduction message received!\n");
02433   }
02434 
02435   DEBR((AA "Reducing node data...\n" AB));
02436 
02438 #if CMK_BIGSIM_CHARM
02439   _TRACE_BG_END_EXECUTE(1);
02440   void* _bgParentLog = NULL;
02441   _TRACE_BG_BEGIN_EXECUTE_NOMSG("NodeReduce", &_bgParentLog, 0);
02442 #endif
02443   CkReductionMsg *result=CkReductionMgr::reduceMessages(msgs);
02444   result->redNo=redNo;
02445   DEBR((AA "Node Reduced gcount=%d; sourceFlag=%d\n" AB,result->gcount,result->sourceFlag));
02446 #if CMK_BIGSIM_CHARM
02447   _TRACE_BG_TLINE_END(&result->log);
02448 #endif
02449 
02450   if (partialReduction) {
02451     msgs.enq(result);
02452     return;
02453   }
02454 
02455   if (hasParent())
02456   {
02457 #if CMK_FAULT_EVAC
02458     if(CmiNodeAlive(CkMyNode()) || killed == false)
02459 #endif
02460   {
02461         DEBR((AA "Passing reduced data up to parent node %d. \n" AB,treeParent()));
02462         DEBR(("[%d,%d] Passing data up to parentNode %d at %.6f for redNo %d with ncontrib %d\n",CkMyNode(),CkMyPe(),treeParent(),CkWallTimer(),redNo,nContrib));
02463 
02464 #if CMK_FAULT_EVAC
02465             result->gcount += additionalGCount;
02466 #endif
02467         thisProxy[treeParent()].RecvMsg(result);
02468     }
02469 
02470   }
02471   else
02472   {
02473         if(result->isMigratableContributor()
02474 #if CMK_FAULT_EVAC
02475        && result->gcount+additionalGCount != result->sourceFlag
02476 #endif
02477     ){
02478           DEBR(("[%d,%d] NodeGroup %d> Node Reduction %d not done yet gcounts %d sources %d migratable %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,redNo,result->gcount,result->sourceFlag,result->isMigratableContributor()));
02479             msgs.enq(result);
02480             return;
02481         }
02482 #if CMK_FAULT_EVAC
02483         result->gcount += additionalGCount;
02484 #endif
02485 
02489         DEBR(("[%d,%d]------------------- END OF REDUCTION %d with %d remote contributions passed to client function at %.6f\n",CkMyNode(),CkMyPe(),redNo,nRemote,CkWallTimer()));
02490     CkSetRefNum(result, result->getUserFlag());
02491     if (!result->callback.isInvalid()){
02492       DEBR(("[%d,%d] message Callback used \n",CkMyNode(),CkMyPe()));
02493         result->callback.send(result);
02494     }
02495     else if (storedCallback!=NULL){
02496       DEBR(("[%d,%d] stored Callback used \n",CkMyNode(),CkMyPe()));
02497         storedCallback->send(result);
02498     }
02499     else{
02500             DEBR((AA "Invalid Callback \n" AB));
02501         CkAbort("No reduction client!\n"
02502             "You must register a client with either SetReductionClient or during contribute.\n");
02503         }
02504   }
02505 
02506   
02507   
02508   redNo++;
02509 #if CMK_FAULT_EVAC
02510     updateTree();
02511 #endif
02512   int i;
02513   inProgress=false;
02514   startRequested=false;
02515   nRemote=nContrib=0;
02516 
02517   
02518   int n=futureMsgs.length();
02519 
02520   for (i=0;i<n;i++)
02521   {
02522     interrupt = true;
02523 
02524     CkReductionMsg *m=futureMsgs.deq();
02525 
02526     interrupt = false;
02527     if (m!=NULL){ 
02528       DEBR(("[%d,%d] NodeGroup %d> Mesg with redNo %d might be useful in new reduction %d \n",CkMyNode(),CkMyPe(),thisgroup.idx,m->redNo,redNo));
02529       doAddContribution(m);
02530     }
02531   }
02532 
02533   interrupt = true;
02534 
02535   n=futureRemoteMsgs.length();
02536 
02537   interrupt = false;
02538   for (i=0;i<n;i++)
02539   {
02540     interrupt = true;
02541 
02542     CkReductionMsg *m=futureRemoteMsgs.deq();
02543 
02544     interrupt = false;
02545     if (m!=NULL)
02546       doRecvMsg(m);
02547   }
02548   
02549   n = futureLateMigrantMsgs.length();
02550   for(i=0;i<n;i++){
02551     CkReductionMsg *m = futureLateMigrantMsgs.deq();
02552     if(m != NULL){
02553       if(m->redNo == redNo){
02554         msgs.enq(m);
02555       }else{
02556         futureLateMigrantMsgs.enq(m);
02557       }
02558     }
02559   }
02560 }
02561 
02563 
02564 void CkNodeReductionMgr::init_BinaryTree(){
02565     parent = (CkMyNode()-1)/TREE_WID;
02566     int firstkid = CkMyNode()*TREE_WID+1;
02567     numKids=CkNumNodes()-firstkid;
02568   if (numKids>TREE_WID) numKids=TREE_WID;
02569   if (numKids<0) numKids=0;
02570 
02571     for(int i=0;i<numKids;i++){
02572         kids.push_back(firstkid+i);
02573 #if CMK_FAULT_EVAC
02574         newKids.push_back(firstkid+i);
02575 #endif
02576     }
02577 }
02578 
02579 void CkNodeReductionMgr::init_TopoTree() {
02580   if (_topoTree == NULL) CkAbort("CkNodeReductionMgr:: topo tree has not been calculated\n");
02581   CmiSpanningTreeInfo &t = *_topoTree;
02582   parent = t.parent;
02583   numKids = t.child_count;
02584   for (int i=0; i < numKids; i++) {
02585     kids.push_back(t.children[i]);
02586 #if CMK_FAULT_EVAC
02587     newKids.push_back(t.children[i]);
02588 #endif
02589   }
02590 }
02591 
02592 void CkNodeReductionMgr::init_BinomialTree(){
02593     int depth = (int )ceil((log((double )CkNumNodes())/log((double)2)));
02594     
02595     upperSize = (unsigned) 1 << depth;
02596     label = upperSize-CkMyNode()-1;
02597     int p=label;
02598     int count=0;
02599     while( p > 0){
02600         if(p % 2 == 0)
02601             break;
02602         else{
02603             p = p/2;
02604             count++;
02605         }
02606     }
02607     
02608     parent = label + (1<<count);
02609     parent = upperSize -1 -parent;
02610     int temp;
02611     if(count != 0){
02612         numKids = 0;
02613         for(int i=0;i<count;i++){
02614             
02615             temp = label - (1<<i);
02616             temp = upperSize-1-temp;
02617             if(temp <= CkNumNodes()-1){
02618         
02619                 kids.push_back(temp);
02620                 numKids++;
02621             }
02622         }
02623     }else{
02624         numKids = 0;
02625     
02626     }
02627 }
02628 
02629 
02630 int CkNodeReductionMgr::treeRoot(void)
02631 {
02632   return 0;
02633 }
02634 bool CkNodeReductionMgr::hasParent(void) 
02635 {
02636   return (bool)(CkMyNode()!=treeRoot());
02637 }
02638 int CkNodeReductionMgr::treeParent(void) 
02639 {
02640   return parent;
02641 }
02642 
02643 int CkNodeReductionMgr::firstKid(void) 
02644 {
02645   return CkMyNode()*TREE_WID+1;
02646 }
02647 int CkNodeReductionMgr::treeKids(void)
02648 {
02649 #ifdef BINOMIAL_TREE
02650     return numKids;
02651 #else
02652 
02653 
02654 
02655 
02656     return numKids;
02657 #endif
02658 }
02659 
02660 void CkNodeReductionMgr::pup(PUP::er &p)
02661 {
02662 
02663 
02664   IrrGroup::pup(p);
02665   p(redNo);
02666   p(inProgress); p(creating); p(startRequested);
02667   p(lcount);
02668   p(nContrib); p(nRemote);
02669   p(interrupt);
02670   p|msgs;
02671   p|futureMsgs;
02672   p|futureRemoteMsgs;
02673   p|futureLateMigrantMsgs;
02674   p|parent;
02675 
02676 #if CMK_FAULT_EVAC
02677   p|additionalGCount;
02678   p|newAdditionalGCount;
02679 #endif
02680 
02681   if(p.isUnpacking()) {
02682     gcount=CkNumNodes();
02683     thisProxy = thisgroup;
02684     lockEverything = CmiCreateLock();
02685 #ifdef BINOMIAL_TREE
02686     init_BinomialTree();
02687 #elif CMK_BIGSIM_CHARM
02688     init_BinaryTree();
02689 #else
02690     init_TopoTree();
02691 #endif      
02692   }
02693 
02694 #if CMK_FAULT_EVAC
02695   p | blocked;
02696   p | maxModificationRedNo;
02697 #endif
02698 
02699 #if (!defined(_FAULT_MLOG_) && !defined(_FAULT_CAUSAL_))
02700   bool isnull = (storedCallback == NULL);
02701   p | isnull;
02702   if (!isnull) {
02703     if (p.isUnpacking()) {
02704       storedCallback = new CkCallback;
02705     }
02706     p|*storedCallback;
02707   }
02708 #endif
02709 
02710 }
02711 
02712 #if CMK_FAULT_EVAC
02713 
02714 
02715 
02716 
02717 
02718 void CkNodeReductionMgr::evacuate(){
02719     DEBREVAC(("[%d] Evacuate called on nodereductionMgr \n",CkMyNode()));
02720     if(treeKids() == 0){
02721     
02722 
02723 
02724         oldleaf=true;
02725         DEBREVAC(("[%d] Leaf Node marks itself for deletion when evacuation is complete \n",CkMyNode()));
02726         
02727 
02728 
02729 
02730 
02731 
02732 
02733         int data[2];
02734         data[0]=CkMyNode();
02735         data[1]=getTotalGCount()+additionalGCount;
02736         thisProxy[treeParent()].modifyTree(LEAFPARENT,2,data);
02737         newParent = treeParent();
02738     }else{
02739         DEBREVAC(("[%d]%d> Internal Node sends messages to change the redN tree \n",CkMyNode(),thisgroup.idx));
02740         oldleaf= false;
02741     
02742 
02743 
02744 
02745 
02746 
02747         newParent = kids[0];
02748         for(int i=numKids-1;i>=0;i--){
02749             newKids.erase(newKids.begin() + i);
02750         }
02751         
02752 
02753 
02754 
02755         
02756 
02757 
02758         int oldParentData[2];
02759         oldParentData[0] = CkMyNode();
02760         oldParentData[1] = newParent;
02761         thisProxy[parent].modifyTree(OLDPARENT,2,oldParentData);
02762 
02763         
02764 
02765 
02766         int childrenData=newParent;
02767         for(int i=1;i<numKids;i++){
02768             thisProxy[kids[i]].modifyTree(OLDCHILDREN,1,&childrenData);
02769         }
02770         
02771         
02772 
02773 
02774 
02775         std::vector<int> newParentData(numKids+2);
02776         newParentData[0] = CkMyNode();
02777         for(int i=1;i<numKids;i++){
02778             newParentData[i] = kids[i];
02779         }
02780         newParentData[numKids] = parent;
02781         newParentData[numKids+1] = getTotalGCount()+additionalGCount;
02782         thisProxy[newParent].modifyTree(NEWPARENT,numKids+2,newParentData.data());
02783     }
02784     readyDeletion = false;
02785     blocked = true;
02786     numModificationReplies = 0;
02787     tempModificationRedNo = findMaxRedNo();
02788 }
02789 
02790 
02791 
02792 
02793 
02794 
02795 
02796 
02797 
02798 void CkNodeReductionMgr::modifyTree(int code,int size,int *data){
02799     DEBREVAC(("[%d]%d> Received modifyTree request with code %d \n",CkMyNode(),thisgroup.idx,code));
02800     int sender;
02801     newKids = kids;
02802     readyDeletion = false;
02803     newAdditionalGCount = additionalGCount;
02804     switch(code){
02805         case OLDPARENT: 
02806             for(int i=0;i<numKids;i++){
02807                 if(newKids[i] == data[0]){
02808                     newKids[i] = data[1];
02809                     break;
02810                 }
02811             }
02812             sender = data[0];
02813             newParent = parent;
02814             break;
02815         case OLDCHILDREN:
02816             newParent = data[0];
02817             sender = parent;
02818             break;
02819         case NEWPARENT:
02820             for(int i=0;i<size-2;i++){
02821                 newKids.push_back(data[i]);
02822             }
02823             newParent = data[size-2];
02824             newAdditionalGCount += data[size-1];
02825             sender = parent;
02826             break;
02827         case LEAFPARENT:
02828             for(int i=0;i<numKids;i++){
02829                 if(newKids[i] == data[0]){
02830                     newKids.erase(newKids.begin() + i);
02831                     break;
02832                 }
02833             }
02834             sender = data[0];
02835             newParent = parent;
02836             newAdditionalGCount += data[1];
02837             break;
02838     };
02839     blocked = true;
02840     int maxRedNo = findMaxRedNo();
02841     
02842     thisProxy[sender].collectMaxRedNo(maxRedNo);
02843 }
02844 
02845 void CkNodeReductionMgr::collectMaxRedNo(int maxRedNo){
02846     
02847 
02848 
02849 
02850     numModificationReplies++;
02851     if(maxRedNo > tempModificationRedNo){
02852         tempModificationRedNo = maxRedNo;
02853     }
02854     if(numModificationReplies == numKids+1){
02855         maxModificationRedNo = tempModificationRedNo;
02856         
02857 
02858 
02859 
02860         if(maxModificationRedNo == -1){
02861             printf("[%d]%d> This array has not started reductions yet \n",CkMyNode(),thisgroup.idx);
02862         }else{
02863             DEBREVAC(("[%d]%d> maxModificationRedNo for this nodegroup %d \n",CkMyNode(),thisgroup.idx,maxModificationRedNo));
02864         }
02865         thisProxy[parent].unblockNode(maxModificationRedNo);
02866         for(int i=0;i<numKids;i++){
02867             thisProxy[kids[i]].unblockNode(maxModificationRedNo);
02868         }
02869         blocked = false;
02870         updateTree();
02871         clearBlockedMsgs();
02872     }
02873 }
02874 
02875 void CkNodeReductionMgr::unblockNode(int maxRedNo){
02876     maxModificationRedNo = maxRedNo;
02877     updateTree();
02878     blocked = false;
02879     clearBlockedMsgs();
02880 }
02881 
02882 
02883 void CkNodeReductionMgr::clearBlockedMsgs(){
02884     int len = bufferedMsgs.length();
02885     for(int i=0;i<len;i++){
02886         CkReductionMsg *m = bufferedMsgs.deq();
02887         doAddContribution(m);
02888     }
02889     len = bufferedRemoteMsgs.length();
02890     for(int i=0;i<len;i++){
02891         CkReductionMsg *m = bufferedRemoteMsgs.deq();
02892         doRecvMsg(m);
02893     }
02894 
02895 }
02896 
02897 
02898 
02899 
02900 
02901 void CkNodeReductionMgr::updateTree(){
02902     if(redNo > maxModificationRedNo){
02903         parent = newParent;
02904         kids = newKids;
02905         maxModificationRedNo = INT_MAX;
02906         numKids = kids.size();
02907         readyDeletion = true;
02908         additionalGCount = newAdditionalGCount;
02909         DEBREVAC(("[%d]%d> Updating Tree numKids %d -> ",CkMyNode(),thisgroup.idx,numKids));
02910         for(int i=0;i<(int)(newKids.size());i++){
02911             DEBREVAC(("%d ",newKids[i]));
02912         }
02913         DEBREVAC(("\n"));
02914     
02915     }else{
02916         if(maxModificationRedNo != INT_MAX){
02917             DEBREVAC(("[%d]%d> Updating delayed because redNo %d maxModificationRedNo %d \n",CkMyNode(),thisgroup.idx,redNo,maxModificationRedNo));
02918             startReduction(redNo,CkMyNode());
02919             finishReduction();
02920         }   
02921     }
02922 }
02923 
02924 
02925 void CkNodeReductionMgr::doneEvacuate(){
02926     DEBREVAC(("[%d] doneEvacuate called \n",CkMyNode()));
02927 
02928 
02929 
02930 
02931 
02932 
02933 
02934 
02935 
02936 
02937 
02938 
02939 
02940 
02941 
02942 
02943 
02944 
02945 
02946 
02947 
02948 
02949 
02950 
02951 
02952         if(readyDeletion){
02953             thisProxy[treeParent()].DeleteChild(CkMyNode());
02954         }else{
02955             thisProxy[newParent].DeleteNewChild(CkMyNode());
02956         }
02957 
02958 }
02959 
02960 void CkNodeReductionMgr::DeleteChild(int deletedChild){
02961     DEBREVAC(("[%d]%d> Deleting child %d \n",CkMyNode(),thisgroup.idx,deletedChild));
02962     for(int i=0;i<numKids;i++){
02963         if(kids[i] == deletedChild){
02964             kids.erase(kids.begin() + i);
02965             break;
02966         }
02967     }
02968     numKids = kids.size();
02969     finishReduction();
02970 }
02971 
02972 void CkNodeReductionMgr::DeleteNewChild(int deletedChild){
02973     for(int i=0;i<(int)(newKids.size());i++){
02974         if(newKids[i] == deletedChild){
02975             newKids.erase(newKids.begin() + i);
02976             break;
02977         }
02978     }
02979     DEBREVAC(("[%d]%d> Deleting  new child %d readyDeletion %d newKids %d -> ",CkMyNode(),thisgroup.idx,deletedChild,readyDeletion,newKids.size()));
02980     for(int i=0;i<(int)(newKids.size());i++){
02981         DEBREVAC(("%d ",newKids[i]));
02982     }
02983     DEBREVAC(("\n"));
02984     finishReduction();
02985 }
02986 
02987 int CkNodeReductionMgr::findMaxRedNo(){
02988     int max = redNo;
02989     for(int i=0;i<futureRemoteMsgs.length();i++){
02990         if(futureRemoteMsgs[i]->redNo  > max){
02991             max = futureRemoteMsgs[i]->redNo;
02992         }
02993     }
02994     
02995 
02996 
02997  
02998     if(redNo == max && msgs.length() == 0){
02999         DEBREVAC(("[%d] Redn %d has not received any contributions \n",CkMyNode(),max));
03000         max--;
03001     }
03002     return max;
03003 }
03004 #endif //CMK_FAULT_EVAC
03005 
03006 #include "CkReduction.def.h"