00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 #include "adio.h"
00009 #include "adio_extern.h"
00010 
00011 #ifdef USE_DBG_LOGGING
00012   #define RDCOLL_DEBUG 1
00013 #endif
00014 #ifdef AGGREGATION_PROFILE
00015 #include "mpe.h"
00016 #endif
00017 
00018 
00019 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
00020                 datatype, int nprocs,
00021                 int myrank, ADIOI_Access
00022                 *others_req, ADIO_Offset *offset_list,
00023                 ADIO_Offset *len_list, int contig_access_count, 
00024                 ADIO_Offset
00025                 min_st_offset, ADIO_Offset fd_size,
00026                 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00027                 int *buf_idx, int *error_code);
00028 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00029                   *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
00030                   *len_list, int *send_size, int *recv_size,
00031                   int *count, int *start_pos, 
00032                   int *partial_send, 
00033                   int *recd_from_proc, int nprocs, 
00034                   int myrank, int
00035                   buftype_is_contig, int contig_access_count,
00036                   ADIO_Offset min_st_offset, 
00037                   ADIO_Offset fd_size,
00038                   ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
00039                   ADIOI_Access *others_req, 
00040                   int iter, 
00041                   MPI_Aint buftype_extent, int *buf_idx);
00042 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00043                    *flat_buf, char **recv_buf, ADIO_Offset 
00044                    *offset_list, ADIO_Offset *len_list, 
00045                    unsigned *recv_size, 
00046                    MPI_Request *requests, MPI_Status *statuses,
00047                    int *recd_from_proc, int nprocs,
00048                    int contig_access_count, 
00049                    ADIO_Offset min_st_offset, 
00050                    ADIO_Offset fd_size, ADIO_Offset *fd_start, 
00051                    ADIO_Offset *fd_end,
00052                    MPI_Aint buftype_extent);
00053 
00054 
00055 void ADIOI_GEN_ReadStridedColl(ADIO_File fd, void *buf, int count,
00056                    MPI_Datatype datatype, int file_ptr_type,
00057                    ADIO_Offset offset, ADIO_Status *status, int
00058                    *error_code)
00059 {
00060 
00061 
00062 
00063 
00064 
00065 
00066     ADIOI_Access *my_req; 
00067     
00068 
00069     
00070     ADIOI_Access *others_req;
00071     
00072 
00073 
00074     int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
00075     int contig_access_count=0, interleave_count = 0, buftype_is_contig;
00076     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00077     ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
00078     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
00079     *fd_end = NULL, *end_offsets = NULL;
00080     ADIO_Offset *len_list = NULL;
00081     int *buf_idx = NULL;
00082 
00083 #ifdef HAVE_STATUS_SET_BYTES
00084     int bufsize, size;
00085 #endif
00086 
00087     if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
00088         ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype, 
00089             file_ptr_type, offset, status, error_code);
00090         return;
00091     }
00092 
00093 
00094     MPI_Comm_size(fd->comm, &nprocs);
00095     MPI_Comm_rank(fd->comm, &myrank);
00096 
00097     
00098     nprocs_for_coll = fd->hints->cb_nodes;
00099     orig_fp = fd->fp_ind;
00100 
00101     
00102     if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
00103     
00104 
00105 
00106     
00107 
00108 
00109     ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00110                   &offset_list, &len_list, &start_offset,
00111                   &end_offset, &contig_access_count); 
00112     
00113 #ifdef RDCOLL_DEBUG
00114     for (i=0; i<contig_access_count; i++) {
00115           DBG_FPRINTF(stderr, "rank %d  off %lld  len %lld\n", 
00116                   myrank, offset_list[i], len_list[i]);
00117           }
00118 #endif
00119 
00120     
00121 
00122  
00123     
00124     st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00125     end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00126 
00127     MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
00128               ADIO_OFFSET, fd->comm);
00129     MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
00130               ADIO_OFFSET, fd->comm);
00131 
00132     
00133     for (i=1; i<nprocs; i++)
00134         if ((st_offsets[i] < end_offsets[i-1]) && 
00135                 (st_offsets[i] <= end_offsets[i]))
00136                 interleave_count++;
00137     
00138 
00139     }
00140 
00141     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00142 
00143     if (fd->hints->cb_read == ADIOI_HINT_DISABLE
00144     || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO))) 
00145     {
00146     
00147     if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
00148         ADIOI_Free(offset_list);
00149         ADIOI_Free(len_list);
00150         ADIOI_Free(st_offsets);
00151         ADIOI_Free(end_offsets);
00152     }
00153 
00154     fd->fp_ind = orig_fp;
00155     ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00156 
00157     if (buftype_is_contig && filetype_is_contig) {
00158         if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00159         off = fd->disp + (fd->etype_size) * offset;
00160         ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
00161                        off, status, error_code);
00162         }
00163         else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00164                        0, status, error_code);
00165     }
00166     else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
00167                        offset, status, error_code);
00168 
00169     return;
00170     }
00171 
00172     
00173 
00174 
00175 
00176 
00177 
00178 
00179 
00180 
00181 
00182 
00183 
00184 
00185 
00186 
00187 
00188     ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
00189                 nprocs_for_coll, &min_st_offset,
00190                 &fd_start, &fd_end, 
00191                 fd->hints->min_fdomain_size, &fd_size,
00192                 fd->hints->striping_unit);
00193 
00194     
00195 
00196 
00197 
00198 
00199 
00200 
00201 
00202 
00203 
00204 
00205 
00206     ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00207               min_st_offset, fd_start, fd_end, fd_size,
00208               nprocs, &count_my_req_procs, 
00209               &count_my_req_per_proc, &my_req,
00210               &buf_idx);
00211 
00212     
00213 
00214 
00215 
00216 
00217 
00218 
00219     ADIOI_Calc_others_req(fd, count_my_req_procs, 
00220               count_my_req_per_proc, my_req, 
00221               nprocs, myrank, &count_others_req_procs, 
00222               &others_req); 
00223 
00224     
00225 
00226 
00227     ADIOI_Free(count_my_req_per_proc);
00228     for (i=0; i<nprocs; i++) {
00229     if (my_req[i].count) {
00230         ADIOI_Free(my_req[i].offsets);
00231         ADIOI_Free(my_req[i].lens);
00232     }
00233     }
00234     ADIOI_Free(my_req);
00235 
00236 
00237     
00238 
00239 
00240     ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
00241                         others_req, offset_list,
00242             len_list, contig_access_count, min_st_offset,
00243             fd_size, fd_start, fd_end, buf_idx, error_code);
00244 
00245     if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
00246 
00247     
00248     for (i=0; i<nprocs; i++) {
00249     if (others_req[i].count) {
00250         ADIOI_Free(others_req[i].offsets);
00251         ADIOI_Free(others_req[i].lens);
00252         ADIOI_Free(others_req[i].mem_ptrs);
00253     }
00254     }
00255     ADIOI_Free(others_req);
00256 
00257     ADIOI_Free(buf_idx);
00258     ADIOI_Free(offset_list);
00259     ADIOI_Free(len_list);
00260     ADIOI_Free(st_offsets);
00261     ADIOI_Free(end_offsets);
00262     ADIOI_Free(fd_start);
00263     ADIOI_Free(fd_end);
00264 
00265 #ifdef HAVE_STATUS_SET_BYTES
00266     MPI_Type_size(datatype, &size);
00267     bufsize = size * count;
00268     MPIR_Status_set_bytes(status, datatype, bufsize);
00269 
00270 
00271 
00272 #endif
00273 
00274     fd->fp_sys_posn = -1;   
00275 }
00276 
00277 void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
00278                 datatype, int file_ptr_type, ADIO_Offset
00279                 offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
00280                 **len_list_ptr, ADIO_Offset *start_offset_ptr,
00281                 ADIO_Offset *end_offset_ptr, int
00282                *contig_access_count_ptr)
00283 {
00284     int filetype_size, etype_size;
00285     unsigned buftype_size;
00286     int i, j, k;
00287     ADIO_Offset i_offset;
00288     ADIO_Offset frd_size=0, old_frd_size=0;
00289     int st_index=0;
00290     ADIO_Offset n_filetypes, etype_in_filetype;
00291     ADIO_Offset abs_off_in_filetype=0;
00292     ADIO_Offset bufsize;
00293     ADIO_Offset sum, n_etypes_in_filetype, size_in_filetype;
00294     int contig_access_count, filetype_is_contig;
00295     ADIO_Offset *len_list;
00296     MPI_Aint filetype_extent, filetype_lb;
00297     ADIOI_Flatlist_node *flat_file;
00298     ADIO_Offset *offset_list, off, end_offset=0, disp;
00299 
00300 #ifdef AGGREGATION_PROFILE
00301     MPE_Log_event (5028, 0, NULL);
00302 #endif
00303     
00304 
00305 
00306 
00307     ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00308 
00309     MPI_Type_size(fd->filetype, &filetype_size);
00310     MPI_Type_extent(fd->filetype, &filetype_extent);
00311     MPI_Type_lb(fd->filetype, &filetype_lb);
00312     MPI_Type_size(datatype, (int*)&buftype_size);
00313     etype_size = fd->etype_size;
00314 
00315     if ( ! filetype_size ) {
00316     *contig_access_count_ptr = 0;
00317     *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00318     *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00319         
00320 
00321     offset_list = *offset_list_ptr;
00322     len_list = *len_list_ptr;
00323         offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind : 
00324                  fd->disp + (ADIO_Offset)etype_size * offset;
00325     len_list[0] = 0;
00326     *start_offset_ptr = offset_list[0];
00327     *end_offset_ptr = offset_list[0] + len_list[0] - 1;
00328     
00329     return;
00330     }
00331 
00332     if (filetype_is_contig) {
00333     *contig_access_count_ptr = 1;        
00334     *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00335     *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
00336         
00337 
00338     offset_list = *offset_list_ptr;
00339     len_list = *len_list_ptr;
00340         offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind : 
00341                  fd->disp + (ADIO_Offset)etype_size * offset;
00342     len_list[0] = (ADIO_Offset)bufcount * (ADIO_Offset)buftype_size;
00343     *start_offset_ptr = offset_list[0];
00344     *end_offset_ptr = offset_list[0] + len_list[0] - 1;
00345 
00346     
00347     if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = *end_offset_ptr + 1;
00348     }
00349 
00350     else {
00351 
00352        
00353    
00354        
00355     flat_file = CtvAccess(ADIOI_Flatlist);
00356     while (flat_file->type != fd->filetype) flat_file = flat_file->next;
00357     disp = fd->disp;
00358 
00359 #ifdef RDCOLL_DEBUG 
00360         {
00361             int ii;
00362             DBG_FPRINTF(stderr, "flattened %3d : ", flat_file->count );
00363             for (ii=0; ii<flat_file->count; ii++) {
00364                 DBG_FPRINTF(stderr, "%16qd:%-16qd", flat_file->indices[ii], flat_file->blocklens[ii] );
00365             }
00366             DBG_FPRINTF(stderr, "\n" );
00367         }
00368 #endif
00369     if (file_ptr_type == ADIO_INDIVIDUAL) {
00370            
00371             offset       = fd->fp_ind - disp;
00372             n_filetypes  = (offset - flat_file->indices[0]) / filetype_extent;
00373              offset     -= (ADIO_Offset)n_filetypes * filetype_extent;
00374             
00375  
00376             
00377             for (i=0; i<flat_file->count; i++) {
00378                 ADIO_Offset dist;
00379                 if (flat_file->blocklens[i] == 0) continue;
00380                 dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
00381                 
00382         if (dist == 0) {
00383             i++;
00384             offset   = flat_file->indices[i];
00385             frd_size = flat_file->blocklens[i];
00386             break;
00387         }
00388         if (dist > 0) {
00389                     frd_size = dist;
00390             break;
00391         }
00392         }
00393             st_index = i;  
00394             offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
00395         }
00396     else {
00397         n_etypes_in_filetype = filetype_size/etype_size;
00398         n_filetypes = offset / n_etypes_in_filetype;
00399         etype_in_filetype = offset % n_etypes_in_filetype;
00400         size_in_filetype = etype_in_filetype * etype_size;
00401  
00402         sum = 0;
00403         for (i=0; i<flat_file->count; i++) {
00404         sum += flat_file->blocklens[i];
00405         if (sum > size_in_filetype) {
00406             st_index = i;
00407             frd_size = sum - size_in_filetype;
00408             abs_off_in_filetype = flat_file->indices[i] +
00409             size_in_filetype - (sum - flat_file->blocklens[i]);
00410             break;
00411         }
00412         }
00413 
00414         
00415         offset = disp + n_filetypes* (ADIO_Offset)filetype_extent + 
00416         abs_off_in_filetype;
00417     }
00418 
00419          
00420 
00421     old_frd_size = frd_size;
00422     contig_access_count = i_offset = 0;
00423     j = st_index;
00424     bufsize = (ADIO_Offset)buftype_size * (ADIO_Offset)bufcount;
00425     frd_size = ADIOI_MIN(frd_size, bufsize);
00426     while (i_offset < bufsize) {
00427         if (frd_size) contig_access_count++;
00428         i_offset += frd_size;
00429         j = (j + 1) % flat_file->count;
00430         frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
00431     }
00432 
00433         
00434 
00435     *offset_list_ptr = (ADIO_Offset *)
00436              ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));  
00437     *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));
00438         
00439 
00440     offset_list = *offset_list_ptr;
00441     len_list = *len_list_ptr;
00442 
00443       
00444 
00445     *start_offset_ptr = offset; 
00446 
00447     i_offset = k = 0;
00448     j = st_index;
00449     off = offset;
00450     frd_size = ADIOI_MIN(old_frd_size, bufsize);
00451     while (i_offset < bufsize) {
00452         if (frd_size) {
00453         offset_list[k] = off;
00454         len_list[k] = frd_size;
00455         k++;
00456         }
00457         i_offset += frd_size;
00458         end_offset = off + frd_size - 1;
00459 
00460      
00461 
00462 
00463         if (off + frd_size < disp + flat_file->indices[j] +
00464         flat_file->blocklens[j] + 
00465          n_filetypes* (ADIO_Offset)filetype_extent)
00466         {
00467         off += frd_size;
00468         
00469 
00470 
00471         }
00472         else {
00473         j = (j+1) % flat_file->count;
00474                 n_filetypes += (j == 0) ? 1 : 0;
00475                 while (flat_file->blocklens[j]==0) {
00476             j = (j+1) % flat_file->count;
00477                     n_filetypes += (j == 0) ? 1 : 0;
00478                     
00479 
00480         }
00481         off = disp + flat_file->indices[j] + 
00482              n_filetypes* (ADIO_Offset)filetype_extent;
00483         frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
00484         }
00485     }
00486 
00487     
00488     if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
00489 
00490     *contig_access_count_ptr = contig_access_count;
00491      *end_offset_ptr = end_offset;
00492     }
00493 #ifdef AGGREGATION_PROFILE
00494     MPE_Log_event (5029, 0, NULL);
00495 #endif
00496 }
00497 
00498 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
00499              datatype, int nprocs,
00500              int myrank, ADIOI_Access
00501              *others_req, ADIO_Offset *offset_list,
00502              ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
00503                          min_st_offset, ADIO_Offset fd_size,
00504              ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00505                          int *buf_idx, int *error_code)
00506 {
00507 
00508 
00509 
00510 
00511 
00512 
00513 
00514 
00515 
00516 
00517     int i, j, m, ntimes, max_ntimes, buftype_is_contig;
00518     ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
00519     char *read_buf = NULL, *tmp_buf;
00520     int *curr_offlen_ptr, *count, *send_size, *recv_size;
00521     int *partial_send, *recd_from_proc, *start_pos;
00522     
00523     ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
00524     int req_len, flag, rank;
00525     MPI_Status status;
00526     ADIOI_Flatlist_node *flat_buf=NULL;
00527     MPI_Aint buftype_extent;
00528     int coll_bufsize;
00529 
00530     *error_code = MPI_SUCCESS;  
00531     
00532     
00533 
00534 
00535 
00536 
00537 
00538     coll_bufsize = fd->hints->cb_buffer_size;
00539 
00540     
00541     for (i=0; i < nprocs; i++) {
00542     if (others_req[i].count) {
00543         st_loc = others_req[i].offsets[0];
00544         end_loc = others_req[i].offsets[0];
00545         break;
00546     }
00547     }
00548 
00549     
00550     for (i=0; i < nprocs; i++)
00551     for (j=0; j<others_req[i].count; j++) {
00552         st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00553         end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
00554                       + others_req[i].lens[j] - 1));
00555     }
00556 
00557     
00558 
00559 
00560 
00561 
00562     if ((st_loc==-1) && (end_loc==-1)) {
00563     
00564     ntimes = 0;
00565     }
00566     else {
00567     
00568     ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
00569     }
00570 
00571     MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm); 
00572 
00573     if (ntimes) read_buf = (char *) ADIOI_Malloc(coll_bufsize);
00574 
00575     curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); 
00576     
00577 
00578     count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00579     
00580 
00581 
00582     partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00583     
00584 
00585 
00586 
00587     send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00588     
00589 
00590     recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00591     
00592 
00593 
00594     recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00595     
00596 
00597 
00598     start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00599     
00600 
00601 
00602     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00603     if (!buftype_is_contig) {
00604     ADIOI_Flatten_datatype(datatype);
00605     flat_buf = CtvAccess(ADIOI_Flatlist);
00606         while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00607     }
00608     MPI_Type_extent(datatype, &buftype_extent);
00609 
00610     done = 0;
00611     off = st_loc;
00612     for_curr_iter = for_next_iter = 0;
00613 
00614     MPI_Comm_rank(fd->comm, &rank);
00615 
00616     for (m=0; m<ntimes; m++) {
00617        
00618        
00619 
00620 
00621        
00622 
00623 
00624 
00625 
00626 
00627 
00628 
00629 
00630 
00631 
00632 
00633 
00634 
00635 
00636 
00637 
00638 
00639 
00640 
00641 
00642  
00643 
00644           
00645 
00646 
00647 
00648 
00649 
00650 
00651 
00652 
00653 
00654     size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done); 
00655     real_off = off - for_curr_iter;
00656     real_size = size + for_curr_iter;
00657 
00658     for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
00659     for_next_iter = 0;
00660 
00661     for (i=0; i<nprocs; i++) {
00662 #ifdef RDCOLL_DEBUG
00663         DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count); 
00664 #endif
00665         if (others_req[i].count) {
00666         start_pos[i] = curr_offlen_ptr[i];
00667         for (j=curr_offlen_ptr[i]; j<others_req[i].count;
00668              j++) {
00669             if (partial_send[i]) {
00670             
00671 
00672             req_off = others_req[i].offsets[j] +
00673                 partial_send[i]; 
00674                         req_len = others_req[i].lens[j] -
00675                 partial_send[i];
00676             partial_send[i] = 0;
00677             
00678             others_req[i].offsets[j] = req_off;
00679             others_req[i].lens[j] = req_len;
00680             }
00681             else {
00682             req_off = others_req[i].offsets[j];
00683                         req_len = others_req[i].lens[j];
00684             }
00685             if (req_off < real_off + real_size) {
00686             count[i]++;
00687       ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIR_Upint)(read_buf+req_off-real_off));
00688             MPI_Address(read_buf+req_off-real_off, 
00689                                &(others_req[i].mem_ptrs[j]));
00690       ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
00691             send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off, 
00692                                       (ADIO_Offset)(unsigned)req_len)); 
00693 
00694             if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
00695                 partial_send[i] = (int) (real_off + real_size - req_off);
00696                 if ((j+1 < others_req[i].count) && 
00697                                  (others_req[i].offsets[j+1] < 
00698                                      real_off+real_size)) { 
00699                 
00700 
00701                 for_next_iter = ADIOI_MAX(for_next_iter,
00702                       real_off + real_size - others_req[i].offsets[j+1]); 
00703                 
00704 
00705                 }
00706                 break;
00707             }
00708             }
00709             else break;
00710         }
00711         curr_offlen_ptr[i] = j;
00712         }
00713     }
00714 
00715     flag = 0;
00716     for (i=0; i<nprocs; i++)
00717         if (count[i]) flag = 1;
00718 
00719     if (flag) {
00720       ADIOI_Assert(size == (int)size);
00721         ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
00722                 ADIO_EXPLICIT_OFFSET, off, &status, error_code);
00723         if (*error_code != MPI_SUCCESS) return;
00724     }
00725     
00726     for_curr_iter = for_next_iter;
00727     
00728     ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
00729                 send_size, recv_size, count, 
00730                     start_pos, partial_send, recd_from_proc, nprocs,
00731                 myrank, 
00732                 buftype_is_contig, contig_access_count,
00733                 min_st_offset, fd_size, fd_start, fd_end,
00734                 others_req, 
00735                             m, buftype_extent, buf_idx); 
00736 
00737 
00738     if (for_next_iter) {
00739         tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
00740       ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIR_Upint)(read_buf+real_size-for_next_iter));
00741       ADIOI_Assert((for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
00742         memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
00743         ADIOI_Free(read_buf);
00744         read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
00745         memcpy(read_buf, tmp_buf, for_next_iter);
00746         ADIOI_Free(tmp_buf);
00747     }
00748 
00749     off += size;
00750     done += size;
00751     }
00752 
00753     for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
00754     for (m=ntimes; m<max_ntimes; m++) 
00755 
00756     ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
00757                 send_size, recv_size, count, 
00758                 start_pos, partial_send, recd_from_proc, nprocs,
00759                 myrank, 
00760                 buftype_is_contig, contig_access_count,
00761                 min_st_offset, fd_size, fd_start, fd_end,
00762                 others_req, m,
00763                             buftype_extent, buf_idx); 
00764 
00765     if (ntimes) ADIOI_Free(read_buf);
00766     ADIOI_Free(curr_offlen_ptr);
00767     ADIOI_Free(count);
00768     ADIOI_Free(partial_send);
00769     ADIOI_Free(send_size);
00770     ADIOI_Free(recv_size);
00771     ADIOI_Free(recd_from_proc);
00772     ADIOI_Free(start_pos);
00773 }
00774 
00775 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00776              *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
00777                          *len_list, int *send_size, int *recv_size,
00778              int *count, int *start_pos, int *partial_send, 
00779              int *recd_from_proc, int nprocs, 
00780              int myrank, int
00781              buftype_is_contig, int contig_access_count,
00782              ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00783              ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
00784              ADIOI_Access *others_req, 
00785                          int iter, MPI_Aint buftype_extent, int *buf_idx)
00786 {
00787     int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
00788     char **recv_buf = NULL; 
00789     MPI_Request *requests;
00790     MPI_Datatype send_type;
00791     MPI_Status *statuses;
00792 
00793 
00794 
00795 
00796     MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
00797 
00798     nprocs_recv = 0;
00799     for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
00800 
00801     nprocs_send = 0;
00802     for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
00803 
00804     requests = (MPI_Request *)
00805     ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
00806 
00807 
00808 
00809 
00810 
00811 #ifdef AGGREGATION_PROFILE
00812     MPE_Log_event (5032, 0, NULL);
00813 #endif
00814 
00815     if (buftype_is_contig) {
00816     j = 0;
00817     for (i=0; i < nprocs; i++) 
00818         if (recv_size[i]) {
00819         MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i], 
00820           MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
00821         j++;
00822         buf_idx[i] += recv_size[i];
00823         }
00824     }
00825     else {
00826 
00827     recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
00828     for (i=0; i < nprocs; i++) 
00829         if (recv_size[i]) recv_buf[i] = 
00830                                   (char *) ADIOI_Malloc(recv_size[i]);
00831 
00832         j = 0;
00833         for (i=0; i < nprocs; i++) 
00834         if (recv_size[i]) {
00835             MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i, 
00836                   myrank+i+100*iter, fd->comm, requests+j);
00837             j++;
00838 #ifdef RDCOLL_DEBUG
00839             DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n", 
00840                myrank, recv_size[i], myrank+i+100*iter); 
00841 #endif
00842         }
00843     }
00844 
00845 
00846 
00847     j = 0;
00848     for (i=0; i<nprocs; i++) {
00849     if (send_size[i]) {
00850 
00851         if (partial_send[i]) {
00852         k = start_pos[i] + count[i] - 1;
00853         tmp = others_req[i].lens[k];
00854         others_req[i].lens[k] = partial_send[i];
00855         }
00856         MPI_Type_hindexed(count[i], 
00857                  &(others_req[i].lens[start_pos[i]]),
00858                 &(others_req[i].mem_ptrs[start_pos[i]]), 
00859              MPI_BYTE, &send_type);
00860         
00861         MPI_Type_commit(&send_type);
00862         MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
00863               fd->comm, requests+nprocs_recv+j);
00864         MPI_Type_free(&send_type);
00865         if (partial_send[i]) others_req[i].lens[k] = tmp;
00866         j++;
00867     }
00868     }
00869 
00870     statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
00871                                      sizeof(MPI_Status)); 
00872      
00873 
00874     
00875     if (nprocs_recv) {
00876 #ifdef NEEDS_MPI_TEST
00877     j = 0;
00878     while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
00879 #else
00880     MPI_Waitall(nprocs_recv, requests, statuses);
00881 #endif
00882 
00883     
00884     if (!buftype_is_contig) 
00885         ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
00886                    offset_list, len_list, (unsigned*)recv_size, 
00887                    requests, statuses, recd_from_proc, 
00888                    nprocs, contig_access_count,
00889                    min_st_offset, fd_size, fd_start, fd_end,
00890                    buftype_extent);
00891     }
00892 
00893     
00894     MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
00895 
00896     ADIOI_Free(statuses);
00897     ADIOI_Free(requests);
00898 
00899     if (!buftype_is_contig) {
00900     for (i=0; i < nprocs; i++) 
00901         if (recv_size[i]) ADIOI_Free(recv_buf[i]);
00902     ADIOI_Free(recv_buf);
00903     }
00904 #ifdef AGGREGATION_PROFILE
00905     MPE_Log_event (5033, 0, NULL);
00906 #endif
00907 }
00908 
00909 #define ADIOI_BUF_INCR \
00910 { \
00911     while (buf_incr) { \
00912     size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00913     user_buf_idx += size_in_buf; \
00914     flat_buf_sz -= size_in_buf; \
00915     if (!flat_buf_sz) { \
00916             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00917             else { \
00918                 flat_buf_idx = 0; \
00919                 n_buftypes++; \
00920             } \
00921             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00922                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00923         flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00924     } \
00925     buf_incr -= size_in_buf; \
00926     } \
00927 }
00928 
00929 
00930 #define ADIOI_BUF_COPY \
00931 { \
00932     while (size) { \
00933     size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
00934   ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
00935   ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
00936     memcpy(((char *) buf) + user_buf_idx, \
00937            &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
00938     recv_buf_idx[p] += size_in_buf;  \
00939     user_buf_idx += size_in_buf; \
00940     flat_buf_sz -= size_in_buf; \
00941     if (!flat_buf_sz) { \
00942             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00943             else { \
00944                 flat_buf_idx = 0; \
00945                 n_buftypes++; \
00946             } \
00947             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00948                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00949         flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00950     } \
00951     size -= size_in_buf; \
00952     buf_incr -= size_in_buf; \
00953     } \
00954     ADIOI_BUF_INCR \
00955 }
00956 
00957 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00958                    *flat_buf, char **recv_buf, ADIO_Offset 
00959                    *offset_list, ADIO_Offset *len_list, 
00960                    unsigned *recv_size, 
00961                    MPI_Request *requests, MPI_Status *statuses,
00962                    int *recd_from_proc, int nprocs,
00963                    int contig_access_count, 
00964                    ADIO_Offset min_st_offset, 
00965                    ADIO_Offset fd_size, ADIO_Offset *fd_start, 
00966                    ADIO_Offset *fd_end,
00967                    MPI_Aint buftype_extent)
00968 {
00969 
00970 
00971 
00972     int i, p, flat_buf_idx;
00973     ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
00974     int n_buftypes;
00975     ADIO_Offset off, len, rem_len, user_buf_idx;
00976     
00977     unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
00978 
00979     ADIOI_UNREFERENCED_ARG(requests);
00980     ADIOI_UNREFERENCED_ARG(statuses);
00981 
00982 
00983 
00984 
00985 
00986 
00987 
00988     curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
00989     done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
00990     recv_buf_idx   = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
00991 
00992     for (i=0; i < nprocs; i++) {
00993     recv_buf_idx[i] = curr_from_proc[i] = 0;
00994     done_from_proc[i] = recd_from_proc[i];
00995     }
00996 
00997     user_buf_idx = flat_buf->indices[0];
00998     flat_buf_idx = 0;
00999     n_buftypes = 0;
01000     flat_buf_sz = flat_buf->blocklens[0];
01001 
01002     
01003 
01004 
01005 
01006     for (i=0; i<contig_access_count; i++) { 
01007     off     = offset_list[i];
01008     rem_len = len_list[i];
01009 
01010     
01011     while (rem_len != 0) {
01012         len = rem_len;
01013         
01014 
01015 
01016 
01017         p = ADIOI_Calc_aggregator(fd,
01018                       off,
01019                       min_st_offset,
01020                       &len,
01021                       fd_size,
01022                       fd_start,
01023                       fd_end);
01024 
01025         if (recv_buf_idx[p] < recv_size[p]) {
01026         if (curr_from_proc[p]+len > done_from_proc[p]) {
01027             if (done_from_proc[p] > curr_from_proc[p]) {
01028             size = ADIOI_MIN(curr_from_proc[p] + len - 
01029                   done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
01030             buf_incr = done_from_proc[p] - curr_from_proc[p];
01031             ADIOI_BUF_INCR
01032             buf_incr = curr_from_proc[p]+len-done_from_proc[p];
01033       ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
01034             curr_from_proc[p] = done_from_proc[p] + size;
01035             ADIOI_BUF_COPY
01036             }
01037             else {
01038             size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
01039             buf_incr = len;
01040       ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
01041             curr_from_proc[p] += (unsigned) size;
01042             ADIOI_BUF_COPY
01043             }
01044         }
01045         else {
01046         ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
01047             curr_from_proc[p] += (unsigned) len;
01048             buf_incr = len;
01049             ADIOI_BUF_INCR
01050         }
01051         }
01052         else {
01053         buf_incr = len;
01054         ADIOI_BUF_INCR
01055         }
01056         off     += len;
01057         rem_len -= len;
01058     }
01059     }
01060     for (i=0; i < nprocs; i++) 
01061     if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
01062 
01063     ADIOI_Free(curr_from_proc);
01064     ADIOI_Free(done_from_proc);
01065     ADIOI_Free(recv_buf_idx);
01066 }