Skylark (Sketching Library)
0.1
|
00001 /* 00002 * io.hpp 00003 * 00004 * Created on: Feb 7, 2014 00005 * Author: vikas 00006 */ 00007 00008 #ifndef IO_HPP_ 00009 #define IO_HPP_ 00010 00011 #include <boost/mpi.hpp> 00012 #include <sstream> 00013 #include <cstdlib> 00014 #include <string> 00015 #include <elemental.hpp> 00016 #include "../base/sparse_matrix.hpp" 00017 #include "options.hpp" 00018 00019 namespace bmpi = boost::mpi; 00020 00021 typedef elem::DistMatrix<double, elem::CIRC, elem::CIRC> DistCircMatrixType; 00022 typedef skylark::base::sparse_matrix_t<double> sparse_matrix_t; 00023 typedef elem::Matrix<double> LocalMatrixType; 00024 00025 #ifdef SKYLARK_HAVE_HDF5 00026 #include <H5Cpp.h> 00027 00028 int write_hdf5(const boost::mpi::communicator &comm, 00029 std::string fName, elem::Matrix<double>& X, 00030 elem::Matrix<double>& Y) { 00031 00032 int n, d; 00033 boost::mpi::reduce(comm, X.Width(), n, std::plus<int>(), 0); 00034 d = X.Height(); 00035 00036 00037 if (comm.rank() == 0) { 00038 try { 00039 std::cout << "Writing to file " << fName << " " << n << "x" << d << std::endl; 00040 00041 H5::Exception::dontPrint(); 00042 00043 H5::H5File file( fName, H5F_ACC_TRUNC ); 00044 H5::FloatType datatype( H5::PredType::NATIVE_DOUBLE ); 00045 datatype.setOrder( H5T_ORDER_LE ); 00046 00047 hsize_t dimsf[2]; // dataset dimensions 00048 dimsf[0] = n; 00049 dimsf[1] = d; 00050 H5::DataSpace dataspaceX(2, dimsf); 00051 H5::DataSet datasetX = file.createDataSet("X", datatype, dataspaceX); 00052 00053 dimsf[0] = n; 00054 H5::DataSpace dataspaceY(1, dimsf); 00055 H5::DataSet datasetY = file.createDataSet("Y", datatype, dataspaceY); 00056 00057 hsize_t stride[2]; // Stride of hyperslab 00058 hsize_t block[2]; // Block sizes 00059 hsize_t mstart[2]; 00060 stride[0] = 1; stride[1] = 1; 00061 block[0] = 1; block[1] = 1; 00062 mstart[0] = 0; mstart[0] = 0; 00063 00064 int sumn = 0; 00065 for(int p = 0; p < comm.size(); p++) { 00066 elem::Matrix<double> XX, YY; 00067 if (p == 0) { 00068 XX = X; 00069 YY = Y; 00070 } else { 00071 int nn; 00072 comm.recv(p, 1, nn); 00073 XX.Resize(d, nn); 00074 YY.Resize(nn, 1); 00075 comm.recv(p, 2, XX.Buffer(), nn * d); 00076 comm.recv(p, 3, YY.Buffer(), nn); 00077 00078 } 00079 00080 hsize_t start[2]; // Start of hyperslab 00081 hsize_t count[2]; // Block count 00082 00083 start[0] = sumn; start[1] = 0; 00084 count[0] = XX.Width(); count[1] = d; 00085 dataspaceX.selectHyperslab(H5S_SELECT_SET, count, start, stride, block); 00086 H5::DataSpace memspaceX(2, count); 00087 memspaceX.selectHyperslab(H5S_SELECT_SET, count, mstart, stride, block); 00088 datasetX.write(XX.Buffer(), H5::PredType::NATIVE_DOUBLE, 00089 memspaceX, dataspaceX); 00090 dataspaceX.selectNone(); 00091 dataspaceY.selectHyperslab(H5S_SELECT_SET, count, start, stride, block); 00092 H5::DataSpace memspaceY(1, count); 00093 memspaceX.selectHyperslab(H5S_SELECT_SET, count, mstart, stride, block); 00094 datasetY.write(YY.Buffer(), H5::PredType::NATIVE_DOUBLE, 00095 memspaceY, dataspaceY); 00096 dataspaceY.selectNone(); 00097 file.flush(H5F_SCOPE_GLOBAL); 00098 sumn += XX.Width(); 00099 } 00100 00101 file.close(); 00102 } 00103 // catch failure caused by the H5File operations 00104 catch( H5::FileIException error ) { 00105 error.printError(); 00106 return -1; 00107 } 00108 // catch failure caused by the DataSet operations 00109 catch( H5::DataSetIException error ) { 00110 error.printError(); 00111 return -1; 00112 } 00113 // catch failure caused by the DataSpace operations 00114 catch( H5::DataSpaceIException error ) { 00115 error.printError(); 00116 return -1; 00117 } 00118 // catch failure caused by the DataSpace operations 00119 catch( H5::DataTypeIException error ) { 00120 error.printError(); 00121 return -1; 00122 } 00123 } else { 00124 comm.send(0, 1, X.Width()); 00125 comm.send(0, 2, X.Buffer(), X.Width() * X.Height()); 00126 comm.send(0, 3, Y.Buffer(), Y.Height()); 00127 } 00128 00129 comm.barrier(); 00130 00131 return 0; // successfully terminated 00132 } 00133 00134 int write_hdf5(std::string fName, sparse_matrix_t& X, 00135 elem::Matrix<double>& Y) { 00136 00137 try { 00138 00139 std::cout << "Writing to file " << fName << std::endl; 00140 00141 int* dimensions = new int[3]; 00142 dimensions[0] = X.height(); 00143 dimensions[1] = X.width(); 00144 dimensions[2] = X.nonzeros(); 00145 00146 H5::Exception::dontPrint(); 00147 00148 H5::H5File file( fName, H5F_ACC_TRUNC ); 00149 00150 // write dimensions 00151 { 00152 hsize_t dim[1]; // dataset dimensions 00153 dim[0] = 3; 00154 H5::DataSpace dataspace( 1, dim ); 00155 H5::FloatType datatype( H5::PredType::NATIVE_INT ); 00156 H5::DataSet dataset = file.createDataSet( "dimensions", datatype, dataspace ); 00157 dataset.write(dimensions, H5::PredType::NATIVE_INT); 00158 } 00159 00160 // write col_ptr 00161 { 00162 std::cout << "Writing col_ptr" << std::endl; 00163 hsize_t dim[1]; // dataset dimensions 00164 dim[0] = dimensions[1]+1; 00165 H5::DataSpace dataspace( 1, dim ); 00166 H5::FloatType datatype( H5::PredType::NATIVE_INT ); 00167 H5::DataSet dataset = file.createDataSet( "indptr", datatype, dataspace ); 00168 dataset.write(X.indptr(), H5::PredType::NATIVE_INT); 00169 } 00170 00171 // write row_ind 00172 { 00173 std::cout << "Writing row indices" << std::endl; 00174 hsize_t dim[1]; // dataset dimensions 00175 dim[0] = dimensions[2]; 00176 H5::DataSpace dataspace( 1, dim ); 00177 H5::FloatType datatype( H5::PredType::NATIVE_INT ); 00178 H5::DataSet dataset = file.createDataSet( "indices", datatype, dataspace ); 00179 dataset.write(X.indices(), H5::PredType::NATIVE_INT); 00180 } 00181 // write row_ind 00182 { 00183 std::cout << "Writing values" << std::endl; 00184 hsize_t dim[1]; // dataset dimensions 00185 dim[0] = dimensions[2]; 00186 H5::DataSpace dataspace( 1, dim ); 00187 H5::FloatType datatype( H5::PredType::NATIVE_DOUBLE ); 00188 H5::DataSet dataset = file.createDataSet( "values", datatype, dataspace ); 00189 dataset.write(X.values(), H5::PredType::NATIVE_DOUBLE); 00190 } 00191 00192 00193 // write values 00194 { 00195 std::cout << "Writing targets" << std::endl; 00196 hsize_t dim[1]; // dataset dimensions 00197 dim[0] = dimensions[1]; 00198 H5::DataSpace dataspace( 1, dim ); 00199 H5::FloatType datatype( H5::PredType::NATIVE_DOUBLE ); 00200 H5::DataSet dataset = file.createDataSet( "Y", datatype, dataspace ); 00201 dataset.write(Y.Buffer(), H5::PredType::NATIVE_DOUBLE); 00202 } 00203 00204 delete[] dimensions; 00205 file.close(); 00206 00207 } 00208 // catch failure caused by the H5File operations 00209 catch( H5::FileIException error ) 00210 { 00211 error.printError(); 00212 return -1; 00213 } 00214 // catch failure caused by the DataSet operations 00215 catch( H5::DataSetIException error ) 00216 { 00217 error.printError(); 00218 return -1; 00219 } 00220 // catch failure caused by the DataSpace operations 00221 catch( H5::DataSpaceIException error ) 00222 { 00223 error.printError(); 00224 return -1; 00225 } 00226 // catch failure caused by the DataSpace operations 00227 catch( H5::DataTypeIException error ) 00228 { 00229 error.printError(); 00230 return -1; 00231 } 00232 return 0; // successfully terminated 00233 } 00234 00235 void read_hdf5_dataset(H5::H5File& file, std::string name, int* buf, int offset, int count) { 00236 std::cout << "reading HDF5 dataset " << name << std::endl; 00237 H5::DataSet dataset = file.openDataSet(name); 00238 H5::DataSpace filespace = dataset.getSpace(); 00239 hsize_t dims[1]; 00240 dims[0] = count; 00241 H5::DataSpace mspace(1,dims); 00242 hsize_t count1[1]; 00243 count1[0] = count; 00244 hsize_t offset1[1]; 00245 offset1[0] = offset; 00246 filespace.selectHyperslab( H5S_SELECT_SET, count1, offset1 ); 00247 dataset.read(buf, H5::PredType::NATIVE_INT, mspace, filespace); 00248 } 00249 00250 void read_hdf5_dataset(H5::H5File& file, std::string name, double* buf, int offset, int count) { 00251 std::cout << "reading HDF5 dataset " << name << std::endl; 00252 H5::DataSet dataset = file.openDataSet(name); 00253 H5::DataSpace filespace = dataset.getSpace(); 00254 hsize_t dims[1]; 00255 dims[0] = count; 00256 H5::DataSpace mspace(1,dims); 00257 hsize_t count1[1]; 00258 count1[0] = count; 00259 hsize_t offset1[1]; 00260 offset1[0] = offset; 00261 filespace.selectHyperslab( H5S_SELECT_SET, count1, offset1 ); 00262 dataset.read(buf, H5::PredType::NATIVE_DOUBLE, mspace, filespace); 00263 } 00264 00265 00266 void read_hdf5(const boost::mpi::communicator &comm, std::string fName, 00267 sparse_matrix_t& X, 00268 elem::Matrix<double>& Y, int min_d = 0) { 00269 00270 try { 00271 int rank = comm.rank(); 00272 int size = comm.size(); 00273 00274 bmpi::timer timer; 00275 if (rank==0) 00276 std::cout << "Reading sparse matrix from HDF5 file " << fName << std::endl; 00277 00278 H5::H5File file( fName, H5F_ACC_RDONLY ); 00279 00280 int* dimensions = new int[3]; 00281 00282 read_hdf5_dataset(file, "dimensions", dimensions, 0, 3); 00283 00284 int d = dimensions[0]; 00285 int n = dimensions[1]; 00286 int nnz = dimensions[3]; 00287 00288 if (min_d > 0) 00289 d = std::max(d, min_d); 00290 00291 int* indptr = new int[n+1]; 00292 read_hdf5_dataset(file, "indptr", indptr, 0, n+1); 00293 00294 boost::mpi::broadcast(comm, n, 0); 00295 boost::mpi::broadcast(comm, d, 0); 00296 00297 // Number of examples per process 00298 int* examples_allocation = new int[size]; 00299 int chunksize = (int) n / size; 00300 int leftover = n % size; 00301 for(int i=0;i< size;i++) 00302 examples_allocation[i] = chunksize; 00303 for(int i=0;i<leftover;i++) 00304 examples_allocation[i]++; 00305 00306 comm.barrier(); 00307 00308 // read chunks on rank = 0 and send 00309 if(rank==0) { 00310 int examples_local, nnz_start_index = 0, nnz_end_index = 0, nnz_local; 00311 int examples_total = 0; 00312 // loop and splice out chunk from the hdf5 file and then send to other processes 00313 for(int i=0;i<size;i++) { 00314 examples_local = examples_allocation[i]; 00315 00316 nnz_start_index = indptr[examples_total]; // start index of {examples_total+1}^th example 00317 nnz_end_index = indptr[examples_total + examples_local]; 00318 nnz_local = nnz_end_index - nnz_start_index; 00319 00320 double *_values = new double[nnz_local]; 00321 int *_rowind = new int[nnz_local]; 00322 int *_col_ptr = new int[examples_local+1]; 00323 double *y = new double[examples_local]; 00324 00325 read_hdf5_dataset(file, "values", _values, nnz_start_index, nnz_local); 00326 read_hdf5_dataset(file, "indices", _rowind, nnz_start_index, nnz_local); 00327 read_hdf5_dataset(file, "Y", y, examples_total, examples_local); 00328 00329 for(int k=0;k<examples_local; k++) 00330 _col_ptr[k] = indptr[examples_total+k] - nnz_start_index; 00331 _col_ptr[examples_local] = nnz_local; 00332 00333 //std::copy(indptr + examples_local_cumulative, indptr + examples_local_cumulative + examples_local, _col_ptr); 00334 examples_total += examples_local; 00335 00336 if (i==0) { 00337 00338 X.attach(_col_ptr, _rowind, _values, nnz_local, d, examples_local, true); 00339 LocalMatrixType Y2(examples_local, 1, y, 0); 00340 Y = Y2; 00341 delete[] y; 00342 std::cout << "rank=0: Read " << examples_local << " x " << d << " with " << nnz_local << " nonzeros" << std::endl; 00343 00344 } else { 00345 00346 int process = i; 00347 std::cout << "Sending to " << process << std::endl; 00348 comm.send(process, 1, examples_local); 00349 comm.send(process, 2, d); 00350 comm.send(process, 3, nnz_local); 00351 comm.send(process, 4, _col_ptr, examples_local+1); 00352 comm.send(process, 5, _rowind, nnz_local); 00353 comm.send(process, 6, _values, nnz_local); 00354 comm.send(process, 7, y, examples_local); 00355 delete[] _values; 00356 delete[] _rowind; 00357 delete[] _col_ptr; 00358 delete[] y; 00359 } 00360 } 00361 } else { 00362 00363 for(int r=1; r<size; r++) { 00364 if ((rank==r) && examples_allocation[r]>0) { 00365 int nnz_local, t, d; 00366 comm.recv(0, 1, t); 00367 comm.recv(0, 2, d); 00368 comm.recv(0, 3, nnz_local); 00369 00370 double* values = new double[nnz_local]; 00371 int* rowind = new int[nnz_local]; 00372 int* col_ptr = new int[t+1]; 00373 00374 comm.recv(0, 4, col_ptr, t+1); 00375 comm.recv(0, 5, rowind, nnz_local); 00376 comm.recv(0, 6, values, nnz_local); 00377 00378 //attach currently creates copies so we can delete the rest 00379 X.attach(col_ptr, rowind, values, nnz_local, d, t, true); 00380 // delete[] col_ptr; 00381 // delete[] rowind; 00382 // delete[] values; 00383 00384 double* y = new double[t]; 00385 comm.recv(0, 7, y, t); 00386 LocalMatrixType Y2(t, 1, y, 0); 00387 Y = Y2; // copy 00388 delete[] y; 00389 //Y.Resize(t,1); 00390 //Y.Attach(t,1,y,0); 00391 00392 std::cout << "rank=" << r << ": Received and read " << t << " x " << d << " with " << nnz_local << " nonzeros" << std::endl; 00393 } 00394 } 00395 } 00396 00397 double readtime = timer.elapsed(); 00398 if (rank==0) 00399 std::cout << "Read Matrix with dimensions: " << n << " by " << d << " (" << readtime << "secs)" << std::endl; 00400 00401 comm.barrier(); 00402 00403 delete[] dimensions; 00404 file.close(); 00405 } 00406 // catch failure caused by the H5File operations 00407 catch( H5::FileIException error ) 00408 { 00409 error.printError(); 00410 //return -1; 00411 } 00412 // catch failure caused by the DataSet operations 00413 catch( H5::DataSetIException error ) 00414 { 00415 error.printError(); 00416 //return -1; 00417 } 00418 // catch failure caused by the DataSpace operations 00419 catch( H5::DataSpaceIException error ) 00420 { 00421 error.printError(); 00422 //return -1; 00423 } 00424 // catch failure caused by the DataSpace operations 00425 catch( H5::DataTypeIException error ) 00426 { 00427 error.printError(); 00428 //return -1; 00429 } 00430 //return 0; // successfully terminated 00431 00432 } 00433 00434 void read_hdf5(const boost::mpi::communicator &comm, std::string fName, 00435 LocalMatrixType& Xlocal, LocalMatrixType& Ylocal, int blocksize = 10000) { 00436 00437 int rank = comm.rank(); 00438 bmpi::timer timer; 00439 if (rank==0) 00440 std::cout << "Reading Dense matrix from HDF5 file " << fName << std::endl; 00441 00442 00443 elem::DistMatrix<double, elem::STAR, elem::VC> X; 00444 elem::DistMatrix<double, elem::VC, elem::STAR> Y; 00445 00446 H5::H5File file( fName, H5F_ACC_RDONLY ); 00447 H5::DataSet datasetX = file.openDataSet( "X" ); 00448 H5::DataSpace filespaceX = datasetX.getSpace(); 00449 int ndims = filespaceX.getSimpleExtentNdims(); 00450 hsize_t dimsX[2]; // dataset dimensions 00451 ndims = filespaceX.getSimpleExtentDims( dimsX ); 00452 hsize_t n = dimsX[0]; 00453 hsize_t d = dimsX[1]; 00454 00455 H5::DataSet datasetY = file.openDataSet( "Y" ); 00456 H5::DataSpace filespaceY = datasetY.getSpace(); 00457 hsize_t dimsY[1]; // dataset dimensions 00458 00459 hsize_t countX[2]; 00460 hsize_t countY[1]; 00461 00462 int numblocks = ((int) n/ (int) blocksize); // of size blocksize 00463 int leftover = n % blocksize; 00464 int block = blocksize; 00465 00466 hsize_t offsetX[2], offsetY[1]; 00467 00468 // if (min_d > 0) 00469 // d = std::max(d, min_d); 00470 00471 X.Resize(d, n); 00472 Y.Resize(n,1); 00473 00474 elem::Zeros(Xlocal, X.LocalHeight(), X.LocalWidth()); 00475 elem::Zeros(Ylocal, Y.LocalHeight(), 1); 00476 00477 X.Attach(d,n,0,0,Xlocal,elem::DefaultGrid()); 00478 Y.Attach(n,1,0,0,Ylocal,elem::DefaultGrid()); 00479 00480 for(int i=0; i<numblocks+1; i++) { 00481 00482 if (i==numblocks) 00483 block = leftover; 00484 if (block==0) 00485 break; 00486 00487 DistCircMatrixType x(d, block), y(block, 1); 00488 x.SetRoot(0); 00489 y.SetRoot(0); 00490 elem::MakeZeros(x); 00491 00492 00493 offsetX[0] = i*blocksize; 00494 offsetX[1] = 0; 00495 countX[0] = block; 00496 countX[1] = d; 00497 offsetY[0] = i*blocksize; 00498 countY[0] = block; 00499 00500 00501 filespaceX.selectHyperslab( H5S_SELECT_SET, countX, offsetX ); 00502 filespaceY.selectHyperslab( H5S_SELECT_SET, countY, offsetY ); 00503 00504 if(rank==0) { 00505 std::cout << "Reading and distributing chunk " << i*blocksize << " to " << i*blocksize + block - 1 << " ("<< block << " elements )" << std::endl; 00506 00507 double *Xdata = x.Matrix().Buffer(); 00508 double *Ydata = y.Matrix().Buffer(); 00509 00510 dimsX[0] = block; 00511 dimsX[1] = d; 00512 H5::DataSpace mspace1(2, dimsX); 00513 datasetX.read( Xdata, H5::PredType::NATIVE_DOUBLE, mspace1, filespaceX ); 00514 00515 dimsY[0] = block; 00516 H5::DataSpace mspace2(1,dimsY); 00517 datasetY.read( Ydata, H5::PredType::NATIVE_DOUBLE, mspace2, filespaceY ); 00518 00519 } 00520 00521 elem::DistMatrix<double, elem::STAR, elem::VC> viewX; 00522 elem::DistMatrix<double, elem::VC, elem::STAR> viewY; 00523 00524 elem::View(viewX, X, 0, i*blocksize, x.Height(), x.Width()); 00525 elem::View(viewY, Y, i*blocksize, 0, x.Width(), 1); 00526 00527 viewX = x; 00528 viewY = y; 00529 00530 } 00531 00532 double readtime = timer.elapsed(); 00533 if (rank==0) 00534 std::cout << "Read Matrix with dimensions: " << n << " by " << d << " (" << readtime << "secs)" << std::endl; 00535 } 00536 #endif 00537 00538 template<typename T> 00539 void read_libsvm(const boost::mpi::communicator &comm, std::string fName, 00540 elem::Matrix<T>& Xlocal, elem::Matrix<T>& Ylocal, 00541 int min_d = 0, int blocksize = 10000) { 00542 00543 elem::Grid grid(comm); 00544 elem::DistMatrix<T, elem::STAR, elem::VC> X(grid); 00545 elem::DistMatrix<T, elem::VC, elem::STAR> Y(grid); 00546 00547 int rank = comm.rank(); 00548 if (rank==0) 00549 std::cout << "Reading from file " << fName << std::endl; 00550 00551 std::ifstream file(fName.c_str()); 00552 std::string line; 00553 std::string token, val, ind; 00554 float label; 00555 unsigned int start = 0; 00556 unsigned int delim, t; 00557 int n = 0; 00558 int d = 0; 00559 int i, j, last; 00560 char c; 00561 00562 bmpi::timer timer; 00563 00564 00565 // make one pass over the data to figure out dimensions - will pay in terms of preallocated storage. 00566 if (rank==0) { 00567 while(!file.eof()) { 00568 getline(file, line); 00569 if(line.length()==0) 00570 break; 00571 delim = line.find_last_of(":"); 00572 if(delim > line.length()) 00573 continue; 00574 n++; 00575 t = delim; 00576 while(line[t]!=' ') { 00577 t--; 00578 } 00579 val = line.substr(t+1, delim - t); 00580 last = atoi(val.c_str()); 00581 if (last>d) 00582 d = last; 00583 } 00584 if (min_d > 0) 00585 d = std::max(d, min_d); 00586 00587 // prepare for second pass 00588 file.clear(); 00589 file.seekg(0, std::ios::beg); 00590 } 00591 00592 boost::mpi::broadcast(comm, n, 0); 00593 boost::mpi::broadcast(comm, d, 0); 00594 00595 int numblocks = ((int) n/ (int) blocksize); // of size blocksize 00596 int leftover = n % blocksize; 00597 int block = blocksize; 00598 00599 X.Resize(d, n); 00600 Y.Resize(n,1); 00601 00602 elem::Zeros(Xlocal, X.LocalHeight(), X.LocalWidth()); 00603 elem::Zeros(Ylocal, Y.LocalHeight(), 1); 00604 00605 X.Attach(d,n,0,0,Xlocal,elem::DefaultGrid()); 00606 Y.Attach(n,1,0,0,Ylocal,elem::DefaultGrid()); 00607 00608 00609 for(int i=0; i<numblocks+1; i++) { 00610 00611 if (i==numblocks) 00612 block = leftover; 00613 if (block==0) 00614 break; 00615 00616 DistCircMatrixType x(d, block), y(block, 1); 00617 x.SetRoot(0); 00618 y.SetRoot(0); 00619 elem::MakeZeros(x); 00620 00621 if(rank==0) { 00622 00623 std::cout << "Reading and distributing chunk " << i*blocksize << " to " << i*blocksize + block - 1 << " ("<< block << " elements )" << std::endl; 00624 T *Xdata = x.Matrix().Buffer(); 00625 T *Ydata = y.Matrix().Buffer(); 00626 00627 t = 0; 00628 while(!file.eof() && t<block) { 00629 getline(file, line); 00630 if( line.length()==0) { 00631 break; 00632 } 00633 00634 std::istringstream tokenstream (line); 00635 tokenstream >> label; 00636 Ydata[t] = label; 00637 00638 while (tokenstream >> token) 00639 { 00640 delim = token.find(':'); 00641 ind = token.substr(0, delim); 00642 val = token.substr(delim+1); //.substr(delim+1); 00643 j = atoi(ind.c_str()) - 1; 00644 Xdata[t * d + j] = atof(val.c_str()); 00645 } 00646 00647 t++; 00648 } 00649 } 00650 00651 // The calls below should distribute the data to all the nodes. 00652 // if (rank==0) 00653 // std::cout << "Distributing Data.." << std::endl; 00654 00655 elem::DistMatrix<T, elem::STAR, elem::VC> viewX; 00656 elem::DistMatrix<T, elem::VC, elem::STAR> viewY; 00657 00658 elem::View(viewX, X, 0, i*blocksize, x.Height(), x.Width()); 00659 elem::View(viewY, Y, i*blocksize, 0, x.Width(), 1); 00660 00661 viewX = x; 00662 viewY = y; 00663 00664 // X = x; 00665 // Y = y; 00666 } 00667 00668 double readtime = timer.elapsed(); 00669 if (rank==0) { 00670 std::cout << "Read Matrix with dimensions: " << n << " by " << d << " (" << readtime << "secs)" << std::endl; 00671 // elem::Print(X,"X",std::cout); 00672 } 00673 } 00674 00675 template<typename T> 00676 void read_libsvm(const boost::mpi::communicator &comm, std::string fName, 00677 skylark::base::sparse_matrix_t<T>& X, elem::Matrix<T>& Y, int min_d = 0) { 00678 00679 int rank = comm.rank(); 00680 int size = comm.size(); 00681 00682 if (rank==0) 00683 std::cout << "Reading sparse matrix from file " << fName << std::endl; 00684 00685 std::ifstream file(fName.c_str()); 00686 std::string line; 00687 std::string token, val, ind; 00688 float label; 00689 unsigned int start = 0; 00690 unsigned int delim, t; 00691 int n = 0; 00692 int d = 0; 00693 int i, j, last; 00694 char c; 00695 int nnz=0; 00696 int nz; 00697 00698 bmpi::timer timer; 00699 00700 // make one pass over the data to figure out dimensions - will pay in terms of preallocated storage. 00701 if (rank==0) { 00702 while(!file.eof()) { 00703 getline(file, line); 00704 if(line.length()==0) 00705 break; 00706 delim = line.find_last_of(":"); 00707 if(delim > line.length()) 00708 continue; 00709 n++; 00710 t = delim; 00711 while(line[t]!=' ') { 00712 t--; 00713 } 00714 val = line.substr(t+1, delim - t); 00715 last = atoi(val.c_str()); 00716 if (last>d) 00717 d = last; 00718 } 00719 if (min_d > 0) 00720 d = std::max(d, min_d); 00721 00722 // prepare for second pass 00723 file.clear(); 00724 file.seekg(0, std::ios::beg); 00725 } 00726 00727 00728 boost::mpi::broadcast(comm, n, 0); 00729 boost::mpi::broadcast(comm, d, 0); 00730 00731 // Number of examples per process 00732 int* examples_allocation = new int[size]; 00733 int chunksize = (int) n / size; 00734 int leftover = n % size; 00735 for(int i=0;i<size;i++) 00736 examples_allocation[i] = chunksize; 00737 for(int i=0;i<leftover;i++) 00738 examples_allocation[i]++; 00739 00740 comm.barrier(); 00741 00742 // read chunks on rank = 0 and send 00743 if(rank==0) { 00744 00745 std::vector<int> col_ptr; 00746 std::vector<int> rowind; 00747 std::vector<double> values; 00748 std::vector<double> y; 00749 00750 int process = 0; 00751 int nnz_local = 0; 00752 int examples_local = 0; 00753 00754 while(!file.eof()) { 00755 getline(file, line); 00756 examples_local++; 00757 if( line.length()==0) { 00758 break; 00759 } 00760 00761 std::istringstream tokenstream (line); 00762 tokenstream >> label; 00763 y.push_back(label); 00764 col_ptr.push_back(nnz_local); 00765 00766 while (tokenstream >> token) 00767 { 00768 delim = token.find(':'); 00769 ind = token.substr(0, delim); 00770 val = token.substr(delim+1); //.substr(delim+1); 00771 j = atoi(ind.c_str()) - 1; 00772 rowind.push_back(j); 00773 values.push_back(atof(val.c_str())); 00774 nnz_local++; 00775 00776 } 00777 00778 if (examples_local == examples_allocation[process]) { 00779 if (process>0) { //send data from rank 0 00780 std::cout << "Sending to " << process << std::endl; 00781 col_ptr.push_back(nnz_local); 00782 comm.send(process, 1, examples_local); 00783 comm.send(process, 2, d); 00784 comm.send(process, 3, nnz_local); 00785 comm.send(process, 4, &col_ptr[0], col_ptr.size()); 00786 comm.send(process, 5, &rowind[0], rowind.size()); 00787 comm.send(process, 6, &values[0], values.size()); 00788 comm.send(process, 7, &y[0], y.size()); 00789 } 00790 else { // rank == 0 - just create the sparse matrix 00791 00792 col_ptr.push_back(nnz_local); //last entry of col_ptr should be total number of nonzeros 00793 00794 // this is making a copy 00795 double *_values = new double[values.size()]; 00796 int *_rowind = new int[rowind.size()]; 00797 int *_col_ptr = new int[col_ptr.size()]; 00798 00799 00800 // DO THE ACTUAL COPY! 00801 std::copy(values.begin(), values.end(), _values); 00802 std::copy(rowind.begin(), rowind.end(), _rowind); 00803 std::copy(col_ptr.begin(), col_ptr.end(), _col_ptr); 00804 00805 X.attach(_col_ptr, _rowind, _values, nnz_local, d, examples_local, true); 00806 00807 elem::Matrix<T> Y2(examples_local, 1, &y[0], 0); 00808 00809 // Y.Resize(examples_local,1); 00810 // but this is not! 00811 //Y.Attach(examples_local,1,&y[0],0); 00812 Y = Y2; // copy 00813 00814 00815 std::cout << "rank=0: Read " << examples_local << " x " << d << " with " << nnz_local << " nonzeros" << std::endl; 00816 } 00817 process++; 00818 nnz_local = 0; 00819 examples_local = 0; 00820 col_ptr.clear(); 00821 rowind.clear(); 00822 values.clear(); 00823 y.clear(); 00824 } 00825 } 00826 00827 00828 } else { 00829 00830 for(int r=1; r<size; r++) { 00831 if ((rank==r) && examples_allocation[r]>0) { 00832 int nnz_local, t, d; 00833 comm.recv(0, 1, t); 00834 comm.recv(0, 2, d); 00835 comm.recv(0, 3, nnz_local); 00836 00837 double* values = new double[nnz_local]; 00838 int* rowind = new int[nnz_local]; 00839 int* col_ptr = new int[t+1]; 00840 00841 comm.recv(0, 4, col_ptr, t+1); 00842 comm.recv(0, 5, rowind, nnz_local); 00843 comm.recv(0, 6, values, nnz_local); 00844 00845 //attach currently creates copies so we can delete the rest 00846 X.attach(col_ptr, rowind, values, nnz_local, d, t, true); 00847 // delete[] col_ptr; 00848 // delete[] rowind; 00849 // delete[] values; 00850 00851 // TODO why not resize Y, and then just recive into buffer? 00852 double* y = new double[t]; 00853 comm.recv(0, 7, y, t); 00854 elem::Matrix<T> Y2(t, 1, y, 0); 00855 Y = Y2; // copy 00856 delete[] y; 00857 00858 //Y.Resize(t,1); 00859 //Y.Attach(t,1,y,0); 00860 00861 std::cout << "rank=" << r << ": Received and read " << t << " x " << d << " with " << nnz_local << " nonzeros" << std::endl; 00862 } 00863 } 00864 } 00865 //} 00866 00867 delete[] examples_allocation; 00868 00869 double readtime = timer.elapsed(); 00870 if (rank==0) 00871 std::cout << "Read Matrix with dimensions: " << n << " by " << d << " (" << readtime << "secs)" << std::endl; 00872 00873 comm.barrier(); 00874 } 00875 00876 00877 template <class InputType, class LabelType> 00878 void read(const boost::mpi::communicator &comm, 00879 int fileformat, std::string filename, InputType& X, LabelType& Y, int d=0) { 00880 00881 switch(fileformat) { 00882 case LIBSVM_DENSE: case LIBSVM_SPARSE: 00883 { 00884 read_libsvm(comm, filename, X, Y, d); 00885 break; 00886 } 00887 case HDF5_DENSE: case HDF5_SPARSE: 00888 { 00889 #ifdef SKYLARK_HAVE_HDF5 00890 read_hdf5(comm, filename, X, Y); 00891 #else 00892 // TODO 00893 #endif 00894 break; 00895 } 00896 } 00897 00898 } 00899 00900 void read_model_file(std::string fName, elem::Matrix<double>& W) { 00901 std::ifstream file(fName.c_str()); 00902 std::string line, token; 00903 std::string prefix = "# Dimensions"; 00904 int i=0; 00905 int j; 00906 int m, n; 00907 while(!file.eof()) { 00908 getline(file, line); 00909 if(line.compare(0, prefix.size(), prefix) == 0) { 00910 std::istringstream tokenstream (line.substr(prefix.size(), line.size())); 00911 tokenstream >> token; 00912 00913 m = atoi(token.c_str()); 00914 tokenstream >> token; 00915 00916 n = atoi(token.c_str()); 00917 std::cout << "Read coefficients of size " << m << " x " << n << std::endl; 00918 W.Resize(m,n); 00919 continue; 00920 } 00921 else { 00922 if(line[0] == '#' || line.length()==0) 00923 continue; 00924 } 00925 00926 std::istringstream tokenstream (line); 00927 j = 0; 00928 while (tokenstream >> token){ 00929 W.Set(i,j, atof(token.c_str())); 00930 j++; 00931 } 00932 i++; 00933 } 00934 } 00935 00936 00937 std::string read_header(const boost::mpi::communicator &comm, std::string fName) { 00938 std::string line; 00939 if (comm.rank()==0) { 00940 std::ifstream file(fName.c_str()); 00941 getline(file, line); 00942 } 00943 00944 boost::mpi::broadcast(comm, line, 0); 00945 return line; 00946 } 00947 00948 00949 #endif /* IO_HPP_ */