00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 #include "liveViz.h"
00026 #include "liveViz_impl.h"
00027 #include "ImageData.h"
00028 
00029 
00030  CProxy_liveVizPollArray myGroupProxy;
00031 CkReduction::reducerType poll_image_combine_reducer;
00032 extern CkCallback clientGetImageCallback;
00033 
00034 
00035 
00036 class liveVizPollArray : public CBase_liveVizPollArray {
00037 private:
00038   CkMsgQ<liveVizRequestMsg> requestQueue; 
00039   CkQ<imageUnit> imageQueue;      
00040 
00041   void init(){
00042     CkAssert(requestQueue.length()==0); 
00043     CkAssert(imageQueue.length()==0);
00044     CkPrintf("init()\n");
00045   }
00046 
00047 public:
00048 
00049   liveVizPollArray() {init();}
00050 
00051   
00052   void request(liveVizRequestMsg *msg) {
00053     CkAssert(CkMyPe() == 0);
00054     if(imageQueue.length()>0){
00055       imageUnit image = imageQueue.deq();
00056       msg->req.wid = image.wid;
00057       msg->req.ht = image.ht;
00058       CkPrintf("wid=%d ht=%d\n", image.wid, image.ht);
00059       liveViz0Deposit(msg->req, image.imageData);
00060       delete msg;
00061     }
00062     else {
00063       requestQueue.enq(msg);
00064     }
00065   }
00066   
00067   
00068   void liveVizPoll0Deposit(int wid, int ht, int bpp, int len, byte * imageData) {
00069     
00070     if(requestQueue.length()>0){
00071       liveVizRequestMsg *msg = requestQueue.deq();
00072       msg->req.wid = wid;
00073       msg->req.ht = ht;
00074       CkPrintf("wid=%d ht=%d\n", wid, ht);
00075       liveViz0Deposit(msg->req, imageData);   
00076     }
00077     
00078     else{
00079       const imageUnit newimage(bpp,wid,ht,imageData);
00080       imageQueue.enq(newimage);
00081     }
00082   }
00083   
00084 
00085 };
00086 
00087 
00088 
00089 
00098 CkReductionMsg *imagePollCombineReducer(int nMsg,CkReductionMsg **msgs)
00099 {
00100   if (nMsg==1) { 
00101     CkReductionMsg *ret=msgs[0];
00102     msgs[0]=NULL; 
00103     return ret;
00104   }
00105 
00106   CkPrintf("Using hard-coded bpp=3\n");
00107   int bpp=3;
00108   ImageData imageData (bpp);
00109   
00110   CkReductionMsg* msg = CkReductionMsg::buildNew(imageData.CombineImageDataSize (nMsg,
00111                                                                                  msgs),
00112                                                  NULL, poll_image_combine_reducer);
00113   
00114   imageData.CombineImageData(nMsg, msgs, (byte*)(msg->getData()));
00115   return msg;
00116 }
00117 
00118 
00119 
00120 
00121 
00122 
00123 
00124 void vizPollReductionHandler(void *r_msg)
00125 {
00126   CkReductionMsg *msg = (CkReductionMsg*)r_msg;
00127   imageHeader *hdr=(imageHeader *)msg->getData();
00128   byte *srcData=sizeof(imageHeader)+(byte *)msg->getData();
00129   CkPrintf("Using hard-coded bpp=3\n");
00130   int bpp=3;
00131   CkRect destRect(0,0,hdr->req.wid,hdr->req.ht);
00132   int len = hdr->r.wid() * hdr->r.ht() * bpp;
00133   if (destRect==hdr->r) { 
00134     myGroupProxy[0].liveVizPoll0Deposit(hdr->r.wid(), hdr->r.ht(), bpp, len, srcData);
00135   }
00136   else { 
00137     CkImage src(hdr->r.wid(),hdr->r.ht(),bpp,srcData);
00138     CkAllocImage dest(hdr->req.wid,hdr->req.ht,bpp);
00139     dest.clear();
00140     dest.put(hdr->r.l,hdr->r.t,src);
00141     myGroupProxy[0].liveVizPoll0Deposit(hdr->r.wid(), hdr->r.ht(), bpp, len, dest.getData());
00142   }
00143   delete msg;
00144 }
00145 
00146 
00147 
00148 
00149 
00150 void liveVizPollDeposit(ArrayElement *client,
00151                         int startx, int starty, 
00152                         int sizex, int sizey,             
00153                         int imagewidth, int imageheight,  
00154                         const byte * src,
00155                         liveVizCombine_t combine_reducer_type, 
00156                         int bytes_per_pixel
00157                         )
00158 {
00159   
00160   CkPrintf("%d: liveVizPollDeposit> Deposited image at (%d,%d), (%d x %d) pixels\n",CkMyPe(),startx,starty,sizex,sizey);
00161   
00162   ImageData imageData (bytes_per_pixel);
00163  
00164   
00165   poll_image_combine_reducer=CkReduction::addReducer(imagePollCombineReducer, false, "imagePollCombineReducer");
00166   CkReductionMsg* msg = CkReductionMsg::buildNew(imageData.GetBuffSize (startx,
00167                                                                         starty,
00168                                                                         sizex,
00169                                                                         sizey,
00170                                                                         imagewidth,
00171                                                                         imageheight,
00172                                                                         src),
00173                                                  NULL, poll_image_combine_reducer);
00174   imageData.WriteHeader(combine_reducer_type,NULL,(byte*)(msg->getData()));
00175   imageData.AddImage (imagewidth, (byte*)(msg->getData()));
00176   
00177   
00178   msg->setCallback(CkCallback(vizPollReductionHandler));
00179   client->contribute(msg);
00180 }
00181 
00182 
00183 
00184 
00185 
00186 
00187 void liveVizPollInit()
00188 {
00189   if (CkMyPe()!=0) CkAbort("liveVizInit must be called only on processor 0!");
00190 
00191   myGroupProxy = CProxy_liveVizPollArray::ckNew();
00192  
00193   
00194   CkCallback myCB(CkIndex_liveVizPollArray::request(0), myGroupProxy[0]);
00195   clientGetImageCallback=myCB;
00196 
00197   liveViz0PollInit();
00198 
00199 #ifdef DEBUG_PRINTS
00200   CkPrintf("Done with liveVizPollInit\n");
00201 #endif
00202 
00203 }
00204 
00205 
00206 #include "liveVizPoll.def.h"
00207