Skylark (Sketching Library)
0.1
|
00001 #ifndef SKYLARK_HASH_TRANSFORM_COMBBLAS_HPP 00002 #define SKYLARK_HASH_TRANSFORM_COMBBLAS_HPP 00003 00004 #include <map> 00005 #include <boost/serialization/map.hpp> 00006 00007 #include "../utility/external/FullyDistMultiVec.hpp" 00008 00009 namespace skylark { namespace sketch { 00010 00011 /* Specialization: FullyDistMultiVec for input, output */ 00012 template <typename IndexType, 00013 typename ValueType, 00014 template <typename> class IdxDistributionType, 00015 template <typename> class ValueDistribution> 00016 struct hash_transform_t <FullyDistMultiVec<IndexType, ValueType>, 00017 FullyDistMultiVec<IndexType, ValueType>, 00018 IdxDistributionType, 00019 ValueDistribution > : 00020 public hash_transform_data_t<IdxDistributionType, 00021 ValueDistribution> { 00022 typedef IndexType index_type; 00023 typedef ValueType value_type; 00024 typedef FullyDistMultiVec<IndexType, ValueType> matrix_type; 00025 typedef FullyDistMultiVec<IndexType, ValueType> output_matrix_type; 00026 typedef FullyDistVec<IndexType, ValueType> mpi_vector_t; 00027 typedef FullyDistMultiVec<IndexType, ValueType> mpi_multi_vector_t; 00028 typedef hash_transform_data_t<IdxDistributionType, 00029 ValueDistribution> data_type; 00030 00034 hash_transform_t (int N, int S, base::context_t& context) : 00035 data_type (N, S, context) {} 00036 00040 template <typename InputMatrixType, 00041 typename OutputMatrixType> 00042 hash_transform_t (hash_transform_t<InputMatrixType, 00043 OutputMatrixType, 00044 IdxDistributionType, 00045 ValueDistribution>& other) : 00046 data_type(other) {} 00047 00051 hash_transform_t (hash_transform_data_t<IdxDistributionType, 00052 ValueDistribution>& other_data) : 00053 data_type(other_data) {} 00054 00055 template <typename Dimension> 00056 void apply (const mpi_multi_vector_t &A, 00057 mpi_multi_vector_t &sketch_of_A, 00058 Dimension dimension) const { 00059 try { 00060 apply_impl (A, sketch_of_A, dimension); 00061 } catch(boost::mpi::exception e) { 00062 SKYLARK_THROW_EXCEPTION ( 00063 base::mpi_exception() 00064 << base::error_msg(e.what()) ); 00065 } catch (std::string e) { 00066 SKYLARK_THROW_EXCEPTION ( 00067 base::combblas_exception() 00068 << base::error_msg(e) ); 00069 } catch (std::logic_error e) { 00070 SKYLARK_THROW_EXCEPTION ( 00071 base::combblas_exception() 00072 << base::error_msg(e.what()) ); 00073 } 00074 } 00075 00076 int get_N() const { return this->_N; } 00077 int get_S() const { return this->_S; } 00079 const sketch_transform_data_t* get_data() const { return this; } 00080 00081 00082 private: 00083 void apply_impl_single (const mpi_vector_t& a_, 00084 mpi_vector_t& sketch_of_a, 00085 columnwise_tag) const { 00086 std::vector<value_type> sketch_term(data_type::_S,0); 00087 00088 // We are essentially doing a 'const' access to a, but the neccessary, 00089 // 'const' option is missing from the interface. 00090 mpi_vector_t &a = const_cast<mpi_vector_t&>(a_); 00091 00094 DenseVectorLocalIterator<index_type, value_type> local_iter(a); 00095 while(local_iter.HasNext()) { 00096 index_type idx = local_iter.GetLocIndex(); 00097 index_type global_idx = local_iter.LocalToGlobal(idx); 00098 index_type global_sketch_idx = data_type::row_idx[global_idx]; 00099 sketch_term[global_sketch_idx] += 00100 (local_iter.GetValue()*data_type::row_value[global_idx]); 00101 local_iter.Next(); 00102 } 00103 00106 MPI_Allreduce(MPI_IN_PLACE, 00107 &(sketch_term[0]), 00108 data_type::_S, 00109 MPIType<value_type>(), 00110 MPI_SUM, 00111 a.commGrid->GetWorld()); 00112 00114 for (index_type i=0; i<data_type::_S; ++i) { 00115 sketch_of_a.SetElement(i,sketch_term[i]); 00116 } 00117 } 00118 00119 00120 void apply_impl (const mpi_multi_vector_t& A, 00121 mpi_multi_vector_t& sketch_of_A, 00122 columnwise_tag) const { 00123 const index_type num_rhs = A.size; 00124 if (sketch_of_A.size != num_rhs) { ; return; } 00125 if (A.dim != data_type::_N) { ; return; } 00126 if (sketch_of_A.dim != data_type::_S) { ; return; } 00127 00129 for (index_type i=0; i<num_rhs; ++i) { 00130 apply_impl_single (A[i], sketch_of_A[i], columnwise_tag()); 00131 } 00132 } 00133 }; 00134 00135 00136 /* Specialization: SpParMat for input, output */ 00137 template <typename IndexType, 00138 typename ValueType, 00139 template <typename> class IdxDistributionType, 00140 template <typename> class ValueDistribution> 00141 struct hash_transform_t < 00142 SpParMat<IndexType, ValueType, SpDCCols<IndexType, ValueType> >, 00143 SpParMat<IndexType, ValueType, SpDCCols<IndexType, ValueType> >, 00144 IdxDistributionType, 00145 ValueDistribution > : 00146 public hash_transform_data_t<IdxDistributionType, 00147 ValueDistribution> { 00148 typedef IndexType index_type; 00149 typedef ValueType value_type; 00150 typedef SpDCCols< index_type, value_type > col_t; 00151 typedef FullyDistVec< index_type, value_type> mpi_vector_t; 00152 typedef SpParMat< index_type, value_type, col_t > matrix_type; 00153 typedef SpParMat< index_type, value_type, col_t > output_matrix_type; 00154 typedef hash_transform_data_t<IdxDistributionType, 00155 ValueDistribution> data_type; 00156 00157 00161 hash_transform_t (int N, int S, base::context_t& context) : 00162 data_type(N, S, context) { 00163 00164 } 00165 00169 template <typename InputMatrixType, 00170 typename OutputMatrixType> 00171 hash_transform_t (hash_transform_t<InputMatrixType, 00172 OutputMatrixType, 00173 IdxDistributionType, 00174 ValueDistribution>& other) : 00175 data_type(other) {} 00176 00180 hash_transform_t (hash_transform_data_t<IdxDistributionType, 00181 ValueDistribution>& other_data) : 00182 data_type(other_data) {} 00183 00184 template <typename Dimension> 00185 void apply (const matrix_type &A, 00186 output_matrix_type &sketch_of_A, 00187 Dimension dimension) const { 00188 try { 00189 apply_impl (A, sketch_of_A, dimension); 00190 } catch(boost::mpi::exception e) { 00191 SKYLARK_THROW_EXCEPTION ( 00192 base::mpi_exception() 00193 << base::error_msg(e.what()) ); 00194 } catch (std::string e) { 00195 SKYLARK_THROW_EXCEPTION ( 00196 base::combblas_exception() 00197 << base::error_msg(e) ); 00198 } catch (std::logic_error e) { 00199 SKYLARK_THROW_EXCEPTION ( 00200 base::combblas_exception() 00201 << base::error_msg(e.what()) ); 00202 } catch (std::bad_alloc e) { 00203 SKYLARK_THROW_EXCEPTION ( 00204 base::skylark_exception() 00205 << base::error_msg("bad_alloc: out of memory") ); 00206 } 00207 } 00208 00209 int get_N() const { return this->_N; } 00210 int get_S() const { return this->_S; } 00212 const sketch_transform_data_t* get_data() const { return this; } 00213 00214 private: 00218 template <typename Dimension> 00219 void apply_impl (const matrix_type &A_, 00220 output_matrix_type &sketch_of_A, 00221 Dimension dist) const { 00222 00223 // We are essentially doing a 'const' access to A, but the necessary, 00224 // 'const' option is missing from the interface 00225 matrix_type &A = const_cast<matrix_type&>(A_); 00226 00227 const size_t rank = A.getcommgrid()->GetRank(); 00228 00229 // extract columns of matrix 00230 col_t &data = A.seq(); 00231 00232 const size_t ncols = sketch_of_A.getncol(); 00233 00234 const size_t my_row_offset = utility::cb_my_row_offset(A); 00235 const size_t my_col_offset = utility::cb_my_col_offset(A); 00236 00237 size_t comm_size = A.getcommgrid()->GetSize(); 00238 std::vector< std::set<size_t> > proc_set(comm_size); 00239 00240 // pre-compute processor targets of local sketch application 00241 for(typename col_t::SpColIter col = data.begcol(); 00242 col != data.endcol(); col++) { 00243 for(typename col_t::SpColIter::NzIter nz = data.begnz(col); 00244 nz != data.endnz(col); nz++) { 00245 00246 // compute global row and column id, and compress in one 00247 // target position index 00248 const index_type rowid = nz.rowid() + my_row_offset; 00249 const index_type colid = col.colid() + my_col_offset; 00250 const size_t pos = getPos(rowid, colid, ncols, dist); 00251 00252 // compute target processor for this target index 00253 const size_t target = 00254 utility::owner(sketch_of_A, pos / ncols, pos % ncols); 00255 00256 if(proc_set[target].count(pos) == 0) { 00257 assert(target < comm_size); 00258 proc_set[target].insert(pos); 00259 } 00260 } 00261 } 00262 00263 // constructing arrays for one-sided access 00264 std::vector<index_type> proc_start_idx(comm_size + 1, 0); 00265 for(size_t i = 1; i < comm_size + 1; ++i) 00266 proc_start_idx[i] = proc_start_idx[i-1] + proc_set[i-1].size(); 00267 00268 std::vector<index_type> indicies(proc_start_idx[comm_size], 0); 00269 std::vector<value_type> values(proc_start_idx[comm_size], 0); 00270 00271 // Apply sketch for all local values. Note that some of the resulting 00272 // values might end up on a different processor. The data structure 00273 // fills values (sorted by processor id) in one continuous array. 00274 // Subsequently, one-sided operations can be used to access values for 00275 // each processor. 00276 for(typename col_t::SpColIter col = data.begcol(); 00277 col != data.endcol(); col++) { 00278 for(typename col_t::SpColIter::NzIter nz = data.begnz(col); 00279 nz != data.endnz(col); nz++) { 00280 00281 // compute global row and column id, and compress in one 00282 // target position index 00283 const index_type rowid = nz.rowid() + my_row_offset; 00284 const index_type colid = col.colid() + my_col_offset; 00285 const size_t pos = getPos(rowid, colid, ncols, dist); 00286 00287 // compute target processor for this target index 00288 const size_t proc = 00289 utility::owner(sketch_of_A, pos / ncols, pos % ncols); 00290 00291 // get offset in array for current element 00292 const size_t ar_idx = proc_start_idx[proc] + 00293 std::distance(proc_set[proc].begin(), proc_set[proc].find(pos)); 00294 00295 indicies[ar_idx] = pos; 00296 values[ar_idx] += nz.value() * 00297 data_type::getValue(rowid, colid, dist); 00298 } 00299 } 00300 00301 // Creating windows for all relevant arrays 00302 boost::mpi::communicator comm = utility::get_communicator(A); 00303 00304 // tell MPI that we will not use locks 00305 MPI_Info info; 00306 MPI_Info_create(&info); 00307 MPI_Info_set(info, "no_locks", "true"); 00308 00309 MPI_Win start_offset_win, idx_win, val_win; 00310 00311 MPI_Win_create(&proc_start_idx[0], sizeof(size_t) * (comm_size + 1), 00312 sizeof(size_t), info, comm, &start_offset_win); 00313 00314 MPI_Win_create(&indicies[0], sizeof(index_type) * indicies.size(), 00315 sizeof(index_type), info, comm, &idx_win); 00316 00317 MPI_Win_create(&values[0], sizeof(value_type) * values.size(), 00318 sizeof(value_type), info, comm, &val_win); 00319 00320 MPI_Info_free(&info); 00321 00322 // Synchronize epoch, no subsequent put operations (read only) and no 00323 // preceding fence calls. 00324 MPI_Win_fence(MPI_MODE_NOPUT | MPI_MODE_NOPRECEDE, start_offset_win); 00325 MPI_Win_fence(MPI_MODE_NOPUT | MPI_MODE_NOPRECEDE, idx_win); 00326 MPI_Win_fence(MPI_MODE_NOPUT | MPI_MODE_NOPRECEDE, val_win); 00327 00328 00329 // creating a local structure to hold sparse data triplets 00330 std::vector< tuple< index_type, index_type, value_type > > sketch_data; 00331 00332 std::map<index_type, size_t> tuple_idx; 00333 typedef typename std::map<index_type, size_t>::iterator itr_t; 00334 00335 // in order to compute local indices in sketch_of_A we need to know 00336 // row and column offset on each rank. 00337 const size_t res_row_offset = utility::cb_my_row_offset(sketch_of_A); 00338 const size_t res_col_offset = utility::cb_my_col_offset(sketch_of_A); 00339 00340 for(size_t p = 0; p < comm_size; ++p) { 00341 00342 // get the start/end offset 00343 std::vector<size_t> offset(2); 00344 MPI_Get(&(offset[0]), 2, boost::mpi::get_mpi_datatype<size_t>(), 00345 p, rank, 2, boost::mpi::get_mpi_datatype<size_t>(), 00346 start_offset_win); 00347 00348 MPI_Win_fence(MPI_MODE_NOPUT, start_offset_win); 00349 size_t num_values = offset[1] - offset[0]; 00350 00351 // and fill indices/values. 00352 std::vector<index_type> add_idx(num_values); 00353 std::vector<value_type> add_val(num_values); 00354 MPI_Get(&(add_idx[0]), num_values, 00355 boost::mpi::get_mpi_datatype<index_type>(), p, offset[0], 00356 num_values, boost::mpi::get_mpi_datatype<index_type>(), 00357 idx_win); 00358 00359 MPI_Get(&(add_val[0]), num_values, 00360 boost::mpi::get_mpi_datatype<value_type>(), p, offset[0], 00361 num_values, boost::mpi::get_mpi_datatype<value_type>(), 00362 val_win); 00363 00364 MPI_Win_fence(MPI_MODE_NOPUT, idx_win); 00365 MPI_Win_fence(MPI_MODE_NOPUT, val_win); 00366 00367 // finally, add data to local CombBLAS buffer (if we have any). 00368 //XXX: We cannot have duplicated (row/col) pairs in sketch_data. 00369 // Check again with CombBLAS if there is a more suitable way 00370 // to create sparse dist matrices. 00371 for(size_t i = 0; i < num_values; ++i) { 00372 00373 index_type lrow = add_idx[i] / ncols - res_row_offset; 00374 index_type lcol = add_idx[i] % ncols - res_col_offset; 00375 00376 itr_t itr = tuple_idx.find(add_idx[i]); 00377 if(itr == tuple_idx.end()) { 00378 tuple_idx.insert( 00379 std::make_pair(add_idx[i], sketch_data.size())); 00380 sketch_data.push_back(make_tuple(lrow, lcol, add_val[i])); 00381 } else { 00382 get<2>(sketch_data[itr->second]) += add_val[i]; 00383 } 00384 } 00385 } 00386 00387 // this pointer will be freed in the destructor of col_t (see below). 00388 //FIXME: verify with Valgrind 00389 SpTuples<index_type, value_type> *tmp_tpl = 00390 new SpTuples<index_type, value_type> ( 00391 sketch_data.size(), sketch_of_A.getlocalrows(), 00392 sketch_of_A.getlocalcols(), &sketch_data[0]); 00393 00394 // create temporary matrix (no further communication should be 00395 // required because all the data are local) and assign (deep copy). 00396 col_t *sp_data = new col_t(*tmp_tpl, false); 00397 00398 //FIXME: is there a direct way to set the "data buffer" for the output 00399 // matrix? Currently this method creates a temporary matrix and 00400 // then assigns to the output matrix (deep copy). 00401 sketch_of_A = output_matrix_type(sp_data, sketch_of_A.getcommgrid()); 00402 00403 00404 MPI_Win_fence(MPI_MODE_NOPUT | MPI_MODE_NOSUCCEED, start_offset_win); 00405 MPI_Win_fence(MPI_MODE_NOPUT | MPI_MODE_NOSUCCEED, idx_win); 00406 MPI_Win_fence(MPI_MODE_NOPUT | MPI_MODE_NOSUCCEED, val_win); 00407 00408 MPI_Win_free(&start_offset_win); 00409 MPI_Win_free(&idx_win); 00410 MPI_Win_free(&val_win); 00411 } 00412 00413 00414 inline index_type getPos(index_type rowid, index_type colid, size_t ncols, 00415 columnwise_tag) const { 00416 return colid + ncols * data_type::row_idx[rowid]; 00417 } 00418 00419 inline index_type getPos(index_type rowid, index_type colid, size_t ncols, 00420 rowwise_tag) const { 00421 return rowid * ncols + data_type::row_idx[colid]; 00422 } 00423 }; 00424 00425 00426 /* Specialization: SpParMat for input, Local SpMat output */ 00427 template <typename IndexType, 00428 typename ValueType, 00429 template <typename> class IdxDistributionType, 00430 template <typename> class ValueDistribution> 00431 struct hash_transform_t < 00432 SpParMat<IndexType, ValueType, SpDCCols<IndexType, ValueType> >, 00433 base::sparse_matrix_t<ValueType>, 00434 IdxDistributionType, 00435 ValueDistribution > : 00436 public hash_transform_data_t<IdxDistributionType, 00437 ValueDistribution> { 00438 typedef IndexType index_type; 00439 typedef ValueType value_type; 00440 typedef SpDCCols< index_type, value_type > col_t; 00441 typedef FullyDistVec< index_type, value_type> mpi_vector_t; 00442 typedef SpParMat< index_type, value_type, col_t > matrix_type; 00443 typedef base::sparse_matrix_t< value_type > output_matrix_type; 00444 typedef hash_transform_data_t<IdxDistributionType, 00445 ValueDistribution> data_type; 00446 00447 00451 hash_transform_t (int N, int S, base::context_t& context) : 00452 data_type(N, S, context) { 00453 00454 } 00455 00459 template <typename InputMatrixType, 00460 typename OutputMatrixType> 00461 hash_transform_t (hash_transform_t<InputMatrixType, 00462 OutputMatrixType, 00463 IdxDistributionType, 00464 ValueDistribution>& other) : 00465 data_type(other) {} 00466 00470 hash_transform_t (hash_transform_data_t<IdxDistributionType, 00471 ValueDistribution>& other_data) : 00472 data_type(other_data) {} 00473 00474 template <typename Dimension> 00475 void apply (const matrix_type &A, 00476 output_matrix_type &sketch_of_A, 00477 Dimension dimension) const { 00478 try { 00479 apply_impl (A, sketch_of_A, dimension); 00480 } catch(boost::mpi::exception e) { 00481 SKYLARK_THROW_EXCEPTION ( 00482 base::mpi_exception() 00483 << base::error_msg(e.what()) ); 00484 } catch (std::string e) { 00485 SKYLARK_THROW_EXCEPTION ( 00486 base::combblas_exception() 00487 << base::error_msg(e) ); 00488 } catch (std::logic_error e) { 00489 SKYLARK_THROW_EXCEPTION ( 00490 base::combblas_exception() 00491 << base::error_msg(e.what()) ); 00492 } catch (std::bad_alloc e) { 00493 SKYLARK_THROW_EXCEPTION ( 00494 base::skylark_exception() 00495 << base::error_msg("bad_alloc: out of memory") ); 00496 } 00497 } 00498 00499 int get_N() const { return this->_N; } 00500 int get_S() const { return this->_S; } 00502 const sketch_transform_data_t* get_data() const { return this; } 00503 00504 private: 00508 template <typename Dimension> 00509 void apply_impl (const matrix_type &A_, 00510 output_matrix_type &sketch_of_A, 00511 Dimension dist) const { 00512 00513 // We are essentially doing a 'const' access to A, but the necessary, 00514 // 'const' option is missing from the interface 00515 matrix_type &A = const_cast<matrix_type&>(A_); 00516 00517 const size_t rank = A.getcommgrid()->GetRank(); 00518 00519 // extract columns of matrix 00520 col_t &data = A.seq(); 00521 00522 const size_t my_row_offset = utility::cb_my_row_offset(A); 00523 const size_t my_col_offset = utility::cb_my_col_offset(A); 00524 00525 int n_res_cols = A.getncol(); 00526 int n_res_rows = A.getnrow(); 00527 data_type::get_res_size(n_res_rows, n_res_cols, dist); 00528 00529 // Apply sketch for all local values. Subsequently, all values are 00530 // gathered on processor 0 and the local matrix is populated. 00531 // XXX: For now sort on each processor to ease the creation of the 00532 // gathered local matrix. Most likely possible can be avoided by 00533 // traversing the matrix in the correct order. 00534 typedef std::map<index_type, value_type> col_values_t; 00535 col_values_t col_values; 00536 for(typename col_t::SpColIter col = data.begcol(); 00537 col != data.endcol(); col++) { 00538 for(typename col_t::SpColIter::NzIter nz = data.begnz(col); 00539 nz != data.endnz(col); nz++) { 00540 00541 index_type rowid = nz.rowid() + my_row_offset; 00542 index_type colid = col.colid() + my_col_offset; 00543 00544 const value_type value = 00545 nz.value() * data_type::getValue(rowid, colid, dist); 00546 data_type::finalPos(rowid, colid, dist); 00547 col_values[colid * n_res_rows + rowid] += value; 00548 } 00549 } 00550 00551 boost::mpi::communicator world(A.getcommgrid()->GetWorld(), 00552 boost::mpi::comm_duplicate); 00553 00554 std::vector< std::map<index_type, value_type > > result; 00555 boost::mpi::gather(world, col_values, result, 0); 00556 00557 // unpack into temp structure 00558 //FIXME: 00559 int size_estimate = A.getnnz(); 00560 if(rank == 0) { 00561 create_local_sp_mat(result, n_res_rows, n_res_cols, size_estimate, 00562 sketch_of_A); 00563 } 00564 } 00565 00566 void create_local_sp_mat( 00567 std::vector< std::map<index_type, value_type > > &result, 00568 int n_res_rows, int n_res_cols, int size_estimate, 00569 output_matrix_type &sketch_of_A) const { 00570 00571 int nnz = 0; 00572 int *indptr_new = new int[n_res_cols + 1]; 00573 std::vector<int> final_rows(size_estimate); 00574 std::vector<value_type> final_vals(size_estimate); 00575 00576 indptr_new[0] = 0; 00577 00578 typedef typename std::map<index_type, value_type>::iterator itr_t; 00579 std::vector<itr_t> proc_iters(result.size()); 00580 for(size_t i = 0; i < result.size(); ++i) 00581 proc_iters[i] = result[i].begin(); 00582 00583 std::vector<index_type> idx_map(n_res_rows, -1); 00584 00585 for(int col = 0; col < n_res_cols; ++col) { 00586 00587 // collect all values for column 'col' of all procs 00588 for(size_t i = 0; i < result.size(); ++i) { 00589 00590 itr_t cur_itr = proc_iters[i]; 00591 00592 for(; cur_itr != result[i].end() && 00593 static_cast<int>(cur_itr->first / n_res_rows) == col; 00594 cur_itr++) { 00595 00596 int row = cur_itr->first % n_res_rows; 00597 double val = cur_itr->second; 00598 00599 if(idx_map[row] == -1) { 00600 idx_map[row] = nnz; 00601 final_rows[nnz] = row; 00602 final_vals[nnz] = val; 00603 nnz++; 00604 } else { 00605 final_vals[idx_map[row]] += val; 00606 } 00607 } 00608 00609 proc_iters[i] = cur_itr; 00610 } 00611 00612 indptr_new[col + 1] = nnz; 00613 00614 // reset idx_map 00615 for(int i = indptr_new[col]; i < nnz; ++i) 00616 idx_map[final_rows[i]] = -1; 00617 } 00618 00619 int *indices_new = new int[nnz]; 00620 std::copy(final_rows.begin(), final_rows.begin() + nnz, indices_new); 00621 00622 double *values_new = new double[nnz]; 00623 std::copy(final_vals.begin(), final_vals.begin() + nnz, values_new); 00624 00625 sketch_of_A.attach(indptr_new, indices_new, values_new, 00626 nnz, n_res_rows, n_res_cols, true); 00627 } 00628 }; 00629 00630 } } 00632 #endif // SKYLARK_HASH_TRANSFORM_COMBBLAS_HPP