CASToR  1.0
Tomographic Reconstruction (PET/SPECT)
vDataFile.cc
Go to the documentation of this file.
00001 
00002 /*
00003   Implementation of class vDataFile
00004 
00005   - separators: X
00006   - doxygen: X
00007   - default initialization: X
00008   - CASTOR_DEBUG: X
00009   - CASTOR_VERBOSE: X
00010 */
00011 
00019 #include "vDataFile.hh"
00020 
00021 // =====================================================================
00022 // ---------------------------------------------------------------------
00023 // ---------------------------------------------------------------------
00024 // =====================================================================
00025 
00026 vDataFile::vDataFile() 
00027 {
00028   mp_ID = NULL;
00029   m_verbose = -1;
00030   
00031   // Variables related to the acquisition
00032   m2p_dataFile = NULL;
00033   m_headerFileName = "";
00034   m_dataFileName = "";
00035   m_scannerName = "";
00036   m_dataMode = MODE_UNKNOWN;
00037   m_dataType = TYPE_UNKNOWN;
00038   m_totalNbEvents = -1;
00039   m_dataType = -1;
00040   m_bedIndex = -1;
00041   m_startTimeInSec = 0.; 
00042   m_durationInSec = -1.;
00043   m_calibrationFactor = 1.;
00044   m_sizeEvent = -1;
00045 
00046   // Default POI (meaning we do not have any)
00047   mp_POIResolution[0] = -1.;
00048   mp_POIResolution[1] = -1.;
00049   mp_POIResolution[2] = -1.;
00050   mp_POIDirectionFlag[0] = false;
00051   mp_POIDirectionFlag[1] = false;
00052   mp_POIDirectionFlag[2] = false;
00053   m_POIInfoFlag = false;
00054   m_ignorePOIFlag = false;
00055   
00056   // Variable related to Buffer/Container arrays
00057   m2p_BufferEvent = NULL;
00058   mp_arrayEvents = NULL;
00059   m2p_bufferEventFromFile = NULL;
00060   m_mpi1stEvent = -1;
00061   m_mpiLastEvent = -1;
00062   m_mpiNbEvents = -1; 
00063   m_1stIdxArrayEvents = -1;
00064   m_lastIdxArrayEvents = -1;
00065   m_sizeArrayEvents = 0;
00066   m_percentageLoad = -1;
00067   m_requestBufferFilling = false;
00068   m_currentlyFillingBuffer = false;
00069   mp_currentlyReadingBuffer = NULL;
00070   mp_overBufferRange = NULL;
00071   mp_nbEventsReadFromFile = NULL;
00072 }
00073 
00074 // =====================================================================
00075 // ---------------------------------------------------------------------
00076 // ---------------------------------------------------------------------
00077 // =====================================================================
00078 
00079 vDataFile::~vDataFile() 
00080 {
00081   // Free array event buffer
00082   if (m_sizeArrayEvents > 0) if (mp_arrayEvents != NULL) delete mp_arrayEvents;
00083     
00084   // Free the (char**) m2p_bufferEventFromFile
00085   if (m2p_bufferEventFromFile)
00086   {
00087     for(int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++)
00088       if (m2p_bufferEventFromFile[th]) delete m2p_bufferEventFromFile[th];
00089     delete[] m2p_bufferEventFromFile;
00090   }
00091   // Free the (vEvent) buffer event
00092   if (m2p_BufferEvent)
00093   {
00094     for(int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++)
00095       if (m2p_BufferEvent[th]) delete m2p_BufferEvent[th];
00096     delete[] m2p_BufferEvent;
00097   }
00098   // Close datafiles and delete them
00099   if (m2p_dataFile)
00100   {
00101     for(int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++)
00102       if (m2p_dataFile[th]) m2p_dataFile[th]->close();
00103     delete m2p_dataFile;
00104   }
00105   // Destroy the boolean tabs that are used to organize and synchronize buffer reading
00106   if (mp_currentlyReadingBuffer) free(mp_currentlyReadingBuffer);
00107   if (mp_overBufferRange) free(mp_overBufferRange);
00108   // Destroy events reading counters, but print out before that
00109   if (mp_nbEventsReadFromFile)
00110   {
00111     // Verbose
00112     if (m_verbose>=4)
00113     {
00114       Cout("vDataFile::~vDataFile() -> Number of events directly read from file for each thread" << endl);
00115       int64_t total = 0;
00116       for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++)
00117       {
00118         Cout("  --> Thread " << th+1 << "  |  " << mp_nbEventsReadFromFile[th] << " events read from file" << endl);
00119         total += mp_nbEventsReadFromFile[th];
00120       }
00121       Cout("  --> Total number of events read from file: " << total << endl);
00122       Cout("  --> Number of events in the datafile: " << m_totalNbEvents << endl);
00123     }
00124     // Free memory
00125     free(mp_nbEventsReadFromFile);
00126   }
00127 }
00128 
00129 // =====================================================================
00130 // ---------------------------------------------------------------------
00131 // ---------------------------------------------------------------------
00132 // =====================================================================
00133 
00134 int vDataFile::ReadInfoInHeader()
00135 {
00136   if (m_verbose>=3) Cout("vDataFile::ReadInfoInHeader() -> Read datafile header from '" << m_headerFileName << " ...'" << endl);
00137   
00138   // Read mandatory general fields in the header, check if errors (either mandatory tag not found or issue during data reading/conversion (==1) )
00139   if (ReadDataASCIIFile(m_headerFileName, "Number of events", &m_totalNbEvents, 1, KEYWORD_MANDATORY) )
00140   {
00141     Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read number of events in the header data file !" << endl);
00142     return 1;
00143   }
00144   if (ReadDataASCIIFile(m_headerFileName, "Scanner name", &m_scannerName, 1, KEYWORD_MANDATORY) ) 
00145   {
00146     Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read scanner name in the header data file !" << endl);
00147     return 1;
00148   }
00149 
00150   // Get data file name
00151   if (ReadDataASCIIFile(m_headerFileName, "Data filename", &m_dataFileName, 1, KEYWORD_MANDATORY) ) 
00152   {
00153     Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read data filename in the header data file !" << endl);
00154     return 1;
00155   }
00156   // If it is an absolute path (start with a OS_SEP) then let it as is, otherwise past the path of the header file name (because we suppose the two
00157   // files are side by side).
00158   // This is currently only unix compatible...
00159   if (m_dataFileName.substr(0,1)!=OS_SEP && m_headerFileName.find(OS_SEP)!=string::npos)
00160   {
00161     // Extract the path from the header file name
00162     size_t last_slash = m_headerFileName.find_last_of(OS_SEP);
00163     // Paste the path to the data file name
00164     m_dataFileName = m_headerFileName.substr(0,last_slash+1) + m_dataFileName;
00165   }
00166 
00167   // For data mode, multiple declarations are allowed, so we get the mode as a string and do some checks
00168   string data_mode = "";
00169   if (ReadDataASCIIFile(m_headerFileName, "Data mode", &data_mode, 1, KEYWORD_MANDATORY) ) 
00170   {
00171     Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read data mode in the header data file !" << endl);
00172     return 1;
00173   }
00174   if ( data_mode=="LIST" || data_mode=="LISTMODE" || data_mode=="LIST-MODE" ||
00175        data_mode=="list" || data_mode=="listmode" || data_mode=="list-mode" ||  data_mode=="0" ) m_dataMode = MODE_LIST;
00176   else if ( data_mode=="HISTOGRAM" || data_mode=="histogram" || data_mode=="Histogram" ||
00177             data_mode=="HISTO" || data_mode=="histo" || data_mode=="Histo" || data_mode=="1" ) m_dataMode = MODE_HISTOGRAM;
00178   else if ( data_mode=="NORMALIZATION" || data_mode=="normalization" || data_mode=="Normalization" ||
00179             data_mode=="NORM" || data_mode=="norm" || data_mode=="Norm" || data_mode=="2" ) m_dataMode = MODE_NORMALIZATION;
00180   else
00181   {
00182     Cerr("***** vDataFile::ReadInfoInHeader() -> Unknown data mode '" << data_mode << "' found in header file !" << endl);
00183     return 1;
00184   }
00185 
00186   // For data type, multiple declarations are allowed, so we get the mode as a string and do some checks
00187   string data_type = "";
00188   if (ReadDataASCIIFile(m_headerFileName, "Data type", &data_type, 1, KEYWORD_MANDATORY) ) 
00189   {
00190     Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read data type in the header data file !" << endl);
00191     return 1;
00192   }
00193   if ( data_type=="PET" || data_type=="pet" || data_type=="0" ) m_dataType = TYPE_PET;
00194   else if ( data_type=="SPECT" || data_type=="spect" || data_type=="1" ) m_dataType = TYPE_SPECT;
00195   else if ( data_type=="TRANSMISSION" || data_type=="transmission" || data_type=="Transmission" ||
00196             data_type=="trans" || data_type=="TRANS" || data_type=="2" ) m_dataType = TYPE_TRANSMISSION;
00197   else
00198   {
00199     Cerr("***** vDataFile::ReadInfoInHeader() -> Unknown data type '" << data_type << "' found in header file !" << endl);
00200     return 1;
00201   }
00202 
00203   // Get start time and duration, and then set the acquisition timing (do not do this for MODE_NORMALIZATION)
00204   if (m_dataMode!=MODE_NORMALIZATION)
00205   {
00206     if (ReadDataASCIIFile(m_headerFileName, "Start time (s)", &m_startTimeInSec, 1, KEYWORD_MANDATORY) ) 
00207     {
00208       Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read acquisition start time in the header data file !" << endl);
00209       return 1;
00210     }
00211     if (ReadDataASCIIFile(m_headerFileName, "Duration (s)", &m_durationInSec, 1, KEYWORD_MANDATORY) ) 
00212     {
00213       Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read acquisition stop time in the header data file !" << endl);
00214       return 1;
00215     }
00216     if (mp_ID->SetAcquisitionTime(m_bedIndex, m_startTimeInSec, m_durationInSec))
00217     {
00218       Cerr("***** vDataFile::ReadInfoInHeader() -> Error while passing acquisition start and stop time to the ImageDimensionsAndQuantification !" << endl);
00219       return 1;
00220     }
00221   }
00222 
00223   // Read optional fields in the header, check if errors (issue during data reading/conversion (==1) )
00224   if (ReadDataASCIIFile(m_headerFileName, "Calibration factor", &m_calibrationFactor, 1, KEYWORD_OPTIONAL)==1 ||
00225       ReadDataASCIIFile(m_headerFileName, "POI capability", mp_POIResolution, 3, KEYWORD_OPTIONAL)==1 ||
00226       ReadDataASCIIFile(m_headerFileName, "POI correction flag", &m_POIInfoFlag, 3, KEYWORD_OPTIONAL)==1 )
00227   {
00228     Cerr("***** vDataFile::ReadInfoInHeader() -> Error while reading optional field in the header data file !" << endl);
00229     return 1;
00230   }
00231   
00232   // Read fields specific to the modality (call to the related function implemented in child classes)
00233   if (ReadSpecificInfoInHeader() )
00234   {
00235     Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read modality-specific informations from the header data file !" << endl);
00236     return 1;
00237   }
00238 
00239   // Give the calibration factor to the oImageDimensionsAndQuantification that manages the quantification factors
00240   if (m_dataMode!=MODE_NORMALIZATION && mp_ID->SetCalibrationFactor(m_bedIndex, m_calibrationFactor))
00241   {
00242     Cerr("***** vDataFile::ReadSpecificInfoInHeader() -> A problem occured while setting the calibration factor to oImageDimensionsAndQuantification !" << endl);
00243     return 1;
00244   }
00245 
00246   return 0;
00247 }
00248 
00249 // =====================================================================
00250 // ---------------------------------------------------------------------
00251 // ---------------------------------------------------------------------
00252 // =====================================================================
00253 
00254 int vDataFile::CheckParameters()
00255 {
00256   if(m_verbose >=3) Cout("vDataFile::CheckParameters() ..." << endl);
00257 
00258   if (mp_ID == NULL)
00259   {
00260     Cerr("***** vDataFile::CheckParameters() -> Error : ImageDimensionsAndQuantification object not initialized !" << endl);
00261     return 1;
00262   }
00263   if (m_headerFileName == "")
00264   {
00265     Cerr("***** vDataFile::CheckParameters() -> Error : string containing path to header file not initialized !" << endl);
00266     return 1;
00267   }  
00268   if (m_dataFileName == "")
00269   {
00270     Cerr("***** vDataFile::CheckParameters() -> Error : string containing path to raw data file not initialized !" << endl);
00271     return 1;
00272   }
00273   if (m_percentageLoad<0 || m_percentageLoad>100)
00274   {
00275     Cerr("***** vDataFile::CheckParameters() -> Percentage load of the data file incorrectly set !" << endl);
00276     return 1;
00277   }
00278   if (m_totalNbEvents<0)
00279   {
00280     Cerr("***** vDataFile::CheckParameters() -> Number of events incorrectly initialized !" << endl);
00281     return 1;
00282   }
00283   if (m_dataMode!=MODE_LIST && m_dataMode!=MODE_HISTOGRAM && m_dataMode!=MODE_NORMALIZATION)
00284   {
00285     Cerr("***** vDataFile::CheckParameters() -> Data mode incorrectly initialized !" << endl);
00286     return 1;
00287   }
00288   if (m_dataMode!= MODE_NORMALIZATION && m_durationInSec<0)
00289   {
00290     Cerr("***** vDataFile::CheckParameters() -> Acquisition duration (s) incorrectly initialized !" << endl);
00291     return 1;
00292   }
00293   if (m_dataType!=TYPE_PET && m_dataType!=TYPE_SPECT && m_dataType!=TYPE_TRANSMISSION)
00294   {
00295     Cerr("***** vDataFile::CheckParameters() -> Data type incorrectly initialized !" << endl);
00296     return 1;
00297   }
00298   if (m_bedIndex<0)
00299   {
00300     Cerr("***** vDataFile::CheckParameters() -> Bed position index incorrectly initialized !" << endl);
00301     return 1;
00302   }
00303   if (m_verbose<0)
00304   {
00305     Cerr("***** vDataFile::CheckParameters() -> Verbosity incorrectly initialized !" << endl);
00306     return 1;
00307   }
00308   
00309   // Call to the related function implemented in child classes
00310   if (CheckSpecificParameters())
00311   {
00312     Cerr("***** vDataFile::CheckParameters() -> Error while checking specific parameters !" << endl);
00313     return 1;
00314   }
00315 
00316   // End
00317   return 0;
00318 }
00319 
00320 // =====================================================================
00321 // ---------------------------------------------------------------------
00322 // ---------------------------------------------------------------------
00323 // =====================================================================
00324 
00325 int vDataFile::InitializeFile()
00326 {
00327   if (m_verbose>=3) Cout("vDataFile::InitializeFile() ..." << endl);
00328   
00329   // Instantiate as many file streams as threads
00330   m2p_dataFile = new fstream*[mp_ID->GetNbThreadsForProjection()];
00331   for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++)
00332   {
00333     m2p_dataFile[th] = new fstream( m_dataFileName.c_str(), ios::binary| ios::in );
00334     // If the datafile does not exist, notify to the user and exit program.
00335     if (!m2p_dataFile[th]->is_open()) 
00336     {
00337       Cerr("***** vDataFile::InitializeFile() -> Error reading the input data file at the path: '" << m_dataFileName.c_str() << "'" << endl);
00338       Cerr("                                     (provided in the data file header: " << m_headerFileName << ")" << endl);
00339       return 1;
00340     }
00341   }
00342   // Check file size consistency here (this is a pure virtual function implemented by children)
00343   if (CheckFileSizeConsistency())
00344   {
00345     Cerr("***** vDataFile::InitializeFile() -> A problem occured while checking file size consistency !" << endl);
00346     return 1;
00347   }
00348   // Initialization of the buffer related to direct data reading from file (during reconstruction)
00349   m2p_bufferEventFromFile = new char*[mp_ID->GetNbThreadsForProjection()];
00350   for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++)
00351   {
00352     m2p_bufferEventFromFile[th] = new char[m_sizeEvent];
00353   }
00354   // Initialization of the boolean tab used to know if any thread is currently reading into the file buffer
00355   mp_currentlyReadingBuffer = (bool*)malloc(mp_ID->GetNbThreadsForProjection()*sizeof(bool));
00356   for (int th=0; th<mp_ID->GetNbThreadsForProjection(); th++) mp_currentlyReadingBuffer[th] = false;
00357   // Initialization of the boolean tab used to know if some threads are under the file buffer range
00358   mp_overBufferRange = (bool*)malloc(mp_ID->GetNbThreadsForProjection()*sizeof(bool));
00359   for (int th=0; th<mp_ID->GetNbThreadsForProjection(); th++) mp_overBufferRange[th] = false;
00360   // Initialization of the counters for how many events are read directly from file
00361   mp_nbEventsReadFromFile = (int64_t*)malloc(mp_ID->GetNbThreadsForProjection()*sizeof(int64_t));
00362   for (int th=0; th<mp_ID->GetNbThreadsForProjection(); th++) mp_nbEventsReadFromFile[th] = 0;
00363 
00364   // ------------------ Compute here the piece of file that each MPI instance manages --------------------
00365 
00366   // 1. Compute the number of events that each instance has to manage
00367   int64_t instance_size = m_totalNbEvents / mp_ID->GetMPISize();
00368   // All instances manage an equal part of the file but the last one which also manages the few last events
00369   if (mp_ID->GetMPIRank()!=mp_ID->GetMPISize()-1) m_mpiNbEvents = instance_size;
00370   else m_mpiNbEvents = instance_size + (m_totalNbEvents - instance_size*mp_ID->GetMPISize());
00371   // 2. Compute the first event managed by the instance
00372   m_mpi1stEvent = mp_ID->GetMPIRank() * instance_size;
00373   // 3. Compute the last event managed by the instance
00374   m_mpiLastEvent = m_mpi1stEvent + m_mpiNbEvents - 1;
00375     
00376   // Instanciation of the array of preloaded events (if enabled)
00377   if (m_percentageLoad>0)
00378   {
00379     if (m_verbose>=3)
00380     {
00381       Cout("  --> Allocating memory for the datafile" << endl);
00382       Cout("  --> Number of events in memory: " << m_mpiNbEvents << endl);
00383     }
00384     // Computation of the exact size of the buffer of preloaded events from the provided percentage load
00385     m_sizeArrayEvents = (((int64_t)m_percentageLoad) * m_mpiNbEvents) / 100;
00386     // Compute the number of bytes
00387     int64_t nb_bytes_to_load = m_sizeArrayEvents*m_sizeEvent;
00388     // Verbose to say that we proceed to the first reading of the datafile
00389     if (m_verbose>=3) Cout("  --> Load of " << m_percentageLoad << "% of the datafile (" << nb_bytes_to_load << " bytes)" << endl);
00390     // Instantiate the array of events in which the file content is directly copied
00391     mp_arrayEvents = new char [nb_bytes_to_load];
00392   }
00393   else m_sizeArrayEvents = 0;
00394 
00395   // End
00396   return 0;
00397 }
00398 
00399 // =====================================================================
00400 // ---------------------------------------------------------------------
00401 // ---------------------------------------------------------------------
00402 // =====================================================================
00403 
00404 void vDataFile::ResetBufferRange()
00405 {
00406   // Reset the range indices only if the percentage is strictly between 0 and 100.
00407   // For the case 0, it is simply a security.
00408   // For the case 100, it would force the buffer to be re-read again...
00409   // The purpose of this function is to be called before a loop over the datafile, so that all
00410   // threads will have to deal with event indices always inside or above the range (never under).
00411   // This is important to avoid that the thread 0 filling the buffer is ahead of other threads, leading to the other threads
00412   // not benefiting from the file buffer. There is a mechanism in the GetEvent() function so that the thread 0 waits for the
00413   // other threads to be over the buffer range before loading a new portion of the data file into the buffer.
00414   if (m_percentageLoad>0 && m_percentageLoad<100)
00415   {
00416     m_1stIdxArrayEvents = -1;
00417     m_lastIdxArrayEvents = -1;
00418   }
00419 }
00420 
00421 // =====================================================================
00422 // ---------------------------------------------------------------------
00423 // ---------------------------------------------------------------------
00424 // =====================================================================
00425 
00426 int vDataFile::FillBuffer(int64_t a_eventIndex)
00427 {
00428   #ifdef CASTOR_VERBOSE
00429   if (m_verbose >=4) Cout("vDataFile::FillBuffer() ..." << endl);
00430   #endif
00431 
00432   // Important note: m_mpiLastEvent/m_lastIdxArrayEvent are the last event indices INCLUDED in the instance/buffer
00433 
00434   // Note: This function is called from the GetEvent() function inside the iterations, but only by thread #0
00435   int thread_0 = 0;
00436 
00437   // Seek to the appropriate position in the input data file
00438   m2p_dataFile[thread_0]->seekg(m_sizeEvent*a_eventIndex);
00439 
00440   // Compute the last event index that can be loaded into the buffer (included; not excluded)
00441   int64_t last_included_event_that_can_be_loaded = a_eventIndex + m_sizeArrayEvents - 1;
00442 
00443   // This is the number of events that will be loaded
00444   int64_t nb_events_to_load = 0;
00445   // If the number of remaining events to be loaded is less than the buffer capacity, we only load until m_mpiLastEvent is reached
00446   if (last_included_event_that_can_be_loaded > m_mpiLastEvent) nb_events_to_load = m_mpiLastEvent + 1 - a_eventIndex;
00447   // Otherwise we fully load the buffer
00448   else nb_events_to_load = m_sizeArrayEvents;
00449 
00450   // Read the data
00451   m2p_dataFile[thread_0]->read(mp_arrayEvents, nb_events_to_load*m_sizeEvent);
00452 
00453   // Todo: histogram/SPECT only!!!!!!!!!!!!!!
00454   //Shuffle( nb_events_to_load );
00455 
00456   // Check if the reading was successfull
00457   if (!(*m2p_dataFile[thread_0]))
00458   {
00459     Cerr("***** vDataFile::FillBuffer() -> Failed to read " << nb_events_to_load << " events in datafile (only read " << m2p_dataFile[thread_0]->gcount() << " events) !" << endl);
00460     return 1;
00461   }
00462 
00463   // Update the buffer first and last indices
00464   m_1stIdxArrayEvents  = a_eventIndex;
00465   m_lastIdxArrayEvents = m_1stIdxArrayEvents + nb_events_to_load - 1;
00466 
00467   // Verbose
00468   #ifdef CASTOR_VERBOSE
00469   if (m_verbose >=4) Cout("vDataFile::FillBuffer() completed" << endl);
00470   #endif
00471 
00472   // End
00473   return 0;
00474 }
00475 
00476 // =====================================================================
00477 // ---------------------------------------------------------------------
00478 // ---------------------------------------------------------------------
00479 // =====================================================================
00480 
00481 int vDataFile::Shuffle( int64_t nb_events_to_load )
00482 {
00483   // Buffer storing ordered indices from ( 0 ) to ( nb_events_to_load - 1 )
00484   int64_t *rndmIdx = new int64_t[ nb_events_to_load ];
00485   std::iota( rndmIdx, rndmIdx + nb_events_to_load, 0 );
00486 
00487   // Initializing random
00488   std::random_device rd;
00489   std::mt19937 rndm( rd() );
00490   rndm.seed( 1100001001 );
00491 
00492   // Shuffling the buffer of indices
00493   std::shuffle( rndmIdx, rndmIdx + nb_events_to_load, rndm );
00494 
00495   // Creating a tmp buffer
00496   char *mp_arrayEvents_tmp = new char[ nb_events_to_load*m_sizeEvent ];
00497 
00498   // Copy sorted buffer to shuffled buffer
00499   for( int64_t i = 0; i < nb_events_to_load; ++i )
00500   {
00501     for( int64_t j = 0; j < m_sizeEvent; ++j )
00502     {
00503       mp_arrayEvents_tmp[ i * m_sizeEvent + j ] = mp_arrayEvents[ rndmIdx[ i ] * m_sizeEvent + j ];
00504     }
00505   }
00506 
00507   // Freeing memory
00508   delete[] rndmIdx;
00509   delete[] mp_arrayEvents;
00510 
00511   mp_arrayEvents = mp_arrayEvents_tmp;
00512 
00513   return 0;
00514 }
00515 
00516 // =====================================================================
00517 // ---------------------------------------------------------------------
00518 // ---------------------------------------------------------------------
00519 // =====================================================================
00520 
00521 vEvent* vDataFile::GetEventWithAscendingOrderAssumption(int64_t a_eventIndex, int64_t a_eventIndexLimit, int a_th)
00522 {
00523   // With the assumption that this function is called from a loop over event indices in an ascending order, the implementation
00524   // is as follows:
00525   //  - The ResetBufferRange() function MUST be called before launching the loop over event indices
00526   //  - Thread 0 is a particular case: it is the only one able to fill the buffer, and to do that it waits for the other threads to be above the current range.
00527   //  - The other threads read directly from the file if the first thread is currently filling the buffer, or if below or above the current range. In the latter
00528   //    case, they set a boolean flag saying they are above so that the thread 0 can know it. Otherwise they read from the buffer.
00529   // See the comments at the beginning of the GetEventWithoutOrderAssumption() function for details about motivations, etc.
00530 
00531   #ifdef CASTOR_VERBOSE
00532   if (m_verbose >=4) Cout("vDataFile::GetEventWithAscendingOrderAssumption() ..." << endl);
00533   #endif
00534 
00535   // Particular case for an empty buffer (-load 0): read the event directly from the file, whatever the thread number
00536   if (m_sizeArrayEvents==0) return GetEventFromFile(a_eventIndex, a_th);
00537 
00538   // Special treatment for the first thread (it is the only one allowed to refill the buffer)
00539   if (a_th==0)
00540   {
00541     // If the event index is below the current indices range loaded in memory, it is not normal as only this thread refills the buffer
00542     if (a_eventIndex<m_1stIdxArrayEvents)
00543     {
00544       Cerr("***** vDataFile::GetEventWithAscendingOrderAssumption() -> The requested event index is below the current range of indices loaded in memory !" << endl);
00545       Cerr("  --> Note that this situation should never happen if this function is being called within an ascending ordered loop on event indices, AND if" << endl);
00546       Cerr("      the vDataFile::ResetBufferRange() function has been called before performing the loop. So if this is really the case, then please report" << endl);
00547       Cerr("      the bug." << endl);
00548       return NULL;
00549     }
00550     // Else if the event is above the current indices range, then this first thread has to refill the buffer
00551     else if (a_eventIndex>m_lastIdxArrayEvents)
00552     {
00553       // Here, the thread has to wait for all others to be over the current buffer range
00554       bool all_threads_over_range = true;
00555       for (int th=1; th<mp_ID->GetNbThreadsForProjection(); th++)
00556         all_threads_over_range = all_threads_over_range && mp_overBufferRange[th];
00557       while (!all_threads_over_range)
00558       {
00559         all_threads_over_range = true;
00560         for (int th=1; th<mp_ID->GetNbThreadsForProjection(); th++)
00561           all_threads_over_range = all_threads_over_range && mp_overBufferRange[th];
00562       }
00563       // Now we are good, so we notify that the first thread is currently filling the buffer
00564       m_currentlyFillingBuffer = true;
00565       // Now the first thread can safely fill the buffer from the current event index, because all threads are above the range and they know the first
00566       // thread will refill the buffer
00567       if (FillBuffer(a_eventIndex))
00568       {
00569         Cerr("***** vDataFile::GetEventWithAscendingOrderAssumption() -> An error occured while filling the buffer from datafile !" << endl);
00570         return NULL;
00571       }
00572       // Say that the first thread is done filling the buffer
00573       m_currentlyFillingBuffer = false;
00574     }
00575     // Get address of the event to recover from the array buffer
00576     char* event_address_in_array = mp_arrayEvents + (a_eventIndex-m_1stIdxArrayEvents)*m_sizeEvent;
00577     // And read from the buffer
00578     return GetEventFromBuffer(event_address_in_array, a_th);
00579   }
00580   // For the other threads
00581   else
00582   {
00583     // The event pointer that will be returned
00584     vEvent* p_event;
00585     // We first check if the thread 0 is currently filling the buffer; in that case then read from file
00586     if (m_currentlyFillingBuffer)
00587     {
00588       // In this case, we still say that the thread is over the buffer range. In case this thread finishes its part of the loop
00589       // while the first thread is filling the buffer, the first thread won't be blocked into waiting for all threads to be over
00590       // the range.
00591       mp_nbEventsReadFromFile[a_th]++;
00592       p_event = GetEventFromFile(a_eventIndex, a_th);
00593     }
00594     // Else if the event index is below the current indices range
00595     else if (a_eventIndex<m_1stIdxArrayEvents)
00596     {
00597       // This situation may still occur in some cases: right after a buffer filling and if the number of subsets is high. This
00598       // is because the new portion of the data file loaded by the thread 0 is not strictly contiguous to the previous portion
00599       // due to subsets and multi-threading (in case of a block thread sequence higher than 1 or a high number of threads). In
00600       // this case we read directly from the file.
00601       mp_overBufferRange[a_th] = false;
00602       mp_nbEventsReadFromFile[a_th]++;
00603       p_event = GetEventFromFile(a_eventIndex, a_th);
00604     }
00605     // Else if the event index is above the current indices range
00606     else if (a_eventIndex>m_lastIdxArrayEvents)
00607     {
00608       // We say that this thread is above the current range, so that the thread 0 knows that
00609       mp_overBufferRange[a_th] = true;
00610       // And we read directly from the file
00611       mp_nbEventsReadFromFile[a_th]++;
00612       p_event = GetEventFromFile(a_eventIndex, a_th);
00613     }
00614     // Else, we can safely read from the buffer
00615     else
00616     {
00617       // We check again here that the first thread is not reading into the buffer in case it started between now and the first check.
00618       // This allows to catch some potential bad behaviors in extrem conditions (a lot of threads with a very small load percentage).
00619       if (m_currentlyFillingBuffer)
00620       {
00621         mp_nbEventsReadFromFile[a_th]++;
00622         p_event = GetEventFromFile(a_eventIndex, a_th);
00623       }
00624       else
00625       {
00626         // Say that we are not above range
00627         mp_overBufferRange[a_th] = false;
00628         // Compute the adress into the buffer
00629         char* event_address_in_array = mp_arrayEvents + (a_eventIndex-m_1stIdxArrayEvents)*m_sizeEvent;
00630         // Get the event
00631         p_event = GetEventFromBuffer(event_address_in_array, a_th);
00632       }
00633     }
00634     // If the current index is over the provided limit index, we say that this thread is over the buffer range to avoid thread 0 to be stuck
00635     if (a_eventIndex >= a_eventIndexLimit) mp_overBufferRange[a_th] = true;
00636     // Return the event
00637     return p_event;
00638   }
00639 }
00640 
00641 // =====================================================================
00642 // ---------------------------------------------------------------------
00643 // ---------------------------------------------------------------------
00644 // =====================================================================
00645 
00646 vEvent* vDataFile::GetEventWithoutOrderAssumption(int64_t a_eventIndex, int a_th)
00647 {
00648   // Without any assumption about the order of event indices, the implementation of this function is quite
00649   // tricky when we want genericity and efficiency! We absolutely want that only one thread deals with the
00650   // buffer filling (the first thread) while others are not blocked during this process and can still read
00651   // events directly from the file. Under these restrictions, multiple strategies were tested, but the current
00652   // one is the most efficient and stable one. The most stressful situation for this function is when using
00653   // many threads with a small load percentage. The use of OpenMP locks, in many different ways has always
00654   // lead to rare occurences (1-2%) of segmentation fault (load percentage typically between 1 to 10 and 16
00655   // threads), which could not be explained but also not tolerated... The current implementation uses home-made
00656   // locks at multiple levels but which does not hamper the efficiency. It has been validated with intense testing
00657   // (1500 reconstructions of the PET benchmark with a number of threads randomly chosen between 50 to 120 and a load
00658   // percentage of 2%; note that if somebody has such a powerfull computer with such a small amount of RAM, it would
00659   // be quite a paradoxal situation, but nonetheless, we wanted to stress this function at most). The results are:
00660   // 1 segmentation fault and 2 runs with a few pixels variating up to 0.2% (basically, it is one event at one moment
00661   // that succeeds in passing between the test of m_requestBufferFilling and the affectation of mp_currentlyReadingBuffer,
00662   // so that it runs through the GetEventFromBuffer function while the first thread also succeed to pass between the
00663   // checks and is filling the buffer at the same time; plus in order to give a corrupted event, the buffer has to be
00664   // changed right before the thread read at this memory location; with such rare stuff in such rare conditions, we feel
00665   // this function is sufficiently robust with respect to the restrictions we assume at the beginning; still, if
00666   // someone has something better, we take it!).
00667   // Anyway, if you want to get events inside an ascending ordered loop over events, then use the GetEventWithAscendingOrderAssumption()
00668   // function instead. It is more efficient and has the same failure rate in the same chaotic conditions (a lot of threads with a very
00669   // small load percentage. But do not forget to call the ResetBufferRange() function before the loop. However, note that this function
00670   // is not yet compatible with the use of MPI.
00671 
00672   #ifdef CASTOR_VERBOSE
00673   if (m_verbose >=4) Cout("vDataFile::GetEventWithoutOrderAssumption() ..." << endl);
00674   #endif
00675 
00676   // Particular case for an empty buffer (-load 0): read the event directly from the file, whatever the thread number
00677   if (m_sizeArrayEvents==0) return GetEventFromFile(a_eventIndex, a_th);
00678 
00679   // Special treatment for the first thread (it is the only one allowed to refill the buffer)
00680   if (a_th==0)
00681   {
00682     // If the event is outside the current buffer, then the thread refills the buffer
00683     if (a_eventIndex<m_1stIdxArrayEvents || a_eventIndex>m_lastIdxArrayEvents)
00684     {
00685       // Say that the first thread is requesting to fill the buffer
00686       m_requestBufferFilling = true;
00687       // Now this thread will wait for all other threads to declare that they are not currently reading into the buffer (otherwise
00688       // the reading of the event by the other threads may be corrupted). The other thread are now aware that the first thread is
00689       // waiting so they will not read from the buffer once finished their current reading.
00690       bool all_threads_not_reading_buffer = true;
00691       for (int th=1; th<mp_ID->GetNbThreadsForProjection(); th++)
00692         all_threads_not_reading_buffer = all_threads_not_reading_buffer && !mp_currentlyReadingBuffer[th];
00693       while (!all_threads_not_reading_buffer)
00694       {
00695         all_threads_not_reading_buffer = true;
00696         for (int th=1; th<mp_ID->GetNbThreadsForProjection(); th++)
00697           all_threads_not_reading_buffer = all_threads_not_reading_buffer && !mp_currentlyReadingBuffer[th];
00698       }
00699       // We are good, so say that the first thread is currently filling the buffer (this is needed below, see explanations)
00700       m_currentlyFillingBuffer = true;
00701       // Now the first thread can safely fill the buffer from the current event index, because as all threads know it was waiting for
00702       // filling the buffer, they know read directly from the file
00703       if (FillBuffer(a_eventIndex))
00704       {
00705         Cerr("***** vDataFile::GetEventWithoutOrderAssumption() -> An error occured while filling the buffer from datafile !" << endl);
00706         return NULL;
00707       }
00708       // Say that the first thread is done filling the buffer
00709       m_currentlyFillingBuffer = false;
00710       // No more request to fill the buffer
00711       m_requestBufferFilling = false;
00712     }
00713     // Get address of the event to recover from the array buffer
00714     char* event_address_in_array = mp_arrayEvents + (a_eventIndex-m_1stIdxArrayEvents)*m_sizeEvent;
00715     // And read from the buffer
00716     return GetEventFromBuffer(event_address_in_array, a_th);
00717   }
00718   // For the other threads
00719   else
00720   {
00721     // If out of buffer range, then read from file (we must check at first if the first thread is currently filling the buffer, otherwise
00722     // the range 1st and last indices may not reflect the actual range; then we cannot use here the m_requestBufferFilling flag instead,
00723     // because some compiler optimization might merge this if section with the next one, and we absolutely need the next one to be alone,
00724     // and right befre the first instruction of the else section setting the mp_currentlyReadingBuffer[a_th] flag to true.
00725     if (m_currentlyFillingBuffer || a_eventIndex<m_1stIdxArrayEvents || a_eventIndex>m_lastIdxArrayEvents)
00726     {
00727       mp_nbEventsReadFromFile[a_th]++;
00728       return GetEventFromFile(a_eventIndex, a_th);
00729     }
00730     // Else if request filling, then read from file (this check has to be separated from the one above, otherwise if there is no buffer
00731     // filling request, between the time this thread do the checks and set the currentlyReadingBuffer to true, the thread0 may have
00732     // enaugh time to start the filling of the buffer, creating erroneous lines, also may be due to some compiler optimizations reversing
00733     // some checks; at least, this version is stable)
00734     else if (m_requestBufferFilling)
00735     {
00736       mp_nbEventsReadFromFile[a_th]++;
00737       return GetEventFromFile(a_eventIndex, a_th);
00738     }
00739     // Else, we can safely read from the buffer
00740     else
00741     {
00742       // Say that this thread is reading into the buffer
00743       mp_currentlyReadingBuffer[a_th] = true;
00744       // Compute the adress into the buffer
00745       char* event_address_in_array = mp_arrayEvents + (a_eventIndex-m_1stIdxArrayEvents)*m_sizeEvent;
00746       // Get the event
00747       vEvent* p_event = GetEventFromBuffer(event_address_in_array, a_th);
00748       // Say that the thread has done reading into the buffer
00749       mp_currentlyReadingBuffer[a_th] = false;
00750       // Return the event
00751       return p_event;
00752     }
00753   }
00754 }
00755 
00756 // =====================================================================
00757 // ---------------------------------------------------------------------
00758 // ---------------------------------------------------------------------
00759 // =====================================================================
00760 
00761 vEvent* vDataFile::GetEventFromFile(int64_t a_eventIndex, int a_th)
00762 {
00763   #ifdef CASTOR_VERBOSE
00764   if (m_verbose >=4) Cout("vDataFile::GetEventFromFile() ..." << endl);
00765   #endif
00766   
00767   // Seek to the position and read the event
00768   m2p_dataFile[a_th]->seekg((int64_t)m_sizeEvent*a_eventIndex);
00769   m2p_dataFile[a_th]->read(m2p_bufferEventFromFile[a_th], m_sizeEvent);
00770   // Check the reading
00771   if (!(*m2p_dataFile[a_th]))
00772   {
00773     Cerr("***** vDataFile::GetEventFromFile() -> Failed to read one event in datafile !" << endl);
00774     return NULL;
00775   }
00776   // Pass the pointer to the event to the interpretor function (implemented by children)
00777   return GetEventFromBuffer(m2p_bufferEventFromFile[a_th], a_th);
00778 }
00779 
00780 // =====================================================================
00781 // ---------------------------------------------------------------------
00782 // ---------------------------------------------------------------------
00783 // =====================================================================
00784 
00785 void vDataFile::GetEventIndexStartAndStop( int64_t* ap_indexStart, int64_t* ap_indexStop, int a_subsetNum, int a_nbSubsets )
00786 {
00787   #ifdef CASTOR_VERBOSE
00788   if (m_verbose >=4) Cout("vDataFile::GetEventIndexStartAndStop() ..." << endl);
00789   #endif
00790   // Basically here, the index start for a single MPI instance will be the current subset number.
00791   // If multiple instances are used, the whole datafile is split into equal-size concatenated pieces.
00792   // So for each instance, we just have to found the first index falling in its range (assuming that
00793   // the event index step is always equal to the number of subsets).
00794 
00795   // For the first machine, index start is the current subset number
00796   if (mp_ID->GetMPIRank()==0) *ap_indexStart = a_subsetNum;
00797   // For the other machines, we must search for the first index falling in their range belonging to this
00798   // subset (a subset being starting at a_subsetNum with a step equal to the number of subsets, a_nbSubsets)
00799   else
00800   {
00801     // Compute the modulo of the first index of this machine minus the subset number with respect to the number of subsets
00802     int64_t modulo = (m_mpi1stEvent-a_subsetNum) % a_nbSubsets;
00803     // If this modulo is null, then the index start is the first index
00804     if (modulo==0) *ap_indexStart = m_mpi1stEvent;
00805     // Otherwise, the index start is equal to the first index plus the number of subsets minus the modulo
00806     else *ap_indexStart = m_mpi1stEvent + (a_nbSubsets - modulo);
00807   }
00808 
00809   // For index stop, we simply get the last event of the MPI instance (+1 because the for loop is exclusive)
00810   *ap_indexStop = m_mpiLastEvent + 1;
00811 }
00812 
00813 // =====================================================================
00814 // ---------------------------------------------------------------------
00815 // ---------------------------------------------------------------------
00816 // =====================================================================
00817 
00818 int vDataFile::GetMaxRingDiff()
00819 {
00820   Cerr("*****vDataFile::GetMaxRingDiff() ->This function is not implemented for the used system" << endl);
00821   Cerr("                                  (this error may be prompted if the present function is erroneously called for a SPECT system)" << endl);
00822   return -1;
00823 }
00824 
00825 /************************************************
00826 * PROJECTION SCRIPT FUNCTIONS
00827 ************************************************/
00828 
00829 // =====================================================================
00830 // ---------------------------------------------------------------------
00831 // ---------------------------------------------------------------------
00832 // =====================================================================
00833 /*
00834   \fn PROJ_WriteData()
00835   \brief Write/Merge chunk of data in a general data file. 
00836   \todo check if some file manipulations are correctly done
00837   \todo adapt to the data loading/writing in RAM
00838   \return 0 if success, and positive value otherwise.
00839 */
00840 int vDataFile::PROJ_WriteData()
00841 {
00842   if (m_verbose >=2) Cout("vDataFile::PROJ_WriteData() ..." << endl);
00843   
00844   // Close all multithreaded datafiles before merging
00845   for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++)
00846     if (m2p_dataFile[th]) m2p_dataFile[th]->close();
00847 
00848 
00849   // If the output file doesn't exist yet, simply rename the first temporary file name using the ouput file name
00850   if (!ifstream(m_dataFileName))
00851   {
00852     // If only one projection is required (i.e one threads && no projection of frame or gates)...
00853     if(mp_ID->GetNbThreadsForProjection()==1  // No multithreading so no multiple tmp datafiles 
00854        && mp_ID->GetNbTimeFrames()*
00855        mp_ID->GetSensNbRespGates()*
00856        mp_ID->GetSensNbCardGates()  == 1)
00857        {
00858          // rename the first temporary file name to the global name
00859          string tmp_file_name = m_dataFileName + "_0";
00860     
00861          // ... then just rename the first temporary file name using the ouput file name
00862          rename(tmp_file_name.c_str(),m_dataFileName.c_str());
00863          // no need to concatenate, so we leave here.
00864          return 0 ;
00865        }
00866   }
00867 
00868   // Create the final output file which will concatenate the events inside the thread-specific data files
00869   ofstream merged_file(m_dataFileName.c_str(), ios::out | ios::binary | ios::app);
00870 
00871   // Concatenation : generate input file ("data_file") to read the buffer of the thread-specific data files and store the information in the final output file
00872   for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++)
00873   {
00874     // Build thread file name
00875     stringstream ss; ss << th;
00876     string file_name = m_dataFileName;
00877     file_name.append("_").append(ss.str());
00878     // Open it
00879     // Note SS: Maybe we can use the m2p_dataFile[th] here by just rewarding to the beginning of the file ?
00880     // Note TM: There were some issues involving the use of rdbuf and concatenation of the file in this case (ifstream were needed instead of fstream for some reasons)
00881     //          But we should have another look on how the projection data writing works with the implementation of the new analytical simulator.
00882     ifstream data_file(file_name.c_str(), ios::binary | ios::in);
00883 
00884     if (!data_file)
00885     {
00886       Cerr(endl);
00887       Cerr("***** vDataFile::PROJ_ConcatenateMthDatafile() -> Input temporary thread file '" << file_name << "' is missing or corrupted !" << endl);
00888       return 1;
00889     }
00890 
00891     // Concatenate it to the merged file
00892     merged_file << data_file.rdbuf();
00893     // Close file
00894     data_file.close();
00895     
00896     // Re-open datafiles (needed if projecting frame/gates, as the contents of the temporary datafile are copied to the main datafile after each frame/gate)
00897     m2p_dataFile[th]->open( file_name.c_str(), ios::binary | ios::out | ios::trunc);
00898   }
00899 
00900   // Close merged file
00901   merged_file.close();
00902 
00903   return 0;
00904 }
00905 
00906 
00907 
00908 
00909 
00910 
00911 // =====================================================================
00912 // ---------------------------------------------------------------------
00913 // ---------------------------------------------------------------------
00914 // =====================================================================
00915 /*
00916   \fn PROJ_DeleteTmpDatafile()
00917   \brief  Delete temporary datafile used for multithreaded output writing if needed
00918   \return 0 if success, and positive value otherwise.
00919 */
00920 int vDataFile::PROJ_DeleteTmpDatafile()
00921 {
00922   if (m_verbose >=3) Cout("vDataFile::PROJ_DeleteTmpDatafile() ..." << endl);
00923   
00924   // Generate input file ("data_file") to read the buffer of the thread-specific data files and store the information in the final output file
00925   for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++)
00926   {
00927     // Build thread file name
00928     stringstream ss; ss << th;
00929 
00930     if (m2p_dataFile[th]) m2p_dataFile[th]->close();
00931 
00932     string file_name = m_dataFileName;
00933     file_name.append("_").append(ss.str());
00934 
00935     // Remove temporary file for data output writing (Projection script only)
00936     ifstream fcheck(file_name.c_str());
00937     if(fcheck.good())
00938     {
00939       fcheck.close();
00940       #ifdef _WIN32
00941       string dos_instruction = "del " + file_name;
00942       system(dos_instruction.c_str());
00943       #else
00944       remove(file_name.c_str());
00945       #endif
00946     }
00947   }
00948   
00949   return 0;
00950 }
00951 
00952 
00953 
00954 
00955 // =====================================================================
00956 // ---------------------------------------------------------------------
00957 // ---------------------------------------------------------------------
00958 // =====================================================================
00959 /*
00960   \fn PROJ_GenerateEvent(int idx_elt1, int idx_elt2, int a_th)
00961   \param idx_elt1 : first ID of the event
00962   \param idx_elt2 : second ID of the event
00963   \param a_th : index of the thread from which the function was called
00964   \brief  Generate a standard event and set up its ID
00965           Used by the projection, list-mode sensitivity generation, and datafile converter scripts
00966   \return the thread specific m2p_BufferEvent array containing the event
00967 */
00968 vEvent* vDataFile::PROJ_GenerateEvent(int a_idxElt1, int a_idxElt2, int a_th)
00969 {
00970   #ifdef CASTOR_VERBOSE
00971   if (m_verbose >=4) Cout("vDataFile::PROJ_GenerateEvent() ..." << endl);
00972   #endif
00973   
00974   // Only 1 line required for projection/sensitivity generation
00975   m2p_BufferEvent[a_th]->SetNbLines(1);  
00976   m2p_BufferEvent[a_th]->SetID1(0, a_idxElt1);
00977   m2p_BufferEvent[a_th]->SetID2(0, a_idxElt2);
00978   
00979   return m2p_BufferEvent[a_th];
00980 }
 All Classes Files Functions Variables Typedefs Defines