Skylark (Sketching Library)  0.1
/var/lib/jenkins/jobs/Skylark/workspace/utility/external/view.hpp
Go to the documentation of this file.
00001 #ifndef SKYLARK_COMBBLAS_SLAB_VIEW_HPP
00002 #define SKYLARK_COMBBLAS_SLAB_VIEW_HPP
00003 
00004 #if SKYLARK_HAVE_COMBBLAS
00005 #include <CombBLAS.h>
00006 #include <CommGrid.h>
00007 #endif
00008 
00009 #include <map>
00010 #include <boost/mpi.hpp>
00011 #include <boost/serialization/map.hpp>
00012 #include <boost/serialization/vector.hpp>
00013 
00014 #include "combblas_comm_grid.hpp"
00015 
00016 namespace skylark {
00017 namespace utility {
00018 
00019 #if SKYLARK_HAVE_COMBBLAS
00020 
00042 template <typename index_type, typename value_type>
00043 struct combblas_slab_view_t {
00044 
00045     typedef SpDCCols<index_type, value_type> cb_col_t;
00046     typedef SpParMat<index_type, value_type, cb_col_t> sp_par_mat_t;
00047     typedef typename cb_col_t::SpColIter cb_col_itr_t;
00048     typedef typename cb_col_t::SpColIter::NzIter cb_nz_itr_t;
00049 
00053     combblas_slab_view_t(const sp_par_mat_t &B, bool transp_b = false)
00054         : _world(B.getcommgrid()->GetWorld(), boost::mpi::comm_duplicate)
00055         , _data((const_cast<sp_par_mat_t &>(B)).seq())
00056         , _col_itr(_data.begcol())
00057         , _row_itr(_data.begnz(_col_itr))
00058         , _transp_b(transp_b)
00059         , _offset_row(cb_my_row_offset(B))
00060         , _offset_col(cb_my_col_offset(B))
00061         , _global_row(0)
00062         , _global_col(0) {
00063 
00064         _ncol = B.getncol();
00065         _nrow = B.getnrow();
00066         if(_transp_b) {
00067             _ncol = B.getnrow();
00068             _nrow = B.getncol();
00069         }
00070 
00071         for(cb_col_itr_t col = _data.begcol(); col != _data.endcol(); col++)
00072             _row_itrs.push_back(_data.begnz(col));
00073     }
00074 
00080     value_type operator()(index_type g_row, index_type g_col) {
00081 
00082         index_type map_idx = idx(g_row, g_col);
00083         if(_values.count(map_idx) > 0)
00084             return _values[map_idx];
00085         else
00086             return static_cast<value_type>(0);
00087     }
00088 
00089 #ifdef SKYLARK_HAVE_ELEMENTAL
00090 
00096     template<typename dist_elem_matrix_t>
00097     void extract_elemental_column_slab_view(
00098             const dist_elem_matrix_t &A,
00099             std::set<index_type> &column_idxs,
00100             size_t width = 1) {
00101 
00102         _values.clear();
00103 
00104         // structure per proc data
00105         std::vector< std::map<index_type, value_type> >
00106             per_proc_data(_world.size());
00107 
00108         // accumulate locale values
00109         for(size_t i = 0; i < width && _col_itr != _data.endcol();
00110             _col_itr++, i++) {
00111 
00112             // first accumulate column values, then distribute
00113             for(cb_nz_itr_t nz = _data.begnz(_col_itr);
00114                 nz != _data.endnz(_col_itr); nz++) {
00115 
00116                 index_type g_cb_col_idx = _col_itr.colid() + _offset_col;
00117                 index_type g_cb_row_idx = nz.rowid()  + _offset_row;
00118                 if(_transp_b) std::swap(g_cb_row_idx, g_cb_col_idx);
00119 
00120                 size_t target_proc = A.Owner(g_cb_row_idx, g_cb_col_idx);
00121                 index_type coords = idx(g_cb_row_idx, g_cb_col_idx);
00122                 per_proc_data[target_proc].insert(
00123                     std::make_pair(coords, nz.value()));
00124             }
00125         }
00126 
00127         std::set<index_type> indices;
00128         _set_view(per_proc_data, indices);
00129         typename std::set<index_type>::iterator itr;
00130         for(itr = indices.begin(); itr != indices.end(); itr++)
00131             column_idxs.insert(col(*itr));
00132     }
00133 
00139     template<typename dist_elem_matrix_t>
00140     void extract_elemental_column_slab_view(
00141             const dist_elem_matrix_t &A,
00142             size_t width = 1) {
00143 
00144         _values.clear();
00145 
00146         // structure per proc data
00147         std::vector< std::map<index_type, value_type> >
00148             per_proc_data(_world.size());
00149 
00150         for(size_t i = 0; i < width && _global_col < _ncol;
00151             _global_col++, i++) {
00152 
00153             // if I don't own anything in this column, continue
00154             if(_col_itr.colid() + _offset_col != _global_col)
00155                     continue;
00156 
00157             // first accumulate column values, then distribute
00158             for(cb_nz_itr_t nz = _data.begnz(_col_itr);
00159                 nz != _data.endnz(_col_itr); nz++) {
00160 
00161                 index_type g_cb_col_idx = _col_itr.colid() + _offset_col;
00162                 index_type g_cb_row_idx = nz.rowid()  + _offset_row;
00163                 if(_transp_b) std::swap(g_cb_row_idx, g_cb_col_idx);
00164 
00165                 size_t target_proc = A.Owner(g_cb_row_idx, g_cb_col_idx);
00166                 index_type coords = idx(g_cb_row_idx, g_cb_col_idx);
00167                 per_proc_data[target_proc].insert(
00168                     std::make_pair(coords, nz.value()));
00169             }
00170 
00171             // this proc can advance to next column
00172             _col_itr++;
00173         }
00174 
00175         std::set<index_type> indices;
00176         _set_view(per_proc_data, indices);
00177     }
00178 
00185     template<typename dist_elem_matrix_t>
00186     void extract_elemental_row_slab_view(
00187             const dist_elem_matrix_t &A,
00188             std::set<index_type> &row_idxs,
00189             size_t height = 1) {
00190 
00191         _values.clear();
00192 
00193         // structure per proc data
00194         std::vector< std::map<index_type, value_type> >
00195             per_proc_data(_world.size());
00196 
00197         // accumulate locale values
00198         cb_col_itr_t col_itr;
00199         for(col_itr = _data.begcol(); col_itr != _data.endcol(); col_itr++) {
00200 
00201             // first accumulate column values, then distribute
00202             size_t r = 0;
00203             for(; r < height && _row_itr != _data.endnz(col_itr); _row_itr++, r++) {
00204 
00205                 index_type g_cb_col_idx = col_itr.colid()  + _offset_col;
00206                 index_type g_cb_row_idx = _row_itr.rowid() + _offset_row;
00207                 if(_transp_b) std::swap(g_cb_row_idx, g_cb_col_idx);
00208 
00209                 size_t target_proc = A.Owner(g_cb_row_idx, g_cb_col_idx);
00210                 index_type coords = idx(g_cb_row_idx, g_cb_col_idx);
00211                 per_proc_data[target_proc].insert(
00212                     std::make_pair(coords, _row_itr.value()));
00213             }
00214         }
00215 
00216         std::set<index_type> indices;
00217         _set_view(per_proc_data, indices);
00218         typename std::set<index_type>::iterator itr;
00219         for(itr = indices.begin(); itr != indices.end(); itr++)
00220             row_idxs.insert(row(*itr));
00221     }
00222 
00229     template<typename dist_elem_matrix_t>
00230     void extract_elemental_row_slab_view(
00231             const dist_elem_matrix_t &A,
00232             size_t height = 1) {
00233 
00234         _values.clear();
00235 
00236         // structure per proc data
00237         std::vector< std::map<index_type, value_type> >
00238             per_proc_data(_world.size());
00239 
00240         for(size_t col = 0; col < _row_itrs.size(); col++) {
00241 
00242             cb_nz_itr_t tmp_row_itr = _row_itrs[col];
00243 
00244             for(size_t i = 0; i < height && _global_row + i < _nrow; i++) {
00245 
00246                 if(tmp_row_itr.rowid() + _offset_row != _global_row + i)
00247                     continue;
00248 
00249                 index_type g_cb_col_idx = col + _offset_col;
00250                 index_type g_cb_row_idx = tmp_row_itr.rowid() + _offset_row;
00251                 if(_transp_b) std::swap(g_cb_row_idx, g_cb_col_idx);
00252 
00253                 size_t target_proc = A.Owner(g_cb_row_idx, g_cb_col_idx);
00254                 index_type coords = idx(g_cb_row_idx, g_cb_col_idx);
00255                 per_proc_data[target_proc].insert(
00256                     std::make_pair(coords, tmp_row_itr.value()));
00257 
00258                 tmp_row_itr++;
00259                 _row_itrs[col]++;
00260             }
00261         }
00262 
00263         _global_row += height;
00264 
00265         std::set<index_type> indices;
00266         _set_view(per_proc_data, indices);
00267     }
00268 #endif
00269 
00275     void extract_full_slab_view(std::set<index_type> &column_idxs,
00276                                 const size_t slab_size = 1) {
00277 
00278         std::map<index_type, value_type> column_values;
00279         _values.clear();
00280 
00281         for(size_t i = 0; i < slab_size && _col_itr != _data.endcol();
00282             _col_itr++, i++) {
00283 
00284             // first accumulate column values, then distribute
00285             for(cb_nz_itr_t nz = _data.begnz(_col_itr);
00286                 nz != _data.endnz(_col_itr); nz++) {
00287 
00288                 index_type g_cb_col_idx = _col_itr.colid() + _offset_col;
00289                 index_type g_cb_row_idx = nz.rowid()  + _offset_row;
00290                 if(_transp_b) std::swap(g_cb_row_idx, g_cb_col_idx);
00291 
00292                 index_type coords = idx(g_cb_row_idx, g_cb_col_idx);
00293                 column_values.insert(std::make_pair(coords, nz.value()));
00294             }
00295         }
00296 
00297         std::vector< std::map< index_type, value_type> > vector_of_maps;
00298         boost::mpi::all_gather< std::map<index_type, value_type> > (
00299             _world, column_values, vector_of_maps);
00300 
00301         // gather vectors in one map
00302         typename std::map<index_type, value_type>::iterator itr;
00303         for(size_t i = 0; i < vector_of_maps.size(); ++i) {
00304             for(itr = vector_of_maps[i].begin();
00305                 itr != vector_of_maps[i].end(); itr++) {
00306 
00307                 column_idxs.insert(col(itr->first));
00308 
00309                 if(_values.count(itr->first) != 0)
00310                     _values[itr->first] += itr->second;
00311                 else
00312                     _values.insert(std::make_pair(itr->first, itr->second));
00313             }
00314         }
00315     }
00316 
00317     void extract_full_slab_view(const size_t width = 1) {
00318 
00319         _values.clear();
00320 
00321         std::map<index_type, value_type> column_values;
00322 
00323         for(size_t i = 0; i < width && _global_col < _ncol;
00324             _global_col++, i++) {
00325 
00326             // if I don't own anything in this column, continue
00327             if(_col_itr.colid() + _offset_col != _global_col)
00328                     continue;
00329 
00330             // first accumulate column values, then distribute
00331             for(cb_nz_itr_t nz = _data.begnz(_col_itr);
00332                 nz != _data.endnz(_col_itr); nz++) {
00333 
00334                 index_type g_cb_col_idx = _col_itr.colid() + _offset_col;
00335                 index_type g_cb_row_idx = nz.rowid()  + _offset_row;
00336                 if(_transp_b) std::swap(g_cb_row_idx, g_cb_col_idx);
00337 
00338                 index_type coords = idx(g_cb_row_idx, g_cb_col_idx);
00339                 column_values.insert(std::make_pair(coords, nz.value()));
00340             }
00341 
00342             // this proc can advance to next column
00343             _col_itr++;
00344         }
00345 
00346         std::vector< std::map< index_type, value_type> > vector_of_maps;
00347         boost::mpi::all_gather< std::map<index_type, value_type> > (
00348             _world, column_values, vector_of_maps);
00349 
00350         // gather vectors in one map
00351         typename std::map<index_type, value_type>::iterator itr;
00352         for(size_t i = 0; i < vector_of_maps.size(); ++i) {
00353             for(itr = vector_of_maps[i].begin();
00354                 itr != vector_of_maps[i].end(); itr++) {
00355 
00356                 if(_values.count(itr->first) != 0)
00357                     _values[itr->first] += itr->second;
00358                 else
00359                     _values.insert(std::make_pair(itr->first, itr->second));
00360             }
00361         }
00362     }
00363 
00365     index_type ncols() { return _ncol; }
00366     index_type nrows() { return _nrow; }
00367 
00368 
00369 private:
00370 
00372     boost::mpi::communicator _world;
00373 
00375     cb_col_t &_data;
00376 
00379     cb_col_itr_t _col_itr;
00380 
00383     cb_nz_itr_t _row_itr;
00384     std::vector<cb_nz_itr_t> _row_itrs;
00385 
00387     const bool _transp_b;
00388 
00390     const size_t _offset_row;
00391 
00393     const size_t _offset_col;
00394 
00395     size_t _global_row;
00396     size_t _global_col;
00397 
00399     size_t _ncol;
00401     size_t _nrow;
00402 
00404     std::map<index_type, value_type> _values;
00405 
00406 
00408     index_type row(index_type value) { return  value / _ncol; }
00409 
00411     index_type col(index_type value) { return  value % _ncol; }
00412 
00414     index_type idx(index_type row, index_type col) {
00415         return row * _ncol + col;
00416     }
00417 
00418     //FIXME: here we use collectives, add existing code for other comm schemes
00421     void _set_view(
00422             std::vector< std::map<index_type, value_type> > &per_proc_data,
00423             std::set<index_type> &indices) {
00424 
00427         //std::vector<size_t> n_values(_world.size());
00428         //for(size_t idx = 0; idx < _world.size(); ++idx)
00429             //n_values[idx] = per_proc_data[idx].size();
00430 
00432         //std::vector< std::vector<size_t> > vector_of_sizes;
00433         //boost::mpi::all_gather< std::vector<size_t> > (
00434             //_world, n_values, vector_of_sizes);
00435 
00436         // pre-post receives for all values then
00437         //FIXME: this is not working because Boost sends serialized data in
00438         // two steps: first the size and then the data, so waiting for one
00439         // irecv is not correct.
00440         //std::vector<boost::mpi::request> reqs;
00441         //std::vector< std::map<index_type, value_type> > vector_of_maps(_world.size());
00442         //for(size_t from = 0; from < _world.size(); ++from) {
00443             //size_t recv_size = vector_of_sizes[from][_world.rank()];
00444             //std::cout << "from " << from << " receive " << recv_size << std::endl;
00445             //if(recv_size > 0)
00446                 //reqs.push_back(
00447                     //_world.irecv(from, 0, &vector_of_maps[from], recv_size));
00448         //}
00449 
00451         //for(size_t to = 0; to < _world.size(); ++to) {
00452             //if(per_proc_data[to].size() > 0)
00453                 //_world.send(to, 0, per_proc_data[to]);
00454         //}
00455 
00457         //boost::mpi::wait_all(reqs.begin(), reqs.end());
00458 
00459         std::vector<std::map<index_type, value_type> > vector_of_maps;
00460         for(int i = 0; i < _world.size(); ++i)
00461             boost::mpi::gather(_world, per_proc_data[i], vector_of_maps, i);
00462 
00463         typename std::map<index_type, value_type>::iterator itr;
00464         for(size_t i = 0; i < vector_of_maps.size(); ++i) {
00465             for(itr = vector_of_maps[i].begin();
00466                 itr != vector_of_maps[i].end(); itr++) {
00467 
00468                 indices.insert(itr->first);
00469 
00470                 if(_values.count(itr->first) != 0)
00471                     _values[itr->first] += itr->second;
00472                 else
00473                     _values.insert(std::make_pair(itr->first, itr->second));
00474             }
00475         }
00476     }
00477 };
00478 
00479 #endif
00480 
00481 } } // namespace skylark::utility
00482 
00483 #endif //SKYLARK_COMBBLAS_SLAB_VIEW