00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 #include "adio.h"
00009 #include "adio_extern.h"
00010 
00011 #ifdef AGGREGATION_PROFILE
00012 #include "mpe.h"
00013 #endif
00014 
00015 
00016 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
00017                          datatype, int nprocs, int myrank,
00018              ADIOI_Access
00019                          *others_req, ADIO_Offset *offset_list,
00020                          ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
00021                          min_st_offset, ADIO_Offset fd_size,
00022                          ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00023                          int *buf_idx, int *error_code);
00024 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
00025                          ADIOI_Flatlist_node *flat_buf, ADIO_Offset 
00026                          *offset_list, ADIO_Offset *len_list, int *send_size, 
00027                          int *recv_size, ADIO_Offset off, int size,
00028                          int *count, int *start_pos, int *partial_recv, 
00029                          int *sent_to_proc, int nprocs, 
00030                          int myrank, int
00031                          buftype_is_contig, int contig_access_count,
00032                          ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00033                          ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
00034                          ADIOI_Access *others_req, 
00035                          int *send_buf_idx, int *curr_to_proc,
00036                          int *done_to_proc, int *hole, int iter, 
00037                          MPI_Aint buftype_extent, int *buf_idx, int *error_code);
00038 static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00039                            *flat_buf, char **send_buf, ADIO_Offset 
00040                            *offset_list, ADIO_Offset *len_list, int *send_size, 
00041                            MPI_Request *requests, int *sent_to_proc, 
00042                            int nprocs, int myrank, 
00043                            int contig_access_count, ADIO_Offset
00044                            min_st_offset, ADIO_Offset fd_size,
00045                            ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
00046                            int *send_buf_idx, int *curr_to_proc, 
00047                            int *done_to_proc, int iter, 
00048                            MPI_Aint buftype_extent);
00049 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
00050                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
00051                       int nprocs, int nprocs_recv, int total_elements);
00052 
00053 
00054 void ADIOI_GEN_WriteStridedColl(ADIO_File fd, void *buf, int count,
00055                        MPI_Datatype datatype, int file_ptr_type,
00056                        ADIO_Offset offset, ADIO_Status *status, int
00057                        *error_code)
00058 {
00059 
00060 
00061 
00062 
00063 
00064 
00065     ADIOI_Access *my_req; 
00066     
00067 
00068     
00069     ADIOI_Access *others_req;
00070     
00071 
00072 
00073     int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
00074     int contig_access_count=0, interleave_count = 0, buftype_is_contig;
00075     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00076     ADIO_Offset orig_fp, start_offset, end_offset, fd_size, min_st_offset, off;
00077     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
00078     *fd_end = NULL, *end_offsets = NULL;
00079     int *buf_idx = NULL;
00080     ADIO_Offset *len_list = NULL;
00081     int old_error, tmp_error;
00082 
00083     if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) { 
00084     ADIOI_IOStridedColl (fd, buf, count, ADIOI_WRITE, datatype, 
00085             file_ptr_type, offset, status, error_code);
00086     return;
00087     }
00088 
00089     MPI_Comm_size(fd->comm, &nprocs);
00090     MPI_Comm_rank(fd->comm, &myrank);
00091 
00092 
00093 
00094 
00095     nprocs_for_coll = fd->hints->cb_nodes;
00096     orig_fp = fd->fp_ind;
00097 
00098     
00099     if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00100     
00101 
00102 
00103     
00104 
00105 
00106     ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00107                   &offset_list, &len_list, &start_offset,
00108                   &end_offset, &contig_access_count); 
00109 
00110     
00111 
00112  
00113     
00114     st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00115     end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
00116 
00117     MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
00118               ADIO_OFFSET, fd->comm);
00119     MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
00120               ADIO_OFFSET, fd->comm);
00121 
00122     
00123     for (i=1; i<nprocs; i++)
00124         if ((st_offsets[i] < end_offsets[i-1]) && 
00125                 (st_offsets[i] <= end_offsets[i]))
00126                 interleave_count++;
00127     
00128 
00129     }
00130 
00131     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00132 
00133     if (fd->hints->cb_write == ADIOI_HINT_DISABLE ||
00134     (!interleave_count && (fd->hints->cb_write == ADIOI_HINT_AUTO)))
00135     {
00136     
00137     if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00138         ADIOI_Free(offset_list);
00139         ADIOI_Free(len_list);
00140         ADIOI_Free(st_offsets);
00141         ADIOI_Free(end_offsets);
00142     }
00143 
00144     fd->fp_ind = orig_fp;
00145         ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00146 
00147         if (buftype_is_contig && filetype_is_contig) {
00148             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00149                 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
00150                 ADIO_WriteContig(fd, buf, count, datatype,
00151                  ADIO_EXPLICIT_OFFSET,
00152                  off, status, error_code);
00153             }
00154             else ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00155                   0, status, error_code);
00156         }
00157     else ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
00158                    offset, status, error_code);
00159 
00160     return;
00161     }
00162 
00163 
00164 
00165 
00166 
00167     ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
00168                 nprocs_for_coll, &min_st_offset,
00169                 &fd_start, &fd_end, 
00170                 fd->hints->min_fdomain_size, &fd_size,
00171                 fd->hints->striping_unit);   
00172 
00173 
00174 
00175 
00176 
00177     ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00178               min_st_offset, fd_start, fd_end, fd_size,
00179               nprocs, &count_my_req_procs, 
00180               &count_my_req_per_proc, &my_req,
00181               &buf_idx); 
00182 
00183 
00184 
00185 
00186 
00187 
00188 
00189 
00190     ADIOI_Calc_others_req(fd, count_my_req_procs, 
00191               count_my_req_per_proc, my_req, 
00192               nprocs, myrank,
00193               &count_others_req_procs, &others_req); 
00194     
00195     ADIOI_Free(count_my_req_per_proc);
00196     for (i=0; i < nprocs; i++) {
00197     if (my_req[i].count) {
00198         ADIOI_Free(my_req[i].offsets);
00199         ADIOI_Free(my_req[i].lens);
00200     }
00201     }
00202     ADIOI_Free(my_req);
00203 
00204 
00205     ADIOI_Exch_and_write(fd, buf, datatype, nprocs, myrank,
00206                         others_req, offset_list,
00207             len_list, contig_access_count, min_st_offset,
00208             fd_size, fd_start, fd_end, buf_idx, error_code);
00209 
00210     
00211 
00212 
00213 
00214 
00215 
00216 
00217 
00218 
00219 
00220 
00221     old_error = *error_code;
00222     if (*error_code != MPI_SUCCESS) *error_code = MPI_ERR_IO;
00223 
00224      
00225 
00226 #ifdef ADIOI_MPE_LOGGING
00227     MPE_Log_event( ADIOI_MPE_postwrite_a, 0, NULL );
00228 #endif
00229     if (fd->hints->cb_nodes == 1) 
00230         MPI_Bcast(error_code, 1, MPI_INT, 
00231                 fd->hints->ranklist[0], fd->comm);
00232     else {
00233         tmp_error = *error_code;
00234         MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT, 
00235                 MPI_MAX, fd->comm);
00236     }
00237 #ifdef ADIOI_MPE_LOGGING
00238     MPE_Log_event( ADIOI_MPE_postwrite_b, 0, NULL );
00239 #endif
00240 #ifdef AGGREGATION_PROFILE
00241     MPE_Log_event (5012, 0, NULL);
00242 #endif
00243 
00244     if ( (old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO) )
00245         *error_code = old_error;
00246 
00247 
00248     if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
00249 
00250 
00251 
00252     for (i=0; i<nprocs; i++) {
00253     if (others_req[i].count) {
00254         ADIOI_Free(others_req[i].offsets);
00255         ADIOI_Free(others_req[i].lens);
00256         ADIOI_Free(others_req[i].mem_ptrs);
00257     }
00258     }
00259     ADIOI_Free(others_req);
00260 
00261     ADIOI_Free(buf_idx);
00262     ADIOI_Free(offset_list);
00263     ADIOI_Free(len_list);
00264     ADIOI_Free(st_offsets);
00265     ADIOI_Free(end_offsets);
00266     ADIOI_Free(fd_start);
00267     ADIOI_Free(fd_end);
00268 
00269 #ifdef HAVE_STATUS_SET_BYTES
00270     if (status) {
00271       int bufsize, size;
00272       
00273       MPI_Type_size(datatype, &size);
00274       bufsize = size * count;
00275       MPIR_Status_set_bytes(status, datatype, bufsize);
00276     }
00277 
00278 
00279 #endif
00280 
00281     fd->fp_sys_posn = -1;   
00282 #ifdef AGGREGATION_PROFILE
00283     MPE_Log_event (5013, 0, NULL);
00284 #endif
00285 }
00286 
00287 
00288 
00289 
00290 
00291 
00292 static void ADIOI_Exch_and_write(ADIO_File fd, void *buf, MPI_Datatype
00293                  datatype, int nprocs, 
00294                  int myrank,
00295                  ADIOI_Access
00296                  *others_req, ADIO_Offset *offset_list,
00297                  ADIO_Offset *len_list, int contig_access_count,
00298                  ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00299                  ADIO_Offset *fd_start, ADIO_Offset *fd_end,
00300                  int *buf_idx, int *error_code)
00301 {
00302 
00303 
00304 
00305 
00306 
00307 
00308 
00309 
00310 
00311     
00312     ADIO_Offset size=0;
00313     int hole, i, j, m, ntimes, max_ntimes, buftype_is_contig;
00314     ADIO_Offset st_loc=-1, end_loc=-1, off, done, req_off;
00315     char *write_buf=NULL;
00316     int *curr_offlen_ptr, *count, *send_size, req_len, *recv_size;
00317     int *partial_recv, *sent_to_proc, *start_pos, flag;
00318     int *send_buf_idx, *curr_to_proc, *done_to_proc;
00319     MPI_Status status;
00320     ADIOI_Flatlist_node *flat_buf=NULL;
00321     MPI_Aint buftype_extent;
00322     int info_flag, coll_bufsize;
00323     char *value;
00324     static char myname[] = "ADIOI_EXCH_AND_WRITE";
00325 
00326     *error_code = MPI_SUCCESS;  
00327     
00328 
00329 
00330 
00331 
00332 
00333     value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
00334     ADIOI_Info_get(fd->info, "cb_buffer_size", MPI_MAX_INFO_VAL, value, 
00335                  &info_flag);
00336     coll_bufsize = atoi(value);
00337     ADIOI_Free(value);
00338 
00339 
00340     for (i=0; i < nprocs; i++) {
00341     if (others_req[i].count) {
00342         st_loc = others_req[i].offsets[0];
00343         end_loc = others_req[i].offsets[0];
00344         break;
00345     }
00346     }
00347 
00348     for (i=0; i < nprocs; i++)
00349     for (j=0; j < others_req[i].count; j++) {
00350         st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00351         end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
00352                        + others_req[i].lens[j] - 1));
00353     }
00354 
00355 
00356 
00357     ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
00358 
00359     if ((st_loc==-1) && (end_loc==-1)) {
00360     ntimes = 0; 
00361     }
00362 
00363     MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX,
00364           fd->comm); 
00365 
00366     if (ntimes) write_buf = (char *) ADIOI_Malloc(coll_bufsize);
00367 
00368     curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int)); 
00369     
00370 
00371     count = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00372     
00373 
00374 
00375     partial_recv = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00376     
00377 
00378 
00379 
00380     send_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00381     
00382 
00383 
00384     recv_size = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00385     
00386 
00387     sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00388     
00389 
00390 
00391     send_buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00392     curr_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00393     done_to_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00394     
00395 
00396     start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00397     
00398 
00399 
00400     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00401     if (!buftype_is_contig) {
00402     ADIOI_Flatten_datatype(datatype);
00403     flat_buf = CtvAccess(ADIOI_Flatlist);
00404         while (flat_buf->type != datatype) flat_buf = flat_buf->next;
00405     }
00406     MPI_Type_extent(datatype, &buftype_extent);
00407 
00408 
00409 
00410 
00411 
00412 
00413 
00414 
00415     
00416 
00417 
00418 
00419 
00420     done = 0;
00421     off = st_loc;
00422 
00423     for (m=0; m < ntimes; m++) {
00424        
00425 
00426 
00427        
00428 
00429 
00430 
00431 
00432           
00433 
00434 
00435 
00436 
00437 
00438 
00439     
00440 
00441     for (i=0; i < nprocs; i++) count[i] = recv_size[i] = 0;
00442 
00443     size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done); 
00444 
00445     for (i=0; i < nprocs; i++) {
00446         if (others_req[i].count) {
00447         start_pos[i] = curr_offlen_ptr[i];
00448         for (j=curr_offlen_ptr[i]; j<others_req[i].count; j++) {
00449             if (partial_recv[i]) {
00450             
00451 
00452             req_off = others_req[i].offsets[j] +
00453                 partial_recv[i]; 
00454                         req_len = others_req[i].lens[j] -
00455                 partial_recv[i];
00456             partial_recv[i] = 0;
00457             
00458             others_req[i].offsets[j] = req_off;
00459             others_req[i].lens[j] = req_len;
00460             }
00461             else {
00462             req_off = others_req[i].offsets[j];
00463                         req_len = others_req[i].lens[j];
00464             }
00465             if (req_off < off + size) {
00466             count[i]++;
00467       ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIR_Upint)(write_buf+req_off-off));
00468             MPI_Address(write_buf+req_off-off, 
00469                                &(others_req[i].mem_ptrs[j]));
00470       ADIOI_Assert((off + size - req_off) == (int)(off + size - req_off));
00471             recv_size[i] += (int)(ADIOI_MIN(off + size - req_off, 
00472                                       (unsigned)req_len));
00473 
00474             if (off+size-req_off < (unsigned)req_len)
00475             {
00476                 partial_recv[i] = (int) (off + size - req_off);
00477 
00478                 
00479                 if ((j+1 < others_req[i].count) && 
00480                                  (others_req[i].offsets[j+1] < off+size))
00481                 { 
00482                 *error_code = MPIO_Err_create_code(MPI_SUCCESS,
00483                                    MPIR_ERR_RECOVERABLE,
00484                                    myname,
00485                                    __LINE__,
00486                                    MPI_ERR_ARG,
00487                                    "Filetype specifies overlapping write regions (which is illegal according to the MPI-2 specification)", 0);
00488                 
00489 
00490 
00491                 }
00492                 
00493                 break;
00494             }
00495             }
00496             else break;
00497         }
00498         curr_offlen_ptr[i] = j;
00499         }
00500     }
00501     
00502     ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list, 
00503                             len_list, send_size, recv_size, off, size, count, 
00504                             start_pos, partial_recv, 
00505                             sent_to_proc, nprocs, myrank, 
00506                 buftype_is_contig, contig_access_count,
00507                 min_st_offset, fd_size, fd_start, fd_end,
00508                 others_req, send_buf_idx, curr_to_proc,
00509                             done_to_proc, &hole, m, buftype_extent, buf_idx,
00510                 error_code); 
00511         if (*error_code != MPI_SUCCESS) return;
00512 
00513     flag = 0;
00514     for (i=0; i<nprocs; i++)
00515         if (count[i]) flag = 1;
00516 
00517     if (flag) {
00518       ADIOI_Assert(size == (int)size);
00519         ADIO_WriteContig(fd, write_buf, (int)size, MPI_BYTE, ADIO_EXPLICIT_OFFSET, 
00520                         off, &status, error_code);
00521         if (*error_code != MPI_SUCCESS) return;
00522     }
00523 
00524     off += size;
00525     done += size;
00526     }
00527 
00528     for (i=0; i<nprocs; i++) count[i] = recv_size[i] = 0;
00529     for (m=ntimes; m<max_ntimes; m++) {
00530     
00531     ADIOI_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list, 
00532                             len_list, send_size, recv_size, off, size, count, 
00533                             start_pos, partial_recv, 
00534                             sent_to_proc, nprocs, myrank, 
00535                 buftype_is_contig, contig_access_count,
00536                 min_st_offset, fd_size, fd_start, fd_end,
00537                 others_req, send_buf_idx, 
00538                             curr_to_proc, done_to_proc, &hole, m, 
00539                             buftype_extent, buf_idx, error_code); 
00540         if (*error_code != MPI_SUCCESS) return;
00541     }
00542 
00543     if (ntimes) ADIOI_Free(write_buf);
00544     ADIOI_Free(curr_offlen_ptr);
00545     ADIOI_Free(count);
00546     ADIOI_Free(partial_recv);
00547     ADIOI_Free(send_size);
00548     ADIOI_Free(recv_size);
00549     ADIOI_Free(sent_to_proc);
00550     ADIOI_Free(start_pos);
00551     ADIOI_Free(send_buf_idx);
00552     ADIOI_Free(curr_to_proc);
00553     ADIOI_Free(done_to_proc);
00554 }
00555 
00556 
00557 
00558 
00559 
00560 static void ADIOI_W_Exchange_data(ADIO_File fd, void *buf, char *write_buf,
00561                   ADIOI_Flatlist_node *flat_buf, ADIO_Offset 
00562                   *offset_list, ADIO_Offset *len_list, int *send_size, 
00563                   int *recv_size, ADIO_Offset off, int size,
00564                   int *count, int *start_pos,
00565                   int *partial_recv,
00566                   int *sent_to_proc, int nprocs, 
00567                   int myrank, int
00568                   buftype_is_contig, int contig_access_count,
00569                   ADIO_Offset min_st_offset,
00570                   ADIO_Offset fd_size,
00571                   ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
00572                   ADIOI_Access *others_req, 
00573                   int *send_buf_idx, int *curr_to_proc,
00574                   int *done_to_proc, int *hole, int iter, 
00575                   MPI_Aint buftype_extent, int *buf_idx,
00576                   int *error_code)
00577 {
00578     int i, j, k, *tmp_len, nprocs_recv, nprocs_send, err;
00579     char **send_buf = NULL; 
00580     MPI_Request *requests, *send_req;
00581     MPI_Datatype *recv_types;
00582     MPI_Status *statuses, status;
00583     int *srt_len, sum;
00584     ADIO_Offset *srt_off;
00585     static char myname[] = "ADIOI_W_EXCHANGE_DATA";
00586 
00587 
00588 
00589 
00590     MPI_Alltoall(recv_size, 1, MPI_INT, send_size, 1, MPI_INT, fd->comm);
00591 
00592     
00593 
00594     nprocs_recv = 0;
00595     for (i=0; i<nprocs; i++) if (recv_size[i]) nprocs_recv++;
00596 
00597     recv_types = (MPI_Datatype *)
00598     ADIOI_Malloc((nprocs_recv+1)*sizeof(MPI_Datatype)); 
00599 
00600 
00601     tmp_len = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00602     j = 0;
00603     for (i=0; i<nprocs; i++) {
00604     if (recv_size[i]) {
00605 
00606         if (partial_recv[i]) {
00607         k = start_pos[i] + count[i] - 1;
00608         tmp_len[i] = others_req[i].lens[k];
00609         others_req[i].lens[k] = partial_recv[i];
00610         }
00611         MPI_Type_hindexed(count[i], 
00612                  &(others_req[i].lens[start_pos[i]]),
00613                  &(others_req[i].mem_ptrs[start_pos[i]]), 
00614              MPI_BYTE, recv_types+j);
00615         
00616         MPI_Type_commit(recv_types+j);
00617         j++;
00618     }
00619     }
00620 
00621     
00622 
00623 
00624 
00625     sum = 0;
00626     for (i=0; i<nprocs; i++) sum += count[i];
00627     
00628 
00629     if (sum) {
00630         srt_off = (ADIO_Offset *) ADIOI_Malloc(sum*sizeof(ADIO_Offset));
00631         srt_len = (int *) ADIOI_Malloc(sum*sizeof(int));
00632 
00633         ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
00634                          nprocs, nprocs_recv, sum);
00635     }
00636 
00637 
00638     for (i=0; i<nprocs; i++) 
00639         if (partial_recv[i]) {
00640             k = start_pos[i] + count[i] - 1;
00641             others_req[i].lens[k] = tmp_len[i];
00642         }
00643     ADIOI_Free(tmp_len);
00644 
00645     
00646 
00647 
00648 
00649 
00650 
00651 
00652     *hole = 0;
00653     if (sum) {
00654         if (off != srt_off[0]) 
00655             *hole = 1;
00656         else { 
00657             for (i=1; i<sum; i++) {
00658                 if (srt_off[i] <= srt_off[0] + srt_len[0]) {
00659             int new_len = srt_off[i] + srt_len[i] - srt_off[0];
00660             if (new_len > srt_len[0]) srt_len[0] = new_len;
00661         }
00662         else
00663             break;
00664         }
00665             if (i < sum || size != srt_len[0]) 
00666                 *hole = 1;
00667     }
00668 
00669         ADIOI_Free(srt_off);
00670         ADIOI_Free(srt_len);
00671     }
00672 
00673     if (nprocs_recv) {
00674     if (*hole) {
00675         ADIO_ReadContig(fd, write_buf, size, MPI_BYTE, 
00676                 ADIO_EXPLICIT_OFFSET, off, &status, &err);
00677         
00678         if (err != MPI_SUCCESS) {
00679         *error_code = MPIO_Err_create_code(err,
00680                            MPIR_ERR_RECOVERABLE, myname,
00681                            __LINE__, MPI_ERR_IO,
00682                            "**ioRMWrdwr", 0);
00683         return;
00684         } 
00685         
00686     }
00687     }
00688 
00689     nprocs_send = 0;
00690     for (i=0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
00691 
00692     if (fd->atomicity) {
00693         
00694         requests = (MPI_Request *)
00695         ADIOI_Malloc((nprocs_send+1)*sizeof(MPI_Request)); 
00696         send_req = requests;
00697     }
00698     else {
00699         requests = (MPI_Request *)  
00700             ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request)); 
00701         
00702 
00703         
00704         j = 0;
00705         for (i=0; i<nprocs; i++) {
00706             if (recv_size[i]) {
00707                 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
00708                           fd->comm, requests+j);
00709                 j++;
00710             }
00711         }
00712     send_req = requests + nprocs_recv;
00713     }
00714 
00715 
00716 
00717 
00718 #ifdef AGGREGATION_PROFILE
00719     MPE_Log_event (5032, 0, NULL);
00720 #endif
00721     if (buftype_is_contig) {
00722     j = 0;
00723     for (i=0; i < nprocs; i++) 
00724         if (send_size[i]) {
00725         MPI_Isend(((char *) buf) + buf_idx[i], send_size[i], 
00726                     MPI_BYTE, i,  myrank+i+100*iter, fd->comm, 
00727                                   send_req+j);
00728         j++;
00729                 buf_idx[i] += send_size[i];
00730         }
00731     }
00732     else if (nprocs_send) {
00733     
00734     send_buf = (char **) ADIOI_Malloc(nprocs*sizeof(char*));
00735     for (i=0; i < nprocs; i++) 
00736         if (send_size[i]) 
00737         send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
00738 
00739     ADIOI_Fill_send_buffer(fd, buf, flat_buf, send_buf,
00740                            offset_list, len_list, send_size, 
00741                send_req,
00742                            sent_to_proc, nprocs, myrank, 
00743                            contig_access_count,
00744                            min_st_offset, fd_size, fd_start, fd_end, 
00745                            send_buf_idx, curr_to_proc, done_to_proc, iter,
00746                            buftype_extent);
00747         
00748     }
00749 
00750     if (fd->atomicity) {
00751         
00752         j = 0;
00753         for (i=0; i<nprocs; i++) {
00754             MPI_Status wkl_status;
00755         if (recv_size[i]) {
00756             MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i, myrank+i+100*iter,
00757                   fd->comm, &wkl_status);
00758             j++;
00759         }
00760         }
00761     }
00762 
00763     for (i=0; i<nprocs_recv; i++) MPI_Type_free(recv_types+i);
00764     ADIOI_Free(recv_types);
00765     
00766     if (fd->atomicity) {
00767         
00768         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+1) * \
00769                                          sizeof(MPI_Status)); 
00770          
00771     }
00772     else {
00773         statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
00774                                      sizeof(MPI_Status)); 
00775         
00776     }
00777 
00778 #ifdef NEEDS_MPI_TEST
00779     i = 0;
00780     if (fd->atomicity) {
00781         
00782         while (!i) MPI_Testall(nprocs_send, send_req, &i, statuses);
00783     }
00784     else {
00785         while (!i) MPI_Testall(nprocs_send+nprocs_recv, requests, &i, statuses);
00786     }
00787 #else
00788     if (fd->atomicity)
00789         
00790         MPI_Waitall(nprocs_send, send_req, statuses);
00791     else
00792         MPI_Waitall(nprocs_send+nprocs_recv, requests, statuses);
00793 #endif
00794 
00795 #ifdef AGGREGATION_PROFILE
00796     MPE_Log_event (5033, 0, NULL);
00797 #endif
00798     ADIOI_Free(statuses);
00799     ADIOI_Free(requests);
00800     if (!buftype_is_contig && nprocs_send) {
00801     for (i=0; i < nprocs; i++) 
00802         if (send_size[i]) ADIOI_Free(send_buf[i]);
00803     ADIOI_Free(send_buf);
00804     }
00805 }
00806 
00807 #define ADIOI_BUF_INCR \
00808 { \
00809     while (buf_incr) { \
00810         size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00811         user_buf_idx += size_in_buf; \
00812         flat_buf_sz -= size_in_buf; \
00813         if (!flat_buf_sz) { \
00814             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00815             else { \
00816                 flat_buf_idx = 0; \
00817                 n_buftypes++; \
00818             } \
00819             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00820                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00821             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00822         } \
00823         buf_incr -= size_in_buf; \
00824     } \
00825 }
00826 
00827 
00828 #define ADIOI_BUF_COPY \
00829 { \
00830     while (size) { \
00831         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
00832   ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
00833   ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
00834         memcpy(&(send_buf[p][send_buf_idx[p]]), \
00835                ((char *) buf) + user_buf_idx, size_in_buf); \
00836         send_buf_idx[p] += size_in_buf; \
00837         user_buf_idx += size_in_buf; \
00838         flat_buf_sz -= size_in_buf; \
00839         if (!flat_buf_sz) { \
00840             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00841             else { \
00842                 flat_buf_idx = 0; \
00843                 n_buftypes++; \
00844             } \
00845             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00846                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
00847             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00848         } \
00849         size -= size_in_buf; \
00850         buf_incr -= size_in_buf; \
00851     } \
00852     ADIOI_BUF_INCR \
00853 }
00854 
00855 
00856 
00857 
00858 
00859 static void ADIOI_Fill_send_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
00860                            *flat_buf, char **send_buf, ADIO_Offset 
00861                            *offset_list, ADIO_Offset *len_list, int *send_size, 
00862                            MPI_Request *requests, int *sent_to_proc, 
00863                            int nprocs, int myrank, 
00864                            int contig_access_count, 
00865                            ADIO_Offset min_st_offset, ADIO_Offset fd_size,
00866                            ADIO_Offset *fd_start, ADIO_Offset *fd_end, 
00867                            int *send_buf_idx, int *curr_to_proc, 
00868                            int *done_to_proc, int iter,
00869                            MPI_Aint buftype_extent)
00870 {
00871 
00872 
00873     int i, p, flat_buf_idx;
00874     ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
00875     int jj, n_buftypes;
00876     ADIO_Offset off, len, rem_len, user_buf_idx;
00877 
00878 
00879 
00880 
00881 
00882 
00883 
00884 
00885     for (i=0; i < nprocs; i++) {
00886     send_buf_idx[i] = curr_to_proc[i] = 0;
00887     done_to_proc[i] = sent_to_proc[i];
00888     }
00889     jj = 0;
00890 
00891     user_buf_idx = flat_buf->indices[0];
00892     flat_buf_idx = 0;
00893     n_buftypes = 0;
00894     flat_buf_sz = flat_buf->blocklens[0];
00895 
00896     
00897 
00898 
00899 
00900     for (i=0; i<contig_access_count; i++) { 
00901     off     = offset_list[i];
00902     rem_len = len_list[i];
00903 
00904     
00905     while (rem_len != 0) {
00906         len = rem_len;
00907         
00908 
00909 
00910 
00911         p = ADIOI_Calc_aggregator(fd,
00912                       off,
00913                       min_st_offset,
00914                       &len,
00915                       fd_size,
00916                       fd_start,
00917                       fd_end);
00918 
00919         if (send_buf_idx[p] < send_size[p]) {
00920         if (curr_to_proc[p]+len > done_to_proc[p]) {
00921             if (done_to_proc[p] > curr_to_proc[p]) {
00922             size = ADIOI_MIN(curr_to_proc[p] + len - 
00923                                 done_to_proc[p], send_size[p]-send_buf_idx[p]);
00924             buf_incr = done_to_proc[p] - curr_to_proc[p];
00925             ADIOI_BUF_INCR
00926       ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
00927                 buf_incr = curr_to_proc[p] + len - done_to_proc[p];
00928       ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
00929             curr_to_proc[p] = done_to_proc[p] + size;
00930                 ADIOI_BUF_COPY
00931             }
00932             else {
00933             size = ADIOI_MIN(len,send_size[p]-send_buf_idx[p]);
00934             buf_incr = len;
00935       ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
00936             curr_to_proc[p] += size;
00937             ADIOI_BUF_COPY
00938             }
00939             if (send_buf_idx[p] == send_size[p]) {
00940             MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p, 
00941                 myrank+p+100*iter, fd->comm, requests+jj);
00942             jj++;
00943             }
00944         }
00945         else {
00946         ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
00947             curr_to_proc[p] += len;
00948             buf_incr = len;
00949             ADIOI_BUF_INCR
00950         }
00951         }
00952         else {
00953         buf_incr = len;
00954         ADIOI_BUF_INCR
00955             }
00956         off     += len;
00957         rem_len -= len;
00958     }
00959     }
00960     for (i=0; i < nprocs; i++) 
00961     if (send_size[i]) sent_to_proc[i] = curr_to_proc[i];
00962 }
00963 
00964 
00965 
00966 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
00967               ADIO_Offset *srt_off, int *srt_len, int *start_pos,
00968               int nprocs, int nprocs_recv, int total_elements)
00969 {
00970     typedef struct {
00971     ADIO_Offset *off_list;
00972     int *len_list;
00973     int nelem;
00974     } heap_struct;
00975 
00976     heap_struct *a, tmp;
00977     int i, j, heapsize, l, r, k, smallest;
00978 
00979     a = (heap_struct *) ADIOI_Malloc((nprocs_recv+1)*sizeof(heap_struct));
00980 
00981     j = 0;
00982     for (i=0; i<nprocs; i++)
00983     if (count[i]) {
00984         a[j].off_list = &(others_req[i].offsets[start_pos[i]]);
00985         a[j].len_list = &(others_req[i].lens[start_pos[i]]);
00986         a[j].nelem = count[i];
00987         j++;
00988     }
00989 
00990     
00991 
00992 
00993     heapsize = nprocs_recv;
00994     for (i=heapsize/2 - 1; i>=0; i--) {
00995     
00996 
00997 
00998 
00999     k = i;
01000     for(;;) {
01001         l = 2*(k+1) - 1;
01002         r = 2*(k+1);
01003 
01004         if ((l < heapsize) && 
01005         (*(a[l].off_list) < *(a[k].off_list)))
01006         smallest = l;
01007         else smallest = k;
01008 
01009         if ((r < heapsize) && 
01010         (*(a[r].off_list) < *(a[smallest].off_list)))
01011         smallest = r;
01012 
01013         if (smallest != k) {
01014         tmp.off_list = a[k].off_list;
01015         tmp.len_list = a[k].len_list;
01016         tmp.nelem = a[k].nelem;
01017 
01018         a[k].off_list = a[smallest].off_list;
01019         a[k].len_list = a[smallest].len_list;
01020         a[k].nelem = a[smallest].nelem;
01021         
01022         a[smallest].off_list = tmp.off_list;
01023         a[smallest].len_list = tmp.len_list;
01024         a[smallest].nelem = tmp.nelem;
01025         
01026         k = smallest;
01027         }
01028         else break;
01029     }
01030     }
01031 
01032     for (i=0; i<total_elements; i++) {
01033         
01034     srt_off[i] = *(a[0].off_list);
01035     srt_len[i] = *(a[0].len_list);
01036     (a[0].nelem)--;
01037 
01038     if (!a[0].nelem) {
01039         a[0].off_list = a[heapsize-1].off_list;
01040         a[0].len_list = a[heapsize-1].len_list;
01041         a[0].nelem = a[heapsize-1].nelem;
01042         heapsize--;
01043     }
01044     else {
01045         (a[0].off_list)++;
01046         (a[0].len_list)++;
01047     }
01048 
01049     
01050     k = 0;
01051     for (;;) {
01052         l = 2*(k+1) - 1;
01053         r = 2*(k+1);
01054 
01055         if ((l < heapsize) && 
01056         (*(a[l].off_list) < *(a[k].off_list)))
01057         smallest = l;
01058         else smallest = k;
01059 
01060         if ((r < heapsize) && 
01061         (*(a[r].off_list) < *(a[smallest].off_list)))
01062         smallest = r;
01063 
01064         if (smallest != k) {
01065         tmp.off_list = a[k].off_list;
01066         tmp.len_list = a[k].len_list;
01067         tmp.nelem = a[k].nelem;
01068 
01069         a[k].off_list = a[smallest].off_list;
01070         a[k].len_list = a[smallest].len_list;
01071         a[k].nelem = a[smallest].nelem;
01072         
01073         a[smallest].off_list = tmp.off_list;
01074         a[smallest].len_list = tmp.len_list;
01075         a[smallest].nelem = tmp.nelem;
01076         
01077         k = smallest;
01078         }
01079         else break;
01080     }
01081     }
01082     ADIOI_Free(a);
01083 }