00001 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 #include "elements.h"
00018 #include "ckheap.h"
00019 #include "GreedyCommLB.h"
00020 #include "manager.h"
00021 
00022 extern int quietModeRequested;
00023 
00024 CreateLBFunc_Def(GreedyCommLB, "Greedy algorithm which takes communication graph into account")
00025 
00026 void GreedyCommLB::init()
00027 {
00028     lbname = (char*)"GreedyCommLB";
00029     alpha = _lb_args.alpha();
00030     beta = _lb_args.beta();
00031     manager_init();
00032 }
00033 
00034 GreedyCommLB::GreedyCommLB(const CkLBOptions &opt): CBase_GreedyCommLB(opt)
00035 {
00036     init();
00037     if (CkMyPe() == 0 && !quietModeRequested)
00038     CkPrintf("CharmLB> GreedyCommLB created.\n");
00039 }
00040 
00041 GreedyCommLB::GreedyCommLB(CkMigrateMessage *m):CBase_GreedyCommLB(m) {
00042     init();
00043 }
00044 
00045 bool GreedyCommLB::QueryBalanceNow(int _step)
00046 {
00047     
00048     return true;
00049 }
00050 
00051 
00052 void GreedyCommLB::alloc(int pe,int id,double load){
00053     
00054     assigned_array[id] = 1;
00055     processors[pe].load += load;
00056 }
00057 
00058 
00059 double GreedyCommLB::compute_com(LDStats* stats, int id, int pe){
00060     int j,com_data=0,com_msg=0;
00061     double total_time;
00062     graph * ptr;
00063     
00064     ptr = object_graph[id].next;
00065     
00066     for(j=0;(j<2*nobj)&&(ptr != NULL);j++,ptr=ptr->next){
00067     int destObj = ptr->id;
00068     if(assigned_array[destObj] == 0)  
00069         continue;
00070         if (stats->to_proc[destObj] == pe)    
00071         continue;
00072     com_data += ptr->data;
00073     com_msg += ptr->nmsg;
00074     }
00075     
00076     total_time = alpha*com_msg + beta*com_data;
00077     return total_time;
00078 }
00079 
00080 
00081 void GreedyCommLB::update(LDStats* stats, int id, int pe){
00082     graph * ptr = object_graph[id].next;
00083     
00084     for(int j=0;(j<2*nobj)&&(ptr != NULL);j++,ptr=ptr->next){
00085     int destObj = ptr->id;
00086     if(assigned_array[destObj] == 0)  
00087         continue;
00088     int destPe = stats->to_proc[destObj];
00089         if (destPe == pe)                     
00090         continue;
00091     int com_data = ptr->data;
00092     int com_msg = ptr->nmsg;
00093         double com_time = alpha*com_msg + beta*com_data;
00094         processors[destPe].load += com_time;
00095     }
00096 }
00097 
00098 
00099 
00100 void GreedyCommLB::add_graph(int x, int y, int data, int nmsg){
00101     graph * ptr, *temp;
00102     
00103     ptr = &(object_graph[x]);  
00104     
00105     temp = new graph;
00106     
00107     temp->id = y;
00108     temp->data = data;
00109     temp->nmsg = nmsg;
00110     temp->next = ptr->next;
00111     
00112     ptr->next = temp;
00113     
00114     ptr = &(object_graph[y]);  
00115 
00116     temp = new graph;
00117     
00118     temp->id = x;
00119     temp->data = data;
00120     temp->nmsg = nmsg;
00121     temp->next = ptr->next;
00122     
00123     ptr->next = temp;
00124 }
00125   
00126 static void init_data(int *assign, graph * object_graph, int l, int b){
00127     for(int obj=0;obj < b;obj++)
00128     assign[obj] = 0;
00129 
00130     for(int j=0;j<b;j++){
00131     object_graph[j].data = 0;
00132     object_graph[j].nmsg = 0;
00133     object_graph[j].next = NULL;
00134     }
00135 }
00136 
00137 void GreedyCommLB::work(LDStats* stats)
00138 {
00139     int pe,obj,com;
00140     ObjectRecord *x;
00141     int i;
00142     
00143     if (_lb_args.debug()) CkPrintf("In GreedyCommLB strategy\n",CkMyPe());
00144     npe = stats->nprocs();
00145     nobj = stats->n_objs;
00146 
00147     
00148     
00149     nmigobj = stats->n_migrateobjs;
00150 
00151     stats->makeCommHash();
00152 
00153     assigned_array = new int[nobj];
00154 
00155     object_graph = new graph[nobj];
00156 
00157     init_data(assigned_array,object_graph,npe,nobj);
00158 
00159 #define MAXDOUBLE   1e10;
00160 
00161     
00162     processors = new processorInfo[npe];
00163     for (int p=0; p<npe; p++) {
00164       processors[p].Id = p;
00165       processors[p].backgroundLoad = stats->procs[p].bg_walltime;
00166       processors[p].computeLoad = 0;
00167       processors[p].pe_speed = stats->procs[p].pe_speed;
00168       if (!stats->procs[p].available) {
00169         processors[p].load = MAXDOUBLE;
00170       }
00171       else {
00172         processors[p].load = 0;
00173         if (!_lb_args.ignoreBgLoad())
00174           processors[p].load = processors[p].backgroundLoad;
00175       }
00176     }
00177 
00178 
00179     
00180     for(com =0; com< stats->n_comm;com++) {
00181          int xcoord=0,ycoord=0;
00182      LDCommData &commData = stats->commData[com];
00183      if((!commData.from_proc())&&(commData.recv_type()==LD_OBJ_MSG))
00184      {
00185         xcoord = stats->getHash(commData.sender);
00186         ycoord = stats->getHash(commData.receiver.get_destObj());
00187         if((xcoord == -1)||(ycoord == -1))
00188         {
00189             if (_lb_args.ignoreBgLoad() || stats->complete_flag==0) continue;
00190             else CkAbort("Error in search\n");
00191         }
00192         add_graph(xcoord,ycoord,commData.bytes, commData.messages);
00193      }
00194          else if (commData.recv_type()==LD_OBJLIST_MSG) {
00195         int nobjs;
00196         const LDObjKey *objs = commData.receiver.get_destObjs(nobjs);
00197         xcoord = stats->getHash(commData.sender);
00198         for (int i=0; i<nobjs; i++) {
00199           ycoord = stats->getHash(objs[i]);
00200           if((xcoord == -1)||(ycoord == -1))
00201           {
00202             if (_lb_args.migObjOnly()) continue;
00203             else CkAbort("Error in search\n");
00204           }
00205 
00206           add_graph(xcoord,ycoord,commData.bytes, commData.messages);
00207         }
00208          }
00209     }
00210 
00211     
00212     
00213     ObjectHeap maxh(nmigobj+1);
00214     for(obj=0; obj < stats->n_objs; obj++) {
00215       LDObjData &objData = stats->objData[obj];
00216       int onpe = stats->from_proc[obj];
00217       if (!objData.migratable) {
00218         if (!stats->procs[onpe].available) {
00219       CmiAbort("Load balancer is not be able to move a nonmigratable object out of an unavailable processor.\n");
00220         }
00221         alloc(onpe, obj, objData.wallTime);
00222     update(stats, obj, onpe);        
00223       }
00224       else {
00225         x = new ObjectRecord;
00226         x->id = obj;
00227         x->pos = obj;
00228         x->val = objData.wallTime;
00229         x->pe = onpe;
00230         maxh.insert(x);
00231       }
00232     }
00233 
00234     minHeap *lightProcessors = new minHeap(npe);
00235     for (i=0; i<npe; i++)
00236       if (stats->procs[i].available)
00237         lightProcessors->insert((InfoRecord *) &(processors[i]));
00238 
00239     int id,maxid,minpe=0;
00240     double temp,total_time,min_temp;
00241     
00242     
00243 
00244     double *pe_comm = new double[npe];
00245     for (int i=0; i<npe; i++) pe_comm[i] = 0.0;
00246 
00247     for(id = 0;id<nmigobj;id++){
00248     x  = maxh.deleteMax();
00249 
00250     maxid = x->id;
00251 
00252         processorInfo *donor = (processorInfo *) lightProcessors->deleteMin();
00253     CmiAssert(donor);
00254     int first_avail_pe = donor->Id;
00255     temp = compute_com(stats, maxid, first_avail_pe);
00256     min_temp = temp;
00257     
00258     total_time = temp + donor->load; 
00259     minpe = first_avail_pe;
00260     
00261         
00262     
00263     
00264         CkVec<int> commPes;
00265         graph * ptr = object_graph[maxid].next;
00266     
00267     
00268     double commload = 0.0;          
00269         for(int com=0;(com<2*nobj)&&(ptr != NULL);com++,ptr=ptr->next){
00270       int destObj = ptr->id;
00271       if(assigned_array[destObj] == 0)  
00272         continue;
00273       int destPe = stats->to_proc[destObj];
00274       if(stats->procs[destPe].available == 0) continue;
00275       
00276       double cload = alpha*ptr->nmsg + beta*ptr->data;
00277       pe_comm[destPe] += cload;
00278       commload += cload;
00279 
00280           int exist = 0;
00281       for (int pp=0; pp<commPes.size(); pp++)
00282             if (destPe == commPes[pp]) { exist=1; break; }    
00283       if (!exist) commPes.push_back(destPe);
00284         }
00285 
00286     int k;
00287     for(k = 0; k < commPes.size(); k++){
00288         pe = commPes[k];
00289             processorInfo *commpe = (processorInfo *) &processors[pe];
00290         
00291         temp = commload - pe_comm[pe];
00292         
00293         
00294         if(total_time > (temp + commpe->load)){
00295         minpe = pe;
00296         total_time = temp + commpe->load;
00297         min_temp = temp;
00298         }
00299     }
00300     
00301     
00302     
00303         stats->assign(maxid, minpe);
00304     
00305     alloc(minpe, maxid, x->val + min_temp);
00306 
00307     
00308     update(stats, maxid, minpe);
00309 
00310         
00311     lightProcessors->insert(donor);
00312     for(k = 0; k < commPes.size(); k++) {
00313         pe = commPes[k];
00314             processorInfo *commpe = (processorInfo *) &processors[pe];
00315             lightProcessors->update(commpe);
00316         pe_comm[pe] = 0.0;          
00317         }
00318 
00319     delete x;
00320     }
00321     
00322     
00323     delete [] pe_comm;
00324 
00325     delete [] processors;
00326     delete [] assigned_array;
00327 
00328     delete lightProcessors;
00329 
00330     for(int oindex= 0; oindex < nobj; oindex++){
00331       graph * ptr = &object_graph[oindex];
00332       ptr = ptr->next;
00333       
00334       while(ptr != NULL){
00335     graph *cur = ptr;
00336     ptr = ptr->next;
00337     delete cur;
00338       }
00339     }
00340     delete [] object_graph;
00341 
00342 }
00343 
00344 #include "GreedyCommLB.def.h"
00345