00001 
00017 #include "charm++.h"
00018 #include "ck.h"
00019 #include "ckarray.h"
00020 #include "ckfutures.h"
00021 #include <stdlib.h>
00022 
00023 typedef struct Future_s {
00024   bool ready;
00025   void *value;
00026   CthThread waiters;
00027   int next; 
00028 } Future;
00029 
00030 typedef struct {
00031   Future *array;
00032   int max;
00033   int freelist;
00034 }
00035 FutureState;
00036 
00037 class CkSema {
00038   private:
00039     CkQ<void*> msgs;
00040     CkQ<CthThread> waiters;
00041   public:
00042     void *wait(void) {
00043       void *retmsg = msgs.deq();
00044       if(retmsg==0) {
00045         waiters.enq(CthSelf());
00046         CthSuspend();
00047         retmsg = msgs.deq();
00048       }
00049       return retmsg;
00050     }
00051     void waitN(int n, void *marray[]) {
00052       while (1) {
00053         if(msgs.length()<n) {
00054           waiters.enq(CthSelf());
00055           CthSuspend();
00056           continue;
00057         }
00058         for(int i=0;i<n;i++)
00059           marray[i] = msgs.deq();
00060         return;
00061       }
00062     }
00063     void signal(void *msg)
00064     {
00065       msgs.enq(msg);
00066       if(!waiters.isEmpty())
00067         CthAwaken(waiters.deq());
00068       return;
00069     }
00070 };
00071 
00072 class CkSemaPool {
00073   private:
00074     std::vector<CkSema*> pool;
00075     CkQ<int> freelist;
00076   public:
00077     int getNew(void) {
00078       int idx;
00079       if(freelist.isEmpty()) {
00080         idx = pool.size();
00081         pool.push_back(new CkSema());
00082       } else {
00083         idx = freelist.deq();
00084         pool[idx] = new CkSema();
00085       }
00086       return idx;
00087     }
00088     void release(int idx) {
00089       CkSema * sem = pool[idx];
00090       delete sem;
00091       freelist.enq(idx);
00092     }
00093     void _check(int idx) {
00094 #if CMK_ERROR_CHECKING
00095       if(pool[idx]==0) {
00096           CkAbort("ERROR! operation attempted on invalid semaphore\n");
00097       }
00098 #endif
00099     }
00100     void *wait(int idx) { 
00101       _check(idx);
00102       return pool[idx]->wait(); 
00103     }
00104     void waitN(int idx, int n, void *marray[]) { 
00105       _check(idx);
00106       pool[idx]->waitN(n, marray); 
00107     }
00108     void signal(int idx, void *msg) { 
00109       _check(idx);
00110       pool[idx]->signal(msg); 
00111     }
00112 };
00113 
00114 CpvStaticDeclare(FutureState, futurestate);
00115 CpvStaticDeclare(CkSemaPool*, semapool);
00116 
00117 static void addedFutures(int lo, int hi)
00118 {
00119   int i;
00120   FutureState *fs = &(CpvAccess(futurestate));
00121   Future *array = fs->array;
00122 
00123   for (i=lo; i<hi; i++)
00124     array[i].next = i+1;
00125   array[hi-1].next = fs->freelist;
00126   fs->freelist = lo;
00127 }
00128 
00129 static 
00130 inline
00131 int createFuture(void)
00132 {
00133   FutureState *fs = &(CpvAccess(futurestate));
00134   Future *fut; int handle, origsize;
00135 
00136   
00137   if (fs->freelist == -1) {
00138     origsize = fs->max;
00139     fs->max = fs->max * 2;
00140     fs->array = (Future*)realloc(fs->array, sizeof(Future)*(fs->max));
00141     _MEMCHECK(fs->array);
00142     addedFutures(origsize, fs->max);
00143   }
00144   handle = fs->freelist;
00145   fut = fs->array + handle;
00146   fs->freelist = fut->next;
00147   fut->ready = false;
00148   fut->value = 0;
00149   fut->waiters = 0;
00150   fut->next = 0;
00151   return handle;
00152 }
00153 
00154 CkFuture CkCreateFuture(void)
00155 {
00156   CkFuture fut;
00157   fut.id = createFuture();
00158   fut.pe = CkMyPe();
00159   return fut;
00160 }
00161 
00162 void CkReleaseFutureID(CkFutureID handle)
00163 {
00164   FutureState *fs = &(CpvAccess(futurestate));
00165   Future *fut = (fs->array)+handle;
00166   fut->next = fs->freelist;
00167   fs->freelist = handle;
00168 }
00169 
00170 int CkProbeFutureID(CkFutureID handle)
00171 {
00172   FutureState *fs = &(CpvAccess(futurestate));
00173   Future *fut = (fs->array)+handle;
00174   return (int)(fut->ready);
00175 }
00176 
00177 void *CkWaitFutureID(CkFutureID handle)
00178 {
00179   CthThread self = CthSelf();
00180   FutureState *fs = &(CpvAccess(futurestate));
00181   Future *fut = (fs->array)+handle;
00182   void *value;
00183 
00184   if (!(fut->ready)) {
00185     CthSetNext(self, fut->waiters);
00186     fut->waiters = self;
00187     while (!(fut->ready)) { CthSuspend(); fut = (fs->array)+handle; }
00188   }
00189   fut = (fs->array)+handle;
00190   value = fut->value;
00191 #if CMK_ERROR_CHECKING
00192   if (value==NULL) 
00193     CkAbort("ERROR! CkWaitFuture would have to return NULL!\n"
00194     "This can happen when a thread that calls a sync method "
00195     "gets a CthAwaken call *before* the sync method returns.");
00196 #endif
00197   return value;
00198 }
00199 
00200 void CkReleaseFuture(CkFuture fut)
00201 {
00202   CkReleaseFutureID(fut.id);
00203 }
00204 
00205 int CkProbeFuture(CkFuture fut)
00206 {
00207   return CkProbeFutureID(fut.id);
00208 }
00209 
00210 void *CkWaitFuture(CkFuture fut)
00211 {
00212   return CkWaitFutureID(fut.id);
00213 }
00214 
00215 void CkWaitVoidFuture(CkFutureID handle)
00216 {
00217   CkFreeMsg(CkWaitFutureID(handle));
00218 }
00219 
00220 static void setFuture(CkFutureID handle, void *pointer)
00221 {
00222   CthThread t;
00223   FutureState *fs = &(CpvAccess(futurestate));
00224   Future *fut = (fs->array)+handle;
00225   fut->ready = true;
00226 #if CMK_ERROR_CHECKING
00227   if (pointer==NULL) CkAbort("setFuture called with NULL!");
00228 #endif
00229   fut->value = pointer;
00230   for (t=fut->waiters; t; t=CthGetNext(t))
00231     CthAwaken(t);
00232   fut->waiters = 0;
00233 }
00234 
00235 void _futuresModuleInit(void)
00236 {
00237   CpvInitialize(FutureState, futurestate);
00238   CpvInitialize(CkSemaPool *, semapool);
00239   CpvAccess(futurestate).array = (Future *)malloc(10*sizeof(Future));
00240   _MEMCHECK(CpvAccess(futurestate).array);
00241   CpvAccess(futurestate).max   = 10;
00242   CpvAccess(futurestate).freelist = -1;
00243   addedFutures(0,10);
00244   CpvAccess(semapool) = new CkSemaPool();
00245 }
00246 
00247 CkGroupID _fbocID;
00248 
00249 class FutureInitMsg : public CMessage_FutureInitMsg {
00250   public: int x ;
00251 };
00252 
00253 class  FutureMain : public Chare {
00254   public:
00255     FutureMain(CkArgMsg *m) {
00256       _fbocID = CProxy_FutureBOC::ckNew(new FutureInitMsg);
00257       delete m;
00258     }
00259     FutureMain(CkMigrateMessage *m) {}
00260 };
00261 
00262 extern "C" 
00263 CkFutureID CkRemoteBranchCallAsync(int ep, void *m, CkGroupID group, int PE)
00264 { 
00265   CkFutureID ret=CkCreateAttachedFuture(m);
00266   CkSendMsgBranch(ep, m, PE, group);
00267   return ret;
00268 }
00269 
00270 extern "C" 
00271 void *CkRemoteBranchCall(int ep, void *m, CkGroupID group, int PE)
00272 { 
00273   CkFutureID i = CkRemoteBranchCallAsync(ep, m, group, PE);  
00274   return CkWaitReleaseFuture(i);
00275 }
00276 
00277 extern "C" 
00278 CkFutureID CkRemoteNodeBranchCallAsync(int ep, void *m, CkGroupID group, int node)
00279 { 
00280   CkFutureID ret=CkCreateAttachedFuture(m);
00281   CkSendMsgNodeBranch(ep, m, node, group);
00282   return ret;
00283 }
00284 
00285 extern "C" 
00286 void *CkRemoteNodeBranchCall(int ep, void *m, CkGroupID group, int node)
00287 { 
00288   CkFutureID i = CkRemoteNodeBranchCallAsync(ep, m, group, node);
00289   return CkWaitReleaseFuture(i);
00290 }
00291 
00292 extern "C" 
00293 CkFutureID CkRemoteCallAsync(int ep, void *m, const CkChareID *ID)
00294 { 
00295   CkFutureID ret=CkCreateAttachedFuture(m);
00296   CkSendMsg(ep, m, ID);
00297   return ret;
00298 }
00299 
00300 extern "C" 
00301 void *CkRemoteCall(int ep, void *m, const CkChareID *ID)
00302 { 
00303   CkFutureID i = CkRemoteCallAsync(ep, m, ID);
00304   return CkWaitReleaseFuture(i);
00305 }
00306 
00307 
00308 CkFutureID CkCreateAttachedFuture(void *msg)
00309 {
00310   CkFutureID ret=createFuture();
00311   UsrToEnv(msg)->setRef(ret);
00312   return ret;
00313 }
00314 
00315 CkFutureID CkCreateAttachedFutureSend(void *msg, int ep,
00316 CkArrayID id, CkArrayIndex idx,
00317 void(*fptr)(CkArrayID,CkArrayIndex,void*,int,int),int size)
00318 {
00319 CkFutureID ret=createFuture();
00320 UsrToEnv(msg)->setRef(ret);
00321 #if IGET_FLOWCONTROL
00322 if (TheIGetControlClass.iget_request(ret,msg,ep,id,idx,fptr,size))
00323 #endif
00324 (fptr)(id,idx,msg,ep,0);
00325 return ret;
00326 }
00327 
00328 
00329 
00330 
00331 
00332 
00333 
00334 
00335 
00336 
00337 
00338 
00339 
00340 
00341 void *CkWaitReleaseFuture(CkFutureID futNum)
00342 {
00343 #if IGET_FLOWCONTROL
00344   TheIGetControlClass.iget_resend(futNum);
00345 #endif
00346   void *result=CkWaitFutureID(futNum);
00347   CkReleaseFutureID(futNum);
00348 #if IGET_FLOWCONTROL
00349   TheIGetControlClass.iget_free(1);
00350 
00351 #endif
00352   return result;
00353 }
00354 
00355 class FutureBOC: public IrrGroup {
00356 public:
00357   FutureBOC(void){ }
00358   FutureBOC(FutureInitMsg *m) { delete m; }
00359   FutureBOC(CkMigrateMessage *m) { }
00360   void SetFuture(FutureInitMsg * m) { 
00361 #if CMK_ERROR_CHECKING
00362     if (m==NULL) CkAbort("FutureBOC::SetFuture called with NULL!");
00363 #endif
00364     int key;
00365     key = UsrToEnv((void *)m)->getRef();
00366     setFuture( key, m);
00367   }
00368   void SetSema(FutureInitMsg *m) {
00369 #if CMK_ERROR_CHECKING
00370     if (m==NULL) CkAbort("FutureBOC::SetSema called with NULL!");
00371 #endif
00372     int idx;
00373     idx = UsrToEnv((void *)m)->getRef();
00374     CpvAccess(semapool)->signal(idx,(void*)m);
00375   }
00376 };
00377 
00378 extern "C" 
00379 void CkSendToFutureID(CkFutureID futNum, void *m, int PE)
00380 {
00381   UsrToEnv(m)->setRef(futNum);
00382   CProxy_FutureBOC fBOC(_fbocID);
00383   fBOC[PE].SetFuture((FutureInitMsg *)m);
00384 }
00385 
00386 void  CkSendToFuture(CkFuture fut, void *msg)
00387 {
00388   CkSendToFutureID(fut.id, msg, fut.pe);
00389 }
00390 
00391 CkSemaID CkSemaCreate(void)
00392 {
00393   CkSemaID id;
00394   id.pe = CkMyPe();
00395   id.idx = CpvAccess(semapool)->getNew();
00396   return id;
00397 }
00398 
00399 void *CkSemaWait(CkSemaID id)
00400 {
00401 #if CMK_ERROR_CHECKING
00402   if(id.pe != CkMyPe()) {
00403     CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
00404   }
00405 #endif
00406   return CpvAccess(semapool)->wait(id.idx);
00407 }
00408 
00409 void CkSemaWaitN(CkSemaID id, int n, void *marray[])
00410 {
00411 #if CMK_ERROR_CHECKING
00412   if(id.pe != CkMyPe()) {
00413     CkAbort("ERROR: Waiting on nonlocal semaphore! Aborting..\n");
00414   }
00415 #endif
00416   CpvAccess(semapool)->waitN(id.idx, n, marray);
00417 }
00418 
00419 void CkSemaSignal(CkSemaID id, void *m)
00420 {
00421   UsrToEnv(m)->setRef(id.idx);
00422   CProxy_FutureBOC fBOC(_fbocID);
00423   fBOC[id.pe].SetSema((FutureInitMsg *)m);
00424 }
00425 
00426 void CkSemaDestroy(CkSemaID id)
00427 {
00428 #if CMK_ERROR_CHECKING
00429   if(id.pe != CkMyPe()) {
00430     CkAbort("ERROR: destroying a nonlocal semaphore! Aborting..\n");
00431   }
00432 #endif
00433   CpvAccess(semapool)->release(id.idx);
00434 }
00435 
00436 
00438 #include "CkFutures.def.h"
00439