00001 
00002 #include "pose.h"
00003 
00005 eventQueue::eventQueue()
00006 {
00007   lastLoggedVT = 0;
00008   Event *e;
00009   eqh = new EqHeap();  
00010   largest = POSE_UnsetTS;
00011   mem_usage = 0;
00012   eventCount = 0;
00013   tsOfLastInserted = 0;
00014   
00015   e = new Event();
00016   e->timestamp = POSE_UnsetTS;
00017   e->done = -1;
00018   e->fnIdx = -99;
00019   e->msg = NULL;
00020   e->commitBfr = NULL;
00021   e->spawnedList = NULL;
00022   e->commitBfrLen = 0;
00023   e->next = e->prev = NULL;
00024   frontPtr = e;
00025   
00026   e = new Event();
00027   e->timestamp=POSE_UnsetTS;
00028   e->done = -1;
00029   e->fnIdx = -100;
00030   e->msg = NULL;
00031   e->commitBfr = NULL;
00032   e->spawnedList = NULL;
00033   e->commitBfrLen = 0;
00034   e->next = e->prev = NULL;
00035   currentPtr = backPtr = e; 
00036   
00037   frontPtr->next = backPtr;
00038   backPtr->prev = frontPtr;
00039   RBevent = NULL;
00040   
00041   recentAvgEventSparsity = 1;
00042   sparsityStartTime = 0;
00043   sparsityCalcCount = 0;
00044   tsDiffCount = 0;
00045   lastCommittedTS = 0;
00046   for (int i = 0; i < DIFFS_TO_STORE; i++) {
00047     tsCommitDiffs[i] = 0;
00048   }
00049 #ifdef MEM_TEMPORAL
00050   localTimePool = (TimePool *)CkLocalBranch(TempMemID);
00051 #endif
00052 #ifdef EQ_SANITIZE
00053   sanitize();
00054 #endif
00055 }
00056 
00058 eventQueue::~eventQueue()
00059 {
00060   Event *tmp1 = frontPtr, *tmp2 = frontPtr->next;
00061   while (tmp2) {
00062     free(tmp1);
00063     tmp1 = tmp2;
00064     tmp2 = tmp1->next;
00065   }
00066   free(tmp1);
00067   delete eqh;
00068 }
00069 
00071 void eventQueue::InsertEvent(Event *e)
00072 {
00073   tsOfLastInserted = e->timestamp;
00074   if(pose_config.deterministic)
00075     {
00076       InsertEventDeterministic(e);
00077     }
00078   else
00079     {
00080 #ifdef EQ_SANITIZE
00081       sanitize();
00082 #endif
00083       Event *tmp = backPtr->prev; 
00084 
00085       if (e->timestamp > largest) largest = e->timestamp;
00086       eventCount++;
00087       
00088       
00089       
00090       
00091       if ((tmp->timestamp < e->timestamp || (tmp->timestamp == e->timestamp && tmp->evID < e->evID)) && (currentPtr != backPtr))
00092     eqh->InsertEvent(e); 
00093       else { 
00094     if ((currentPtr != backPtr) && (currentPtr->timestamp > e->timestamp))
00095       tmp = currentPtr; 
00096     while (tmp->timestamp > e->timestamp || (tmp->timestamp == e->timestamp && tmp->evID > e->evID)) 
00097       tmp = tmp->prev;
00098     
00099     e->prev = tmp;
00100     e->next = tmp->next;
00101     e->next->prev = e;
00102     tmp->next = e;
00103     
00104     if ((currentPtr->prev == e) && (currentPtr->done < 1))
00105       currentPtr = currentPtr->prev;
00106     else if ((currentPtr == backPtr) || (e->timestamp < currentPtr->timestamp || (e->timestamp == currentPtr->timestamp && e->evID < currentPtr->evID)))
00107       SetRBevent(e);
00108       }
00109 #ifdef EQ_SANITIZE
00110       sanitize();
00111 #endif
00112     }
00113 }
00114 
00116 
00118 void eventQueue::InsertEventDeterministic(Event *e)
00119 {
00120   Event *tmp = backPtr->prev; 
00121   if (e->timestamp > largest) largest = e->timestamp;
00122   eventCount++;
00123   
00124   
00125   
00126   
00127   if ((tmp->timestamp < e->timestamp || (tmp->timestamp == e->timestamp && tmp->evID < e->evID)) && (currentPtr != backPtr))
00128     eqh->InsertEvent(e); 
00129   else { 
00130     if ((currentPtr != backPtr) && (currentPtr->timestamp > e->timestamp))
00131       tmp = currentPtr; 
00132     while (tmp->timestamp > e->timestamp || (tmp->timestamp == e->timestamp && tmp->evID > e->evID)) 
00133       tmp = tmp->prev;
00134     
00135     if (tmp->timestamp == e->timestamp) 
00136       while ((tmp->timestamp == e->timestamp) && (e->evID < tmp->evID))
00137     tmp = tmp->prev;
00138     
00139     e->prev = tmp;
00140     e->next = tmp->next;
00141     e->next->prev = e;
00142     tmp->next = e;
00143     
00144     if ((currentPtr->prev == e) && (currentPtr->done < 1))
00145       currentPtr = currentPtr->prev;
00146     else if ((currentPtr == backPtr) || 
00147          ((e->timestamp < currentPtr->timestamp) ||
00148           ((e->timestamp == currentPtr->timestamp) && 
00149            (e->evID < currentPtr->evID))))
00150       SetRBevent(e);
00151   }
00152 #ifdef EQ_SANITIZE
00153   sanitize();
00154 #endif
00155 }
00156 
00157 void eventQueue::CommitStatsHelper(sim *obj, Event *commitPtr) {
00158 #if !CMK_TRACE_DISABLED
00159   localStat *localStats = (localStat *)CkLocalBranch(theLocalStats);
00160   if (pose_config.stats) {
00161     localStats->Commit();
00162   }
00163 
00164   if (pose_config.dop) {
00165     
00166     
00167     if (lastLoggedVT >= commitPtr->svt) {
00168       commitPtr->svt = commitPtr->evt = -1;
00169     } else {
00170       lastLoggedVT = commitPtr->evt;
00171     }
00172     localStats->WriteDopData(commitPtr->srt, commitPtr->ert, commitPtr->svt, commitPtr->evt);
00173     localStats->SetMaximums(commitPtr->evt, commitPtr->ert);
00174   }
00175 #endif
00176 
00177   
00178   if (obj->myStrat->STRAT_T == ADAPT5_T) {
00179 
00180     sparsityCalcCount++;
00181     
00182     if (sparsityCalcCount >= EVQ_SPARSE_CALC_PERIOD) {
00183       recentAvgEventSparsity = (int)((commitPtr->timestamp - sparsityStartTime) / sparsityCalcCount);
00184       if (recentAvgEventSparsity < 1) {
00185     recentAvgEventSparsity = 1;
00186       }
00187       ((adapt5 *)obj->myStrat)->setRecentAvgEventSparsity(recentAvgEventSparsity);
00188       sparsityCalcCount = 0;
00189       sparsityStartTime = commitPtr->timestamp;
00190     }
00191 
00192     
00193     POSE_TimeType diff = commitPtr->timestamp - lastCommittedTS;
00194     lastCommittedTS = commitPtr->timestamp;
00195     for (int i = 0; i < DIFFS_TO_STORE; i++) {
00196       if (diff > tsCommitDiffs[i]) {
00197     
00198     for (int j = DIFFS_TO_STORE - 2; j > i; j--) {
00199       tsCommitDiffs[j+1] = tsCommitDiffs[j];
00200     }
00201     if (i < DIFFS_TO_STORE - 1) {
00202       
00203       tsCommitDiffs[i+1] = tsCommitDiffs[i];
00204     }
00205     tsCommitDiffs[i] = diff;
00206     break;
00207       }
00208     }
00209     tsDiffCount++;
00210 
00211     
00212     if (tsDiffCount >= TS_DIFF_WIN_SIZE) {
00213       POSE_TimeType totalDiff = 0;
00214       for (int i = HIGHEST_DIFFS_TO_IGNORE; i < DIFFS_TO_STORE; i++) {
00215     totalDiff += tsCommitDiffs[i];
00216       }
00217       POSE_TimeType avgDiff = totalDiff / NUM_DIFFS_TO_AVERAGE;
00218       ((adapt5 *)obj->myStrat)->setTimeLeash(DIFFS_TO_STORE * avgDiff);
00219       tsDiffCount = 0;
00220       for (int i = 0; i < DIFFS_TO_STORE; i++) {
00221     tsCommitDiffs[i] = 0;
00222       }
00223     }
00224 
00225   }
00226 
00227 }
00228 
00230 void eventQueue::CommitEvents(sim *obj, POSE_TimeType ts)
00231 {
00232 #ifdef EQ_SANITIZE
00233   sanitize();
00234 #endif
00235   Event *target = frontPtr->next, *commitPtr = frontPtr->next;
00236   if (ts == POSE_endtime) {
00237     CommitAll(obj);
00238 #ifdef MEM_TEMPORAL
00239     localTimePool->set_min_time(ts);
00240     localTimePool->empty_recycle_bin();
00241 #endif
00242     return;
00243   }
00244 
00245   
00246   if (obj->objID->usesAntimethods()) {
00247     while ((commitPtr->timestamp < ts) && (commitPtr != backPtr) 
00248        && (commitPtr != currentPtr)) {
00249       CmiAssert(commitPtr->done == 1); 
00250       obj->ResolveCommitFn(commitPtr->fnIdx, commitPtr->msg); 
00251       obj->basicStats[0]++;
00252       CommitStatsHelper(obj, commitPtr);
00253       if (commitPtr->commitBfrLen > 0)  { 
00254     CkPrintf("%s", commitPtr->commitBfr);
00255     if (commitPtr->commitErr) CmiAbort("Commit ERROR");
00256       }
00257       commitPtr = commitPtr->next;
00258     }
00259   }
00260   else {
00261     while ((target != backPtr) && (target->timestamp < ts) && 
00262        (target != currentPtr)) { 
00263       while (commitPtr != target) { 
00264     CmiAssert(commitPtr->done == 1); 
00265     obj->ResolveCommitFn(commitPtr->fnIdx, commitPtr->msg);
00266     obj->basicStats[0]++;
00267     CommitStatsHelper(obj, commitPtr);
00268     if (commitPtr->commitBfrLen > 0)  { 
00269       CkPrintf("%s", commitPtr->commitBfr);
00270       if (commitPtr->commitErr) CmiAbort("Commit ERROR");
00271     }
00272     commitPtr = commitPtr->next;
00273       }
00274       
00275       target = target->next;
00276 #ifdef MEM_TEMPORAL      
00277       while (!target->serialCPdata && (target->timestamp <ts) && (target != backPtr))
00278 #else
00279       while (!target->cpData && (target->timestamp <ts) && (target != backPtr))
00280 #endif
00281     target = target->next;
00282     }
00283   }
00284   
00285   Event *link = commitPtr;
00286   commitPtr = commitPtr->prev;
00287   while (commitPtr != frontPtr) {
00288 #ifdef MEM_TEMPORAL
00289     if (commitPtr->serialCPdata) {
00290       localTimePool->tmp_free(commitPtr->timestamp, commitPtr->serialCPdata);
00291 #else
00292     if (commitPtr->cpData) {
00293       delete commitPtr->cpData;
00294 #endif
00295     }
00296     commitPtr = commitPtr->prev;
00297     delete commitPtr->next;
00298     mem_usage--;
00299   }
00300   frontPtr->next = link;
00301   link->prev = frontPtr;
00302 #ifdef EQ_SANITIZE
00303   sanitize();
00304 #endif
00305 }
00306 
00308 void eventQueue::CommitAll(sim *obj)
00309 {
00310 #ifdef EQ_SANITIZE
00311   sanitize();
00312 #endif
00313   Event *commitPtr = frontPtr->next;
00314   
00315   
00316   while (commitPtr != backPtr) {
00317     if (commitPtr->done) {
00318       obj->ResolveCommitFn(commitPtr->fnIdx, commitPtr->msg);
00319       obj->basicStats[0]++;
00320       CommitStatsHelper(obj, commitPtr);
00321       if (commitPtr->commitBfrLen > 0)  { 
00322     CkPrintf("%s", commitPtr->commitBfr);
00323     if (commitPtr->commitErr) CmiAbort("Commit ERROR");
00324       }
00325     }
00326     commitPtr = commitPtr->next;
00327   }
00328 
00329   
00330   
00331   Event *link = commitPtr;
00332   commitPtr = commitPtr->prev;
00333   while (commitPtr != frontPtr) {
00334 #ifdef MEM_TEMPORAL
00335     if (commitPtr->serialCPdata) {
00336       localTimePool->tmp_free(commitPtr->timestamp, commitPtr->serialCPdata);
00337     }
00338 #else
00339     if (commitPtr->cpData) {
00340       delete commitPtr->cpData;
00341     }
00342 #endif
00343     commitPtr = commitPtr->prev;
00344     mem_usage--;
00345     delete commitPtr->next;
00346   }
00347   frontPtr->next = link;
00348   link->prev = frontPtr; 
00349 #ifdef EQ_SANITIZE
00350   sanitize();
00351 #endif
00352 }
00353 
00355 void eventQueue::CommitDoneEvents(sim *obj) {
00356 #ifdef EQ_SANITIZE
00357   sanitize();
00358 #endif
00359   Event *commitPtr = frontPtr->next;
00360   
00361   
00362   while (commitPtr != backPtr) {
00363     if (commitPtr->done) {
00364       obj->ResolveCommitFn(commitPtr->fnIdx, commitPtr->msg);
00365       obj->basicStats[0]++;
00366       CommitStatsHelper(obj, commitPtr);
00367       if (commitPtr->commitBfrLen > 0)  { 
00368     CkPrintf("%s", commitPtr->commitBfr);
00369     if (commitPtr->commitErr) CmiAbort("Commit ERROR");
00370       }
00371     }
00372     commitPtr = commitPtr->next;
00373   }
00374 
00375   
00376   Event *link = commitPtr;
00377   commitPtr = commitPtr->prev;
00378   while (commitPtr != frontPtr) {
00379     if (commitPtr->done == 1) {
00380 #ifdef MEM_TEMPORAL
00381       if (commitPtr->serialCPdata) {
00382     localTimePool->tmp_free(commitPtr->timestamp, commitPtr->serialCPdata);
00383       }
00384 #else
00385       if (commitPtr->cpData) {
00386     delete commitPtr->cpData;
00387       }
00388 #endif
00389     }
00390     commitPtr = commitPtr->prev;
00391     if (commitPtr->next->done == 1) {
00392       mem_usage--;
00393       delete commitPtr->next;
00394     } else {
00395       link = commitPtr->next;
00396     }
00397   }
00398   frontPtr->next = link;
00399   link->prev = frontPtr; 
00400 #ifdef EQ_SANITIZE
00401   sanitize();
00402 #endif
00403 }
00404 
00406 void eventQueue::SetCurrentPtr(Event *e) { 
00407   Event *tmp = e;
00408   
00409   CmiAssert((e->done == 0) || (e->done == -1)); 
00410   CmiAssert(currentPtr->done != 2);
00411   currentPtr = e; 
00412   if ((currentPtr == backPtr) && (eqh->top)) { 
00413     
00414     tmp = eqh->GetAndRemoveTopEvent();
00415     tmp->next = currentPtr;
00416     tmp->prev = currentPtr->prev;
00417     currentPtr->prev->next = tmp;
00418     currentPtr->prev = tmp;
00419     currentPtr = tmp;
00420   }
00421 }
00422 
00424 void eventQueue::DeleteEvent(Event *ev) 
00425 {
00426 #ifdef EQ_SANITIZE
00427   sanitize();
00428 #endif
00429   Event *tmp;
00430   CmiAssert(ev != currentPtr);
00431   CmiAssert(ev->spawnedList == NULL);
00432   CmiAssert(ev != frontPtr);
00433   CmiAssert(ev != backPtr);
00434   
00435   if (RBevent == ev) {
00436     RBevent = NULL;
00437     tmp = ev->next;
00438     while ((tmp != currentPtr) && (tmp != backPtr) && (tmp->done == 1))
00439       tmp = tmp->next;
00440     if ((tmp != currentPtr) && (tmp != backPtr) && (tmp->done == 0))
00441       RBevent = tmp;
00442   }
00443   
00444   ev->prev->next = ev->next;
00445   ev->next->prev = ev->prev;
00446   POSE_TimeType ts = ev->timestamp;
00447   if (!ev->done) eventCount--;
00448   else mem_usage--;
00449   delete ev; 
00450   if (ts == largest) FindLargest();
00451 #ifdef EQ_SANITIZE
00452   sanitize();
00453 #endif
00454 }
00455 
00457 void eventQueue::dump()
00458 {
00459   Event *e = frontPtr;
00460   CkPrintf("[EVENTQUEUE: \n");
00461   while (e) {
00462 #if USE_LONG_TIMESTAMPS
00463     CkPrintf("%lld[", e->timestamp); e->evID.dump(); CkPrintf(".%d", e->done); CkPrintf("]");
00464 #else
00465     CkPrintf("%d[", e->timestamp); e->evID.dump(); CkPrintf("]");
00466 #endif
00467     if (e == frontPtr) CkPrintf("(FP)");
00468     if (e == currentPtr) CkPrintf("(CP)");
00469     if (e == backPtr) CkPrintf("(BP)");
00470     CkPrintf(" ");
00471     e = e->next;
00472   }
00473   CkPrintf("\n");
00474   eqh->dump();
00475   CkPrintf("end EVENTQUEUE]\n");
00476 }
00477 
00479 char *eventQueue::dumpString() {
00480   Event *e = frontPtr;
00481   char *str= new char[PVT_DEBUG_BUFFER_LINE_LENGTH];
00482   char *tempStr= new char[PVT_DEBUG_BUFFER_LINE_LENGTH];
00483   snprintf(str, PVT_DEBUG_BUFFER_LINE_LENGTH, "[EVQ: ");
00484   while (e) {
00485 #if USE_LONG_TIMESTAMPS
00486     snprintf(tempStr, PVT_DEBUG_BUFFER_LINE_LENGTH, "%lld[%u.%d.%d]", e->timestamp, e->evID.id, e->evID.getPE(), e->done);
00487 #else
00488     sprintf(tempStr, PVT_DEBUG_BUFFER_LINE_LENGTH, "%d[%u.%d.%d]", e->timestamp, e->evID.id, e->evID.getPE(), e->done);
00489 #endif
00490     strncat(str, tempStr, 32);
00491 
00492     if (e == frontPtr) strncat(str, "(FP)", PVT_DEBUG_BUFFER_LINE_LENGTH);
00493     if (e == currentPtr) strncat(str, "(CP)", PVT_DEBUG_BUFFER_LINE_LENGTH);
00494     if (e == backPtr) strncat(str, "(BP)", PVT_DEBUG_BUFFER_LINE_LENGTH);
00495     strncat(str, " ", PVT_DEBUG_BUFFER_LINE_LENGTH);
00496     e = e->next;
00497   }
00498   char *eqs=eqh->dumpString();
00499   strncat(str, eqs, 32);
00500   delete [] tempStr;
00501   delete [] eqs;
00502   strncat(str, "end EVQ]", PVT_DEBUG_BUFFER_LINE_LENGTH);
00503   return str;
00504 }
00505 
00507 void eventQueue::pup(PUP::er &p) 
00508 {
00509   p|tsOfLastInserted; p|recentAvgEventSparsity;
00510   p|sparsityStartTime; p|sparsityCalcCount;
00511   Event *tmp;
00512   int i;
00513   int countlist = 0;
00514   if (p.isUnpacking()) { 
00515     p(countlist); 
00516     tmp = frontPtr; 
00517     for (i=0; i<countlist; i++) { 
00518       tmp->next = new Event;
00519       tmp->next->prev = tmp;
00520       tmp->next->next = NULL;
00521       tmp = tmp->next;
00522       tmp->pup(p);
00523     }
00524     tmp->next = backPtr; 
00525     backPtr->prev = tmp;
00526     currentPtr = backPtr; 
00527     if ((countlist > 0) && (backPtr->prev != frontPtr))  
00528       while ((currentPtr->prev->done == 0) && 
00529          (currentPtr->prev != frontPtr))
00530     currentPtr = currentPtr->prev;
00531     eqh->pup(p); 
00532   }
00533   else { 
00534     tmp = frontPtr->next;
00535     while (tmp != backPtr) { 
00536       countlist++;
00537       tmp = tmp->next;
00538     }
00539     p(countlist); 
00540     tmp = frontPtr->next;
00541     for (i=0; i<countlist; i++) { 
00542       tmp->pup(p);
00543       tmp = tmp->next;
00544     }
00545     eqh->pup(p); 
00546   }
00547 }
00548 
00550 void eventQueue::sanitize()
00551 {
00552   
00553   CmiAssert(frontPtr != NULL);
00554   CmiAssert(frontPtr->timestamp == POSE_UnsetTS);
00555   CmiAssert(frontPtr->done == -1);
00556   CmiAssert(frontPtr->fnIdx == -99);
00557   CmiAssert(frontPtr->msg == NULL);
00558   CmiAssert(frontPtr->commitBfr == NULL);
00559   CmiAssert(frontPtr->spawnedList == NULL);
00560   CmiAssert(frontPtr->next != NULL);
00561   CmiAssert(frontPtr->prev == NULL);
00562   CmiAssert(frontPtr->commitBfrLen == 0);
00563   CmiAssert(backPtr != NULL);
00564   CmiAssert(backPtr->timestamp == POSE_UnsetTS);
00565   CmiAssert(backPtr->done == -1);
00566   CmiAssert(backPtr->fnIdx == -100);
00567   CmiAssert(backPtr->msg == NULL);
00568   CmiAssert(backPtr->commitBfr == NULL);
00569   CmiAssert(backPtr->spawnedList == NULL);
00570   CmiAssert(backPtr->next == NULL);
00571   CmiAssert(backPtr->prev != NULL);
00572   CmiAssert(backPtr->commitBfrLen == 0);
00573 
00574   
00575   Event *tmp = frontPtr->next;
00576   while (tmp != backPtr) {
00577     CmiAssert(tmp->next != NULL);
00578     tmp->sanitize();
00579     tmp = tmp->next;
00580   }
00581 
00582   
00583   tmp = backPtr->prev;
00584   while (tmp != frontPtr) {
00585     CmiAssert(tmp->prev != NULL);
00586     tmp->sanitize();
00587     tmp = tmp->prev;
00588   }
00589 
00590   
00591   CmiAssert(currentPtr != NULL);
00592   
00593   
00594   tmp = currentPtr;
00595   while (tmp != backPtr) {
00596     CmiAssert(tmp->next != NULL);
00597     CmiAssert((tmp->next == backPtr) || 
00598           (tmp->timestamp <= tmp->next->timestamp));
00599     tmp = tmp->next;
00600   } 
00601   
00602   while (tmp != currentPtr) {
00603     CmiAssert(tmp->prev != NULL);
00604     tmp = tmp->prev;
00605   } 
00606   
00607   while (tmp != frontPtr) {
00608     CmiAssert(tmp->prev != NULL);
00609     tmp = tmp->prev;
00610   } 
00611   
00612   while (tmp != currentPtr) {
00613     CmiAssert(tmp->next != NULL);
00614     tmp = tmp->next;
00615   } 
00616 
00617   
00618   if ((frontPtr->next != backPtr) && (frontPtr->next->done))
00619 #ifdef MEM_TEMPORAL
00620     CmiAssert(frontPtr->next->serialCPdata);
00621 #else
00622     CmiAssert(frontPtr->next->cpData);
00623 #endif
00624 
00625   
00626   tmp = frontPtr->next;
00627   while ((tmp != currentPtr) && (tmp->done == 1))
00628     tmp = tmp->next;
00629   if (tmp == currentPtr) CmiAssert(RBevent == NULL);
00630   else CmiAssert((RBevent == NULL) || (tmp == RBevent));
00631 
00632   
00633   eqh->sanitize();
00634 }