00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 #ifndef _CKREDUCTION_H
00019 #define _CKREDUCTION_H
00020 
00021 #include "CkReduction.decl.h"
00022 
00023 #ifdef _PIPELINED_ALLREDUCE_
00024 #define FRAG_SIZE 131072
00025 #define FRAG_THRESHOLD 131072
00026 #endif
00027 
00028 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00029 #define MAX_INT 5000000
00030 #define _MLOG_REDUCE_P2P_ 0
00031 #endif
00032 
00033 
00034 
00035 class CkGroupCallbackMsg:public CMessage_CkGroupCallbackMsg {
00036 public:
00037     typedef void (*callbackType)(void *param);
00038     CkGroupCallbackMsg(callbackType Ncallback,void *Nparam)
00039         {callback=Ncallback;param=Nparam;}
00040     void call(void) {(*callback)(param);}
00041 private:
00042     callbackType callback;
00043     void *param;
00044 };
00045 
00046 class CkGroupInitCallback : public IrrGroup {
00047 public:
00048     CkGroupInitCallback(void);
00049     CkGroupInitCallback(CkMigrateMessage *m):IrrGroup(m) {}
00050     void callMeBack(CkGroupCallbackMsg *m);
00051     void pup(PUP::er& p){ IrrGroup::pup(p); }
00052 };
00053 
00054 
00055 class CkGroupReadyCallback : public IrrGroup {
00056 private:
00057   bool _isReady;
00058   CkQ<CkGroupCallbackMsg *> _msgs;
00059   void callBuffered(void);
00060 public:
00061     CkGroupReadyCallback(void);
00062     CkGroupReadyCallback(CkMigrateMessage *m):IrrGroup(m) {}
00063     void callMeBack(CkGroupCallbackMsg *m);
00064     bool isReady(void) { return _isReady; }
00065 protected:
00066     void setReady(void) {_isReady = true; callBuffered(); }
00067     void setNotReady(void) {_isReady = false; }
00068 };
00069 
00070 class CkReductionNumberMsg:public CMessage_CkReductionNumberMsg {
00071 public:
00072   int num;
00073   CkReductionNumberMsg(int n) {num=n;}
00074 };
00075 
00076 
00077 class CkReductionInactiveMsg:public CMessage_CkReductionInactiveMsg {
00078   public:
00079     int id, redno;
00080     CkReductionInactiveMsg(int i, int r) {id=i; redno = r;}
00081 };
00082 
00083 
00085 class contributorInfo {
00086 public:
00087     int redNo;
00088     contributorInfo() {redNo=0;}
00089     inline void pup(PUP::er& p) { 
00090         p((char *)this, sizeof(contributorInfo));
00091     }
00092 };
00093 PUPbytes(contributorInfo)
00094 
00095 class countAdjustment {
00096 public:
00097   int gcount;
00098   int lcount;
00099   countAdjustment(int ignored=0) {(void)ignored; gcount=0; lcount=0;}
00100   inline void pup(PUP::er& p) { 
00101     p((char *)this, sizeof(countAdjustment));
00102   }
00103 };
00104 PUPbytes(countAdjustment)
00105 
00106 
00109 namespace ck { namespace impl { class XArraySectionReducer; } }
00110 
00111 
00112 
00113 class CkReduction {
00114 public:
00115     
00116 
00117 
00118         
00119 
00120 
00121 
00122 
00123 
00124 
00125 
00126   
00127     typedef enum {
00128     
00129         invalid=0,
00130                 nop,
00131     
00132         sum_char,sum_short,sum_int,sum_long,sum_long_long,
00133                 sum_uchar,sum_ushort,sum_uint,sum_ulong,
00134                 sum_ulong_long,sum_float,sum_double,
00135 
00136     
00137         product_char,product_short,product_int,product_long,product_long_long,
00138                 product_uchar,product_ushort,product_uint,product_ulong,
00139                 product_ulong_long,product_float,product_double,
00140 
00141     
00142         max_char,max_short,max_int,max_long,max_long_long,
00143                 max_uchar,max_ushort,max_uint,max_ulong,
00144                 max_ulong_long,max_float,max_double,
00145 
00146     
00147         min_char,min_short,min_int,min_long,min_long_long,
00148                 min_uchar,min_ushort,min_uint,min_ulong,
00149                 min_ulong_long,min_float,min_double,
00150 
00151     
00152     
00153         logical_and, 
00154                 logical_and_int,logical_and_bool,
00155 
00156     
00157     
00158         logical_or, 
00159                 logical_or_int,logical_or_bool,
00160 
00161     
00162     
00163     
00164                 logical_xor_int,logical_xor_bool,
00165 
00166                 
00167                 bitvec_and, 
00168                 bitvec_and_int,bitvec_and_bool,
00169 
00170                 
00171                 bitvec_or, 
00172                 bitvec_or_int,bitvec_or_bool,
00173 
00174                 
00175                 bitvec_xor, 
00176                 bitvec_xor_int,bitvec_xor_bool,
00177 
00178     
00179         random,
00180 
00181     
00182         concat,
00183 
00184     
00185     
00186         set,
00187 
00188         
00189         statistics,
00190 
00191         
00192         tuple,
00193 
00194         
00195         external_py
00196     } reducerType;
00197 
00198     
00199     
00200     class setElement {
00201     public:
00202             int dataSize;
00203             char data[1];
00204         
00205         
00206         setElement *next(void);
00207     };
00208 
00209     
00210     struct statisticsElement {
00211         int count;
00212         double mean;
00213         double m2;
00214         statisticsElement(double initialValue);
00215         double variance() const { return count > 1 ? m2 / (double(count) - 1.0) : 0.0; }
00216         double stddev() const { return sqrt(variance()); }
00217     };
00218 
00219     struct tupleElement {
00220         size_t dataSize;
00221         char* data;
00222         CkReduction::reducerType reducer;
00223         bool owns_data;
00224         tupleElement();
00225         tupleElement(size_t dataSize, void* data, CkReduction::reducerType reducer);
00226         tupleElement(CkReduction::tupleElement&& rhs_move);
00227         tupleElement& operator=(CkReduction::tupleElement&& rhs_move);
00228         ~tupleElement();
00229 
00230         inline void* getData(void) { return data; }
00231         void pup(PUP::er &p);
00232     };
00233 
00234 
00235     
00236     
00237     
00238     
00239     typedef CkReductionMsg *(*reducerFn)(int nMsg,CkReductionMsg **msgs);
00240 
00241   struct reducerStruct {
00242     reducerFn fn;
00243     bool streamable;
00244 #if CMK_ERROR_CHECKING
00245     const char *name; 
00246 #endif
00247     reducerStruct(reducerFn f=NULL, bool s=false, const char *n=NULL) : fn(f), streamable(s)
00248 #if CMK_ERROR_CHECKING
00249                   ,name(n)
00250 #endif
00251     {}
00252   };
00253 
00254     
00255     
00256     static reducerType addReducer(reducerFn fn, bool streamable=false, const char* name=NULL);
00257 
00258 private:
00259     friend class CkReductionMgr;
00260     friend class CkNodeReductionMgr;
00261     friend class CkMulticastMgr;
00262     friend class ck::impl::XArraySectionReducer;
00263 
00264 
00265     
00266     static std::vector<reducerStruct>& reducerTable();
00267     static std::vector<reducerStruct> initReducerTable();
00268 
00269     
00270     
00271     static CkReductionMsg* tupleReduction_fn(int nMsgs, CkReductionMsg** msgs);
00272 
00273     
00274     CkReduction();
00275 };
00276 PUPbytes(CkReduction::reducerType)
00277 
00278 
00279 
00280         
00281 
00282 
00283 
00284 
00285 
00286 
00287 
00288 struct CkReductionTypesExt {
00289     
00290     int nop = CkReduction::nop;
00291     
00292     int sum_char = CkReduction::sum_char;
00293     int sum_short = CkReduction::sum_short;
00294     int sum_int = CkReduction::sum_int;
00295     int sum_long = CkReduction::sum_long;
00296     int sum_long_long = CkReduction::sum_long_long;
00297     int sum_uchar = CkReduction::sum_uchar;
00298     int sum_ushort = CkReduction::sum_ushort;
00299     int sum_uint = CkReduction::sum_uint;
00300     int sum_ulong = CkReduction::sum_ulong;
00301     int sum_ulong_long = CkReduction::sum_ulong_long;
00302     int sum_float = CkReduction::sum_float;
00303     int sum_double = CkReduction::sum_double;
00304     
00305     int product_char = CkReduction::product_char;
00306     int product_short = CkReduction::product_short;
00307     int product_int = CkReduction::product_int;
00308     int product_long = CkReduction::product_long;
00309     int product_long_long = CkReduction::product_long_long;
00310     int product_uchar = CkReduction::product_uchar;
00311     int product_ushort = CkReduction::product_ushort;
00312     int product_uint = CkReduction::product_uint;
00313     int product_ulong = CkReduction::product_ulong;
00314     int product_ulong_long = CkReduction::product_ulong_long;
00315     int product_float = CkReduction::product_float;
00316     int product_double = CkReduction::product_double;
00317     
00318     int max_char = CkReduction::max_char;
00319     int max_short = CkReduction::max_short;
00320     int max_int = CkReduction::max_int;
00321     int max_long = CkReduction::max_long;
00322     int max_long_long = CkReduction::max_long_long;
00323     int max_uchar = CkReduction::max_uchar;
00324     int max_ushort = CkReduction::max_ushort;
00325     int max_uint = CkReduction::max_uint;
00326     int max_ulong = CkReduction::max_ulong;
00327     int max_ulong_long = CkReduction::max_ulong_long;
00328     int max_float = CkReduction::max_float;
00329     int max_double = CkReduction::max_double;
00330     
00331     int min_char = CkReduction::min_char;
00332     int min_short = CkReduction::min_short;
00333     int min_int = CkReduction::min_int;
00334     int min_long = CkReduction::min_long;
00335     int min_long_long = CkReduction::min_long_long;
00336     int min_uchar = CkReduction::min_uchar;
00337     int min_ushort = CkReduction::min_ushort;
00338     int min_uint = CkReduction::min_uint;
00339     int min_ulong = CkReduction::min_ulong;
00340     int min_ulong_long = CkReduction::min_ulong_long;
00341     int min_float = CkReduction::min_float;
00342     int min_double = CkReduction::min_double;
00343     
00344     int external_py = CkReduction::external_py;
00345 };
00346 
00347 extern "C" CkReductionTypesExt charm_reducers;
00348 
00349 
00350 
00351 class CkReductionMsg : public CMessage_CkReductionMsg
00352 {
00353     friend class CkReduction;
00354     friend class CkReductionMgr;
00355     friend class CkNodeReductionMgr;
00356     friend class CkMulticastMgr;
00357 #ifdef _PIPELINED_ALLREDUCE_
00358     friend class ArrayElement;
00359     friend class AllreduceMgr;
00360 #endif
00361     friend class ck::impl::XArraySectionReducer;
00362 public:
00363 
00364 
00365     
00366     
00367     static CkReductionMsg *buildNew(int NdataSize,const void *srcData,
00368         CkReduction::reducerType reducer=CkReduction::invalid,
00369                 CkReductionMsg *buf = NULL);
00370 
00371     inline int getLength() const {return dataSize;}
00372     inline int getSize() const {return dataSize;}
00373     inline void *getData() {return data;}
00374     inline const void *getData() const {return data;}
00375 
00376     inline int getGcount() const {return gcount;}
00377     inline CkReduction::reducerType getReducer() const {return reducer;}
00378     inline int getRedNo() const {return redNo;}
00379 
00380     inline CMK_REFNUM_TYPE getUserFlag() const {return userFlag;}
00381     inline void setUserFlag(CMK_REFNUM_TYPE f) { userFlag=f;}
00382 
00383     inline void setCallback(const CkCallback &cb) { callback=cb; }
00384 
00385     
00386     
00387     inline bool isFromUser() const {return sourceFlag==-1;}
00388 
00389     inline bool isMigratableContributor() const {return migratableContributor;}
00390     inline void setMigratableContributor(bool _mig){ migratableContributor = _mig;}
00391 
00392     
00393     static CkReductionMsg* buildFromTuple(CkReduction::tupleElement* reductions, int num_reductions);
00394     void toTuple(CkReduction::tupleElement** out_reductions, int* num_reductions);
00395 
00396     ~CkReductionMsg() {}
00397 
00398 
00399     
00400     static void *alloc(int msgnum, size_t size, int *reqSize, int priobits, GroupDepNum groupDepNum=GroupDepNum{});
00401     static void *pack(CkReductionMsg *);
00402     static CkReductionMsg *unpack(void *in);
00403 
00404 private:
00405     inline int nSources() const {return std::abs(sourceFlag);}
00406 
00407     
00408     CkReductionMsg() {}
00409 
00410 private:
00411     int dataSize;
00412     int sourceFlag;
00413 
00414 
00415 
00416 
00417 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_ )
00418     int sourceProcessorCount;
00419 #endif
00420     int fromPE;
00421     int redNo;
00422     int gcount;
00423     CkReduction::reducerType reducer;
00424     CMK_REFNUM_TYPE userFlag; 
00425     bool migratableContributor; 
00426         
00427         int8_t rebuilt;          
00428         int8_t nFrags;
00429         int8_t fragNo;      
00430                          
00431         CkSectionInfo sid;   
00432     CkCallback callback; 
00433 #if CMK_BIGSIM_CHARM
00434 public:
00435     
00436     void *event; 
00437     int eventPe; 
00438 private:
00439         void *log;
00440 #endif
00441     void *data;
00442     double dataStorage;
00443 };
00444 
00445 
00446 #define CK_REDUCTION_CONTRIBUTE_METHODS_DECL \
00447   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
00448     CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
00449   void contribute(int dataSize,const void *data,CkReduction::reducerType type, \
00450     const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1); \
00451   template <typename T> \
00452   void contribute(const std::vector<T> &data,CkReduction::reducerType type,            \
00453     const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1) \
00454   { contribute(sizeof(T)*data.size(), data.data(), type, cb, userFlag); }  \
00455   void contribute(CkReductionMsg *msg); \
00456   void contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);\
00457   void contribute(CMK_REFNUM_TYPE userFlag=(CMK_REFNUM_TYPE)-1);
00458 
00459 #define CK_BARRIER_CONTRIBUTE_METHODS_DECL \
00460   void barrier(const CkCallback &cb);\
00461 
00462 
00468 class CkNodeReductionMgr : public IrrGroup {
00469 public:
00470     CProxy_CkNodeReductionMgr thisProxy;
00471 public:
00472     CkNodeReductionMgr(void);
00473     CkNodeReductionMgr(CkMigrateMessage *m) : IrrGroup(m) {
00474           storedCallback = NULL;
00475         }
00476         ~CkNodeReductionMgr();
00477 
00478     typedef CkReductionClientFn clientFn;
00479 
00484     void ckSetReductionClient(CkCallback *cb);
00485 
00486 
00487 
00488 
00489     void contribute(contributorInfo *ci,CkReductionMsg *msg);
00490     void contributeWithCounter(contributorInfo *ci,CkReductionMsg *m,int count);
00491 
00492     
00493     void RecvMsg(CkReductionMsg *m);
00494     void doRecvMsg(CkReductionMsg *m);
00495     void LateMigrantMsg(CkReductionMsg *m);
00496 
00497     virtual void flushStates(); 
00498 
00499     virtual int getTotalGCount(){return 0;};
00500 
00501 private:
00502 
00503     
00504     CkCallback *storedCallback;
00505 
00506     int redNo;
00507     bool inProgress;
00508     bool creating;
00509     bool startRequested;
00510     int gcount;
00511     int lcount;
00512 
00513     
00514     int nContrib,nRemote;
00515     
00516     CkMsgQ<CkReductionMsg> msgs;
00517     
00518     CkMsgQ<CkReductionMsg> futureMsgs;
00519     
00520     CkMsgQ<CkReductionMsg> futureRemoteMsgs;
00521     
00522     CkMsgQ<CkReductionMsg> futureLateMigrantMsgs;
00523     
00524     
00525     CmiNodeLock lockEverything;
00526 
00527     bool interrupt; 
00528 
00529     
00530     std::vector<int> kids;
00531     
00532 
00533     void startReduction(int number,int srcPE);
00534     void doAddContribution(CkReductionMsg *m);
00535     void finishReduction(void);
00536 protected:  
00537     void addContribution(CkReductionMsg *m);
00538 
00539 private:
00540 
00541 
00542 
00543     unsigned upperSize;
00544     unsigned label;
00545     int parent;
00546     int numKids;
00547 
00548     void init_BinomialTree();
00549 
00550     void init_TopoTree();
00551     void init_BinaryTree();
00552     enum {TREE_WID=2};
00553     int treeRoot(void);
00554     bool hasParent(void);
00555     int treeParent(void);
00556     int firstKid(void);
00557     int treeKids(void);
00558 
00559     
00560     bool isPast(int num) const {return (bool)(num<redNo);}
00561     bool isPresent(int num) const {return (bool)(num==redNo);}
00562     bool isFuture(int num) const {return (bool)(num>redNo);}
00563 
00564 #if CMK_FAULT_EVAC
00565     bool oldleaf;
00566     bool blocked;
00567     int newParent;
00568     int additionalGCount,newAdditionalGCount; 
00569     std::vector<int> newKids;
00570     CkMsgQ<CkReductionMsg> bufferedMsgs;
00571     CkMsgQ<CkReductionMsg> bufferedRemoteMsgs;
00572     enum {OLDPARENT,OLDCHILDREN,NEWPARENT,LEAFPARENT};
00573     int numModificationReplies;
00574     int maxModificationRedNo;
00575     int tempModificationRedNo;
00576     bool readyDeletion;
00577     bool killed;
00578 #endif
00579     
00580 
00581  public:
00582     virtual void pup(PUP::er &p);
00583 #if CMK_FAULT_EVAC
00584     virtual void evacuate();
00585     virtual void doneEvacuate();
00586     void DeleteChild(int deletedChild);
00587     void DeleteNewChild(int deletedChild);
00588     void collectMaxRedNo(int maxRedNo);
00589     void unblockNode(int maxRedNo);
00590     void modifyTree(int code,int size,int *data);
00591 private:    
00592     int findMaxRedNo();
00593     void updateTree();
00594     void clearBlockedMsgs();
00595 #endif
00596 };
00597 
00598 
00599 
00600 
00601 class NodeGroup : public CkNodeReductionMgr {
00602   protected:
00603     contributorInfo reductionInfo;
00604   public:
00605     CmiNodeLock __nodelock;
00606     const int thisIndex;
00607     NodeGroup();
00608     NodeGroup(CkMigrateMessage* m):CkNodeReductionMgr(m),thisIndex(CkMyNode()) { __nodelock=CmiCreateLock(); }
00609     
00610     ~NodeGroup();
00611     inline const CkGroupID &ckGetGroupID(void) const {return thisgroup;}
00612     inline CkGroupID CkGetNodeGroupID(void) const {return thisgroup;}
00613     virtual bool isNodeGroup() { return true; }
00614 
00615     virtual void pup(PUP::er &p);
00616     virtual void flushStates() {
00617         CkNodeReductionMgr::flushStates();
00618         reductionInfo.redNo = 0;
00619     }
00620 
00621     CK_REDUCTION_CONTRIBUTE_METHODS_DECL
00622     void contributeWithCounter(CkReductionMsg *msg,int count);
00623 };
00624 
00625 
00626 class CkReductionMgr : public CkGroupInitCallback {
00627 public:
00628         CProxy_CkReductionMgr thisProxy;
00629 
00630 public:
00631     CkReductionMgr();
00632     CkReductionMgr(CkMigrateMessage *m);
00633         ~CkReductionMgr();
00634 
00635     typedef CkReductionClientFn clientFn;
00636 
00641     void ckSetReductionClient(CkCallback *cb);
00642 
00643 
00644 
00645 
00646 
00647     
00648     
00649     
00650     void creatingContributors(void);
00651     void doneCreatingContributors(void);
00652     
00653     void contributorStamped(contributorInfo *ci);
00654     void contributorCreated(contributorInfo *ci);
00655     void contributorDied(contributorInfo *ci);
00656     
00657     void contributorLeaving(contributorInfo *ci);
00658     
00659     void contributorArriving(contributorInfo *ci);
00660 
00661 
00662 
00663 
00664     void contribute(contributorInfo *ci,CkReductionMsg *msg);
00665 
00666 
00667     
00668     void ReductionStarting(CkReductionNumberMsg *m);
00669     
00670     void LateMigrantMsg(CkReductionMsg *m);
00671     
00672     void MigrantDied(CkReductionNumberMsg *m);
00673 
00674     void RecvMsg(CkReductionMsg *m);
00675   void AddToInactiveList(CkReductionInactiveMsg *m);
00676 
00677 
00678         void barrier(CkReductionMsg * msg);
00679         void Barrier_RecvMsg(CkReductionMsg *m);
00680         void addBarrier(CkReductionMsg *m);
00681         void finishBarrier(void);
00682 
00683     virtual bool isReductionMgr(void){ return true; }
00684     virtual void flushStates();
00685     
00686 
00687 
00688 
00689 
00690 
00691 
00692 
00693     int getGCount(){return gcount;};
00694 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00695     void decGCount(){gcount--;}
00696     void incNumImmigrantRecObjs(){
00697         numImmigrantRecObjs++;
00698     }
00699     void decNumImmigrantRecObjs(){
00700         numImmigrantRecObjs--;
00701     }
00702     void incNumEmigrantRecObjs(){
00703         numEmigrantRecObjs++;
00704     }
00705     void decNumEmigrantRecObjs(){
00706         numEmigrantRecObjs--;
00707     }
00708 
00709 #endif
00710 
00711         
00712     static CkReductionMsg *reduceMessages(CkMsgQ<CkReductionMsg> &msgs);
00713 
00714 private:
00715 
00716 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00717     int numImmigrantRecObjs;
00718     int numEmigrantRecObjs;
00719 #endif
00720 
00721 
00722     
00723     CkCallback storedCallback;
00724 
00725     int redNo;
00726     int completedRedNo;
00727     bool inProgress;
00728     bool creating;
00729     bool startRequested;
00730     int gcount;
00731     int lcount;
00732     int maxStartRequest; 
00733 
00734     
00735     int nContrib,nRemote;
00736   
00737   bool is_inactive;
00738 
00739         
00740         CkCallback barrier_storedCallback;
00741         int barrier_gCount;
00742         int barrier_nSource;
00743         int barrier_nContrib,barrier_nRemote;
00744 
00745     
00746     CkMsgQ<CkReductionMsg> msgs;
00747 
00748     
00749     CkMsgQ<CkReductionMsg> futureMsgs;
00750     
00751     CkMsgQ<CkReductionMsg> futureRemoteMsgs;
00752 
00753     CkMsgQ<CkReductionMsg> finalMsgs;
00754   std::map<int, int> inactiveList;
00755 
00756 
00757     void startReduction(int number,int srcPE);
00758     void addContribution(CkReductionMsg *m);
00759     void finishReduction(void);
00760   void checkIsActive();
00761   void informParentInactive();
00762   void checkAndAddToInactiveList(int id, int red_no);
00763   void checkAndRemoveFromInactiveList(int id, int red_no);
00764   void sendReductionStartingToKids(int red_no);
00765 
00766 
00767     unsigned upperSize;
00768     unsigned label;
00769     int parent;
00770     int numKids;
00771     
00772     std::vector<int> newKids;
00773     std::vector<int> kids;
00774     void init_BinomialTree();
00775 
00776     void init_TopoTree();
00777     void init_BinaryTree();
00778     enum {TREE_WID=2};
00779     int treeRoot(void);
00780 
00781     
00782     bool isPast(int num) const {return (bool)(num<redNo);}
00783     bool isPresent(int num) const {return (bool)(num==redNo);}
00784     bool isFuture(int num) const {return (bool)(num>redNo);}
00785 
00786 
00787     
00788     
00789     std::vector<countAdjustment> adjVec;
00790     
00791     countAdjustment &adj(int number);
00792 
00793 protected:
00794     bool hasParent(void);
00795     int treeParent(void);
00796     int firstKid(void);
00797     int treeKids(void);
00798 
00799     
00800     bool disableNotifyChildrenStart;
00801     void resetCountersWhenFlushingStates() { gcount = lcount = 0; }
00802         bool isDestroying;
00803 
00804 
00805 public:
00806 #if (defined(_FAULT_MLOG_) && _MLOG_REDUCE_P2P_)
00807     int *perProcessorCounts;
00808     int processorCount;
00809     int totalCount;
00810     int numberReductionMessages(){
00811             if(totalCount != 0){
00812                 return totalCount;
00813             }else{
00814                 return MAX_INT;
00815             }
00816     }
00817 #endif
00818     virtual void pup(PUP::er &p);
00819     static bool isIrreducible(){ return false;}
00820     void contributeViaMessage(CkReductionMsg *m);
00821 };
00822 
00823 
00824 
00825 
00826 
00827 
00828 
00829 
00830 
00831 
00832 #define CkReductionTarget(me, method) \
00833     CkIndex_##me::redn_wrapper_##method(NULL)
00834 
00835 #define CK_REDUCTION_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
00836 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
00837     CMK_REFNUM_TYPE userFlag)\
00838 {\
00839     CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
00840     msg->setUserFlag(userFlag);\
00841     msg->setMigratableContributor(migratable);\
00842     myRednMgr->contribute(&myRednInfo,msg);\
00843 }\
00844 void me::contribute(int dataSize,const void *data,CkReduction::reducerType type,\
00845     const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
00846 {\
00847     CkReductionMsg *msg=CkReductionMsg::buildNew(dataSize,data,type);\
00848     msg->setUserFlag(userFlag);\
00849     msg->setCallback(cb);\
00850     msg->setMigratableContributor(migratable);\
00851     myRednMgr->contribute(&myRednInfo,msg);\
00852 }\
00853 void me::contribute(CkReductionMsg *msg) \
00854     {\
00855     msg->setMigratableContributor(migratable);\
00856     myRednMgr->contribute(&myRednInfo,msg);\
00857     }\
00858 void me::contribute(const CkCallback &cb,CMK_REFNUM_TYPE userFlag)\
00859 {\
00860     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
00861     msg->setUserFlag(userFlag);\
00862     msg->setCallback(cb);\
00863     msg->setMigratableContributor(migratable);\
00864     myRednMgr->contribute(&myRednInfo,msg);\
00865 }\
00866 void me::contribute(CMK_REFNUM_TYPE userFlag)\
00867 {\
00868     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
00869     msg->setUserFlag(userFlag);\
00870     msg->setMigratableContributor(migratable);\
00871     myRednMgr->contribute(&myRednInfo,msg);\
00872 }\
00873 
00874 #define CK_BARRIER_CONTRIBUTE_METHODS_DEF(me,myRednMgr,myRednInfo,migratable) \
00875 void me::barrier(const CkCallback &cb)\
00876 {\
00877     CkReductionMsg *msg=CkReductionMsg::buildNew(0,NULL,CkReduction::nop);\
00878     msg->setCallback(cb);\
00879     msg->setMigratableContributor(migratable);\
00880     myRednMgr->barrier(msg);\
00881 }\
00882 
00883 
00884 
00885 class Group : public CkReductionMgr
00886 {
00887     contributorInfo reductionInfo;
00888  public:
00889     const int thisIndex;
00890     Group();
00891     Group(CkMigrateMessage *msg);
00892     virtual bool isNodeGroup() { return false; }
00893     virtual void pup(PUP::er &p);
00894     virtual void flushStates() {
00895         CkReductionMgr::flushStates();
00896         reductionInfo.redNo = 0;
00897     }
00898     virtual void CkAddThreadListeners(CthThread tid, void *msg);
00899 
00900     int getRedNo() const { return reductionInfo.redNo; }
00901 
00902     CK_REDUCTION_CONTRIBUTE_METHODS_DECL
00903         CK_BARRIER_CONTRIBUTE_METHODS_DECL
00904 };
00905 
00906 #ifdef _PIPELINED_ALLREDUCE_
00907 class AllreduceMgr
00908 {
00909 public:
00910     AllreduceMgr() { fragsRecieved=0; size=0; }
00911     friend class ArrayElement;
00912     
00913     void allreduce_recieve(CkReductionMsg* msg)
00914     {
00915         
00916         fragsRecieved++;
00917         if(fragsRecieved==1)
00918         {
00919             data = new char[FRAG_SIZE*msg->nFrags];
00920         }
00921         memcpy(data+msg->fragNo*FRAG_SIZE, msg->data, msg->dataSize);
00922         size += msg->dataSize;
00923         
00924         if(fragsRecieved==msg->nFrags) {
00925             CkReductionMsg* ret = CkReductionMsg::buildNew(size, data);
00926             cb.send(ret);
00927             fragsRecieved=0; size=0;
00928             delete [] data;
00929         }
00930         
00931     }
00932     
00933     CkCallback cb;  
00934     int size;
00935     char* data;
00936     int fragsRecieved;
00937     
00938 };
00939 #endif // _PIPELINED_ALLREDUCE_
00940 
00941 #endif //_CKREDUCTION_H