Skylark (Sketching Library)  0.1
/var/lib/jenkins/jobs/Skylark/workspace/ml/io.hpp
Go to the documentation of this file.
00001 /*
00002  * io.hpp
00003  *
00004  *  Created on: Feb 7, 2014
00005  *      Author: vikas
00006  */
00007 
00008 #ifndef IO_HPP_
00009 #define IO_HPP_
00010 
00011 #include <boost/mpi.hpp>
00012 #include <sstream>
00013 #include <cstdlib>
00014 #include <string>
00015 #include <elemental.hpp>
00016 #include "../base/sparse_matrix.hpp"
00017 #include "options.hpp"
00018 
00019 namespace bmpi =  boost::mpi;
00020 
00021 typedef elem::DistMatrix<double, elem::CIRC, elem::CIRC> DistCircMatrixType;
00022 typedef skylark::base::sparse_matrix_t<double> sparse_matrix_t;
00023 typedef elem::Matrix<double> LocalMatrixType;
00024 
00025 #ifdef SKYLARK_HAVE_HDF5
00026 #include <H5Cpp.h>
00027 
00028 int write_hdf5(const boost::mpi::communicator &comm, 
00029     std::string fName, elem::Matrix<double>& X,
00030     elem::Matrix<double>& Y) {
00031 
00032     int n, d;
00033     boost::mpi::reduce(comm, X.Width(), n, std::plus<int>(), 0);
00034     d = X.Height();
00035 
00036 
00037     if (comm.rank() == 0) {
00038         try {
00039             std::cout << "Writing to file " << fName << " " << n << "x" << d << std::endl;
00040 
00041             H5::Exception::dontPrint();
00042 
00043             H5::H5File file( fName, H5F_ACC_TRUNC );
00044             H5::FloatType datatype( H5::PredType::NATIVE_DOUBLE );
00045             datatype.setOrder( H5T_ORDER_LE );
00046 
00047             hsize_t dimsf[2]; // dataset dimensions
00048             dimsf[0] = n;
00049             dimsf[1] = d;
00050             H5::DataSpace dataspaceX(2, dimsf);
00051             H5::DataSet datasetX = file.createDataSet("X", datatype, dataspaceX);
00052 
00053             dimsf[0] = n;
00054             H5::DataSpace dataspaceY(1, dimsf);
00055             H5::DataSet datasetY = file.createDataSet("Y", datatype, dataspaceY);
00056 
00057             hsize_t stride[2]; // Stride of hyperslab
00058             hsize_t block[2];  // Block sizes
00059             hsize_t mstart[2];
00060             stride[0] = 1; stride[1] = 1;
00061             block[0] = 1; block[1] = 1;
00062             mstart[0] = 0; mstart[0] = 0;
00063 
00064             int sumn = 0;
00065             for(int p = 0; p < comm.size(); p++) {
00066                 elem::Matrix<double> XX, YY;
00067                 if (p == 0) {
00068                     XX = X;
00069                     YY = Y;
00070                 } else {
00071                     int nn;
00072                     comm.recv(p, 1, nn);
00073                     XX.Resize(d, nn);
00074                     YY.Resize(nn, 1);
00075                     comm.recv(p, 2, XX.Buffer(), nn * d);
00076                     comm.recv(p, 3, YY.Buffer(), nn);
00077 
00078                 }
00079 
00080                 hsize_t start[2]; // Start of hyperslab
00081                 hsize_t count[2];  // Block count
00082 
00083                 start[0] = sumn; start[1] = 0;
00084                 count[0] = XX.Width(); count[1] = d;
00085                 dataspaceX.selectHyperslab(H5S_SELECT_SET, count, start, stride, block);
00086                 H5::DataSpace memspaceX(2, count);
00087                 memspaceX.selectHyperslab(H5S_SELECT_SET, count, mstart, stride, block);
00088                 datasetX.write(XX.Buffer(), H5::PredType::NATIVE_DOUBLE,
00089                     memspaceX, dataspaceX);
00090                 dataspaceX.selectNone();
00091                 dataspaceY.selectHyperslab(H5S_SELECT_SET, count, start, stride, block);
00092                 H5::DataSpace memspaceY(1, count);
00093                 memspaceX.selectHyperslab(H5S_SELECT_SET, count, mstart, stride, block);
00094                 datasetY.write(YY.Buffer(), H5::PredType::NATIVE_DOUBLE,
00095                     memspaceY, dataspaceY);
00096                 dataspaceY.selectNone();
00097                 file.flush(H5F_SCOPE_GLOBAL);
00098                 sumn += XX.Width();
00099             }
00100 
00101             file.close();
00102         }
00103         // catch failure caused by the H5File operations
00104         catch( H5::FileIException error ) {
00105             error.printError();
00106             return -1;
00107         }
00108         // catch failure caused by the DataSet operations
00109         catch( H5::DataSetIException error ) {
00110             error.printError();
00111             return -1;
00112         }
00113         // catch failure caused by the DataSpace operations
00114         catch( H5::DataSpaceIException error ) {
00115             error.printError();
00116             return -1;
00117         }
00118         // catch failure caused by the DataSpace operations
00119         catch( H5::DataTypeIException error ) {
00120             error.printError();
00121             return -1;
00122         }
00123     } else {
00124         comm.send(0, 1, X.Width());
00125         comm.send(0, 2, X.Buffer(), X.Width() * X.Height());
00126         comm.send(0, 3, Y.Buffer(), Y.Height());
00127     }
00128 
00129     comm.barrier();
00130 
00131     return 0; // successfully terminated
00132 }
00133 
00134 int write_hdf5(std::string fName, sparse_matrix_t& X,
00135         elem::Matrix<double>& Y) {
00136 
00137     try {
00138 
00139         std::cout << "Writing to file " << fName << std::endl;
00140 
00141         int* dimensions = new int[3];
00142         dimensions[0] = X.height();
00143         dimensions[1] = X.width();
00144         dimensions[2] = X.nonzeros();
00145 
00146         H5::Exception::dontPrint();
00147 
00148         H5::H5File file( fName, H5F_ACC_TRUNC );
00149 
00150         // write dimensions
00151         {
00152                 hsize_t dim[1]; // dataset dimensions
00153                 dim[0] = 3;
00154                 H5::DataSpace dataspace( 1, dim );
00155                 H5::FloatType datatype( H5::PredType::NATIVE_INT );
00156                 H5::DataSet dataset = file.createDataSet( "dimensions", datatype, dataspace );
00157                 dataset.write(dimensions, H5::PredType::NATIVE_INT);
00158         }
00159 
00160         // write col_ptr
00161         {
00162                 std::cout << "Writing col_ptr" << std::endl;
00163                 hsize_t dim[1]; // dataset dimensions
00164                 dim[0] = dimensions[1]+1;
00165                 H5::DataSpace dataspace( 1, dim );
00166                 H5::FloatType datatype( H5::PredType::NATIVE_INT );
00167                 H5::DataSet dataset = file.createDataSet( "indptr", datatype, dataspace );
00168                 dataset.write(X.indptr(), H5::PredType::NATIVE_INT);
00169         }
00170 
00171         // write row_ind
00172         {
00173                 std::cout << "Writing row indices" << std::endl;
00174                 hsize_t dim[1]; // dataset dimensions
00175                 dim[0] = dimensions[2];
00176                 H5::DataSpace dataspace( 1, dim );
00177                 H5::FloatType datatype( H5::PredType::NATIVE_INT );
00178                 H5::DataSet dataset = file.createDataSet( "indices", datatype, dataspace );
00179                 dataset.write(X.indices(), H5::PredType::NATIVE_INT);
00180         }
00181         // write row_ind
00182         {
00183                 std::cout << "Writing values" << std::endl;
00184                 hsize_t dim[1]; // dataset dimensions
00185                 dim[0] = dimensions[2];
00186                 H5::DataSpace dataspace( 1, dim );
00187                 H5::FloatType datatype( H5::PredType::NATIVE_DOUBLE );
00188                 H5::DataSet dataset = file.createDataSet( "values", datatype, dataspace );
00189                 dataset.write(X.values(), H5::PredType::NATIVE_DOUBLE);
00190         }
00191 
00192 
00193         // write values
00194         {
00195                 std::cout << "Writing targets" << std::endl;
00196                 hsize_t dim[1]; // dataset dimensions
00197                 dim[0] = dimensions[1];
00198                 H5::DataSpace dataspace( 1, dim );
00199                 H5::FloatType datatype( H5::PredType::NATIVE_DOUBLE );
00200                 H5::DataSet dataset = file.createDataSet( "Y", datatype, dataspace );
00201                 dataset.write(Y.Buffer(), H5::PredType::NATIVE_DOUBLE);
00202         }
00203 
00204       delete[] dimensions;
00205       file.close();
00206 
00207     }
00208    // catch failure caused by the H5File operations
00209       catch( H5::FileIException error )
00210       {
00211       error.printError();
00212       return -1;
00213       }
00214       // catch failure caused by the DataSet operations
00215       catch( H5::DataSetIException error )
00216       {
00217       error.printError();
00218       return -1;
00219       }
00220       // catch failure caused by the DataSpace operations
00221       catch( H5::DataSpaceIException error )
00222       {
00223       error.printError();
00224       return -1;
00225       }
00226       // catch failure caused by the DataSpace operations
00227       catch( H5::DataTypeIException error )
00228       {
00229       error.printError();
00230       return -1;
00231       }
00232   return 0; // successfully terminated
00233 }
00234 
00235 void read_hdf5_dataset(H5::H5File& file, std::string name, int* buf, int offset, int count) {
00236                         std::cout << "reading HDF5 dataset " << name << std::endl;
00237                         H5::DataSet dataset = file.openDataSet(name);
00238                         H5::DataSpace filespace = dataset.getSpace();
00239                         hsize_t dims[1];
00240                         dims[0] = count;
00241                         H5::DataSpace mspace(1,dims);
00242                         hsize_t count1[1];
00243                         count1[0] = count;
00244                         hsize_t offset1[1];
00245                         offset1[0] = offset;
00246                         filespace.selectHyperslab( H5S_SELECT_SET, count1, offset1 );
00247                         dataset.read(buf, H5::PredType::NATIVE_INT, mspace, filespace);
00248 }
00249 
00250 void read_hdf5_dataset(H5::H5File& file, std::string name, double* buf, int offset, int count) {
00251                         std::cout << "reading HDF5 dataset " << name << std::endl;
00252                         H5::DataSet dataset = file.openDataSet(name);
00253                         H5::DataSpace filespace = dataset.getSpace();
00254                         hsize_t dims[1];
00255                         dims[0] = count;
00256                         H5::DataSpace mspace(1,dims);
00257                         hsize_t count1[1];
00258                         count1[0] = count;
00259                         hsize_t offset1[1];
00260                         offset1[0] = offset;
00261                         filespace.selectHyperslab( H5S_SELECT_SET, count1, offset1 );
00262                         dataset.read(buf, H5::PredType::NATIVE_DOUBLE, mspace, filespace);
00263 }
00264 
00265 
00266 void read_hdf5(const boost::mpi::communicator &comm, std::string fName,
00267         sparse_matrix_t& X,
00268         elem::Matrix<double>& Y, int min_d = 0) {
00269 
00270         try {
00271         int rank = comm.rank();
00272         int size = comm.size();
00273 
00274         bmpi::timer timer;
00275         if (rank==0)
00276                             std::cout << "Reading sparse matrix from HDF5 file " << fName << std::endl;
00277 
00278         H5::H5File file( fName, H5F_ACC_RDONLY );
00279 
00280         int* dimensions = new int[3];
00281 
00282         read_hdf5_dataset(file, "dimensions", dimensions, 0, 3);
00283 
00284         int d = dimensions[0];
00285         int n = dimensions[1];
00286         int nnz = dimensions[3];
00287 
00288         if (min_d > 0)
00289                 d = std::max(d, min_d);
00290 
00291         int* indptr = new int[n+1];
00292         read_hdf5_dataset(file, "indptr", indptr, 0, n+1);
00293 
00294         boost::mpi::broadcast(comm, n, 0);
00295         boost::mpi::broadcast(comm, d, 0);
00296 
00297          // Number of examples per process
00298         int* examples_allocation = new int[size];
00299         int chunksize = (int) n / size;
00300         int leftover = n % size;
00301         for(int i=0;i< size;i++)
00302                 examples_allocation[i] = chunksize;
00303         for(int i=0;i<leftover;i++)
00304                 examples_allocation[i]++;
00305 
00306         comm.barrier();
00307 
00308          // read chunks on rank = 0 and send
00309          if(rank==0) {
00310                  int examples_local, nnz_start_index = 0, nnz_end_index = 0, nnz_local;
00311                  int examples_total = 0;
00312                          // loop and splice out chunk from the hdf5 file and then send to other processes
00313                  for(int i=0;i<size;i++) {
00314                                  examples_local = examples_allocation[i];
00315 
00316                                  nnz_start_index = indptr[examples_total]; // start index of {examples_total+1}^th example
00317                                  nnz_end_index = indptr[examples_total + examples_local];
00318                                  nnz_local = nnz_end_index - nnz_start_index;
00319 
00320                                  double *_values = new double[nnz_local];
00321                                  int *_rowind = new int[nnz_local];
00322                                  int *_col_ptr = new int[examples_local+1];
00323                                  double *y = new double[examples_local];
00324 
00325                                  read_hdf5_dataset(file, "values", _values, nnz_start_index, nnz_local);
00326                                  read_hdf5_dataset(file, "indices", _rowind, nnz_start_index, nnz_local);
00327                                  read_hdf5_dataset(file, "Y", y, examples_total, examples_local);
00328 
00329                                  for(int k=0;k<examples_local; k++)
00330                                          _col_ptr[k] = indptr[examples_total+k] - nnz_start_index;
00331                                  _col_ptr[examples_local] = nnz_local;
00332 
00333                                  //std::copy(indptr + examples_local_cumulative, indptr + examples_local_cumulative + examples_local, _col_ptr);
00334                                  examples_total += examples_local;
00335 
00336                                  if (i==0) {
00337 
00338                                          X.attach(_col_ptr, _rowind, _values, nnz_local, d, examples_local, true);
00339                                          LocalMatrixType Y2(examples_local, 1, y, 0);
00340                                          Y = Y2;
00341                                          delete[] y;
00342                                          std::cout << "rank=0: Read " << examples_local << " x " << d << " with " << nnz_local << " nonzeros" << std::endl;
00343 
00344                                  } else {
00345 
00346                                          int process = i;
00347                                          std::cout << "Sending to " << process << std::endl;
00348                                          comm.send(process, 1, examples_local);
00349                                          comm.send(process, 2, d);
00350                                          comm.send(process, 3, nnz_local);
00351                                          comm.send(process, 4, _col_ptr, examples_local+1);
00352                                          comm.send(process, 5, _rowind, nnz_local);
00353                                          comm.send(process, 6, _values, nnz_local);
00354                                          comm.send(process, 7, y, examples_local);
00355                                          delete[] _values;
00356                                          delete[] _rowind;
00357                                          delete[] _col_ptr;
00358                                          delete[] y;
00359                                  }
00360                          }
00361             } else {
00362 
00363                         for(int r=1; r<size; r++) {
00364                         if ((rank==r) && examples_allocation[r]>0) {
00365                             int nnz_local, t, d;
00366                             comm.recv(0, 1, t);
00367                             comm.recv(0, 2, d);
00368                             comm.recv(0, 3, nnz_local);
00369 
00370                             double* values = new double[nnz_local];
00371                             int* rowind = new int[nnz_local];
00372                             int* col_ptr = new int[t+1];
00373 
00374                             comm.recv(0, 4, col_ptr, t+1);
00375                             comm.recv(0, 5, rowind, nnz_local);
00376                             comm.recv(0, 6, values, nnz_local);
00377 
00378                             //attach currently creates copies so we can delete the rest
00379                             X.attach(col_ptr, rowind, values, nnz_local, d, t, true);
00380                            // delete[] col_ptr;
00381                            // delete[] rowind;
00382                            // delete[] values;
00383 
00384                             double* y = new double[t];
00385                             comm.recv(0, 7, y, t);
00386                             LocalMatrixType Y2(t, 1, y, 0);
00387                             Y = Y2; // copy
00388                             delete[] y;
00389                             //Y.Resize(t,1);
00390                             //Y.Attach(t,1,y,0);
00391 
00392                             std::cout << "rank=" << r << ": Received and read " << t << " x " << d << " with " << nnz_local << " nonzeros" << std::endl;
00393                         }
00394                     }
00395                 }
00396 
00397          double readtime = timer.elapsed();
00398          if (rank==0)
00399                         std::cout << "Read Matrix with dimensions: " << n << " by " << d << " (" << readtime << "secs)" << std::endl;
00400 
00401          comm.barrier();
00402 
00403         delete[] dimensions;
00404         file.close();
00405         }
00406         // catch failure caused by the H5File operations
00407               catch( H5::FileIException error )
00408               {
00409               error.printError();
00410               //return -1;
00411               }
00412               // catch failure caused by the DataSet operations
00413               catch( H5::DataSetIException error )
00414               {
00415               error.printError();
00416               //return -1;
00417               }
00418               // catch failure caused by the DataSpace operations
00419               catch( H5::DataSpaceIException error )
00420               {
00421               error.printError();
00422               //return -1;
00423               }
00424               // catch failure caused by the DataSpace operations
00425               catch( H5::DataTypeIException error )
00426               {
00427               error.printError();
00428               //return -1;
00429               }
00430           //return 0; // successfully terminated
00431 
00432 }
00433 
00434 void read_hdf5(const boost::mpi::communicator &comm, std::string fName,
00435     LocalMatrixType&  Xlocal, LocalMatrixType& Ylocal, int blocksize = 10000) {
00436 
00437         int rank = comm.rank();
00438         bmpi::timer timer;
00439         if (rank==0)
00440                     std::cout << "Reading Dense matrix from HDF5 file " << fName << std::endl;
00441 
00442 
00443         elem::DistMatrix<double, elem::STAR, elem::VC> X;
00444         elem::DistMatrix<double, elem::VC, elem::STAR> Y;
00445 
00446        H5::H5File file( fName, H5F_ACC_RDONLY );
00447        H5::DataSet datasetX = file.openDataSet( "X" );
00448        H5::DataSpace filespaceX = datasetX.getSpace();
00449        int ndims = filespaceX.getSimpleExtentNdims();
00450        hsize_t dimsX[2]; // dataset dimensions
00451        ndims = filespaceX.getSimpleExtentDims( dimsX );
00452        hsize_t n = dimsX[0];
00453        hsize_t d = dimsX[1];
00454 
00455        H5::DataSet datasetY = file.openDataSet( "Y" );
00456        H5::DataSpace filespaceY = datasetY.getSpace();
00457        hsize_t dimsY[1]; // dataset dimensions
00458 
00459        hsize_t countX[2];
00460        hsize_t countY[1];
00461 
00462         int numblocks = ((int) n/ (int) blocksize); // of size blocksize
00463         int leftover = n % blocksize;
00464         int block = blocksize;
00465 
00466         hsize_t offsetX[2], offsetY[1];
00467 
00468 //        if (min_d > 0)
00469  //                   d = std::max(d, min_d);
00470 
00471         X.Resize(d, n);
00472         Y.Resize(n,1);
00473 
00474         elem::Zeros(Xlocal, X.LocalHeight(), X.LocalWidth());
00475         elem::Zeros(Ylocal, Y.LocalHeight(), 1);
00476 
00477         X.Attach(d,n,0,0,Xlocal,elem::DefaultGrid());
00478         Y.Attach(n,1,0,0,Ylocal,elem::DefaultGrid());
00479 
00480         for(int i=0; i<numblocks+1; i++) {
00481 
00482             if (i==numblocks)
00483                 block = leftover;
00484             if (block==0)
00485                 break;
00486 
00487             DistCircMatrixType x(d, block), y(block, 1);
00488             x.SetRoot(0);
00489             y.SetRoot(0);
00490             elem::MakeZeros(x);
00491 
00492 
00493             offsetX[0] = i*blocksize;
00494             offsetX[1] = 0;
00495             countX[0] = block;
00496             countX[1] = d;
00497             offsetY[0] = i*blocksize;
00498             countY[0] = block;
00499 
00500 
00501             filespaceX.selectHyperslab( H5S_SELECT_SET, countX, offsetX );
00502             filespaceY.selectHyperslab( H5S_SELECT_SET, countY, offsetY );
00503 
00504             if(rank==0) {
00505                 std::cout << "Reading and distributing chunk " << i*blocksize << " to " << i*blocksize + block - 1 << " ("<< block << " elements )" << std::endl;
00506 
00507                 double *Xdata = x.Matrix().Buffer();
00508                 double *Ydata = y.Matrix().Buffer();
00509 
00510                 dimsX[0] = block;
00511                 dimsX[1] = d;
00512                 H5::DataSpace mspace1(2, dimsX);
00513                 datasetX.read( Xdata, H5::PredType::NATIVE_DOUBLE, mspace1, filespaceX );
00514 
00515                 dimsY[0] = block;
00516                 H5::DataSpace mspace2(1,dimsY);
00517                 datasetY.read( Ydata, H5::PredType::NATIVE_DOUBLE, mspace2, filespaceY );
00518 
00519             }
00520 
00521             elem::DistMatrix<double, elem::STAR, elem::VC> viewX;
00522             elem::DistMatrix<double, elem::VC, elem::STAR> viewY;
00523 
00524             elem::View(viewX, X, 0, i*blocksize, x.Height(), x.Width());
00525             elem::View(viewY, Y, i*blocksize, 0, x.Width(), 1);
00526 
00527             viewX = x;
00528             viewY = y;
00529 
00530         }
00531 
00532         double readtime = timer.elapsed();
00533         if (rank==0)
00534                 std::cout << "Read Matrix with dimensions: " << n << " by " << d << " (" << readtime << "secs)" << std::endl;
00535 }
00536 #endif
00537 
00538 template<typename T>
00539 void read_libsvm(const boost::mpi::communicator &comm, std::string fName,
00540                 elem::Matrix<T>& Xlocal, elem::Matrix<T>& Ylocal,
00541                 int min_d = 0, int blocksize = 10000) {
00542 
00543     elem::Grid grid(comm);
00544     elem::DistMatrix<T, elem::STAR, elem::VC> X(grid);
00545     elem::DistMatrix<T, elem::VC, elem::STAR> Y(grid);
00546 
00547         int rank = comm.rank();
00548         if (rank==0)
00549                         std::cout << "Reading from file " << fName << std::endl;
00550 
00551         std::ifstream file(fName.c_str());
00552         std::string line;
00553         std::string token, val, ind;
00554         float label;
00555         unsigned int start = 0;
00556         unsigned int delim, t;
00557         int n = 0;
00558         int d = 0;
00559         int i, j, last;
00560         char c;
00561 
00562         bmpi::timer timer;
00563 
00564 
00565         // make one pass over the data to figure out dimensions - will pay in terms of preallocated storage.
00566         if (rank==0) {
00567             while(!file.eof()) {
00568                 getline(file, line);
00569                 if(line.length()==0)
00570                     break;
00571                 delim = line.find_last_of(":");
00572                 if(delim > line.length())
00573                     continue;
00574                 n++;
00575                 t = delim;
00576                 while(line[t]!=' ') {
00577                     t--;
00578                 }
00579                 val = line.substr(t+1, delim - t);
00580                 last = atoi(val.c_str());
00581                 if (last>d)
00582                     d = last;
00583             }
00584             if (min_d > 0)
00585                 d = std::max(d, min_d);
00586 
00587             // prepare for second pass
00588             file.clear();
00589             file.seekg(0, std::ios::beg);
00590         }
00591 
00592         boost::mpi::broadcast(comm, n, 0);
00593         boost::mpi::broadcast(comm, d, 0);
00594 
00595         int numblocks = ((int) n/ (int) blocksize); // of size blocksize
00596         int leftover = n % blocksize;
00597         int block = blocksize;
00598 
00599         X.Resize(d, n);
00600         Y.Resize(n,1);
00601 
00602         elem::Zeros(Xlocal, X.LocalHeight(), X.LocalWidth());
00603         elem::Zeros(Ylocal, Y.LocalHeight(), 1);
00604 
00605         X.Attach(d,n,0,0,Xlocal,elem::DefaultGrid());
00606         Y.Attach(n,1,0,0,Ylocal,elem::DefaultGrid());
00607 
00608 
00609         for(int i=0; i<numblocks+1; i++) {
00610 
00611                     if (i==numblocks)
00612                         block = leftover;
00613                     if (block==0)
00614                         break;
00615 
00616                     DistCircMatrixType x(d, block), y(block, 1);
00617                     x.SetRoot(0);
00618                     y.SetRoot(0);
00619                     elem::MakeZeros(x);
00620 
00621                 if(rank==0) {
00622 
00623                     std::cout << "Reading and distributing chunk " << i*blocksize << " to " << i*blocksize + block - 1 << " ("<< block << " elements )" << std::endl;
00624                     T *Xdata = x.Matrix().Buffer();
00625                     T *Ydata = y.Matrix().Buffer();
00626 
00627                     t = 0;
00628                     while(!file.eof() && t<block) {
00629                         getline(file, line);
00630                         if( line.length()==0) {
00631                             break;
00632                         }
00633 
00634                         std::istringstream tokenstream (line);
00635                         tokenstream >> label;
00636                         Ydata[t] = label;
00637 
00638                         while (tokenstream >> token)
00639                          {
00640                             delim  = token.find(':');
00641                             ind = token.substr(0, delim);
00642                             val = token.substr(delim+1); //.substr(delim+1);
00643                             j = atoi(ind.c_str()) - 1;
00644                             Xdata[t * d + j] = atof(val.c_str());
00645                          }
00646 
00647                         t++;
00648                     }
00649                  }
00650 
00651                 // The calls below should distribute the data to all the nodes.
00652                // if (rank==0)
00653                 //    std::cout << "Distributing Data.." << std::endl;
00654 
00655                 elem::DistMatrix<T, elem::STAR, elem::VC> viewX;
00656                 elem::DistMatrix<T, elem::VC, elem::STAR> viewY;
00657 
00658                 elem::View(viewX, X, 0, i*blocksize, x.Height(), x.Width());
00659                 elem::View(viewY, Y, i*blocksize, 0, x.Width(), 1);
00660 
00661                 viewX = x;
00662                 viewY = y;
00663 
00664 //      X = x;
00665 //      Y = y;
00666         }
00667 
00668         double readtime = timer.elapsed();
00669         if (rank==0) {
00670                 std::cout << "Read Matrix with dimensions: " << n << " by " << d << " (" << readtime << "secs)" << std::endl;
00671                 // elem::Print(X,"X",std::cout);
00672         }
00673 }
00674 
00675 template<typename T>
00676 void read_libsvm(const boost::mpi::communicator &comm, std::string fName, 
00677     skylark::base::sparse_matrix_t<T>& X, elem::Matrix<T>& Y, int min_d = 0) {
00678 
00679     int rank = comm.rank();
00680     int size = comm.size();
00681 
00682     if (rank==0)
00683             std::cout << "Reading sparse matrix from file " << fName << std::endl;
00684 
00685     std::ifstream file(fName.c_str());
00686     std::string line;
00687     std::string token, val, ind;
00688     float label;
00689     unsigned int start = 0;
00690     unsigned int delim, t;
00691     int n = 0;
00692     int d = 0;
00693     int i, j, last;
00694     char c;
00695     int nnz=0;
00696     int nz;
00697 
00698     bmpi::timer timer;
00699 
00700     // make one pass over the data to figure out dimensions - will pay in terms of preallocated storage.
00701         if (rank==0) {
00702             while(!file.eof()) {
00703                 getline(file, line);
00704                 if(line.length()==0)
00705                     break;
00706                 delim = line.find_last_of(":");
00707                 if(delim > line.length())
00708                     continue;
00709                 n++;
00710                 t = delim;
00711                 while(line[t]!=' ') {
00712                     t--;
00713                 }
00714                 val = line.substr(t+1, delim - t);
00715                 last = atoi(val.c_str());
00716                 if (last>d)
00717                     d = last;
00718             }
00719             if (min_d > 0)
00720                 d = std::max(d, min_d);
00721 
00722             // prepare for second pass
00723             file.clear();
00724             file.seekg(0, std::ios::beg);
00725         }
00726 
00727 
00728    boost::mpi::broadcast(comm, n, 0);
00729    boost::mpi::broadcast(comm, d, 0);
00730 
00731     // Number of examples per process
00732     int* examples_allocation = new int[size];
00733     int chunksize = (int) n / size;
00734     int leftover = n % size;
00735     for(int i=0;i<size;i++)
00736         examples_allocation[i] = chunksize;
00737     for(int i=0;i<leftover;i++)
00738         examples_allocation[i]++;
00739 
00740     comm.barrier();
00741 
00742     // read chunks on rank = 0 and send
00743     if(rank==0) {
00744 
00745         std::vector<int> col_ptr;
00746         std::vector<int> rowind;
00747         std::vector<double> values;
00748         std::vector<double> y;
00749 
00750         int process = 0;
00751         int nnz_local = 0;
00752         int examples_local = 0;
00753 
00754         while(!file.eof()) {
00755             getline(file, line);
00756             examples_local++;
00757             if( line.length()==0) {
00758                 break;
00759             }
00760 
00761             std::istringstream tokenstream (line);
00762             tokenstream >> label;
00763             y.push_back(label);
00764             col_ptr.push_back(nnz_local);
00765 
00766             while (tokenstream >> token)
00767             {
00768                 delim  = token.find(':');
00769                 ind = token.substr(0, delim);
00770                 val = token.substr(delim+1); //.substr(delim+1);
00771                 j = atoi(ind.c_str()) - 1;
00772                 rowind.push_back(j);
00773                 values.push_back(atof(val.c_str()));
00774                 nnz_local++;
00775 
00776             }
00777 
00778             if (examples_local == examples_allocation[process]) {
00779                 if (process>0) { //send data from rank 0
00780                     std::cout << "Sending to " << process << std::endl;
00781                     col_ptr.push_back(nnz_local);
00782                     comm.send(process, 1, examples_local);
00783                     comm.send(process, 2, d);
00784                     comm.send(process, 3, nnz_local);
00785                     comm.send(process, 4, &col_ptr[0], col_ptr.size());
00786                     comm.send(process, 5, &rowind[0], rowind.size());
00787                     comm.send(process, 6, &values[0], values.size());
00788                     comm.send(process, 7, &y[0], y.size());
00789                 }
00790                 else { // rank == 0 - just create the sparse matrix
00791 
00792                     col_ptr.push_back(nnz_local); //last entry of col_ptr should be total number of nonzeros
00793 
00794                     // this is making a copy
00795                     double *_values = new double[values.size()];
00796                     int *_rowind = new int[rowind.size()];
00797                     int *_col_ptr = new int[col_ptr.size()];
00798 
00799 
00800                     // DO THE ACTUAL COPY!
00801                     std::copy(values.begin(), values.end(), _values);
00802                     std::copy(rowind.begin(), rowind.end(), _rowind);
00803                     std::copy(col_ptr.begin(), col_ptr.end(), _col_ptr);
00804 
00805                     X.attach(_col_ptr, _rowind, _values, nnz_local, d, examples_local, true);
00806 
00807                     elem::Matrix<T> Y2(examples_local, 1, &y[0], 0);
00808 
00809                     //  Y.Resize(examples_local,1);
00810                     // but this is not!
00811                     //Y.Attach(examples_local,1,&y[0],0);
00812                     Y = Y2; // copy
00813 
00814 
00815                     std::cout << "rank=0: Read " << examples_local << " x " << d << " with " << nnz_local << " nonzeros" << std::endl;
00816                 }
00817                 process++;
00818                 nnz_local = 0;
00819                 examples_local = 0;
00820                 col_ptr.clear();
00821                 rowind.clear();
00822                 values.clear();
00823                 y.clear();
00824             }
00825         }
00826 
00827 
00828     } else {
00829 
00830         for(int r=1; r<size; r++) {
00831             if ((rank==r) && examples_allocation[r]>0) {
00832                 int nnz_local, t, d;
00833                 comm.recv(0, 1, t);
00834                 comm.recv(0, 2, d);
00835                 comm.recv(0, 3, nnz_local);
00836 
00837                 double* values = new double[nnz_local];
00838                 int* rowind = new int[nnz_local];
00839                 int* col_ptr = new int[t+1];
00840 
00841                 comm.recv(0, 4, col_ptr, t+1);
00842                 comm.recv(0, 5, rowind, nnz_local);
00843                 comm.recv(0, 6, values, nnz_local);
00844 
00845                 //attach currently creates copies so we can delete the rest
00846                 X.attach(col_ptr, rowind, values, nnz_local, d, t, true);
00847                // delete[] col_ptr;
00848                // delete[] rowind;
00849                // delete[] values;
00850 
00851                 // TODO why not resize Y, and then just recive into buffer?
00852                 double* y = new double[t];
00853                 comm.recv(0, 7, y, t);
00854                 elem::Matrix<T> Y2(t, 1, y, 0);
00855                 Y = Y2; // copy
00856                 delete[] y;
00857 
00858                 //Y.Resize(t,1);
00859                 //Y.Attach(t,1,y,0);
00860 
00861                 std::cout << "rank=" << r << ": Received and read " << t << " x " << d << " with " << nnz_local << " nonzeros" << std::endl;
00862             }
00863         }
00864     }
00865     //}
00866 
00867     delete[] examples_allocation;
00868 
00869     double readtime = timer.elapsed();
00870     if (rank==0)
00871         std::cout << "Read Matrix with dimensions: " << n << " by " << d << " (" << readtime << "secs)" << std::endl;
00872 
00873     comm.barrier();
00874 }
00875 
00876 
00877 template <class InputType, class LabelType>
00878 void read(const boost::mpi::communicator &comm,
00879     int fileformat, std::string filename, InputType& X, LabelType& Y, int d=0) {
00880 
00881     switch(fileformat) {
00882             case LIBSVM_DENSE: case LIBSVM_SPARSE:
00883             {
00884                 read_libsvm(comm, filename, X, Y, d);
00885                 break;
00886             }
00887             case HDF5_DENSE: case HDF5_SPARSE:
00888             {
00889                 #ifdef SKYLARK_HAVE_HDF5
00890                 read_hdf5(comm, filename, X, Y);
00891                 #else
00892                     // TODO
00893                 #endif
00894                 break;
00895             }
00896         }
00897 
00898 }
00899 
00900 void read_model_file(std::string fName, elem::Matrix<double>& W) {
00901     std::ifstream file(fName.c_str());
00902         std::string line, token;
00903         std::string prefix = "# Dimensions";
00904         int i=0;
00905         int j;
00906         int m, n;
00907         while(!file.eof()) {
00908                         getline(file, line);
00909                         if(line.compare(0, prefix.size(), prefix) == 0) {
00910                             std::istringstream tokenstream (line.substr(prefix.size(), line.size()));
00911                                 tokenstream >> token;
00912 
00913                                 m = atoi(token.c_str());
00914                                 tokenstream >> token;
00915 
00916                                 n = atoi(token.c_str());
00917                                 std::cout << "Read coefficients of size " << m << " x " << n << std::endl;
00918                                 W.Resize(m,n);
00919                                 continue;
00920                         }
00921                         else {
00922                                 if(line[0] == '#' || line.length()==0)
00923                                         continue;
00924                         }
00925 
00926                         std::istringstream tokenstream (line);
00927                         j = 0;
00928                         while (tokenstream >> token){
00929                                 W.Set(i,j, atof(token.c_str()));
00930                                 j++;
00931                         }
00932                         i++;
00933         }
00934 }
00935 
00936 
00937 std::string read_header(const boost::mpi::communicator &comm, std::string fName) {
00938                 std::string line;
00939                 if (comm.rank()==0) {
00940                     std::ifstream file(fName.c_str());
00941                         getline(file, line);
00942                 }
00943 
00944                 boost::mpi::broadcast(comm, line, 0);
00945                 return line;
00946         }
00947 
00948 
00949 #endif /* IO_HPP_ */