00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 #include "ad_lustre.h"
00012 #include "adio_extern.h"
00013 
00014 
00015 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
00016                     MPI_Datatype datatype, int nprocs,
00017                     int myrank,
00018                     ADIOI_Access *others_req,
00019                     ADIOI_Access *my_req,
00020                     ADIO_Offset *offset_list,
00021                     ADIO_Offset *len_list,
00022                     int contig_access_count,
00023                     int *striping_info,
00024                                         int **buf_idx, int *error_code);
00025 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
00026                       ADIOI_Flatlist_node *flat_buf,
00027                       char **send_buf,
00028                       ADIO_Offset *offset_list,
00029                       ADIO_Offset *len_list, int *send_size,
00030                       MPI_Request *requests,
00031                       int *sent_to_proc, int nprocs,
00032                       int myrank, int contig_access_count,
00033                       int *striping_info,
00034                       int *send_buf_idx,
00035                                           int *curr_to_proc,
00036                       int *done_to_proc, int iter,
00037                       MPI_Aint buftype_extent);
00038 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
00039                      char *write_buf,
00040                      ADIOI_Flatlist_node *flat_buf,
00041                      ADIO_Offset *offset_list,
00042                      ADIO_Offset *len_list, int *send_size,
00043                      int *recv_size, ADIO_Offset off,
00044                      int size, int *count,
00045                      int *start_pos, int *partial_recv,
00046                      int *sent_to_proc, int nprocs,
00047                      int myrank, int buftype_is_contig,
00048                      int contig_access_count,
00049                      int *striping_info,
00050                      ADIOI_Access *others_req,
00051                      int *send_buf_idx,
00052                      int *curr_to_proc,
00053                      int *done_to_proc, int *hole,
00054                      int iter, MPI_Aint buftype_extent,
00055                      int *buf_idx, int *error_code);
00056 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
00057                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
00058                       int nprocs, int nprocs_recv, int total_elements);
00059 
00060 void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count,
00061                    MPI_Datatype datatype,
00062                    int file_ptr_type, ADIO_Offset offset,
00063                    ADIO_Status *status, int *error_code)
00064 {
00065     
00066 
00067 
00068 
00069 
00070 
00071 
00072     ADIOI_Access *my_req;
00073     
00074 
00075 
00076     ADIOI_Access *others_req;
00077     
00078 
00079 
00080     int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
00081     int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
00082     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
00083     ADIO_Offset orig_fp, start_offset, end_offset, off;
00084     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
00085     ADIO_Offset *len_list = NULL;
00086     int **buf_idx = NULL, *striping_info = NULL;
00087     int old_error, tmp_error;
00088 
00089     MPI_Comm_size(fd->comm, &nprocs);
00090     MPI_Comm_rank(fd->comm, &myrank);
00091 
00092     orig_fp = fd->fp_ind;
00093 
00094     
00095     if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00096     
00097 
00098 
00099     
00100 
00101 
00102 
00103     ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
00104                           &offset_list, &len_list, &start_offset,
00105                           &end_offset, &contig_access_count);
00106 
00107     
00108 
00109 
00110 
00111     st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
00112     end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
00113     MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
00114               ADIO_OFFSET, fd->comm);
00115     MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
00116               ADIO_OFFSET, fd->comm);
00117     
00118     for (i = 1; i < nprocs; i++)
00119         if ((st_offsets[i] < end_offsets[i-1]) &&
00120                 (st_offsets[i] <= end_offsets[i]))
00121                 interleave_count++;
00122     
00123 
00124 
00125     
00126 
00127 
00128 
00129         if (interleave_count > 0) {
00130         do_collect = 1;
00131         } else {
00132             do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
00133                                     len_list, nprocs);
00134         }
00135     }
00136     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00137 
00138     
00139     if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
00140         fd->hints->cb_write == ADIOI_HINT_DISABLE) {
00141 
00142     
00143     if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
00144         ADIOI_Free(offset_list);
00145         ADIOI_Free(len_list);
00146             ADIOI_Free(st_offsets);
00147             ADIOI_Free(end_offsets);
00148     }
00149 
00150     fd->fp_ind = orig_fp;
00151     ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
00152     if (buftype_is_contig && filetype_is_contig) {
00153         if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
00154                 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
00155         ADIO_WriteContig(fd, buf, count, datatype,
00156                  ADIO_EXPLICIT_OFFSET,
00157                  off, status, error_code);
00158         } else
00159         ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
00160                  0, status, error_code);
00161     } else {
00162         ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
00163                   offset, status, error_code);
00164     }
00165     return;
00166     }
00167 
00168     
00169     ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
00170 
00171     
00172 
00173 
00174     ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
00175                              striping_info, nprocs, &count_my_req_procs,
00176                              &count_my_req_per_proc, &my_req,
00177                              &buf_idx);
00178 
00179     
00180 
00181 
00182 
00183 
00184 
00185 
00186 
00187     ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
00188                           my_req, nprocs, myrank, &count_others_req_procs,
00189                           &others_req);
00190     ADIOI_Free(count_my_req_per_proc);
00191 
00192     
00193     ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
00194                                 others_req, my_req, offset_list, len_list,
00195                                 contig_access_count, striping_info,
00196                                 buf_idx, error_code);
00197 
00198     
00199 
00200 
00201 
00202 
00203 
00204 
00205 
00206 
00207 
00208 
00209     old_error = *error_code;
00210     if (*error_code != MPI_SUCCESS)
00211     *error_code = MPI_ERR_IO;
00212 
00213     
00214 
00215 #ifdef ADIOI_MPE_LOGGING
00216     MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
00217 #endif
00218     if (fd->hints->cb_nodes == 1)
00219     MPI_Bcast(error_code, 1, MPI_INT,
00220           fd->hints->ranklist[0], fd->comm);
00221     else {
00222     tmp_error = *error_code;
00223     MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
00224               MPI_MAX, fd->comm);
00225     }
00226 #ifdef ADIOI_MPE_LOGGING
00227     MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
00228 #endif
00229 
00230     if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
00231     *error_code = old_error;
00232 
00233 
00234     if (!buftype_is_contig)
00235     ADIOI_Delete_flattened(datatype);
00236 
00237     
00238     
00239     for (i = 0; i < nprocs; i++) {
00240     if (others_req[i].count) {
00241         ADIOI_Free(others_req[i].offsets);
00242         ADIOI_Free(others_req[i].lens);
00243         ADIOI_Free(others_req[i].mem_ptrs);
00244     }
00245     }
00246     ADIOI_Free(others_req);
00247     
00248     for (i = 0; i < nprocs; i++) {
00249     if (my_req[i].count) {
00250         ADIOI_Free(my_req[i].offsets);
00251         ADIOI_Free(my_req[i].lens);
00252     }
00253     }
00254     ADIOI_Free(my_req);
00255     for (i = 0; i < nprocs; i++) {
00256         ADIOI_Free(buf_idx[i]);
00257     }
00258     ADIOI_Free(buf_idx);
00259     ADIOI_Free(offset_list);
00260     ADIOI_Free(len_list);
00261     ADIOI_Free(st_offsets);
00262     ADIOI_Free(end_offsets);
00263     ADIOI_Free(striping_info);
00264 
00265 #ifdef HAVE_STATUS_SET_BYTES
00266     if (status) {
00267     int bufsize, size;
00268     
00269     MPI_Type_size(datatype, &size);
00270     bufsize = size * count;
00271     MPIR_Status_set_bytes(status, datatype, bufsize);
00272     }
00273     
00274 
00275 
00276 #endif
00277 
00278     fd->fp_sys_posn = -1;   
00279 }
00280 
00281 
00282 
00283 
00284 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
00285                     MPI_Datatype datatype, int nprocs,
00286                     int myrank, ADIOI_Access *others_req,
00287                                         ADIOI_Access *my_req,
00288                     ADIO_Offset *offset_list,
00289                                         ADIO_Offset *len_list, 
00290                     int contig_access_count,
00291                                         int *striping_info, int **buf_idx,
00292                                         int *error_code)
00293 {
00294     
00295 
00296 
00297 
00298 
00299 
00300 
00301 
00302 
00303 
00304     int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
00305     ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
00306     ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
00307     ADIO_Offset max_size, step_size = 0;
00308     int real_size, req_len, send_len;
00309     int *recv_curr_offlen_ptr, *recv_count, *recv_size;
00310     int *send_curr_offlen_ptr, *send_size;
00311     int *partial_recv, *sent_to_proc, *recv_start_pos;
00312     int *send_buf_idx, *curr_to_proc, *done_to_proc;
00313     int *this_buf_idx;
00314     char *write_buf = NULL;
00315     MPI_Status status;
00316     ADIOI_Flatlist_node *flat_buf = NULL;
00317     MPI_Aint buftype_extent;
00318     int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
00319     int data_sieving = 0;
00320 
00321     *error_code = MPI_SUCCESS;  
00322     
00323 
00324     
00325 
00326 
00327 
00328 
00329 
00330 
00331     for (i = 0; i < nprocs; i++) {
00332     if (others_req[i].count) {
00333         st_loc = others_req[i].offsets[0];
00334         end_loc = others_req[i].offsets[0];
00335         break;
00336     }
00337     }
00338     for (i = 0; i < nprocs; i++) {
00339     for (j = 0; j < others_req[i].count; j++) {
00340         st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
00341         end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
00342                                           others_req[i].lens[j] - 1));
00343     }
00344     }
00345     
00346     if ((st_loc == -1) && (end_loc == -1))
00347     ntimes = 0;
00348     MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
00349     
00350     if (st_loc == -1)
00351         st_loc = max_end_loc;
00352     MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
00353     
00354     min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
00355 
00356     
00357 
00358 
00359 
00360     step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
00361     max_ntimes = (max_end_loc - min_st_loc + 1) / step_size
00362         + (((max_end_loc - min_st_loc + 1) % step_size) ? 1 : 0);
00363 
00364     if (ntimes)
00365     write_buf = (char *) ADIOI_Malloc(stripe_size);
00366 
00367     
00368     off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
00369     for (m = 0; m < max_ntimes; m ++)
00370         off_list[m] = max_end_loc;
00371     for (i = 0; i < nprocs; i++) {
00372         for (j = 0; j < others_req[i].count; j ++) {
00373             req_off = others_req[i].offsets[j];
00374             m = (int)((req_off - min_st_loc) / step_size);
00375             off_list[m] = ADIOI_MIN(off_list[m], req_off);
00376         }
00377     }
00378 
00379     recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00380     send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00381     
00382 
00383     recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00384     
00385 
00386 
00387     send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00388     
00389 
00390 
00391     recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00392     
00393 
00394     sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00395     
00396 
00397 
00398     send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00399     curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00400     done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00401     
00402 
00403     this_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00404 
00405     recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
00406     
00407 
00408 
00409     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
00410     if (!buftype_is_contig) {
00411     ADIOI_Flatten_datatype(datatype);
00412     flat_buf = ADIOI_Flatlist;
00413     while (flat_buf->type != datatype)
00414         flat_buf = flat_buf->next;
00415     }
00416     MPI_Type_extent(datatype, &buftype_extent);
00417     
00418 
00419 
00420 
00421 
00422 
00423     
00424 
00425 
00426 
00427 
00428     iter_st_off = min_st_loc;
00429 
00430     
00431 
00432 
00433 
00434 
00435 
00436 
00437 
00438 
00439     
00440     data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
00441 
00442     for (m = 0; m < max_ntimes; m++) {
00443     
00444 
00445 
00446 
00447     
00448 
00449 
00450 
00451 
00452     
00453 
00454 
00455 
00456 
00457 
00458 
00459 
00460 
00461 
00462 
00463 
00464 
00465     
00466     for (i = 0; i < nprocs; i++)
00467         recv_count[i] = recv_size[i] = send_size[i] = 0;
00468 
00469         off = off_list[m];
00470         max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
00471         real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size -
00472                                     off,
00473                                     end_loc - off + 1);
00474 
00475     for (i = 0; i < nprocs; i++) {
00476             if (my_req[i].count) {
00477                 this_buf_idx[i] = buf_idx[i][send_curr_offlen_ptr[i]];
00478                 for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
00479                     send_off = my_req[i].offsets[j];
00480                     send_len = my_req[i].lens[j];
00481                     if (send_off < iter_st_off + max_size) {
00482                         send_size[i] += send_len;
00483                     } else {
00484                         break;
00485                     }
00486                 }
00487                 send_curr_offlen_ptr[i] = j;
00488             }
00489         if (others_req[i].count) {
00490         recv_start_pos[i] = recv_curr_offlen_ptr[i];
00491         for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
00492                     req_off = others_req[i].offsets[j];
00493                     req_len = others_req[i].lens[j];
00494             if (req_off < iter_st_off + max_size) {
00495             recv_count[i]++;
00496                         ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIR_Upint)(write_buf+req_off-off));
00497             MPI_Address(write_buf + req_off - off,
00498                     &(others_req[i].mem_ptrs[j]));
00499                         recv_size[i] += req_len;
00500             } else {
00501             break;
00502                     }
00503         }
00504         recv_curr_offlen_ptr[i] = j;
00505         }
00506     }
00507         
00508         hole = data_sieving;
00509     ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
00510                                      len_list, send_size, recv_size, off, real_size,
00511                                      recv_count, recv_start_pos, partial_recv,
00512                                      sent_to_proc, nprocs, myrank,
00513                                      buftype_is_contig, contig_access_count,
00514                                      striping_info, others_req, send_buf_idx,
00515                                      curr_to_proc, done_to_proc, &hole, m,
00516                                      buftype_extent, this_buf_idx, error_code);
00517     if (*error_code != MPI_SUCCESS)
00518             goto over;
00519 
00520     flag = 0;
00521     for (i = 0; i < nprocs; i++)
00522         if (recv_count[i]) {
00523         flag = 1;
00524         break;
00525         }
00526     if (flag) {
00527             
00528             if(data_sieving == ADIOI_HINT_ENABLE) {
00529             ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
00530                      ADIO_EXPLICIT_OFFSET, off, &status,
00531                      error_code);
00532             } else {
00533                 
00534 
00535                 if (!hole) {
00536                     ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
00537                                      ADIO_EXPLICIT_OFFSET, off, &status,
00538                                      error_code);
00539                 } else {
00540                     for (i = 0; i < nprocs; i++) {
00541                         if (others_req[i].count) {
00542                             for (j = 0; j < others_req[i].count; j++) {
00543                                 if (others_req[i].offsets[j] < off + real_size &&
00544                                     others_req[i].offsets[j] >= off) {
00545                                     ADIO_WriteContig(fd,
00546                                                      write_buf + others_req[i].offsets[j] - off,
00547                                                      others_req[i].lens[j],
00548                                                      MPI_BYTE, ADIO_EXPLICIT_OFFSET,
00549                                                      others_req[i].offsets[j], &status,
00550                                                      error_code);
00551                                 if (*error_code != MPI_SUCCESS)
00552                                 goto over;
00553                                 }
00554                             }
00555                         }
00556                     }
00557                 }
00558             }
00559         if (*error_code != MPI_SUCCESS)
00560         goto over;
00561     }
00562         iter_st_off += max_size;
00563     }
00564 over:
00565     if (ntimes)
00566     ADIOI_Free(write_buf);
00567     ADIOI_Free(recv_curr_offlen_ptr);
00568     ADIOI_Free(send_curr_offlen_ptr);
00569     ADIOI_Free(recv_count);
00570     ADIOI_Free(send_size);
00571     ADIOI_Free(recv_size);
00572     ADIOI_Free(sent_to_proc);
00573     ADIOI_Free(recv_start_pos);
00574     ADIOI_Free(send_buf_idx);
00575     ADIOI_Free(curr_to_proc);
00576     ADIOI_Free(done_to_proc);
00577     ADIOI_Free(this_buf_idx);
00578     ADIOI_Free(off_list);
00579 }
00580 
00581 
00582 
00583 
00584 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
00585                      char *write_buf,
00586                      ADIOI_Flatlist_node *flat_buf,
00587                      ADIO_Offset *offset_list,
00588                      ADIO_Offset *len_list, int *send_size,
00589                      int *recv_size, ADIO_Offset off,
00590                      int size, int *count,
00591                      int *start_pos, int *partial_recv,
00592                      int *sent_to_proc, int nprocs,
00593                      int myrank, int buftype_is_contig,
00594                      int contig_access_count,
00595                      int *striping_info,
00596                      ADIOI_Access *others_req,
00597                      int *send_buf_idx,
00598                      int *curr_to_proc, int *done_to_proc,
00599                                          int *hole, int iter,
00600                                          MPI_Aint buftype_extent,
00601                      int *buf_idx, int *error_code)
00602 {
00603     int i, j, nprocs_recv, nprocs_send, err;
00604     char **send_buf = NULL;
00605     MPI_Request *requests, *send_req;
00606     MPI_Datatype *recv_types;
00607     MPI_Status *statuses, status;
00608     int *srt_len, sum, sum_recv;
00609     ADIO_Offset *srt_off;
00610     int data_sieving = *hole;
00611     static char myname[] = "ADIOI_W_EXCHANGE_DATA";
00612 
00613     
00614     nprocs_recv = 0;
00615     for (i = 0; i < nprocs; i++)
00616     if (recv_size[i])
00617         nprocs_recv++;
00618 
00619     recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
00620                            sizeof(MPI_Datatype));
00621     
00622 
00623     j = 0;
00624     for (i = 0; i < nprocs; i++) {
00625     if (recv_size[i]) {
00626         MPI_Type_hindexed(count[i],
00627                   &(others_req[i].lens[start_pos[i]]),
00628                   &(others_req[i].mem_ptrs[start_pos[i]]),
00629                   MPI_BYTE, recv_types + j);
00630         
00631         MPI_Type_commit(recv_types + j);
00632         j++;
00633     }
00634     }
00635 
00636     
00637 
00638 
00639 
00640 
00641     sum = 0;
00642     for (i = 0; i < nprocs; i++)
00643     sum += count[i];
00644     srt_off = (ADIO_Offset *) ADIOI_Malloc((sum + 1) * sizeof(ADIO_Offset));
00645     srt_len = (int *) ADIOI_Malloc((sum + 1) * sizeof(int));
00646     
00647 
00648     ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
00649              nprocs, nprocs_recv, sum);
00650 
00651     
00652     *hole = 0;
00653     for (i = 0; i < sum - 1; i++) {
00654         if (srt_off[i] + srt_len[i] < srt_off[i + 1]) {
00655             *hole = 1;
00656         break;
00657     }
00658     }
00659     
00660 
00661 
00662 
00663 
00664 
00665     if (*hole == 0) {
00666         sum_recv = 0;
00667         for (i = 0; i < nprocs; i++)
00668             sum_recv += recv_size[i];
00669     if (size > sum_recv)
00670         *hole = 1;
00671     }
00672     
00673     if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
00674         ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
00675                         ADIO_EXPLICIT_OFFSET, off, &status, &err);
00676         
00677         if (err != MPI_SUCCESS) {
00678             *error_code = MPIO_Err_create_code(err,
00679                                                MPIR_ERR_RECOVERABLE,
00680                                                myname, __LINE__,
00681                                                MPI_ERR_IO,
00682                                                "**ioRMWrdwr", 0);
00683             ADIOI_Free(recv_types);
00684             ADIOI_Free(srt_off);
00685             ADIOI_Free(srt_len);
00686             return;
00687         }
00688         
00689     }
00690     ADIOI_Free(srt_off);
00691     ADIOI_Free(srt_len);
00692 
00693     nprocs_send = 0;
00694     for (i = 0; i < nprocs; i++)
00695     if (send_size[i])
00696         nprocs_send++;
00697 
00698     if (fd->atomicity) {
00699     
00700     requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
00701                                                 sizeof(MPI_Request));
00702     send_req = requests;
00703     } else {
00704     requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
00705                                                 sizeof(MPI_Request));
00706     
00707 
00708     
00709     j = 0;
00710     for (i = 0; i < nprocs; i++) {
00711         if (recv_size[i]) {
00712         MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
00713               myrank + i + 100 * iter, fd->comm, requests + j);
00714         j++;
00715         }
00716     }
00717     send_req = requests + nprocs_recv;
00718     }
00719 
00720     
00721 
00722 
00723 
00724     if (buftype_is_contig) {
00725     j = 0;
00726     for (i = 0; i < nprocs; i++)
00727         if (send_size[i]) {
00728                 ADIOI_Assert(buf_idx[i] != -1);
00729         MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
00730               MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
00731               send_req + j);
00732         j++;
00733         }
00734     } else
00735         if (nprocs_send) {
00736     
00737     send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
00738     for (i = 0; i < nprocs; i++)
00739         if (send_size[i])
00740         send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
00741 
00742     ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
00743                                       len_list, send_size, send_req,
00744                                       sent_to_proc, nprocs, myrank,
00745                                       contig_access_count, striping_info,
00746                                       send_buf_idx, curr_to_proc, done_to_proc,
00747                                       iter, buftype_extent);
00748     
00749     }
00750 
00751     
00752     if (fd->atomicity) {
00753     j = 0;
00754     for (i = 0; i < nprocs; i++) {
00755         MPI_Status wkl_status;
00756         if (recv_size[i]) {
00757         MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
00758              myrank + i + 100 * iter, fd->comm, &wkl_status);
00759         j++;
00760         }
00761     }
00762     }
00763 
00764     for (i = 0; i < nprocs_recv; i++)
00765     MPI_Type_free(recv_types + i);
00766     ADIOI_Free(recv_types);
00767 
00768     
00769     
00770     if (fd->atomicity) {
00771     statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
00772                            sizeof(MPI_Status));
00773     } else {
00774     statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
00775                            sizeof(MPI_Status));
00776     }
00777 
00778 #ifdef NEEDS_MPI_TEST
00779     i = 0;
00780     if (fd->atomicity) {
00781     
00782     while (!i)
00783         MPI_Testall(nprocs_send, send_req, &i, statuses);
00784     } else {
00785     while (!i)
00786         MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
00787     }
00788 #else
00789     
00790     if (fd->atomicity)
00791     MPI_Waitall(nprocs_send, send_req, statuses);
00792     else
00793     MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
00794 #endif
00795     ADIOI_Free(statuses);
00796     ADIOI_Free(requests);
00797     if (!buftype_is_contig && nprocs_send) {
00798     for (i = 0; i < nprocs; i++)
00799         if (send_size[i])
00800         ADIOI_Free(send_buf[i]);
00801     ADIOI_Free(send_buf);
00802     }
00803 }
00804 
00805 #define ADIOI_BUF_INCR \
00806 { \
00807     while (buf_incr) { \
00808         size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
00809         user_buf_idx += size_in_buf; \
00810         flat_buf_sz -= size_in_buf; \
00811         if (!flat_buf_sz) { \
00812             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00813             else { \
00814                 flat_buf_idx = 0; \
00815                 n_buftypes++; \
00816             } \
00817             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00818                 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;  \
00819             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00820         } \
00821         buf_incr -= size_in_buf; \
00822     } \
00823 }
00824 
00825 
00826 #define ADIOI_BUF_COPY \
00827 { \
00828     while (size) { \
00829         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
00830         ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
00831         ADIOI_Assert(size_in_buf == (size_t)size_in_buf);               \
00832         memcpy(&(send_buf[p][send_buf_idx[p]]), \
00833                ((char *) buf) + user_buf_idx, size_in_buf); \
00834         send_buf_idx[p] += size_in_buf; \
00835         user_buf_idx += size_in_buf; \
00836         flat_buf_sz -= size_in_buf; \
00837         if (!flat_buf_sz) { \
00838             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
00839             else { \
00840                 flat_buf_idx = 0; \
00841                 n_buftypes++; \
00842             } \
00843             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
00844                 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;    \
00845             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
00846         } \
00847         size -= size_in_buf; \
00848         buf_incr -= size_in_buf; \
00849     } \
00850     ADIOI_BUF_INCR \
00851 }
00852 
00853 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
00854                       ADIOI_Flatlist_node *flat_buf,
00855                       char **send_buf,
00856                       ADIO_Offset *offset_list,
00857                       ADIO_Offset *len_list, int *send_size,
00858                       MPI_Request *requests,
00859                       int *sent_to_proc, int nprocs,
00860                       int myrank,
00861                       int contig_access_count,
00862                       int *striping_info,
00863                       int *send_buf_idx,
00864                       int *curr_to_proc,
00865                       int *done_to_proc, int iter,
00866                       MPI_Aint buftype_extent)
00867 {
00868     
00869     int i, p, flat_buf_idx, size;
00870     int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
00871     ADIO_Offset off, len, rem_len, user_buf_idx;
00872 
00873     
00874 
00875 
00876 
00877 
00878 
00879 
00880 
00881     for (i = 0; i < nprocs; i++) {
00882     send_buf_idx[i] = curr_to_proc[i] = 0;
00883     done_to_proc[i] = sent_to_proc[i];
00884     }
00885     jj = 0;
00886 
00887     user_buf_idx = flat_buf->indices[0];
00888     flat_buf_idx = 0;
00889     n_buftypes = 0;
00890     flat_buf_sz = flat_buf->blocklens[0];
00891 
00892     
00893 
00894 
00895     for (i = 0; i < contig_access_count; i++) {
00896     off = offset_list[i];
00897     rem_len = (ADIO_Offset) len_list[i];
00898 
00899     
00900     while (rem_len != 0) {
00901         len = rem_len;
00902         
00903 
00904 
00905 
00906         p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
00907 
00908         if (send_buf_idx[p] < send_size[p]) {
00909         if (curr_to_proc[p] + len > done_to_proc[p]) {
00910             if (done_to_proc[p] > curr_to_proc[p]) {
00911             size = (int) ADIOI_MIN(curr_to_proc[p] + len -
00912                            done_to_proc[p],
00913                            send_size[p] -
00914                            send_buf_idx[p]);
00915             buf_incr = done_to_proc[p] - curr_to_proc[p];
00916             ADIOI_BUF_INCR
00917                         ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
00918                 buf_incr = (int) (curr_to_proc[p] + len -
00919                           done_to_proc[p]);
00920                         ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
00921             curr_to_proc[p] = done_to_proc[p] + size;
00922                 ADIOI_BUF_COPY
00923                     } else {
00924             size = (int) ADIOI_MIN(len, send_size[p] -
00925                            send_buf_idx[p]);
00926             buf_incr = (int) len;
00927                         ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
00928             curr_to_proc[p] += size;
00929                 ADIOI_BUF_COPY
00930                     }
00931             if (send_buf_idx[p] == send_size[p]) {
00932             MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
00933                   myrank + p + 100 * iter, fd->comm,
00934                   requests + jj);
00935             jj++;
00936             }
00937         } else {
00938                     ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
00939             curr_to_proc[p] += (int) len;
00940             buf_incr = (int) len;
00941             ADIOI_BUF_INCR
00942                 }
00943         } else {
00944         buf_incr = (int) len;
00945             ADIOI_BUF_INCR
00946             }
00947         off += len;
00948         rem_len -= len;
00949     }
00950     }
00951     for (i = 0; i < nprocs; i++)
00952     if (send_size[i])
00953         sent_to_proc[i] = curr_to_proc[i];
00954 }