00001 
00002 
00003 
00004 
00005 
00006 
00007 #include "ampiimpl.h"
00008 
00009 
00010 
00011 
00012 
00013 #define WIN_SUCCESS 0
00014 #define WIN_ERROR   (-1)
00015 
00016 extern int AMPI_RDMA_THRESHOLD;
00017 extern int AMPI_SMP_RDMA_THRESHOLD;
00018 
00019 win_obj::win_obj() noexcept {
00020   baseAddr = NULL;
00021   comm = MPI_COMM_NULL;
00022   initflag = false;
00023 }
00024 
00025 win_obj::win_obj(const char *name, void *base, MPI_Aint size, int disp_unit,
00026                  MPI_Comm comm) noexcept {
00027   create(name, base, size, disp_unit, comm);
00028   owner = -1;  
00029 }
00030 
00031 void win_obj::setName(const char *src) noexcept {
00032   CkDDT_SetName(winName, src);
00033 }
00034 
00035 void win_obj::getName(char *name, int *len) noexcept {
00036   int length = *len = winName.size();
00037   memcpy(name, winName.data(), length);
00038   name[length] = '\0';
00039 }
00040 
00041 win_obj::~win_obj() noexcept {
00042   free();
00043 }
00044 
00045 
00046 
00047 void win_obj::pup(PUP::er &p) noexcept {
00048 #if 0
00049   p|winSize;
00050   p|disp_unit;
00051   p|comm;
00052   p|initflag;
00053 
00054   p|winName;
00055   p|keyvals;
00056 
00057   int size = 0;
00058   if(baseAddr) size = winSize;
00059   p|size;
00060   if(p.isUnpacking()) baseAddr = new char[size+1];
00061   p(baseAddr, size);
00062 #endif
00063 }
00064 
00065 int win_obj::create(const char *name, void *base, MPI_Aint size, int disp_unit, MPI_Comm comm) noexcept {
00066   if (name) setName(name);
00067   baseAddr = base;
00068   winSize = size*disp_unit;
00069   this->disp_unit = disp_unit;
00070   this->comm = comm;
00071   
00072   initflag = true;
00073   return WIN_SUCCESS;
00074 }
00075 
00076 int win_obj::free() noexcept {
00077   
00078   initflag = false;
00079   return WIN_SUCCESS;
00080 }
00081 
00082 
00083 
00084 
00085 int win_obj::put(void *orgaddr, int orgcnt, int orgunit, MPI_Aint targdisp,
00086                  int targcnt, int targunit) noexcept {
00087   if(!initflag) {
00088     CkAbort("Put to non-existing MPI_Win\n");
00089     return WIN_ERROR;
00090   }
00091   int totalsize = targdisp+targcnt*targunit;
00092   if(totalsize > (winSize)){
00093     CkAbort("Put size exceeds MPI_Win size\n");
00094     return WIN_ERROR;
00095   }
00096 
00097   return WIN_SUCCESS;
00098 }
00099 
00100 int win_obj::get(void *orgaddr, int orgcnt, int orgunit, MPI_Aint targdisp,
00101                  int targcnt, int targunit) noexcept {
00102   if(!initflag) {
00103     CkAbort("Get from non-existing MPI_Win\n");
00104     return WIN_ERROR;
00105   }
00106   int totalsize = targdisp+targcnt*targunit;
00107   if(totalsize > (winSize)){
00108     CkAbort("Get size exceeds MPI_Win size\n");
00109     return WIN_ERROR;
00110   }
00111   
00112 
00113   return WIN_SUCCESS;
00114 }
00115 
00116 int win_obj::iget(int orgcnt, int orgunit, MPI_Aint targdisp,
00117                   int targcnt, int targunit) noexcept {
00118   if(!initflag) {
00119     CkAbort("Get from non-existing MPI_Win\n");
00120     return WIN_ERROR;
00121   }
00122 
00123   if((targdisp+targcnt*targunit) > (winSize)){
00124     CkAbort("Get size exceeds MPI_Win size\n");
00125     return WIN_ERROR;
00126   }
00127   
00128 
00129   return WIN_SUCCESS;
00130 }
00131 
00132 int win_obj::accumulate(void *orgaddr, int count, MPI_Aint targdisp, MPI_Datatype targtype,
00133                         MPI_Op op, ampiParent* pptr) noexcept
00134 {
00135   
00136   CkAssert(pptr != NULL);
00137   pptr->applyOp(targtype, op, count, orgaddr, (void*)((char*)baseAddr+disp_unit*targdisp));
00138   return WIN_SUCCESS;
00139 }
00140 
00141 int win_obj::fence() noexcept {
00142   return WIN_SUCCESS;
00143 }
00144 
00145 int win_obj::lock(int requestRank, int lock_type) noexcept {
00146   owner = requestRank;
00147   return WIN_SUCCESS;
00148 }
00149 
00150 int win_obj::unlock(int requestRank) noexcept {
00151   if (owner != requestRank){
00152     CkPrintf("    ERROR: Can't unlock a lock which you don't own.\n");
00153     return WIN_ERROR;
00154   }
00155   owner = -1;
00156 
00157   
00158   dequeue();
00159 
00160   return WIN_SUCCESS;
00161 }
00162 
00163 void win_obj::dequeue() noexcept {
00164   lockQueueEntry *lq = lockQueue.deq();
00165   delete lq;
00166 }
00167 
00168 void win_obj::enqueue(int requestRank, int lock_type) noexcept {
00169   lockQueueEntry *lq = new lockQueueEntry(requestRank, lock_type);
00170   lockQueue.enq(lq);
00171 }
00172 
00173 bool win_obj::emptyQueue() noexcept {
00174   return (lockQueue.length()==0);
00175 }
00176 
00177 void win_obj::lockTopQueue() noexcept {
00178   lockQueueEntry *lq = lockQueue.deq();
00179   lock(lq->requestRank, lq->lock_type);
00180   lockQueue.insert(0, lq);
00181 }
00182 
00183 
00184 int win_obj::wait() noexcept {
00185   return -1;
00186 }
00187 
00188 int win_obj::post() noexcept {
00189   return -1;
00190 }
00191 
00192 int win_obj::start() noexcept {
00193   return -1;
00194 }
00195 
00196 int win_obj::complete() noexcept {
00197   return -1;
00198 }
00199 
00200 int ampiParent::addWinStruct(WinStruct* win) noexcept {
00201   winStructList.push_back(win);
00202   return winStructList.size()-1;
00203 }
00204 
00205 WinStruct *ampiParent::getWinStruct(MPI_Win win) const noexcept {
00206   return winStructList[(int)win];
00207 }
00208 
00209 void ampiParent::removeWinStruct(WinStruct *win) noexcept {}
00210 
00211 int ampi::winPut(const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00212                  MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, WinStruct *win) noexcept {
00213   CkDDT_DataType *ddt = getDDT()->getType(orgtype);
00214   int orgtotalsize = ddt->getSize(orgcnt);
00215   AMPI_DEBUG("    Rank[%d:%d] invoke Remote put at [%d]\n", thisIndex, myRank, rank);
00216 
00217   if (ddt->isContig()) {
00218     ampi *destPtr = thisProxy[rank].ckLocal();
00219     if (destPtr != NULL) {
00220       destPtr->winRemotePut(orgtotalsize, (char*)orgaddr, orgcnt, orgtype, targdisp,
00221                             targcnt, targtype, win->index);
00222     }
00223 #if AMPI_RDMA_IMPL
00224     else if (orgtotalsize >= AMPI_RDMA_THRESHOLD ||
00225             (orgtotalsize >= AMPI_SMP_RDMA_THRESHOLD && destLikelyWithinProcess(thisProxy, rank)))
00226     {
00227       AmpiRequestList& reqs = getReqs();
00228       SendReq* ampiReq = parent->reqPool.newReq<SendReq>(orgtype, myComm.getComm(), getDDT());
00229       MPI_Request req = reqs.insert(ampiReq);
00230       CkCallback completedSendCB(CkIndex_ampi::completedRdmaSend(NULL), thisProxy[thisIndex], true);
00231       completedSendCB.setRefnum(req);
00232       thisProxy[rank].winRemotePut(orgtotalsize, CkSendBuffer(orgaddr, completedSendCB), orgcnt, orgtype,
00233                                    targdisp, targcnt, targtype, win->index);
00234       ampiReq->wait(MPI_STATUS_IGNORE);
00235       reqs.free(parent->reqPool, req, getDDT());
00236     }
00237 #endif
00238     else {
00239       thisProxy[rank].winRemotePut(orgtotalsize, (char*)orgaddr, orgcnt, orgtype, targdisp,
00240                                    targcnt, targtype, win->index);
00241     }
00242   }
00243   else {
00244     vector<char> sorgaddr(orgtotalsize);
00245     int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00246     ddt->serialize((char*)orgaddr, sorgaddr.data(), orgcnt, orgsize, PACK);
00247     thisProxy[rank].winRemotePut(orgtotalsize, sorgaddr.data(), orgcnt, orgtype, targdisp,
00248                                  targcnt, targtype, win->index);
00249   }
00250 
00251   return MPI_SUCCESS;
00252 }
00253 
00254 void ampi::winRemotePut(int orgtotalsize, char* sorgaddr, int orgcnt, MPI_Datatype orgtype,
00255                         MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, int winIndex) noexcept {
00256   win_obj *winobj = winObjects[winIndex];
00257   CkDDT_DataType *tddt = getDDT()->getType(targtype);
00258   int targunit = tddt->getSize();
00259   int orgunit = getDDT()->getSize(orgtype);
00260 
00261   winobj->put(sorgaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00262   char* targaddr = ((char*)(winobj->baseAddr)) + winobj->disp_unit*targdisp;
00263   int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00264   tddt->serialize(targaddr, (char*)sorgaddr, targcnt, targsize, UNPACK);
00265 }
00266 
00267 int ampi::winGet(void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00268                  MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00269                  WinStruct *win) noexcept {
00270   
00271   AMPI_DEBUG("    Rank[%d:%d] invoke Remote get at [%d]\n", thisIndex, myRank, rank);
00272   CkDDT_DataType *orgddt  = getDDT()->getType(orgtype);
00273   CkDDT_DataType *targddt = getDDT()->getType(targtype);
00274   int orgtotalsize  = orgddt->getSize(orgcnt);
00275   int targtotalsize = targddt->getSize(targcnt);
00276 
00277   
00278   
00279   if (orgddt->isContig() || targddt->isContig()) {
00280     ampi *destPtr = thisProxy[rank].ckLocal();
00281     if (destPtr != NULL) {
00282       char* targdata = destPtr->winLocalGet(orgcnt, orgtype, targdisp, targcnt, targtype, win->index);
00283       if (orgddt->isContig()) {
00284         int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00285         orgddt->serialize((char*)orgaddr, targdata, orgcnt, orgsize, UNPACK);
00286       } else {
00287         int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00288         targddt->serialize((char*)orgaddr, targdata, targcnt, targsize, PACK);
00289       }
00290       return MPI_SUCCESS;
00291     }
00292   }
00293 
00294   AmpiMsg* msg = thisProxy[rank].winRemoteGet(orgcnt, orgtype, targdisp, targcnt, targtype, win->index);
00295 
00296   
00297   int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00298   orgddt->serialize((char*)orgaddr, msg->getData(), orgcnt, orgsize, UNPACK);
00299   AMPI_DEBUG("    Rank[%d] got win  [%d] \n", thisIndex, *(int*)msg->getData());
00300   AMPI_DEBUG("    Rank[%d] got win  [%d] , size %d\n", thisIndex, *(int*)orgaddr, orgcnt);
00301 
00302   delete msg;
00303   return MPI_SUCCESS;
00304 }
00305 
00306 char* ampi::winLocalGet(int orgcnt, MPI_Datatype orgtype, MPI_Aint targdisp, int targcnt,
00307                         MPI_Datatype targtype, int winIndex) noexcept {
00308   AMPI_DEBUG("    LocalGet invoked at Rank[%d:%d]\n", thisIndex, myRank);
00309 
00310   win_obj *winobj = winObjects[winIndex];
00311   CkDDT_DataType *tddt = getDDT()->getType(targtype);
00312   int targunit = tddt->getSize();
00313   int targtotalsize = winobj->disp_unit*targcnt;
00314   int orgunit = getDDT()->getSize(orgtype);
00315   char* targaddr = (char*)(winobj->baseAddr) + winobj->disp_unit*targdisp;
00316 
00317   winobj->get(targaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00318 
00319   AMPI_DEBUG("    Rank[%d] local get win  [%d] \n", thisIndex, *(int*)(targaddr));
00320   return targaddr;
00321 }
00322 
00323 AmpiMsg* ampi::winRemoteGet(int orgcnt, MPI_Datatype orgtype, MPI_Aint targdisp, int targcnt,
00324                             MPI_Datatype targtype, int winIndex) noexcept {
00325   AMPI_DEBUG("    RemoteGet invoked at Rank[%d:%d]\n", thisIndex, myRank);
00326 
00327   win_obj *winobj = winObjects[winIndex];
00328   CkDDT_DataType *tddt = getDDT()->getType(targtype);
00329   int targunit = tddt->getSize();
00330   int targtotalsize = winobj->disp_unit*targcnt;
00331   int orgunit = getDDT()->getSize(orgtype);
00332   char* targaddr = (char*)(winobj->baseAddr) + winobj->disp_unit*targdisp;
00333 
00334   winobj->get(targaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00335 
00336   AMPI_DEBUG("    Rank[%d] get win  [%d] \n", thisIndex, *(int*)(targaddr));
00337   AmpiMsg *msg = new (targtotalsize, 0) AmpiMsg(0, 0, MPI_RMA_TAG, thisIndex, targtotalsize);
00338   int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00339   tddt->serialize(targaddr, msg->getData(), targcnt, targsize, PACK);
00340   return msg;
00341 }
00342 
00343 int ampi::winIget(MPI_Aint orgdisp, int orgcnt, MPI_Datatype orgtype, int rank,
00344                   MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00345                   WinStruct *win, MPI_Request *req) noexcept {
00346   
00347   AMPI_DEBUG("    Rank[%d:%d] request Remote iget at [%d]\n", thisIndex, myRank, rank);
00348   *req = thisProxy[rank].winRemoteIget(orgdisp, orgcnt, orgtype, targdisp, targcnt, targtype, win->index);
00349   return MPI_SUCCESS;
00350 }
00351 
00352 AmpiMsg* ampi::winRemoteIget(MPI_Aint orgdisp, int orgcnt, MPI_Datatype orgtype,
00353                              MPI_Aint targdisp, int targcnt,
00354                              MPI_Datatype targtype, int winIndex) noexcept {
00355   AMPI_DEBUG("    RemoteIget invoked at Rank[%d:%d]\n", thisIndex, myRank);
00356   win_obj *winobj = winObjects[winIndex];
00357   CkDDT_DataType *tddt = getDDT()->getType(targtype);
00358   int targunit = tddt->getSize();
00359   int targtotalsize = winobj->disp_unit*targcnt;
00360   int orgunit = getDDT()->getSize(orgtype);
00361 
00362   winobj->iget(orgcnt, orgunit, targdisp, targcnt, targunit);
00363 
00364   AmpiMsg *msg = new (targtotalsize, 0) AmpiMsg(0, 0, MPI_RMA_TAG, thisIndex, targtotalsize);
00365 
00366   char* targaddr = (char*)(winobj->baseAddr) + targdisp*winobj->disp_unit;
00367   AMPI_DEBUG("    Rank[%d] iget win  [%d] \n", thisIndex, *(int*)(targaddr));
00368   int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00369   tddt->serialize(targaddr, msg->getData(), targcnt, targsize, PACK);
00370   AMPI_DEBUG("    Rank[%d] copy win  [%d] \n", thisIndex, *(int*)msg->getData());
00371   return msg;
00372 }
00373 
00374 int ampi::winIgetWait(MPI_Request *request, MPI_Status *status) noexcept {
00375   
00376   AMPI_DEBUG("    [%d] Iget Waiting\n", thisIndex, *request);
00377   status->msg = (AmpiMsg*)CkWaitReleaseFuture(*request);
00378   AMPI_DEBUG("    [%d] Iget Waiting [%d] awaken\n", thisIndex, *request);
00379   return MPI_SUCCESS;
00380 }
00381 
00382 int ampi::winIgetFree(MPI_Request *request, MPI_Status *status) noexcept {
00383   AMPI_DEBUG("    [%d] : Iget [%d] frees buffer\n", thisIndex, *request);
00384 
00385   void *data = NULL;
00386   AMPI_Iget_data(&data, *status);
00387   if(!data) {
00388     AMPI_DEBUG("    [%d] Iget [%d] attempt to free NULL buffer \n", thisIndex, *request);
00389     return ampiErrhandler("AMPI_Iget_free", MPI_ERR_BUFFER);
00390   }
00391   else {
00392     delete (status->msg);
00393     return MPI_SUCCESS;
00394   }
00395 }
00396 
00397 int ampi::winAccumulate(const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00398                         MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00399                         MPI_Op op, WinStruct *win) noexcept {
00400   CkDDT_DataType *ddt = getDDT()->getType(orgtype);
00401   int orgtotalsize = ddt->getSize(orgcnt);
00402   AMPI_DEBUG("    Rank[%d:%d] invoke Remote accumulate at [%d]\n", thisIndex, myRank, rank);
00403 
00404   if (ddt->isContig()) {
00405     ampi *destPtr = thisProxy[rank].ckLocal();
00406     if (destPtr != NULL) {
00407       destPtr->winRemoteAccumulate(orgtotalsize, (char*)orgaddr, orgcnt, orgtype, targdisp,
00408                                    targcnt, targtype, op, win->index);
00409     }
00410 #if AMPI_RDMA_IMPL
00411     else if (orgtotalsize >= AMPI_RDMA_THRESHOLD ||
00412             (orgtotalsize >= AMPI_SMP_RDMA_THRESHOLD && destLikelyWithinProcess(thisProxy, rank)))
00413     {
00414       AmpiRequestList& reqs = getReqs();
00415       SendReq* ampiReq = parent->reqPool.newReq<SendReq>(orgtype, myComm.getComm(), getDDT());
00416       MPI_Request req = reqs.insert(ampiReq);
00417       CkCallback completedSendCB(CkIndex_ampi::completedRdmaSend(NULL), thisProxy[thisIndex], true);
00418       completedSendCB.setRefnum(req);
00419       thisProxy[rank].winRemoteAccumulate(orgtotalsize, CkSendBuffer(orgaddr, completedSendCB), orgcnt,
00420                                           orgtype, targdisp, targcnt, targtype,  op, win->index);
00421       ampiReq->wait(MPI_STATUS_IGNORE);
00422       reqs.free(parent->reqPool, req, getDDT());
00423     }
00424 #endif
00425     else {
00426       thisProxy[rank].winRemoteAccumulate(orgtotalsize, (char*)orgaddr, orgcnt, orgtype,
00427                                           targdisp, targcnt, targtype,  op, win->index);
00428     }
00429   }
00430   else {
00431     vector<char> sorgaddr(orgtotalsize);
00432     int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00433     ddt->serialize((char*)orgaddr, sorgaddr.data(), orgcnt, orgsize, PACK);
00434     thisProxy[rank].winRemoteAccumulate(orgtotalsize, sorgaddr.data(), orgcnt, orgtype,
00435                                         targdisp, targcnt, targtype,  op, win->index);
00436   }
00437 
00438   return MPI_SUCCESS;
00439 }
00440 
00441 void ampi::winRemoteAccumulate(int orgtotalsize, char* sorgaddr, int orgcnt,
00442                                MPI_Datatype orgtype, MPI_Aint targdisp,
00443                                int targcnt, MPI_Datatype targtype, MPI_Op op,
00444                                int winIndex) noexcept {
00445   win_obj *winobj = winObjects[winIndex];
00446   CkDDT_DataType *ddt = getDDT()->getType(targtype);
00447   if (ddt->isContig()) {
00448     winobj->accumulate(sorgaddr, targcnt, targdisp, targtype, op, parent);
00449   }
00450   else {
00451     vector<char> getdata(orgtotalsize);
00452     int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00453     ddt->serialize(getdata.data(), sorgaddr, targcnt, targsize, UNPACK);
00454     winobj->accumulate(getdata.data(), targcnt, targdisp, targtype, op, parent);
00455   }
00456 }
00457 
00458 int ampi::winGetAccumulate(const void *orgaddr, int orgcnt, MPI_Datatype orgtype,
00459                            void *resaddr, int rescnt, MPI_Datatype restype, int rank,
00460                            MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00461                            MPI_Op op, WinStruct *win) noexcept {
00462   CkDDT_DataType *orgddt = getDDT()->getType(orgtype);
00463   CkDDT_DataType *resddt = getDDT()->getType(restype);
00464   int orgtotalsize = orgddt->getSize(orgcnt);
00465   AMPI_DEBUG("    Rank[%d:%d] invoke Remote get at [%d]\n", thisIndex, myRank, rank);
00466 
00467   AmpiMsg *msg;
00468   if (orgddt->isContig()) {
00469     ampi *destPtr = thisProxy[rank].ckLocal();
00470     if (destPtr != NULL) {
00471       destPtr->winLocalGetAccumulate(orgtotalsize, (char*)orgaddr, orgcnt, orgtype, targdisp,
00472                                      targcnt, targtype, op, (char*)resaddr, win->index);
00473       return MPI_SUCCESS;
00474     }
00475 #if AMPI_RDMA_IMPL
00476     else if (orgtotalsize >= AMPI_RDMA_THRESHOLD ||
00477             (orgtotalsize >= AMPI_SMP_RDMA_THRESHOLD && destLikelyWithinProcess(thisProxy, rank)))
00478     {
00479       AmpiRequestList& reqs = getReqs();
00480       SendReq* ampiReq = parent->reqPool.newReq<SendReq>(orgtype, myComm.getComm(), getDDT());
00481       MPI_Request req = reqs.insert(ampiReq);
00482       CkCallback completedSendCB(CkIndex_ampi::completedRdmaSend(NULL), thisProxy[thisIndex], true);
00483       completedSendCB.setRefnum(req);
00484       msg = thisProxy[rank].winRemoteGetAccumulate(orgtotalsize, CkSendBuffer(orgaddr, completedSendCB), orgcnt,
00485                                                    orgtype, targdisp, targcnt, targtype, op, win->index);
00486       ampiReq->wait(MPI_STATUS_IGNORE);
00487       reqs.free(parent->reqPool, req, getDDT());
00488     }
00489 #endif
00490     else {
00491       msg = thisProxy[rank].winRemoteGetAccumulate(orgtotalsize, CkSendBuffer(orgaddr), orgcnt, orgtype, targdisp,
00492                                                    targcnt, targtype, op, win->index);
00493     }
00494   }
00495   else {
00496     vector<char> sorgaddr(orgtotalsize);
00497     int orgsize = getDDT()->getType(orgtype)->getSize(orgcnt);
00498     orgddt->serialize((char*)orgaddr, sorgaddr.data(), orgcnt, orgsize, PACK);
00499     msg = thisProxy[rank].winRemoteGetAccumulate(orgtotalsize, sorgaddr.data(), orgcnt, orgtype, targdisp,
00500                                                  targcnt, targtype, op, win->index);
00501   }
00502 
00503   int ressize = getDDT()->getType(restype)->getSize(rescnt);
00504   resddt->serialize((char*)resaddr, msg->getData(), rescnt, ressize, UNPACK);
00505   delete msg;
00506 
00507   return MPI_SUCCESS;
00508 }
00509 
00510 void ampi::winLocalGetAccumulate(int orgtotalsize, char* sorgaddr, int orgcnt, MPI_Datatype orgtype,
00511                                  MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Op op,
00512                                  char *resaddr, int winIndex) noexcept {
00513   win_obj *winobj = winObjects[winIndex];
00514   CkDDT_DataType *tddt = getDDT()->getType(targtype);
00515   int targunit = tddt->getSize();
00516   int targtotalsize = winobj->disp_unit*targcnt;
00517   int orgunit = getDDT()->getSize(orgtype);
00518   char* targaddr = (char*)(winobj->baseAddr) + winobj->disp_unit*targdisp;
00519 
00520   
00521   winobj->get(targaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00522   int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00523   tddt->serialize(targaddr, resaddr, targcnt, targsize, PACK);
00524 
00525   
00526   if (tddt->isContig()) {
00527     winobj->accumulate(sorgaddr, targcnt, targdisp, targtype, op, parent);
00528   }
00529   else {
00530     vector<char> getdata(orgtotalsize);
00531     int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00532     tddt->serialize(getdata.data(), sorgaddr, targcnt, targsize, UNPACK);
00533     winobj->accumulate(getdata.data(), targcnt, targdisp, targtype, op, parent);
00534   }
00535 }
00536 
00537 AmpiMsg* ampi::winRemoteGetAccumulate(int orgtotalsize, char* sorgaddr, int orgcnt, MPI_Datatype orgtype,
00538                                       MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Op op,
00539                                       int winIndex) noexcept {
00540   win_obj *winobj = winObjects[winIndex];
00541   CkDDT_DataType *tddt = getDDT()->getType(targtype);
00542   int targunit = tddt->getSize();
00543   int targtotalsize = winobj->disp_unit*targcnt;
00544   int orgunit = getDDT()->getSize(orgtype);
00545   char* targaddr = (char*)(winobj->baseAddr) + winobj->disp_unit*targdisp;
00546 
00547   
00548   winobj->get(targaddr, orgcnt, orgunit, targdisp, targcnt, targunit);
00549   AmpiMsg *msg = new (targtotalsize, 0) AmpiMsg(0, 0, MPI_RMA_TAG, thisIndex, targtotalsize);
00550   int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00551   tddt->serialize(targaddr, msg->getData(), targcnt, targsize, PACK);
00552 
00553   
00554   if (tddt->isContig()) {
00555     winobj->accumulate(sorgaddr, targcnt, targdisp, targtype, op, parent);
00556   }
00557   else {
00558     vector<char> getdata(orgtotalsize);
00559     int targsize = getDDT()->getType(targtype)->getSize(targcnt);
00560     tddt->serialize(getdata.data(), sorgaddr, targcnt, targsize, UNPACK);
00561     winobj->accumulate(getdata.data(), targcnt, targdisp, targtype, op, parent);
00562   }
00563 
00564   return msg;
00565 }
00566 
00567 int ampi::winCompareAndSwap(const void *orgaddr, const void *compaddr, void *resaddr, MPI_Datatype type,
00568                             int rank, MPI_Aint targdisp, WinStruct *win) noexcept {
00569   CkDDT_DataType *ddt = getDDT()->getType(type);
00570 
00571   if (ddt->isContig()) {
00572     ampi *destPtr = thisProxy[rank].ckLocal();
00573     if (destPtr != NULL) {
00574       char* targaddr = destPtr->winLocalCompareAndSwap(ddt->getSize(), (char*)orgaddr,
00575                                                        (char*)compaddr, type, targdisp, win->index);
00576       int targsize = getDDT()->getType(type)->getSize(1);
00577       ddt->serialize((char*)resaddr, targaddr, 1, targsize, PACK);
00578       return MPI_SUCCESS;
00579     }
00580   }
00581 
00582   AmpiMsg* msg = thisProxy[rank].winRemoteCompareAndSwap(getDDT()->getType(type)->getSize(1), (char*)orgaddr,
00583                                                          (char*)compaddr, type, targdisp, win->index);
00584   int ressize = getDDT()->getType(type)->getSize(1);
00585   ddt->serialize((char*)resaddr, msg->getData(), 1, ressize, PACK);
00586 
00587   delete msg;
00588   return MPI_SUCCESS;
00589 }
00590 
00591 char* ampi::winLocalCompareAndSwap(int size, char* sorgaddr, char* compaddr, MPI_Datatype type,
00592                                    MPI_Aint targdisp, int winIndex) noexcept {
00593   win_obj *winobj = winObjects[winIndex];
00594   winobj->put(sorgaddr, 1, size, targdisp, 1, size);
00595 
00596   CkDDT_DataType *ddt = getDDT()->getType(type);
00597   char* targaddr = ((char*)(winobj->baseAddr)) + ddt->getSize(targdisp);
00598 
00599   if (*targaddr == *compaddr) {
00600     int size = ddt->getSize(1);
00601     ddt->serialize(targaddr, (char*)sorgaddr, 1, size, UNPACK);
00602   }
00603 
00604   return targaddr;
00605 }
00606 
00607 AmpiMsg* ampi::winRemoteCompareAndSwap(int size, char* sorgaddr, char* compaddr, MPI_Datatype type,
00608                                        MPI_Aint targdisp, int winIndex) noexcept {
00609   win_obj *winobj = winObjects[winIndex];
00610   winobj->put(sorgaddr, 1, size, targdisp, 1, size);
00611 
00612   CkDDT_DataType *ddt = getDDT()->getType(type);
00613   char* targaddr = ((char*)(winobj->baseAddr)) + ddt->getSize(targdisp);
00614 
00615   AmpiMsg *msg = new (size, 0) AmpiMsg(0, 0, MPI_RMA_TAG, thisIndex, size);
00616   ddt->serialize(targaddr, msg->getData(), 1, msg->getLength(), PACK);
00617 
00618   if (*targaddr == *compaddr) {
00619     ddt->serialize(targaddr, (char*)sorgaddr, 1, ddt->getSize(1), UNPACK);
00620   }
00621 
00622   return msg;
00623 }
00624 
00625 int ampi::winLock(int lock_type, int rank, WinStruct *win) noexcept {
00626   AMPI_DEBUG("    [%d] Lock: invoke Remote lock at [%d]\n", thisIndex, rank);
00627   thisProxy[rank].winRemoteLock(lock_type, win->index, thisIndex);
00628   return MPI_SUCCESS;
00629 }
00630 
00631 void ampi::winRemoteLock(int lock_type, int winIndex, int requestRank) noexcept {
00632   AMPI_DEBUG("    [%d] RemoteLock: invoked \n", thisIndex);
00633   win_obj *winobj = winObjects[winIndex];
00634 
00635   
00636   if(winobj->owner > -1 && !(winobj->emptyQueue()))  {
00637   
00638     winobj->enqueue(requestRank, lock_type);
00639     AMPI_DEBUG("    [%d] RemoteLock: queue lock from [%d] \n", thisIndex, requestRank);
00640   }
00641   
00642   else {
00643     winobj->lock(requestRank, lock_type);
00644     winobj->enqueue(requestRank, lock_type);
00645     AMPI_DEBUG("    [%d] RemoteLock: give lock to [%d] \n", thisIndex, requestRank);
00646   }
00647 }
00648 
00649 int ampi::winUnlock(int rank, WinStruct *win) noexcept {
00650   AMPI_DEBUG("    [%d] Unlock: invoke Remote lock at [%d]\n", thisIndex, rank);
00651   thisProxy[rank].winRemoteUnlock(win->index, thisIndex);
00652   return MPI_SUCCESS;
00653 }
00654 
00655 void ampi::winRemoteUnlock(int winIndex, int requestRank) noexcept {
00656   AMPI_DEBUG("    [%d] RemoteUnlock: invoked \n", thisIndex);
00657   win_obj *winobj = winObjects[winIndex];
00658   winobj->unlock(requestRank);
00659   AMPI_DEBUG("    [%d] RemoteUnlock: [%d] release lock\n", thisIndex, requestRank);
00660 
00661   
00662   if(!(winobj->emptyQueue())) {
00663     AMPI_DEBUG("    [%d] RemoteUnlock: queue non-empty, give lock to \n", thisIndex);
00664     winobj->lockTopQueue();
00665   }
00666 }
00667 
00668 MPI_Win ampi::createWinInstance(void *base, MPI_Aint size, int disp_unit, MPI_Info info) noexcept {
00669   AMPI_DEBUG("     Creating win obj {%d, %p}\n ", myComm.getComm(), base);
00670   win_obj *newobj = new win_obj((char*)(NULL), base, size, disp_unit, myComm.getComm());
00671   winObjects.push_back(newobj);
00672   WinStruct *newwin = new WinStruct(myComm.getComm(),winObjects.size()-1);
00673   AMPI_DEBUG("     Creating MPI_WIN at (%p) with {%d, %d}\n", &newwin, myComm.getComm(), winObjects.size()-1);
00674   return (parent->addWinStruct(newwin));
00675 }
00676 
00677 int ampi::deleteWinInstance(MPI_Win win) noexcept {
00678   WinStruct *winStruct = parent->getWinStruct(win);
00679   win_obj *winobj = winObjects[winStruct->index];
00680   parent->removeWinStruct(winStruct); 
00681   winobj->free();
00682   return MPI_SUCCESS;
00683 }
00684 
00685 int ampi::winGetGroup(WinStruct *win, MPI_Group *group) const noexcept {
00686    *group = parent->comm2group(win->comm);
00687    return MPI_SUCCESS;
00688 }
00689 
00690 void ampi::winSetName(WinStruct *win, const char *name) noexcept {
00691   win_obj *winobj = winObjects[win->index];
00692   winobj->setName(name);
00693 }
00694 
00695 void ampi::winGetName(WinStruct *win, char *name, int *length) const noexcept {
00696   win_obj *winobj = winObjects[win->index];
00697   winobj->getName(name, length);
00698 }
00699 
00700 win_obj* ampi::getWinObjInstance(WinStruct *win) const noexcept {
00701   return winObjects[win->index];
00702 }
00703 
00704 
00705 
00706 
00707 
00708 
00709 
00710 
00711 
00712 
00713 
00714 
00715 
00716 
00717 
00718 
00719 
00720 
00721 
00722 
00723 
00724 AMPI_API_IMPL(int, MPI_Win_create, void *base, MPI_Aint size, int disp_unit,
00725                                    MPI_Info info, MPI_Comm comm, MPI_Win *newwin)
00726 {
00727   AMPI_API("AMPI_Win_create");
00728   ampiParent *parent = getAmpiParent();
00729   ampi *ptr = getAmpiInstance(comm);
00730   *newwin = ptr->createWinInstance(base, size, disp_unit, info);
00731   
00732   WinStruct *winStruct = parent->getWinStruct(*newwin);
00733   vector<int>& keyvals = ptr->getWinObjInstance(winStruct)->getKeyvals();
00734   parent->setAttr(*newwin, keyvals, MPI_WIN_BASE, &base);
00735   parent->setAttr(*newwin, keyvals, MPI_WIN_SIZE, &size);
00736   parent->setAttr(*newwin, keyvals, MPI_WIN_DISP_UNIT, &disp_unit);
00737   ptr->barrier(); 
00738   return MPI_SUCCESS;
00739 }
00740 
00741 
00742 
00743 
00744 
00745 
00746 
00747 AMPI_API_IMPL(int, MPI_Win_free, MPI_Win *win)
00748 {
00749   AMPI_API("AMPI_Win_free");
00750   if(win==NULL) { return ampiErrhandler("AMPI_Win_free", MPI_ERR_WIN); }
00751 
00752   WinStruct *winStruct = getAmpiParent()->getWinStruct(*win);
00753   ampi *ptr = getAmpiInstance(winStruct->comm);
00754   ptr->deleteWinInstance(*win);
00755   
00756   ptr->barrier();
00757   *win = MPI_WIN_NULL;
00758   return MPI_SUCCESS;
00759 }
00760 
00761 
00762 
00763 
00764 
00765 
00766 AMPI_API_IMPL(int, MPI_Put, const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00767                             MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Win win)
00768 {
00769   AMPI_API("AMPI_Put");
00770   if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00771   handle_MPI_BOTTOM((void*&)orgaddr, orgtype);
00772   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00773   ampi *ptr = getAmpiInstance(winStruct->comm);
00774   return ptr->winPut(orgaddr, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct);
00775 }
00776 
00777 
00778 
00779 
00780 
00781 
00782 AMPI_API_IMPL(int, MPI_Get, void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00783                             MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00784                             MPI_Win win)
00785 {
00786   AMPI_API("AMPI_Get");
00787   if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00788   handle_MPI_BOTTOM(orgaddr, orgtype);
00789   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00790   ampi *ptr = getAmpiInstance(winStruct->comm);
00791   
00792   return  ptr->winGet(orgaddr, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct);
00793 }
00794 
00795 
00796 
00797 
00798 
00799 
00800 
00801 
00802 
00803 
00804 
00805 AMPI_API_IMPL(int, MPI_Accumulate, const void *orgaddr, int orgcnt, MPI_Datatype orgtype,
00806                                    int rank, MPI_Aint targdisp, int targcnt,
00807                                    MPI_Datatype targtype, MPI_Op op, MPI_Win win)
00808 {
00809   AMPI_API("AMPI_Accumulate");
00810   if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00811   handle_MPI_BOTTOM((void*&)orgaddr, orgtype);
00812   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00813   ampi *ptr = getAmpiInstance(winStruct->comm);
00814   return ptr->winAccumulate(orgaddr, orgcnt, orgtype, rank,
00815                             targdisp, targcnt, targtype, op, winStruct);
00816 }
00817 
00818 
00819 
00820 
00821 
00822 
00823 
00824 
00825 AMPI_API_IMPL(int, MPI_Get_accumulate, const void *orgaddr, int orgcnt, MPI_Datatype orgtype,
00826                                        void *resaddr, int rescnt, MPI_Datatype restype,
00827                                        int rank, MPI_Aint targdisp, int targcnt,
00828                                        MPI_Datatype targtype, MPI_Op op, MPI_Win win)
00829 {
00830   AMPI_API("AMPI_Get_accumulate");
00831   if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00832   handle_MPI_BOTTOM((void*&)orgaddr, orgtype);
00833   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00834   ampi *ptr = getAmpiInstance(winStruct->comm);
00835   return ptr->winGetAccumulate(orgaddr, orgcnt, orgtype, resaddr, rescnt, restype,
00836                                rank, targdisp, targcnt, targtype, op, winStruct);
00837 }
00838 
00839 
00840 
00841 
00842 
00843 
00844 
00845 
00846 
00847 AMPI_API_IMPL(int, MPI_Rput, const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00848                              MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Win win,
00849                              MPI_Request *request)
00850 {
00851   AMPI_API("AMPI_Rput");
00852   if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00853   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00854   ampi *ptr = getAmpiInstance(winStruct->comm);
00855   *request = ptr->postReq(getAmpiParent()->reqPool.newReq<SendReq>(orgtype, winStruct->comm, ptr->getDDT(), AMPI_REQ_COMPLETED));
00856   return ptr->winPut(orgaddr, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct);
00857 }
00858 
00859 
00860 
00861 
00862 
00863 
00864 
00865 
00866 AMPI_API_IMPL(int, MPI_Rget, void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00867                              MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00868                              MPI_Win win, MPI_Request *request)
00869 {
00870   AMPI_API("AMPI_Rget");
00871   if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00872   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00873   ampi *ptr = getAmpiInstance(winStruct->comm);
00874   *request = ptr->postReq(getAmpiParent()->reqPool.newReq<SendReq>(orgtype, winStruct->comm, ptr->getDDT(), AMPI_REQ_COMPLETED));
00875   return ptr->winGet(orgaddr, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct);
00876 }
00877 
00878 
00879 
00880 
00881 
00882 
00883 
00884 
00885 
00886 AMPI_API_IMPL(int, MPI_Raccumulate, const void *orgaddr, int orgcnt, MPI_Datatype orgtype, int rank,
00887                                     MPI_Aint targdisp, int targcnt, MPI_Datatype targtype,
00888                                     MPI_Op op, MPI_Win win, MPI_Request *request)
00889 {
00890   AMPI_API("AMPI_Raccumulate");
00891   if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00892   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00893   ampi *ptr = getAmpiInstance(winStruct->comm);
00894   *request = ptr->postReq(getAmpiParent()->reqPool.newReq<SendReq>(orgtype, winStruct->comm, ptr->getDDT(), AMPI_REQ_COMPLETED));
00895   return ptr->winAccumulate(orgaddr, orgcnt, orgtype, rank,
00896                             targdisp, targcnt, targtype, op, winStruct);
00897 }
00898 
00899 
00900 
00901 
00902 
00903 
00904 
00905 
00906 
00907 AMPI_API_IMPL(int, MPI_Rget_accumulate, const void *orgaddr, int orgcnt, MPI_Datatype orgtype,
00908                                         void *resaddr, int rescnt, MPI_Datatype restype,
00909                                         int rank, MPI_Aint targdisp, int targcnt,
00910                                         MPI_Datatype targtype, MPI_Op op, MPI_Win win,
00911                                         MPI_Request *request)
00912 {
00913   AMPI_API("AMPI_Rget_accumulate");
00914   if (targtype > AMPI_MAX_PREDEFINED_TYPE) {CkAbort("AMPI does not currently support RMA with derived datatypes.");}
00915   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00916   ampi *ptr = getAmpiInstance(winStruct->comm);
00917   *request = ptr->postReq(getAmpiParent()->reqPool.newReq<SendReq>(orgtype, winStruct->comm, ptr->getDDT(), AMPI_REQ_COMPLETED));
00918   return ptr->winGetAccumulate(orgaddr, orgcnt, orgtype, resaddr, rescnt, restype,
00919                                rank, targdisp, targcnt, targtype, op, winStruct);
00920 }
00921 
00922 
00923 
00924 
00925 
00926 
00927 AMPI_API_IMPL(int, MPI_Fetch_and_op, const void *orgaddr, void *resaddr, MPI_Datatype type,
00928                                      int rank, MPI_Aint targdisp, MPI_Op op, MPI_Win win)
00929 {
00930   AMPI_API("AMPI_Fetch_and_op");
00931   #if AMPI_ERROR_CHECKING
00932     if (type > AMPI_MAX_PREDEFINED_TYPE)
00933     {
00934       return ampiErrhandler("AMPI_Fetch_and_op", MPI_ERR_UNSUPPORTED_OPERATION);
00935     }
00936   #endif
00937   handle_MPI_BOTTOM((void*&)orgaddr, type);
00938   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00939   ampi *ptr = getAmpiInstance(winStruct->comm);
00940   
00941   return ptr->winGetAccumulate(orgaddr, 1, type, resaddr, 1, type,
00942                                rank, targdisp, 1, type, op, winStruct);
00943 }
00944 
00945 
00946 
00947 
00948 
00949 
00950 AMPI_API_IMPL(int, MPI_Compare_and_swap, const void *orgaddr, const void *compaddr, void *resaddr,
00951                                          MPI_Datatype type, int rank, MPI_Aint targdisp, MPI_Win win)
00952 {
00953   AMPI_API("AMPI_Compare_and_swap");
00954   #if AMPI_ERROR_CHECKING
00955     if (type > AMPI_MAX_PREDEFINED_TYPE)
00956     {
00957       return ampiErrhandler("AMPI_Compare_and_swap", MPI_ERR_UNSUPPORTED_OPERATION);
00958     }
00959   #endif
00960   handle_MPI_BOTTOM((void*&)orgaddr, type);
00961   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00962   ampi *ptr = getAmpiInstance(winStruct->comm);
00963   return ptr->winCompareAndSwap(orgaddr, compaddr, resaddr, type, rank, targdisp, winStruct);
00964 }
00965 
00966 
00967 
00968 
00969 
00970 
00971 
00972 
00973 
00974 
00975 AMPI_API_IMPL(int, MPI_Win_fence, int assertion, MPI_Win win)
00976 {
00977   AMPI_API("AMPI_Win_fence");
00978   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
00979   MPI_Comm comm = winStruct->comm;
00980   ampi *ptr = getAmpiInstance(comm);
00981 
00982   
00983   ptr->barrier();
00984 
00985   
00986   
00987   return MPI_SUCCESS;
00988 }
00989 
00990 
00991 
00992 
00993 
00994 
00995 
00996 
00997 
00998 
00999 AMPI_API_IMPL(int, MPI_Win_lock, int lock_type, int rank, int assertion, MPI_Win win)
01000 {
01001   AMPI_API("AMPI_Win_lock");
01002   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01003   ampi *ptr = getAmpiInstance(winStruct->comm);
01004 
01005   
01006   
01007   ptr->winLock(lock_type, rank, winStruct);
01008   return MPI_SUCCESS;
01009 }
01010 
01011 
01012 
01013 
01014 
01015 
01016 
01017 
01018 
01019   
01020 AMPI_API_IMPL(int, MPI_Win_unlock, int rank, MPI_Win win)
01021 {
01022   AMPI_API("AMPI_Win_unlock");
01023   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01024   ampi *ptr = getAmpiInstance(winStruct->comm);
01025 
01026   
01027   
01028   ptr->winUnlock(rank, winStruct);
01029   return MPI_SUCCESS;
01030 }
01031 
01032 
01033 
01034 
01035 
01036 
01037 
01038 
01039 
01040 
01041 
01042 
01043 
01044 AMPI_API_IMPL(int, MPI_Win_post, MPI_Group group, int assertion, MPI_Win win)
01045 {
01046   AMPI_API("AMPI_Win_post");
01047 
01048   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01049   if (winStruct->isInEpoch()) {
01050     return ampiErrhandler("AMPI_Win_post", MPI_ERR_RMA_SYNC);
01051   } else {
01052     winStruct->setInEpoch(true);
01053   }
01054 
01055   int parentGroupSize;
01056   MPI_Group parentGroup;
01057   MPI_Comm_group(winStruct->comm, &parentGroup);
01058   MPI_Group_size(parentGroup, &parentGroupSize);
01059 
01060   std::vector<int> parentGroupRanks(parentGroupSize);
01061   std::vector<int> subsetGroupRanks(parentGroupSize);
01062   for (int i=0; i<parentGroupSize; i++) {
01063     parentGroupRanks[i] = i;
01064   }
01065   MPI_Group_translate_ranks(parentGroup, parentGroupSize, parentGroupRanks.data(), group, subsetGroupRanks.data());
01066 
01067   ampi *ptr = getAmpiInstance(winStruct->comm);
01068   int actualRanks = 0;
01069   for (int i=0; i<subsetGroupRanks.size(); i++) { 
01070     if (subsetGroupRanks[i] != MPI_UNDEFINED) {
01071       subsetGroupRanks[actualRanks++] = i;
01072       ptr->send(MPI_EPOCH_START_TAG, ptr->getRank(), NULL, 0, MPI_INT, subsetGroupRanks[actualRanks-1], winStruct->comm);
01073     }
01074   }
01075 
01076   for (int i=actualRanks; i<subsetGroupRanks.size(); i++) {
01077     subsetGroupRanks.pop_back();
01078   }
01079   winStruct->setExposureRankList(subsetGroupRanks);
01080   std::vector<MPI_Request> &reqList = winStruct->getRequestList();
01081   reqList.resize(subsetGroupRanks.size()); 
01082 
01083   return MPI_SUCCESS;
01084 }
01085 
01086 AMPI_API_IMPL(int, MPI_Win_wait, MPI_Win win)
01087 {
01088   AMPI_API("AMPI_Win_wait");
01089 
01090   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01091   if (!winStruct->isInEpoch()) {
01092     return ampiErrhandler("AMPI_Win_wait", MPI_ERR_RMA_SYNC);
01093   }
01094 
01095   ampi* ptr = getAmpiInstance(winStruct->comm);
01096   const std::vector<int> &exposureRankList = winStruct->getExposureRankList();
01097   std::vector<MPI_Request> &requestList = winStruct->getRequestList();
01098 
01099   
01100   if (!winStruct->AreRecvsPosted()) {
01101     for (int i=0; i<exposureRankList.size(); i++) {
01102       ptr->irecv(NULL, 0, MPI_INT, exposureRankList[i], MPI_EPOCH_END_TAG, winStruct->comm, &requestList[i]);
01103     }
01104   }
01105 
01106   MPI_Waitall(requestList.size(), requestList.data(), MPI_STATUSES_IGNORE);
01107   winStruct->clearEpochExposure();
01108 
01109   return MPI_SUCCESS;
01110 }
01111 
01112 AMPI_API_IMPL(int, MPI_Win_start, MPI_Group group, int assertion, MPI_Win win)
01113 {
01114   AMPI_API("AMPI_Win_start");
01115 
01116   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01117   if (winStruct->isInEpoch()) {
01118     return ampiErrhandler("AMPI_Win_start", MPI_ERR_RMA_SYNC);
01119   } else {
01120     winStruct->setInEpoch(true);
01121   }
01122 
01123   int parentGroupSize;
01124   MPI_Group parentGroup;
01125   MPI_Comm_group(winStruct->comm, &parentGroup);
01126   MPI_Group_size(parentGroup, &parentGroupSize);
01127 
01128   std::vector<int> subsetGroupRanks(parentGroupSize);
01129   std::vector<int> parentGroupRanks(parentGroupSize);
01130   for (int i=0; i<parentGroupSize; i++) {
01131     parentGroupRanks[i] = i;
01132   }
01133 
01134   MPI_Group_translate_ranks(parentGroup, parentGroupSize, parentGroupRanks.data(), group, subsetGroupRanks.data());
01135 
01136   ampi *ptr = getAmpiInstance(winStruct->comm);
01137   int actualRanks = 0;
01138   for (int i=0; i<subsetGroupRanks.size(); i++) {
01139     if (subsetGroupRanks[i] != MPI_UNDEFINED) {
01140       subsetGroupRanks[actualRanks++] = i;
01141       ptr->recv(MPI_EPOCH_START_TAG, subsetGroupRanks[actualRanks-1], NULL, 0, MPI_INT, winStruct->comm, MPI_STATUS_IGNORE);
01142     }
01143   }
01144 
01145   for (int i=actualRanks; i<subsetGroupRanks.size(); i++) {
01146     subsetGroupRanks.pop_back();
01147   }
01148   winStruct->setAccessRankList(subsetGroupRanks);
01149 
01150   return MPI_SUCCESS;
01151 }
01152 
01153 AMPI_API_IMPL(int, MPI_Win_complete, MPI_Win win)
01154 {
01155   AMPI_API("AMPI_Win_complete");
01156 
01157   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01158   if (!winStruct->isInEpoch()) {
01159     return ampiErrhandler("AMPI_Win_complete", MPI_ERR_RMA_SYNC);
01160   } else {
01161     winStruct->setInEpoch(true);
01162   }
01163 
01164   std::vector<int> &accessGroupRanks = winStruct->getAccessRankList();
01165   ampi *ptr = getAmpiInstance(winStruct->comm);
01166 
01167   for (int i=0; i<accessGroupRanks.size(); i++) {
01168     ptr->send(MPI_EPOCH_END_TAG, ptr->getRank(), NULL, 0, MPI_INT, accessGroupRanks[i], winStruct->comm);
01169   }
01170   winStruct->clearEpochAccess();
01171 
01172   return MPI_SUCCESS;
01173 }
01174 
01175 AMPI_API_IMPL(int, MPI_Win_test, MPI_Win win, int *flag)
01176 {
01177   AMPI_API("AMPI_Win_test");
01178 
01179   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01180   if (!winStruct->isInEpoch()) {
01181     return ampiErrhandler("AMPI_Win_test", MPI_ERR_RMA_SYNC);
01182   }
01183 
01184   std::vector<MPI_Request> &reqList = winStruct->getRequestList();
01185   const std::vector<int> &exposureRankList = winStruct->getExposureRankList();
01186   ampi* ptr = getAmpiInstance(winStruct->comm);
01187 
01188   
01189   if (!winStruct->AreRecvsPosted()) {
01190     for (int i=0; i<reqList.size(); i++) {
01191       ptr->irecv(NULL, 0, MPI_INT, exposureRankList[i], MPI_EPOCH_END_TAG, winStruct->comm, reqList.data());
01192     }
01193     winStruct->setAreRecvsPosted(true);
01194   }
01195 
01196   MPI_Testall(reqList.size(), reqList.data(), flag, MPI_STATUSES_IGNORE);
01197   if (*flag) {
01198     winStruct->clearEpochExposure();
01199   }
01200 
01201   return MPI_SUCCESS;
01202 }
01203 
01204 
01205 CLINKAGE
01206 int AMPI_Iget(MPI_Aint orgdisp, int orgcnt, MPI_Datatype orgtype, int rank,
01207               MPI_Aint targdisp, int targcnt, MPI_Datatype targtype, MPI_Win win,
01208               MPI_Request *request) {
01209   AMPI_API("AMPI_Iget");
01210   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01211   ampi *ptr = getAmpiInstance(winStruct->comm);
01212   
01213   return  ptr->winIget(orgdisp, orgcnt, orgtype, rank, targdisp, targcnt, targtype, winStruct,
01214                request);
01215 }
01216 
01217 CLINKAGE
01218 int AMPI_Iget_wait(MPI_Request *request, MPI_Status *status, MPI_Win win) {
01219   AMPI_API("AMPI_Iget_wait");
01220   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01221   ampi *ptr = getAmpiInstance(winStruct->comm);
01222   
01223   return  ptr->winIgetWait(request,status);
01224 }
01225 
01226 CLINKAGE
01227 int AMPI_Iget_free(MPI_Request *request, MPI_Status *status, MPI_Win win) {
01228   AMPI_API("AMPI_Iget_free");
01229   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01230   ampi *ptr = getAmpiInstance(winStruct->comm);
01231   
01232   return  ptr->winIgetFree(request, status);
01233 }
01234 
01235 CLINKAGE
01236 int AMPI_Iget_data(void *data, MPI_Status status) {
01237   *((char**)data) = ((AmpiMsg*)status.msg)->data;
01238   return MPI_SUCCESS;
01239 }
01240 
01241 
01242 
01243 
01244 
01245 
01246 
01247 
01248 
01249 
01250 
01251 
01252 
01253 AMPI_API_IMPL(int, MPI_Alloc_mem, MPI_Aint size, MPI_Info info, void *baseptr)
01254 {
01255   
01256   *(void **)baseptr = malloc(size);
01257   return MPI_SUCCESS;
01258 }
01259 
01260 
01261 
01262 
01263 
01264 AMPI_API_IMPL(int, MPI_Free_mem, void *baseptr)
01265 {
01266   
01267   free(baseptr);
01268   return MPI_SUCCESS;
01269 }
01270 
01271 AMPI_API_IMPL(int, MPI_Win_get_group, MPI_Win win, MPI_Group *group)
01272 {
01273   AMPI_API("AMPI_Win_get_group");
01274   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01275   ampi *ptr = getAmpiInstance(winStruct->comm);
01276   ptr->winGetGroup(winStruct, group);
01277   return MPI_SUCCESS;
01278 }
01279 
01280 AMPI_API_IMPL(int, MPI_Win_delete_attr, MPI_Win win, int key)
01281 {
01282   AMPI_API("AMPI_Win_delete_attr");
01283   ampiParent *parent = getAmpiParent();
01284   WinStruct *winStruct = parent->getWinStruct(win);
01285   vector<int>& keyvals = getAmpiInstance(winStruct->comm)->getWinObjInstance(winStruct)->getKeyvals();
01286   return parent->deleteAttr(win, keyvals, key);
01287 }
01288 
01289 AMPI_API_IMPL(int, MPI_Win_get_attr, MPI_Win win, int key, void* value, int* flag)
01290 {
01291   AMPI_API("AMPI_Win_get_attr");
01292   ampiParent *parent = getAmpiParent();
01293   WinStruct *winStruct = parent->getWinStruct(win);
01294   vector<int>& keyvals = getAmpiInstance(winStruct->comm)->getWinObjInstance(winStruct)->getKeyvals();
01295   return parent->getAttr(win, keyvals, key, value, flag);
01296 }
01297 
01298 AMPI_API_IMPL(int, MPI_Win_set_attr, MPI_Win win, int key, void* value)
01299 {
01300   AMPI_API("AMPI_Win_set_attr");
01301   ampiParent *parent = getAmpiParent();
01302   WinStruct *winStruct = parent->getWinStruct(win);
01303   vector<int>& keyvals = getAmpiInstance(winStruct->comm)->getWinObjInstance(winStruct)->getKeyvals();
01304   return parent->setAttr(win, keyvals, key, value);
01305 }
01306 
01307 AMPI_API_IMPL(int, MPI_Win_set_name, MPI_Win win, const char *name)
01308 {
01309   AMPI_API("AMPI_Win_set_name");
01310   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01311   ampi *ptr = getAmpiInstance(winStruct->comm);
01312   ptr->winSetName(winStruct, name);
01313   return MPI_SUCCESS;
01314 }
01315 
01316 AMPI_API_IMPL(int, MPI_Win_set_info, MPI_Win win, MPI_Info info)
01317 {
01318   AMPI_API("AMPI_Win_set_info");
01319   
01320   return MPI_SUCCESS;
01321 }
01322 
01323 AMPI_API_IMPL(int, MPI_Win_get_info, MPI_Win win, MPI_Info *info)
01324 {
01325   AMPI_API("AMPI_Win_get_info");
01326   
01327   *info = MPI_INFO_NULL;
01328   return MPI_SUCCESS;
01329 }
01330 
01331 AMPI_API_IMPL(int, MPI_Win_create_errhandler, MPI_Win_errhandler_function *win_errhandler_fn,
01332                                               MPI_Errhandler *errhandler)
01333 {
01334   AMPI_API("AMPI_Win_create_errhandler");
01335   return MPI_SUCCESS;
01336 }
01337 
01338 AMPI_API_IMPL(int, MPI_Win_call_errhandler, MPI_Win win, int errorcode)
01339 {
01340   AMPI_API("AMPI_Win_call_errhandler");
01341   CkPrintf("WARNING: AMPI does not support MPI_Win_call_errhandler (errorcode = %d)\n", errorcode);
01342   return MPI_SUCCESS;
01343 }
01344 
01345 AMPI_API_IMPL(int, MPI_Win_get_errhandler, MPI_Win win, MPI_Errhandler *errhandler)
01346 {
01347   AMPI_API("AMPI_Win_get_errhandler");
01348   return MPI_SUCCESS;
01349 }
01350 
01351 AMPI_API_IMPL(int, MPI_Win_set_errhandler, MPI_Win win, MPI_Errhandler errhandler)
01352 {
01353   AMPI_API("AMPI_Win_set_errhandler");
01354   return MPI_SUCCESS;
01355 }
01356 
01357 int MPI_win_null_copy_fn(MPI_Win win, int keyval, void *extra_state,
01358                          void *attr_in, void *attr_out, int *flag){
01359   (*flag) = 0;
01360   return MPI_SUCCESS;
01361 }
01362 
01363 int MPI_win_dup_fn(MPI_Win win, int keyval, void *extra_state,
01364                    void *attr_in, void *attr_out, int *flag){
01365   (*(void **)attr_out) = attr_in;
01366   (*flag) = 1;
01367   return MPI_SUCCESS;
01368 }
01369 
01370 int MPI_win_null_delete_fn(MPI_Win win, int keyval, void *attr, void *extra_state){
01371   return MPI_SUCCESS;
01372 }
01373 
01374 AMPI_API_IMPL(int, MPI_Win_create_keyval, MPI_Win_copy_attr_function *copy_fn,
01375                                           MPI_Win_delete_attr_function *delete_fn,
01376                                           int *keyval, void *extra_state)
01377 {
01378   AMPI_API("AMPI_Win_create_keyval");
01379   return getAmpiParent()->createKeyval(copy_fn,delete_fn,keyval,extra_state);
01380 }
01381 
01382 AMPI_API_IMPL(int, MPI_Win_free_keyval, int *keyval)
01383 {
01384   AMPI_API("AMPI_Win_free_keyval");
01385   ampiParent *parent = getAmpiParent();
01386   ampiCommStruct& cs = *(ampiCommStruct*)&getAmpiInstance(MPI_COMM_WORLD)->comm2CommStruct(MPI_COMM_WORLD);
01387   return getAmpiParent()->freeUserKeyval(MPI_COMM_WORLD, cs.getKeyvals(), keyval);
01388 }
01389 
01390 AMPI_API_IMPL(int, MPI_Win_get_name, MPI_Win win, char *name, int *length)
01391 {
01392   AMPI_API("AMPI_Win_get_name");
01393   WinStruct *winStruct = getAmpiParent()->getWinStruct(win);
01394   ampi *ptr = getAmpiInstance(winStruct->comm);
01395   ptr->winGetName(winStruct, name, length);
01396   return MPI_SUCCESS;
01397 }