00001 #include "converse.h"
00002 #include "queueing.h"
00003 #include "cldb.h"
00004 #include <time.h>
00005 #include <stdlib.h>
00006 #include <math.h>
00007 
00008 void LoadNotifyFn(int l)
00009 {
00010 }
00011 
00012 const char *CldGetStrategy(void)
00013 {
00014   return "spray";
00015 }
00016 
00017 #define CYCLE_MILLISECONDS 500
00018 #define DEBUGGING_OUTPUT 0
00019 
00020 typedef struct 
00021 {
00022   int mype;
00023   int EnqueueHandler;
00024   int ReduceHandler;
00025   int AverageHandler;
00026   int HopHandler;
00027   double load_reported;
00028   double load_total;
00029   int    load_count;
00030   int    spantree_parent;
00031   int    spantree_children;
00032   int    spantree_root;
00033   int    rebalance;
00034 }
00035 peinfo;
00036 
00037 CpvStaticDeclare(peinfo, peinf);
00038 
00039 struct loadmsg {
00040   char core[CmiMsgHeaderSizeBytes];
00041   double load_total;
00042 };
00043 
00044 struct reqmsg {
00045   char core[CmiMsgHeaderSizeBytes];
00046 };
00047 
00048 void CldPropagateLoad(double load);
00049 
00050 int CldEstimate(void)
00051 {
00052   return CldLoad();
00053 }
00054 
00055 void CldInitiateReduction(void)
00056 {
00057   double load = CldEstimate();
00058   peinfo *pinf = &(CpvAccess(peinf));
00059   pinf->load_reported = load;
00060   CldPropagateLoad(load);
00061 }
00062 
00063 void CldPropagateLoad(double load)
00064 {
00065   struct loadmsg msg;
00066   peinfo *pinf = &(CpvAccess(peinf));
00067   pinf->load_total += load;
00068   pinf->load_count ++;
00069   if (pinf->load_count == pinf->spantree_children + 1) {
00070     msg.load_total   = pinf->load_total;
00071     if (pinf->mype == pinf->spantree_root) {
00072       if (DEBUGGING_OUTPUT) CmiPrintf("---\n");
00073       CmiSetHandler(&msg, pinf->AverageHandler);
00074       CmiSyncBroadcastAll(sizeof(msg), &msg);
00075     } else {
00076       CmiSetHandler(&msg, pinf->ReduceHandler);
00077       CmiSyncSend(pinf->spantree_parent, sizeof(msg), &msg);
00078     }
00079     pinf->load_total = 0;
00080     pinf->load_count = 0;
00081   }
00082 }
00083 
00084 void CldReduceHandler(struct loadmsg *msg)
00085 {
00086   CldPropagateLoad(msg->load_total);
00087   CmiFree(msg);
00088 }
00089 
00090 void CldAverageHandler(struct loadmsg *msg)
00091 {
00092   peinfo *pinf = &(CpvAccess(peinf));
00093   double load = CldEstimate();
00094   double average = (msg->load_total / CmiNumPes());
00095   int rebalance;
00096   if (load < (average+10) * 1.2) rebalance=0;
00097   else rebalance = (int)(load - average);
00098   if (DEBUGGING_OUTPUT)
00099     CmiPrintf("PE %d load=%6d average=%6d rebalance=%d\n", 
00100           CmiMyPe(), CldEstimate(), (int)average, rebalance);
00101   pinf->rebalance = rebalance;
00102   CmiFree(msg);
00103   CcdCallFnAfter((CcdVoidFn)CldInitiateReduction, 0, CYCLE_MILLISECONDS);
00104 }
00105 
00106 void CldEnqueueHandler(char *msg)
00107 {
00108   int len, queueing, priobits; unsigned int *prioptr;
00109   CldInfoFn ifn; CldPackFn pfn;
00110   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00111   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00112   CmiSetHandler(msg, CmiGetXHandler(msg));
00113   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00114 }
00115 
00116 void CldHopHandler(char *msg)
00117 {
00118   peinfo *pinf = &(CpvAccess(peinf));
00119   int len, queueing, priobits; unsigned int *prioptr;
00120   CldInfoFn ifn; CldPackFn pfn; int pe;
00121 
00122   if (pinf->rebalance) {
00123     
00124     do pe = ((CrnRand()&0x7FFFFFFF)%CmiNumPes());
00125     while (pe == pinf->mype);
00126     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
00127     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00128     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
00129       pfn(&msg);
00130       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00131     }
00132     CmiSyncSendAndFree(pe, len, msg);
00133     pinf->rebalance--;
00134   } else {
00135     CmiSetHandler(msg, CmiGetXHandler(msg));
00136     CmiHandleMessage(msg);
00137   }
00138 }
00139 
00140 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
00141 {
00142   int npes, *pes;
00143   int len, queueing, priobits,i; unsigned int *prioptr;
00144   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00145   peinfo *pinf = &(CpvAccess(peinf));
00146   CldPackFn pfn;
00147   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00148   if (pfn) {
00149     pfn(&msg);
00150     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00151   }
00152   CmiSetInfo(msg,infofn);
00153   CmiSetXHandler(msg, CmiGetHandler(msg));
00154   CmiSetHandler(msg, pinf->EnqueueHandler);
00155   CmiLookupGroup(grp, &npes, &pes);
00156   for(i=0;i<npes;i++) {
00157     CmiSyncSend(pes[i], len, msg);
00158   }
00159   CmiFree(msg);
00160 }
00161 
00162 void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn)
00163 {
00164   int len, queueing, priobits,i; unsigned int *prioptr;
00165   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00166   peinfo *pinf = &(CpvAccess(peinf));
00167   CldPackFn pfn;
00168   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00169   if (pfn) {
00170     pfn(&msg);
00171     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00172   }
00173   CmiSetInfo(msg,infofn);
00174   CmiSetXHandler(msg, CmiGetHandler(msg));
00175   CmiSetHandler(msg, pinf->EnqueueHandler);
00176   for(i=0;i<npes;i++) {
00177     CmiSyncSend(pes[i], len, msg);
00178   }
00179   CmiFree(msg);
00180 }
00181 
00182 void CldEnqueue(int pe, void *msg, int infofn)
00183 {
00184   int len, queueing, priobits; unsigned int *prioptr;
00185   CldInfoFn ifn; CldPackFn pfn;
00186   peinfo *pinf = &(CpvAccess(peinf));
00187   ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00188   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00189   if (pe != CLD_ANYWHERE) {
00190     if (pfn && (CmiNodeOf(pe) != CmiMyNode())) {
00191       pfn(&msg);
00192       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00193     }
00194     CmiSetInfo(msg, infofn);
00195     CmiSetXHandler(msg, CmiGetHandler(msg));
00196     CmiSetHandler(msg, pinf->EnqueueHandler);
00197     if (pe==CLD_BROADCAST) CmiSyncBroadcastAndFree(len, msg);
00198     else if (pe==CLD_BROADCAST_ALL) CmiSyncBroadcastAllAndFree(len, msg);
00199     else CmiSyncSendAndFree(pe, len, msg);
00200   } else {
00201     CmiSetInfo(msg, infofn);
00202     CmiSetXHandler(msg, CmiGetHandler(msg));
00203     CmiSetHandler(msg, pinf->HopHandler);
00204     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
00205   }
00206 }
00207 
00208 void CldNodeEnqueue(int node, void *msg, int infofn)
00209 {
00210   int len, queueing, priobits; unsigned int *prioptr;
00211   CldInfoFn ifn; CldPackFn pfn;
00212   peinfo *pinf = &(CpvAccess(peinf));
00213   ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
00214   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00215   if (node != CLD_ANYWHERE) {
00216     if (pfn && (node != CmiMyNode())) {
00217       pfn(&msg);
00218       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
00219     }
00220     CmiSetInfo(msg, infofn);
00221     CmiSetXHandler(msg, CmiGetHandler(msg));
00222     CmiSetHandler(msg, pinf->EnqueueHandler);
00223     if (node==CLD_BROADCAST) CmiSyncNodeBroadcastAndFree(len, msg);
00224     else if (node==CLD_BROADCAST_ALL) CmiSyncNodeBroadcastAllAndFree(len, msg);
00225     else CmiSyncNodeSendAndFree(node, len, msg);
00226   } else {
00227     CmiSetInfo(msg, infofn);
00228     CmiSetXHandler(msg, CmiGetHandler(msg));
00229     CmiSetHandler(msg, pinf->HopHandler);
00230     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
00231   }
00232 }
00233 
00234 void CldModuleInit(char **argv)
00235 {
00236   peinfo *pinf;
00237   CpvInitialize(peinfo, peinf);
00238   
00239   CrnSrand((int) (time(0)+CmiMyPe()));
00240   pinf = &CpvAccess(peinf);
00241   pinf->mype = CmiMyPe();
00242   pinf->EnqueueHandler = CmiRegisterHandler((CmiHandler)CldEnqueueHandler);
00243   pinf->ReduceHandler  = CmiRegisterHandler((CmiHandler)CldReduceHandler);
00244   pinf->AverageHandler = CmiRegisterHandler((CmiHandler)CldAverageHandler);
00245   pinf->HopHandler     = CmiRegisterHandler((CmiHandler)CldHopHandler);
00246   pinf->load_total = 0.0;
00247   pinf->load_count = 0;
00248   pinf->spantree_children = CmiNumSpanTreeChildren(CmiMyPe());
00249   pinf->spantree_parent = CmiSpanTreeParent(CmiMyPe());
00250   pinf->spantree_root = 0;
00251   pinf->rebalance = 0;
00252   CldModuleGeneralInit(argv);
00253   CldInitiateReduction();
00254 }
00255 void CldCallback(void)
00256 {}