00001 
00002 #ifndef SRTABLE_H
00003 #define SRTABLE_H
00004 #include "pose.h"
00005 
00006 #define MAX_B 10
00007 
00008 
00009 
00010 class UpdateMsg; 
00011 
00013 
00014 class SRentry {
00015  public:
00017   POSE_TimeType timestamp;  
00019   int sends;
00021   int recvs;
00023   SRentry *next;
00025 
00026   SRentry() :timestamp(POSE_UnsetTS), sends(0), recvs(0), next(NULL) 
00027     {
00028     }
00030 
00031   SRentry(POSE_TimeType ts, SRentry *p) :timestamp(ts), sends(0), recvs(0), next(p) 
00032     { 
00033     }
00035 
00037   SRentry(POSE_TimeType ts, int sr, SRentry *p) :timestamp(ts), sends(0), recvs(0), next(p) 
00038     {
00039       if (sr == SEND) { sends = 1; recvs = 0; }
00040       else { sends = 0; recvs = 1; }
00041     }
00043 
00044   SRentry(POSE_TimeType ts, int sr) :timestamp(ts), sends(0), recvs(0), next(NULL) 
00045     {
00046       if (sr == SEND) { sends = 1; recvs = 0; }
00047       else { sends = 0; recvs = 1; }
00048     }
00050 
00052   void pup(PUP::er &p) {
00053     p|timestamp; p|sends; p|recvs;
00054     int nullFlag;
00055     if (next == NULL) {
00056       nullFlag = 1;
00057     } else {
00058       nullFlag = 0;
00059     }
00060     p|nullFlag;
00061     if (p.isUnpacking()) {
00062       if (nullFlag) {
00063     next = NULL;
00064       } else {
00065     next = new SRentry();
00066     next->pup(p);
00067       }
00068     } else {
00069       if (!nullFlag) {
00070     next->pup(p);
00071       }
00072     }
00073   }
00075   inline SRentry& operator=(const SRentry& e) {
00076     timestamp = e.timestamp;
00077     sends = e.sends;
00078     recvs = e.recvs;
00079     return *this;
00080   }
00082   void dump() {
00083     if (next)
00084       CkPrintf("TS:%d #s:%d #r:%d n:!NULL ", timestamp, sends, recvs); 
00085     else CkPrintf("TS:%d #s:%d #r:%d n:NULL ",timestamp, sends, recvs);
00086   }
00088   char *dumpString();
00090   void sanitize() {
00091     CmiAssert(timestamp >= POSE_UnsetTS); 
00092     CmiAssert(sends >= 0);  
00093     CmiAssert(recvs >= 0);  
00094     if (next == NULL) return;   
00095     
00096     POSE_TimeType test_ts = next->timestamp;
00097     int test_sendCount = next->sends;
00098     int test_recvCount = next->recvs;
00099     SRentry *test_next = next->next;
00100     next->timestamp = test_ts;
00101     next->sends = test_sendCount;
00102     next->recvs = test_recvCount;
00103     next->next = test_next;
00104   }
00105 };
00106 
00108 
00109 class SRtable {
00110  public:
00112 
00113   POSE_TimeType offset;
00115 
00116   int b;
00118 
00119   int size_b;
00121 
00122   SRentry *buckets[MAX_B];
00124 
00125   SRentry *end_bucket[MAX_B];
00127   int sends[MAX_B], recvs[MAX_B], ofSends, ofRecvs;
00129 
00130   SRentry *overflow;
00132   SRentry *end_overflow;
00134 
00135   int numEntries[MAX_B];
00137 
00138   int numOverflow;
00139   
00141 
00142   SRtable();
00144   ~SRtable() { FreeTable(); }
00146   void pup(PUP::er &p) {
00147     p|offset; p|b; p|size_b; p|numOverflow;
00148     PUParray(p, sends, MAX_B);
00149     PUParray(p, recvs, MAX_B);
00150     p|ofSends; p|ofRecvs;
00151     PUParray(p, numEntries, MAX_B);
00152 
00153     
00154     int nullFlag;
00155     SRentry *tmp;
00156     for (int i = 0; i < MAX_B; i++) {
00157       if (buckets[i] == NULL) {
00158     nullFlag = 1;
00159       } else {
00160     nullFlag = 0;
00161       }
00162       p|nullFlag;
00163       if (p.isUnpacking()) {  
00164     if (nullFlag) {
00165       buckets[i] = end_bucket[i] = NULL;
00166     } else {
00167       buckets[i] = new SRentry();
00168       buckets[i]->pup(p);
00169     }
00170       } else {  
00171     if (!nullFlag) {
00172       buckets[i]->pup(p);
00173     }
00174       }
00175     }
00176 
00177     
00178     if (overflow == NULL) {
00179       nullFlag = 1;
00180     } else {
00181       nullFlag = 0;
00182     }
00183     p|nullFlag;
00184     if (p.isUnpacking()) {  
00185       if (nullFlag) {
00186     overflow = end_overflow = NULL;
00187       } else {
00188     overflow = new SRentry();
00189     tmp = overflow;
00190     do {
00191       tmp->pup(p);
00192       
00193       
00194       
00195       
00196       
00197       
00198       if (tmp->next) {
00199         tmp->next = new SRentry();
00200       } else {
00201         
00202         
00203         end_overflow = tmp;
00204       }
00205       tmp = tmp->next;
00206     } while (tmp);
00207       }
00208     } else {  
00209       if (!nullFlag) {
00210     tmp = overflow;
00211     while (tmp) {
00212       tmp->pup(p);
00213       tmp = tmp->next;
00214     }
00215       }
00216     }
00217   }
00219   void Initialize();
00221   void Insert(POSE_TimeType ts, int sr) {
00222 #ifdef SR_SANITIZE
00223     sanitize();
00224 #endif
00225     CmiAssert(ts >= offset);
00226     CmiAssert((sr == 0) || (sr == 1));
00227     if (size_b == -1) size_b = 1 + ts/b;
00228     POSE_TimeType destBkt = (ts-offset)/size_b;  
00229     SRentry *e = new SRentry(ts, sr, NULL);
00230     if (destBkt >= b) { 
00231       if (overflow) {
00232     if (end_overflow->timestamp == ts) { 
00233       if (sr == SEND) end_overflow->sends++;
00234       else end_overflow->recvs++;
00235       delete e;
00236     }
00237     else { 
00238       end_overflow->next = e;
00239       end_overflow = e;
00240     }
00241       }
00242       else overflow = end_overflow = e;
00243       if (sr == SEND) ofSends++;
00244       else ofRecvs++;
00245     }
00246     else { 
00247       if (buckets[destBkt]) {
00248     if (end_bucket[destBkt]->timestamp == ts) { 
00249       
00250       if (sr == SEND) end_bucket[destBkt]->sends++;
00251       else end_bucket[destBkt]->recvs++;
00252       delete e;
00253     }
00254     else { 
00255       end_bucket[destBkt]->next = e;
00256       end_bucket[destBkt] = e;
00257     }
00258       }
00259       else buckets[destBkt] = end_bucket[destBkt] = e;
00260       if (sr == SEND) sends[destBkt]++;
00261       else recvs[destBkt]++;
00262     }
00263 #ifdef SR_SANITIZE
00264     sanitize();
00265 #endif
00266   } 
00268   void Insert(SRentry *e) {
00269 #ifdef SR_SANITIZE
00270     sanitize();
00271 #endif
00272     CmiAssert(e != NULL);
00273     CmiAssert(e->timestamp >= offset);
00274     POSE_TimeType destBkt = (e->timestamp-offset)/size_b;
00275     e->next = NULL;
00276     if (destBkt >= b) { 
00277       if (overflow) {
00278     end_overflow->next = e;
00279     end_overflow = e;
00280       }
00281       else overflow = end_overflow = e;
00282       ofSends += e->sends;
00283       ofRecvs += e->recvs;
00284     }
00285     else { 
00286       if (buckets[destBkt]) {
00287     end_bucket[destBkt]->next = e;
00288     end_bucket[destBkt] = e;
00289       }
00290       else buckets[destBkt] = end_bucket[destBkt] = e;
00291       sends[destBkt] += e->sends;
00292       recvs[destBkt] += e->recvs;
00293     }
00294 #ifdef SR_SANITIZE
00295     sanitize();
00296 #endif
00297   }
00299 
00301   void Restructure(POSE_TimeType newGVTest, POSE_TimeType firstTS,int firstSR);
00303   void MapToBuckets(SRentry *bkt, SRentry *endBkt, int *s, int *r);
00305   UpdateMsg *PackTable(POSE_TimeType pvt, POSE_TimeType *maxSRb);
00307   void SortTable();
00309   void CompressAndSortBucket(POSE_TimeType i, int is_overflow);
00311   void FreeTable();
00313   void dump();
00315   char *dumpString();
00317   void sanitize();
00319   void self_test();
00320 };
00321 
00322 #endif