00001 
00005 
00006 #ifndef CENTRALLB_H
00007 #define CENTRALLB_H
00008 
00009 #include "BaseLB.h"
00010 #include "CentralLB.decl.h"
00011 
00012 #include <vector>
00013 #include "pup_stl.h"
00014 #include "manager.h"
00015 extern CkGroupID loadbalancer;
00016 
00017 void CreateCentralLB();
00018 
00019 class CLBStatsMsg;
00020 class LBSimulation;
00021 
00023 typedef LBMigrateMsg  CLBMigrateMsg;
00024 
00025 class LBInfo
00026 {
00027 public:
00028   LBRealType *peLoads;  
00029   LBRealType *objLoads;     
00030   LBRealType *comLoads;     
00031   LBRealType *bgLoads;  
00032   int    numPes;
00033   int    msgCount;  
00034   CmiUInt8  msgBytes;   
00035   LBRealType minObjLoad, maxObjLoad;
00036   LBInfo(): peLoads(NULL), objLoads(NULL), comLoads(NULL), 
00037             bgLoads(NULL), numPes(0), msgCount(0),
00038             msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
00039   LBInfo(LBRealType *pl, int count): peLoads(pl), objLoads(NULL), 
00040             comLoads(NULL), bgLoads(NULL), numPes(count), msgCount(0),
00041             msgBytes(0), minObjLoad(0.0), maxObjLoad(0.0) {}
00042   LBInfo(int count);
00043   ~LBInfo();
00044   void getInfo(BaseLB::LDStats* stats, int count, int considerComm);
00045   void clear();
00046   void print();
00047   void getSummary(LBRealType &maxLoad, LBRealType &maxCpuLoad, LBRealType &totalLoad);
00048 };
00049 
00053 class SpanningTree
00054 {
00055     public:
00056         int arity;
00057         int parent;
00058         int numChildren;
00059         SpanningTree();
00060         void calcParent(int n);
00061         void calcNumChildren(int n);
00062 };
00063 
00064 class CentralLB : public CBase_CentralLB
00065 {
00066 private:
00067   CLBStatsMsg *statsMsg;
00068   int count_msgs;
00069   void initLB(const CkLBOptions &);
00070 public:
00071   CkMarshalledCLBStatsMessage bufMsg;
00072   SpanningTree st;
00073   CentralLB(const CkLBOptions & opt) : CBase_CentralLB(opt), concurrent(false) { initLB(opt);
00074 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00075         lbDecisionCount= resumeCount=0;
00076 #endif
00077 #if CMK_SHRINK_EXPAND
00078         manager_init();
00079 #endif
00080   }
00081   CentralLB(CkMigrateMessage *m) : CBase_CentralLB(m), concurrent(false) {
00082 #if CMK_SHRINK_EXPAND
00083         manager_init();
00084 #endif
00085   }
00086 #if defined(TEMP_LDB) 
00087     float getTemp(int);
00088       FILE* logFD;
00089     int physicalCoresPerNode;
00090     int logicalCoresPerNode,numSockets;
00091     int logicalCoresPerChip;
00092 #endif
00093 
00094   virtual ~CentralLB();
00095 
00096   void pup(PUP::er &p);
00097 
00098   void turnOn();
00099   void turnOff();
00100 
00101   void SetPESpeed(int);
00102   int GetPESpeed();
00103   inline void setConcurrent(bool c) { concurrent = c; }
00104 
00105   static void staticAtSync(void*);
00106   void AtSync(void); 
00107   void ProcessAtSync(void); 
00108                             
00109   void SendStats();
00110   void ReceiveCounts(int *counts, int n);
00111   void ReceiveStats(CkMarshalledCLBStatsMessage &&msg); 
00112   void ReceiveStatsViaTree(CkMarshalledCLBStatsMessage &&msg); 
00113   void ReceiveStatsFromRoot(CkMarshalledCLBStatsMessage &&msg);
00114   
00115   void depositData(CLBStatsMsg *m);
00116   void LoadBalance(void); 
00117   void t_LoadBalance(void); 
00118   void ApplyDecision(void);
00119   void ResumeClients(int);                      
00120 
00121   void ResumeClients(); 
00122   void InitiateScatter(LBMigrateMsg *msg);
00123   void ScatterMigrationResults(LBScatterMsg *);
00124   void ReceiveMigration(LBScatterMsg *);
00125   void ReceiveMigration(LBMigrateMsg *);    
00126   void ProcessMigrationDecision();
00127   void ProcessReceiveMigration();
00128 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00129     void ReceiveDummyMigration(int _step);
00130 #endif
00131   void MissMigrate(int waitForBarrier);
00132 
00133   
00134   void CheckForRealloc ();
00135   void ResumeFromReallocCheckpoint();
00136   void MigrationDoneImpl (int );
00137   void WillIbekilled(std::vector<char> avail, int);
00138   void StartCleanup();
00139 
00140   
00141   static void staticPredictorOn(void* data, void* model);
00142   static void staticPredictorOnWin(void* data, void* model, int wind);
00143   static void staticPredictorOff(void* data);
00144   static void staticChangePredictor(void* data, void* model);
00145 
00146   
00147   inline void StartLB() { thisProxy.ProcessAtSync(); }
00148   static void staticStartLB(void* data);
00149 
00150   
00151   static void staticMigrated(void* me, LDObjHandle h, int waitBarrier=1);
00152   void Migrated(int waitBarrier=1);
00153 
00154   void MigrationDone(int balancing);  
00155   void CheckMigrationComplete();      
00156 
00157   
00158   void FuturePredictor(LDStats* stats);
00159 
00160   struct FutureModel {
00161     int n_stats;    
00162     int cur_stats;   
00163     int start_stats; 
00164     LDStats *collection;
00165     int n_objs;     
00166     LBPredictorFunction *predictor;
00167     double **parameters;
00168     bool *model_valid;
00169 
00170     FutureModel(): n_stats(0), cur_stats(0), start_stats(0), collection(NULL),
00171      n_objs(0), parameters(NULL) {predictor = new DefaultFunction();}
00172 
00173     FutureModel(int n): n_stats(n), cur_stats(0), start_stats(0), n_objs(0),
00174      parameters(NULL) {
00175       collection = new LDStats[n];
00176       
00177       predictor = new DefaultFunction();
00178     }
00179 
00180     FutureModel(int n, LBPredictorFunction *myfunc): n_stats(n), cur_stats(0), start_stats(0), n_objs(0), parameters(NULL) {
00181       collection = new LDStats[n];
00182       
00183       predictor = myfunc;
00184     }
00185 
00186     ~FutureModel() {
00187       delete[] collection;
00188       for (int i=0;i<n_objs;++i) delete[] parameters[i];
00189       delete[] parameters;
00190       delete predictor;
00191     }
00192 
00193     void changePredictor(LBPredictorFunction *new_predictor) {
00194       delete predictor;
00195       int i;
00196       
00197       predictor = new_predictor;
00198       for (i=0;i<n_objs;++i) delete[] parameters[i];
00199       for (i=0;i<n_objs;++i) {
00200     parameters[i] = new double[new_predictor->num_params];
00201     model_valid[i] = false;
00202       }
00203     }
00204   };
00205 
00206   
00207   
00208   void predictorOn(LBPredictorFunction *pred) {
00209     predictorOn(pred, _lb_predict_window);
00210   }
00211   void predictorOn(LBPredictorFunction *pred, int window_size) {
00212     if (predicted_model) {
00213       PredictorPrintf("Predictor already allocated");
00214     } else {
00215       _lb_predict_window = window_size;
00216       if (pred) predicted_model = new FutureModel(window_size, pred);
00217       else predicted_model = new FutureModel(window_size);
00218       _lb_predict = true;
00219     }
00220     PredictorPrintf("Predictor turned on, window size %d\n",window_size);
00221   }
00222 
00223   
00224   void predictorOff() {
00225     if (predicted_model) delete predicted_model;
00226     predicted_model = 0;
00227     _lb_predict = false;
00228     PredictorPrintf("Predictor turned off\n");
00229   }
00230 
00231   
00232   
00233   void changePredictor(LBPredictorFunction *new_predictor) {
00234     if (predicted_model) {
00235       predicted_model->changePredictor(new_predictor);
00236       PredictorPrintf("Predictor model changed\n");
00237     }
00238   }
00239   
00240 
00241   LBMigrateMsg* callStrategy(LDStats* stats,int count){
00242     return Strategy(stats);
00243   };
00244 
00245   int cur_ld_balancer;
00246 
00247   void readStatsMsgs(const char* filename);
00248   void writeStatsMsgs(const char* filename);
00249 
00250   void removeCommDataOfDeletedObjs(LDStats* stats);
00251   void preprocess(LDStats* stats);
00252   virtual LBMigrateMsg* Strategy(LDStats* stats);
00253   virtual void work(LDStats* stats);
00254     virtual void changeFreq(int n);
00255   virtual LBMigrateMsg * createMigrateMsg(LDStats* stats);
00256   virtual LBMigrateMsg * extractMigrateMsg(LBMigrateMsg *m, int p);
00257 
00258   
00259   virtual LBMigrateMsg* Strategy(LDStats* stats, int nprocs) {
00260     return Strategy(stats);
00261   }
00262 
00263 protected:
00264   virtual bool QueryBalanceNow(int) { return true; };  
00265   virtual bool QueryDumpData() { return false; };  
00266   virtual void LoadbalanceDone(int balancing) {}
00267 
00268   void simulationRead();
00269   void simulationWrite();
00270   void findSimResults(LDStats* stats, int count, 
00271                       LBMigrateMsg* msg, LBSimulation* simResults);
00272   void removeNonMigratable(LDStats* statsDataList, int count);
00273   void loadbalance_with_thread() { use_thread = true; }
00274 
00275   bool concurrent;
00276 private:  
00277   int myspeed;
00278   int stats_msg_count;
00279   CLBStatsMsg **statsMsgsList;
00280   LDStats *statsData;
00281   int migrates_completed;
00282   int migrates_expected;
00283   int future_migrates_completed;
00284   int future_migrates_expected;
00285   int lbdone;
00286   double start_lb_time;
00287   double strat_start_time;
00288   LBMigrateMsg   *storedMigrateMsg;
00289   LBScatterMsg   *storedScatterMsg;
00290   bool  reduction_started;
00291   bool  use_thread;
00292 
00293   FutureModel *predicted_model;
00294 
00295   void BuildStatsMsg();
00296   void buildStats();
00297   void printStrategyStats(LBMigrateMsg *msg);
00298 
00299 public:
00300   int useMem();
00301 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00302     int savedBalancing;
00303     void endMigrationDone(int balancing);
00304     int lbDecisionCount ,resumeCount;
00305 #endif
00306 };
00307 
00308 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) 
00309     void resumeCentralLbAfterChkpt(void *lb);
00310     void resumeAfterRestoreParallelRecovery(void *_lb);
00311 #endif
00312 
00313 
00314 
00315 
00316 class CLBStatsMsg {
00317 public:
00318 #if defined(TEMP_LDB)
00319     float pe_temp;
00320 #endif
00321 
00322   int from_pe;
00323   int pe_speed;
00324   LBRealType total_walltime;
00325   LBRealType idletime;
00326   LBRealType bg_walltime;
00327 #if CMK_LB_CPUTIMER
00328   LBRealType total_cputime;
00329   LBRealType bg_cputime;
00330 #endif
00331   int n_objs;
00332   LDObjData *objData;
00333   int n_comm;
00334   LDCommData *commData;
00335 
00336   char * avail_vector;
00337   int next_lb;
00338 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
00339     int step;
00340 #endif
00341 
00342 public:
00343   CLBStatsMsg(int osz, int csz);
00344   CLBStatsMsg(): from_pe(0), pe_speed(0), total_walltime(0.0), idletime(0.0),
00345          bg_walltime(0.0), n_objs(0), objData(NULL), n_comm(0),
00346 #if defined(TEMP_LDB)
00347         pe_temp(1.0),
00348 #endif
00349 
00350 #if CMK_LB_CPUTIMER
00351          total_cputime(0.0), bg_cputime(0.0),
00352 #endif
00353          commData(NULL), avail_vector(NULL), next_lb(0) {}
00354   ~CLBStatsMsg();
00355   void pup(PUP::er &p);
00356 }; 
00357 
00358 
00359 
00360 void getLoadInfo(BaseLB::LDStats* stats, int count, LBInfo &info, int considerComm);
00361 
00362 #endif 
00363