00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 #include "ad_lustre.h"
00012 #include "adio_extern.h"
00013 
00014 #undef AGG_DEBUG
00015 
00016 void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int **striping_info_ptr,
00017                     int mode)
00018 {
00019     int *striping_info = NULL;
00020     
00021 
00022 
00023 
00024 
00025     int stripe_size, stripe_count, CO = 1;
00026     int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
00027 
00028     
00029     
00030     stripe_size = fd->hints->striping_unit;
00031     
00032     
00033     stripe_count = fd->hints->striping_factor;
00034 
00035     
00036     if (!mode) {
00037         
00038 
00039 
00040 
00041 
00042 
00043     CO = 1;
00044     
00045     } else {
00046         
00047     CO = fd->hints->fs_hints.lustre.co_ratio;
00048     }
00049     
00050     
00051     
00052 
00053 
00054 
00055 
00056     if (nprocs_for_coll >= stripe_count)
00057     
00058 
00059 
00060 
00061 
00062 
00063         avail_cb_nodes = 
00064         stripe_count * ADIOI_MIN(nprocs_for_coll/stripe_count, CO);
00065     else {
00066         
00067         
00068         
00069 
00070         
00071 
00072 
00073 
00074         divisor = 2;
00075         avail_cb_nodes = 1;
00076         
00077         while (stripe_count >= divisor*divisor) {
00078             if ((stripe_count % divisor) == 0) {
00079                  if (stripe_count/divisor <= nprocs_for_coll) {
00080                      
00081                      avail_cb_nodes = stripe_count/divisor;
00082                      break;
00083         }
00084         
00085 
00086                 else if (divisor <= nprocs_for_coll) 
00087             avail_cb_nodes = divisor;
00088         }
00089         divisor++;
00090         }
00091     }
00092 
00093     *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int));
00094     striping_info = *striping_info_ptr;
00095     striping_info[0] = stripe_size;
00096     striping_info[1] = stripe_count;
00097     striping_info[2] = avail_cb_nodes;
00098 }
00099 
00100 int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
00101                                  ADIO_Offset *len, int *striping_info)
00102 {
00103     int rank_index, rank;
00104     ADIO_Offset avail_bytes;
00105     int stripe_size = striping_info[0];
00106     int avail_cb_nodes = striping_info[2];
00107 
00108     
00109     rank_index = (int)((off / stripe_size) % avail_cb_nodes);
00110 
00111     
00112 
00113 
00114 
00115     if (rank_index >= fd->hints->cb_nodes)
00116         MPI_Abort(MPI_COMM_WORLD, 1);
00117 
00118     avail_bytes = (off / (ADIO_Offset)stripe_size + 1) *
00119                   (ADIO_Offset)stripe_size - off;
00120     if (avail_bytes < *len) {
00121     
00122     *len = avail_bytes;
00123     }
00124     
00125     
00126     rank = fd->hints->ranklist[rank_index];
00127 
00128     return rank;
00129 }
00130 
00131 
00132 
00133 
00134 
00135 
00136 
00137 void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list,
00138                   ADIO_Offset *len_list, int contig_access_count,
00139                   int *striping_info, int nprocs,
00140                               int *count_my_req_procs_ptr,
00141                   int **count_my_req_per_proc_ptr,
00142                   ADIOI_Access **my_req_ptr,
00143                   int ***buf_idx_ptr)
00144 {
00145     
00146 
00147     int *count_my_req_per_proc, count_my_req_procs, **buf_idx;
00148     int i, l, proc;
00149     ADIO_Offset avail_len, rem_len, curr_idx, off;
00150     ADIOI_Access *my_req;
00151 
00152     *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
00153     count_my_req_per_proc = *count_my_req_per_proc_ptr;
00154     
00155 
00156 
00157 
00158 
00159 
00160     buf_idx = (int **) ADIOI_Malloc(nprocs * sizeof(int*));
00161 
00162     
00163 
00164 
00165     for (i = 0; i < contig_access_count; i++) {
00166     
00167 
00168 
00169     if (len_list[i] == 0)
00170         continue;
00171     off = offset_list[i];
00172     avail_len = len_list[i];
00173     
00174 
00175 
00176 
00177     proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
00178     count_my_req_per_proc[proc]++;
00179 
00180     
00181 
00182 
00183 
00184     rem_len = len_list[i] - avail_len;
00185 
00186     while (rem_len != 0) {
00187         off += avail_len;   
00188         avail_len = rem_len;    
00189         proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
00190         count_my_req_per_proc[proc]++;
00191         rem_len -= avail_len;   
00192     }
00193     }
00194 
00195     
00196 
00197 
00198 
00199 
00200 
00201     
00202     for (i = 0; i < nprocs; i++) {
00203     
00204     buf_idx[i] = (int *) ADIOI_Malloc((count_my_req_per_proc[i] + 1)
00205                                * sizeof(int)); 
00206     }
00207 
00208     
00209     *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
00210     my_req = *my_req_ptr;
00211 
00212     count_my_req_procs = 0;
00213     for (i = 0; i < nprocs; i++) {
00214     if (count_my_req_per_proc[i]) {
00215         my_req[i].offsets = (ADIO_Offset *)
00216                         ADIOI_Malloc(count_my_req_per_proc[i] *
00217                                              sizeof(ADIO_Offset));
00218         my_req[i].lens = (int *) ADIOI_Malloc(count_my_req_per_proc[i] *
00219                                   sizeof(int));
00220         count_my_req_procs++;
00221     }
00222     my_req[i].count = 0;    
00223     }
00224 
00225     
00226     curr_idx = 0;
00227     for (i = 0; i < contig_access_count; i++) {
00228     
00229 
00230     if (len_list[i] == 0)
00231         continue;
00232     off = offset_list[i];
00233     avail_len = len_list[i];
00234     proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
00235 
00236     l = my_req[proc].count;
00237 
00238     ADIOI_Assert(curr_idx == (int) curr_idx);
00239     ADIOI_Assert(l < count_my_req_per_proc[proc]);
00240     buf_idx[proc][l] = (int) curr_idx;
00241     curr_idx += avail_len;
00242 
00243     rem_len = len_list[i] - avail_len;
00244 
00245     
00246 
00247 
00248 
00249 
00250     my_req[proc].offsets[l] = off;
00251     ADIOI_Assert(avail_len == (int) avail_len);
00252     my_req[proc].lens[l] = (int) avail_len;
00253     my_req[proc].count++;
00254 
00255     while (rem_len != 0) {
00256         off += avail_len;
00257         avail_len = rem_len;
00258         proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
00259                                                 striping_info);
00260 
00261         l = my_req[proc].count;
00262         ADIOI_Assert(curr_idx == (int) curr_idx);
00263         ADIOI_Assert(l < count_my_req_per_proc[proc]);
00264         buf_idx[proc][l] = (int) curr_idx;
00265 
00266         curr_idx += avail_len;
00267         rem_len -= avail_len;
00268 
00269         my_req[proc].offsets[l] = off;
00270         ADIOI_Assert(avail_len == (int) avail_len);
00271         my_req[proc].lens[l] = (int) avail_len;
00272         my_req[proc].count++;
00273     }
00274     }
00275 
00276 #ifdef AGG_DEBUG
00277     for (i = 0; i < nprocs; i++) {
00278     if (count_my_req_per_proc[i] > 0) {
00279         FPRINTF(stdout, "data needed from %d (count = %d):\n",
00280                     i, my_req[i].count);
00281         for (l = 0; l < my_req[i].count; l++) {
00282         FPRINTF(stdout, "   off[%d] = %lld, len[%d] = %d\n",
00283                     l, my_req[i].offsets[l], l, my_req[i].lens[l]);
00284         }
00285     }
00286     }
00287 #endif
00288 
00289     *count_my_req_procs_ptr = count_my_req_procs;
00290     *buf_idx_ptr = buf_idx;
00291 }
00292 
00293 int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count,
00294                ADIO_Offset *len_list, int nprocs)
00295 {
00296     
00297 
00298 
00299 
00300 
00301 
00302     int i, docollect = 1, big_req_size = 0;
00303     ADIO_Offset req_size = 0, total_req_size;
00304     int avg_req_size, total_access_count;
00305 
00306     
00307     for (i = 0; i < contig_access_count; i++)
00308         req_size += len_list[i];
00309     MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM,
00310                fd->comm);
00311     MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM,
00312                fd->comm);
00313     
00314     avg_req_size = (int)(total_req_size / total_access_count);
00315     
00316     big_req_size = fd->hints->fs_hints.lustre.coll_threshold;
00317     
00318     if ((big_req_size > 0) && (avg_req_size > big_req_size))
00319         docollect = 0;
00320 
00321     return docollect;
00322 }