00001 
00002 
00003 
00004 
00005 
00006 
00007 #include "adio.h"
00008 #include "adio_extern.h"
00009 
00010 #ifdef AGGREGATION_PROFILE
00011 #include "mpe.h"
00012 #endif
00013 
00014 #undef AGG_DEBUG
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 
00041 
00042 
00043 
00044 
00045 
00046 
00047 
00048 
00049 
00050 
00051 
00052 
00053 
00054 
00055 
00056 
00057 
00058 
00059 
00060 
00061 
00062 
00063 
00064 
00065 
00066 
00067 
00068 
00069 
00070 
00071 
00072 
00073 int ADIOI_Calc_aggregator(ADIO_File fd,
00074              ADIO_Offset off, 
00075              ADIO_Offset min_off, 
00076              ADIO_Offset *len, 
00077              ADIO_Offset fd_size,
00078              ADIO_Offset *fd_start,
00079              ADIO_Offset *fd_end)
00080 {
00081     int rank_index, rank;
00082     ADIO_Offset avail_bytes;
00083 
00084     ADIOI_UNREFERENCED_ARG(fd_start);
00085 
00086     
00087     rank_index = (int) ((off - min_off + fd_size)/ fd_size - 1);
00088 
00089     if (fd->hints->striping_unit > 0) {
00090         
00091 
00092 
00093 
00094         rank_index = 0;
00095         while (off > fd_end[rank_index]) rank_index++;
00096     }
00097 
00098     
00099 
00100 
00101     if (rank_index >= fd->hints->cb_nodes || rank_index < 0) {
00102         FPRINTF(stderr, "Error in ADIOI_Calc_aggregator(): rank_index(%d) >= fd->hints->cb_nodes (%d) fd_size=%lld off=%lld\n",
00103             rank_index,fd->hints->cb_nodes,fd_size,off);
00104         MPI_Abort(MPI_COMM_WORLD, 1);
00105     }
00106 
00107     
00108 
00109 
00110 
00111 
00112 
00113 
00114     avail_bytes = fd_end[rank_index] + 1 - off;
00115     if (avail_bytes < *len) {
00116     
00117     *len = avail_bytes;
00118     }
00119 
00120     
00121     
00122     rank = fd->hints->ranklist[rank_index];
00123 
00124     return rank;
00125 }
00126 
00127 void ADIOI_Calc_file_domains(ADIO_Offset *st_offsets, ADIO_Offset
00128                  *end_offsets, int nprocs, int nprocs_for_coll,
00129                  ADIO_Offset *min_st_offset_ptr,
00130                  ADIO_Offset **fd_start_ptr, ADIO_Offset 
00131                  **fd_end_ptr, int min_fd_size, 
00132                  ADIO_Offset *fd_size_ptr,
00133                  int striping_unit)
00134 {
00135 
00136 
00137 
00138 
00139     ADIO_Offset min_st_offset, max_end_offset, *fd_start, *fd_end, fd_size;
00140     int i;
00141 
00142 #ifdef AGGREGATION_PROFILE
00143     MPE_Log_event (5004, 0, NULL);
00144 #endif
00145 
00146 #ifdef AGG_DEBUG
00147     FPRINTF(stderr, "ADIOI_Calc_file_domains: %d aggregator(s)\n", 
00148         nprocs_for_coll);
00149 #endif
00150 
00151 
00152 
00153     min_st_offset = st_offsets[0];
00154     max_end_offset = end_offsets[0];
00155 
00156     for (i=1; i<nprocs; i++) {
00157     min_st_offset = ADIOI_MIN(min_st_offset, st_offsets[i]);
00158     max_end_offset = ADIOI_MAX(max_end_offset, end_offsets[i]);
00159     }
00160 
00161 
00162 
00163 
00164 
00165  
00166     fd_size = ((max_end_offset - min_st_offset + 1) + nprocs_for_coll -
00167            1)/nprocs_for_coll; 
00168     
00169 
00170     
00171 
00172 
00173 
00174 
00175     if (fd_size < min_fd_size)
00176     fd_size = min_fd_size;
00177 
00178     *fd_start_ptr = (ADIO_Offset *)
00179     ADIOI_Malloc(nprocs_for_coll*sizeof(ADIO_Offset)); 
00180     *fd_end_ptr = (ADIO_Offset *)
00181     ADIOI_Malloc(nprocs_for_coll*sizeof(ADIO_Offset)); 
00182 
00183     fd_start = *fd_start_ptr;
00184     fd_end = *fd_end_ptr;
00185 
00186     
00187 
00188 
00189     if (striping_unit > 0) {
00190         ADIO_Offset end_off;
00191         int         rem_front, rem_back;
00192 
00193         
00194         fd_start[0] = min_st_offset;
00195         end_off     = fd_start[0] + fd_size;
00196         rem_front   = end_off % striping_unit;
00197         rem_back    = striping_unit - rem_front;
00198         if (rem_front < rem_back) 
00199         end_off -= rem_front;
00200         else                      
00201         end_off += rem_back;
00202         fd_end[0] = end_off - 1;
00203     
00204         
00205         for (i=1; i<nprocs_for_coll; i++) {
00206             fd_start[i] = fd_end[i-1] + 1;
00207             end_off     = min_st_offset + fd_size * (i+1);
00208             rem_front   = end_off % striping_unit;
00209             rem_back    = striping_unit - rem_front;
00210             if (rem_front < rem_back) 
00211             end_off -= rem_front;
00212             else                      
00213             end_off += rem_back;
00214             fd_end[i] = end_off - 1;
00215         }
00216         fd_end[nprocs_for_coll-1] = max_end_offset;
00217     }
00218     else { 
00219         fd_start[0] = min_st_offset;
00220         fd_end[0] = min_st_offset + fd_size - 1;
00221 
00222         for (i=1; i<nprocs_for_coll; i++) {
00223             fd_start[i] = fd_end[i-1] + 1;
00224             fd_end[i] = fd_start[i] + fd_size - 1;
00225         }
00226     }
00227 
00228 
00229 
00230 
00231 
00232 
00233 
00234     for (i=0; i<nprocs_for_coll; i++) {
00235     if (fd_start[i] > max_end_offset)
00236         fd_start[i] = fd_end[i] = -1;
00237     if (fd_end[i] > max_end_offset)
00238         fd_end[i] = max_end_offset;
00239     }
00240 
00241     *fd_size_ptr = fd_size;
00242     *min_st_offset_ptr = min_st_offset;
00243 
00244 #ifdef AGGREGATION_PROFILE
00245     MPE_Log_event (5005, 0, NULL);
00246 #endif
00247 }
00248 
00249 
00250 
00251 
00252 
00253 
00254 void ADIOI_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list, ADIO_Offset *len_list, 
00255                int contig_access_count, ADIO_Offset 
00256                min_st_offset, ADIO_Offset *fd_start,
00257                ADIO_Offset *fd_end, ADIO_Offset fd_size,
00258                        int nprocs,
00259                        int *count_my_req_procs_ptr,
00260                int **count_my_req_per_proc_ptr,
00261                ADIOI_Access **my_req_ptr,
00262                int **buf_idx_ptr)
00263 
00264 
00265 {
00266     int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
00267     int i, l, proc;
00268     ADIO_Offset fd_len, rem_len, curr_idx, off;
00269     ADIOI_Access *my_req;
00270 
00271 #ifdef AGGREGATION_PROFILE
00272     MPE_Log_event (5024, 0, NULL);
00273 #endif
00274 
00275     *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs,sizeof(int)); 
00276     count_my_req_per_proc = *count_my_req_per_proc_ptr;
00277 
00278 
00279 
00280 
00281 
00282     buf_idx = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00283 
00284 
00285 
00286 
00287    
00288     
00289     for (i=0; i < nprocs; i++) buf_idx[i] = -1;
00290 
00291     
00292 
00293 
00294     for (i=0; i < contig_access_count; i++) {
00295     
00296 
00297     if (len_list[i] == 0) 
00298         continue;
00299     off = offset_list[i];
00300     fd_len = len_list[i];
00301     
00302 
00303 
00304 
00305 
00306     proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, 
00307                      fd_start, fd_end);
00308     count_my_req_per_proc[proc]++;
00309 
00310     
00311 
00312 
00313 
00314     rem_len = len_list[i] - fd_len;
00315 
00316     while (rem_len != 0) {
00317         off += fd_len; 
00318         fd_len = rem_len; 
00319         proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, 
00320                      fd_size, fd_start, fd_end);
00321 
00322         count_my_req_per_proc[proc]++;
00323         rem_len -= fd_len; 
00324     }
00325     }
00326 
00327 
00328 
00329     *my_req_ptr = (ADIOI_Access *)
00330     ADIOI_Malloc(nprocs*sizeof(ADIOI_Access)); 
00331     my_req = *my_req_ptr;
00332 
00333     count_my_req_procs = 0;
00334     for (i=0; i < nprocs; i++) {
00335     if (count_my_req_per_proc[i]) {
00336         my_req[i].offsets = (ADIO_Offset *)
00337         ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(ADIO_Offset));
00338         my_req[i].lens = (int *)
00339         ADIOI_Malloc(count_my_req_per_proc[i] * sizeof(int));
00340         count_my_req_procs++;
00341     }       
00342     my_req[i].count = 0;  
00343 
00344     }
00345 
00346 
00347     curr_idx = 0;
00348     for (i=0; i<contig_access_count; i++) { 
00349     
00350 
00351     if (len_list[i] == 0)
00352         continue;
00353     off = offset_list[i];
00354     fd_len = len_list[i];
00355     proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, fd_size, 
00356                      fd_start, fd_end);
00357 
00358     
00359     if (buf_idx[proc] == -1) 
00360   {
00361     ADIOI_Assert(curr_idx == (int) curr_idx);
00362     buf_idx[proc] = (int) curr_idx;
00363   }
00364 
00365     l = my_req[proc].count;
00366     curr_idx += fd_len; 
00367 
00368     rem_len = len_list[i] - fd_len;
00369 
00370     
00371 
00372 
00373 
00374 
00375     my_req[proc].offsets[l] = off;
00376   ADIOI_Assert(fd_len == (int) fd_len);
00377     my_req[proc].lens[l] = (int) fd_len;
00378     my_req[proc].count++;
00379 
00380     while (rem_len != 0) {
00381         off += fd_len;
00382         fd_len = rem_len;
00383         proc = ADIOI_Calc_aggregator(fd, off, min_st_offset, &fd_len, 
00384                      fd_size, fd_start, fd_end);
00385 
00386         if (buf_idx[proc] == -1) 
00387       {
00388         ADIOI_Assert(curr_idx == (int) curr_idx);
00389         buf_idx[proc] = (int) curr_idx;
00390       }
00391 
00392         l = my_req[proc].count;
00393         curr_idx += fd_len;
00394         rem_len -= fd_len;
00395 
00396         my_req[proc].offsets[l] = off;
00397       ADIOI_Assert(fd_len == (int) fd_len);
00398         my_req[proc].lens[l] = (int) fd_len;
00399         my_req[proc].count++;
00400     }
00401     }
00402 
00403 #ifdef AGG_DEBUG
00404     for (i=0; i<nprocs; i++) {
00405     if (count_my_req_per_proc[i] > 0) {
00406         FPRINTF(stdout, "data needed from %d (count = %d):\n", i, 
00407             my_req[i].count);
00408         for (l=0; l < my_req[i].count; l++) {
00409         FPRINTF(stdout, "   off[%d] = %lld, len[%d] = %d\n", l,
00410             my_req[i].offsets[l], l, my_req[i].lens[l]);
00411         }
00412     FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
00413     }
00414     }
00415 #endif
00416 
00417     *count_my_req_procs_ptr = count_my_req_procs;
00418     *buf_idx_ptr = buf_idx;
00419 #ifdef AGGREGATION_PROFILE
00420     MPE_Log_event (5025, 0, NULL);
00421 #endif
00422 }
00423 
00424 
00425 
00426 void ADIOI_Calc_others_req(ADIO_File fd, int count_my_req_procs, 
00427                 int *count_my_req_per_proc,
00428                 ADIOI_Access *my_req, 
00429                 int nprocs, int myrank,
00430                 int *count_others_req_procs_ptr,
00431                 ADIOI_Access **others_req_ptr)  
00432 {
00433 
00434 
00435 
00436 
00437 
00438 
00439 
00440 
00441     int *count_others_req_per_proc, count_others_req_procs;
00442     int i, j;
00443     MPI_Request *requests;
00444     MPI_Status *statuses;
00445     ADIOI_Access *others_req;
00446 
00447 
00448 #ifdef AGGREGATION_PROFILE
00449     MPE_Log_event (5026, 0, NULL);
00450 #endif
00451     count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs*sizeof(int));
00452 
00453     MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT,
00454          count_others_req_per_proc, 1, MPI_INT, fd->comm);
00455 
00456     *others_req_ptr = (ADIOI_Access *)
00457     ADIOI_Malloc(nprocs*sizeof(ADIOI_Access)); 
00458     others_req = *others_req_ptr;
00459 
00460     count_others_req_procs = 0;
00461     for (i=0; i<nprocs; i++) {
00462     if (count_others_req_per_proc[i]) {
00463         others_req[i].count = count_others_req_per_proc[i];
00464         others_req[i].offsets = (ADIO_Offset *)
00465         ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(ADIO_Offset));
00466         others_req[i].lens = (int *)
00467         ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(int)); 
00468         others_req[i].mem_ptrs = (MPI_Aint *)
00469         ADIOI_Malloc(count_others_req_per_proc[i]*sizeof(MPI_Aint)); 
00470         count_others_req_procs++;
00471     }
00472     else others_req[i].count = 0;
00473     }
00474     
00475 
00476 
00477     requests = (MPI_Request *)
00478     ADIOI_Malloc(1+2*(count_my_req_procs+count_others_req_procs)*sizeof(MPI_Request)); 
00479 
00480 
00481     j = 0;
00482     for (i=0; i<nprocs; i++) {
00483     if (others_req[i].count) {
00484         MPI_Irecv(others_req[i].offsets, others_req[i].count, 
00485                       ADIO_OFFSET, i, i+myrank, fd->comm, &requests[j]);
00486         j++;
00487         MPI_Irecv(others_req[i].lens, others_req[i].count, 
00488                       MPI_INT, i, i+myrank+1, fd->comm, &requests[j]);
00489         j++;
00490     }
00491     }
00492 
00493     for (i=0; i < nprocs; i++) {
00494     if (my_req[i].count) {
00495         MPI_Isend(my_req[i].offsets, my_req[i].count, 
00496                       ADIO_OFFSET, i, i+myrank, fd->comm, &requests[j]);
00497         j++;
00498         MPI_Isend(my_req[i].lens, my_req[i].count, 
00499                       MPI_INT, i, i+myrank+1, fd->comm, &requests[j]);
00500         j++;
00501     }
00502     }
00503 
00504     if (j) {
00505     statuses = (MPI_Status *) ADIOI_Malloc(j * sizeof(MPI_Status));
00506     MPI_Waitall(j, requests, statuses);
00507     ADIOI_Free(statuses);
00508     }
00509 
00510     ADIOI_Free(requests);
00511     ADIOI_Free(count_others_req_per_proc);
00512 
00513     *count_others_req_procs_ptr = count_others_req_procs;
00514 #ifdef AGGREGATION_PROFILE
00515     MPE_Log_event (5027, 0, NULL);
00516 #endif
00517 }