![]() |
CASToR
1.0
Tomographic Reconstruction (PET/SPECT)
|
00001 00002 /* 00003 Implementation of class vDataFile 00004 00005 - separators: X 00006 - doxygen: X 00007 - default initialization: X 00008 - CASTOR_DEBUG: X 00009 - CASTOR_VERBOSE: X 00010 */ 00011 00019 #include "vDataFile.hh" 00020 00021 // ===================================================================== 00022 // --------------------------------------------------------------------- 00023 // --------------------------------------------------------------------- 00024 // ===================================================================== 00025 00026 vDataFile::vDataFile() 00027 { 00028 mp_ID = NULL; 00029 m_verbose = -1; 00030 00031 // Variables related to the acquisition 00032 m2p_dataFile = NULL; 00033 m_headerFileName = ""; 00034 m_dataFileName = ""; 00035 m_scannerName = ""; 00036 m_dataMode = MODE_UNKNOWN; 00037 m_dataType = TYPE_UNKNOWN; 00038 m_totalNbEvents = -1; 00039 m_dataType = -1; 00040 m_bedIndex = -1; 00041 m_startTimeInSec = 0.; 00042 m_durationInSec = -1.; 00043 m_calibrationFactor = 1.; 00044 m_sizeEvent = -1; 00045 00046 // Default POI (meaning we do not have any) 00047 mp_POIResolution[0] = -1.; 00048 mp_POIResolution[1] = -1.; 00049 mp_POIResolution[2] = -1.; 00050 mp_POIDirectionFlag[0] = false; 00051 mp_POIDirectionFlag[1] = false; 00052 mp_POIDirectionFlag[2] = false; 00053 m_POIInfoFlag = false; 00054 m_ignorePOIFlag = false; 00055 00056 // Variable related to Buffer/Container arrays 00057 m2p_BufferEvent = NULL; 00058 mp_arrayEvents = NULL; 00059 m2p_bufferEventFromFile = NULL; 00060 m_mpi1stEvent = -1; 00061 m_mpiLastEvent = -1; 00062 m_mpiNbEvents = -1; 00063 m_1stIdxArrayEvents = -1; 00064 m_lastIdxArrayEvents = -1; 00065 m_sizeArrayEvents = 0; 00066 m_percentageLoad = -1; 00067 m_requestBufferFilling = false; 00068 m_currentlyFillingBuffer = false; 00069 mp_currentlyReadingBuffer = NULL; 00070 mp_overBufferRange = NULL; 00071 mp_nbEventsReadFromFile = NULL; 00072 } 00073 00074 // ===================================================================== 00075 // --------------------------------------------------------------------- 00076 // --------------------------------------------------------------------- 00077 // ===================================================================== 00078 00079 vDataFile::~vDataFile() 00080 { 00081 // Free array event buffer 00082 if (m_sizeArrayEvents > 0) if (mp_arrayEvents != NULL) delete mp_arrayEvents; 00083 00084 // Free the (char**) m2p_bufferEventFromFile 00085 if (m2p_bufferEventFromFile) 00086 { 00087 for(int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++) 00088 if (m2p_bufferEventFromFile[th]) delete m2p_bufferEventFromFile[th]; 00089 delete[] m2p_bufferEventFromFile; 00090 } 00091 // Free the (vEvent) buffer event 00092 if (m2p_BufferEvent) 00093 { 00094 for(int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++) 00095 if (m2p_BufferEvent[th]) delete m2p_BufferEvent[th]; 00096 delete[] m2p_BufferEvent; 00097 } 00098 // Close datafiles and delete them 00099 if (m2p_dataFile) 00100 { 00101 for(int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++) 00102 if (m2p_dataFile[th]) m2p_dataFile[th]->close(); 00103 delete m2p_dataFile; 00104 } 00105 // Destroy the boolean tabs that are used to organize and synchronize buffer reading 00106 if (mp_currentlyReadingBuffer) free(mp_currentlyReadingBuffer); 00107 if (mp_overBufferRange) free(mp_overBufferRange); 00108 // Destroy events reading counters, but print out before that 00109 if (mp_nbEventsReadFromFile) 00110 { 00111 // Verbose 00112 if (m_verbose>=4) 00113 { 00114 Cout("vDataFile::~vDataFile() -> Number of events directly read from file for each thread" << endl); 00115 int64_t total = 0; 00116 for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++) 00117 { 00118 Cout(" --> Thread " << th+1 << " | " << mp_nbEventsReadFromFile[th] << " events read from file" << endl); 00119 total += mp_nbEventsReadFromFile[th]; 00120 } 00121 Cout(" --> Total number of events read from file: " << total << endl); 00122 Cout(" --> Number of events in the datafile: " << m_totalNbEvents << endl); 00123 } 00124 // Free memory 00125 free(mp_nbEventsReadFromFile); 00126 } 00127 } 00128 00129 // ===================================================================== 00130 // --------------------------------------------------------------------- 00131 // --------------------------------------------------------------------- 00132 // ===================================================================== 00133 00134 int vDataFile::ReadInfoInHeader() 00135 { 00136 if (m_verbose>=3) Cout("vDataFile::ReadInfoInHeader() -> Read datafile header from '" << m_headerFileName << " ...'" << endl); 00137 00138 // Read mandatory general fields in the header, check if errors (either mandatory tag not found or issue during data reading/conversion (==1) ) 00139 if (ReadDataASCIIFile(m_headerFileName, "Number of events", &m_totalNbEvents, 1, KEYWORD_MANDATORY) ) 00140 { 00141 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read number of events in the header data file !" << endl); 00142 return 1; 00143 } 00144 if (ReadDataASCIIFile(m_headerFileName, "Scanner name", &m_scannerName, 1, KEYWORD_MANDATORY) ) 00145 { 00146 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read scanner name in the header data file !" << endl); 00147 return 1; 00148 } 00149 00150 // Get data file name 00151 if (ReadDataASCIIFile(m_headerFileName, "Data filename", &m_dataFileName, 1, KEYWORD_MANDATORY) ) 00152 { 00153 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read data filename in the header data file !" << endl); 00154 return 1; 00155 } 00156 // If it is an absolute path (start with a OS_SEP) then let it as is, otherwise past the path of the header file name (because we suppose the two 00157 // files are side by side). 00158 // This is currently only unix compatible... 00159 if (m_dataFileName.substr(0,1)!=OS_SEP && m_headerFileName.find(OS_SEP)!=string::npos) 00160 { 00161 // Extract the path from the header file name 00162 size_t last_slash = m_headerFileName.find_last_of(OS_SEP); 00163 // Paste the path to the data file name 00164 m_dataFileName = m_headerFileName.substr(0,last_slash+1) + m_dataFileName; 00165 } 00166 00167 // For data mode, multiple declarations are allowed, so we get the mode as a string and do some checks 00168 string data_mode = ""; 00169 if (ReadDataASCIIFile(m_headerFileName, "Data mode", &data_mode, 1, KEYWORD_MANDATORY) ) 00170 { 00171 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read data mode in the header data file !" << endl); 00172 return 1; 00173 } 00174 if ( data_mode=="LIST" || data_mode=="LISTMODE" || data_mode=="LIST-MODE" || 00175 data_mode=="list" || data_mode=="listmode" || data_mode=="list-mode" || data_mode=="0" ) m_dataMode = MODE_LIST; 00176 else if ( data_mode=="HISTOGRAM" || data_mode=="histogram" || data_mode=="Histogram" || 00177 data_mode=="HISTO" || data_mode=="histo" || data_mode=="Histo" || data_mode=="1" ) m_dataMode = MODE_HISTOGRAM; 00178 else if ( data_mode=="NORMALIZATION" || data_mode=="normalization" || data_mode=="Normalization" || 00179 data_mode=="NORM" || data_mode=="norm" || data_mode=="Norm" || data_mode=="2" ) m_dataMode = MODE_NORMALIZATION; 00180 else 00181 { 00182 Cerr("***** vDataFile::ReadInfoInHeader() -> Unknown data mode '" << data_mode << "' found in header file !" << endl); 00183 return 1; 00184 } 00185 00186 // For data type, multiple declarations are allowed, so we get the mode as a string and do some checks 00187 string data_type = ""; 00188 if (ReadDataASCIIFile(m_headerFileName, "Data type", &data_type, 1, KEYWORD_MANDATORY) ) 00189 { 00190 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read data type in the header data file !" << endl); 00191 return 1; 00192 } 00193 if ( data_type=="PET" || data_type=="pet" || data_type=="0" ) m_dataType = TYPE_PET; 00194 else if ( data_type=="SPECT" || data_type=="spect" || data_type=="1" ) m_dataType = TYPE_SPECT; 00195 else if ( data_type=="TRANSMISSION" || data_type=="transmission" || data_type=="Transmission" || 00196 data_type=="trans" || data_type=="TRANS" || data_type=="2" ) m_dataType = TYPE_TRANSMISSION; 00197 else 00198 { 00199 Cerr("***** vDataFile::ReadInfoInHeader() -> Unknown data type '" << data_type << "' found in header file !" << endl); 00200 return 1; 00201 } 00202 00203 // Get start time and duration, and then set the acquisition timing (do not do this for MODE_NORMALIZATION) 00204 if (m_dataMode!=MODE_NORMALIZATION) 00205 { 00206 if (ReadDataASCIIFile(m_headerFileName, "Start time (s)", &m_startTimeInSec, 1, KEYWORD_MANDATORY) ) 00207 { 00208 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read acquisition start time in the header data file !" << endl); 00209 return 1; 00210 } 00211 if (ReadDataASCIIFile(m_headerFileName, "Duration (s)", &m_durationInSec, 1, KEYWORD_MANDATORY) ) 00212 { 00213 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read acquisition stop time in the header data file !" << endl); 00214 return 1; 00215 } 00216 if (mp_ID->SetAcquisitionTime(m_bedIndex, m_startTimeInSec, m_durationInSec)) 00217 { 00218 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while passing acquisition start and stop time to the ImageDimensionsAndQuantification !" << endl); 00219 return 1; 00220 } 00221 } 00222 00223 // Read optional fields in the header, check if errors (issue during data reading/conversion (==1) ) 00224 if (ReadDataASCIIFile(m_headerFileName, "Calibration factor", &m_calibrationFactor, 1, KEYWORD_OPTIONAL)==1 || 00225 ReadDataASCIIFile(m_headerFileName, "POI capability", mp_POIResolution, 3, KEYWORD_OPTIONAL)==1 || 00226 ReadDataASCIIFile(m_headerFileName, "POI correction flag", &m_POIInfoFlag, 3, KEYWORD_OPTIONAL)==1 ) 00227 { 00228 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while reading optional field in the header data file !" << endl); 00229 return 1; 00230 } 00231 00232 // Read fields specific to the modality (call to the related function implemented in child classes) 00233 if (ReadSpecificInfoInHeader() ) 00234 { 00235 Cerr("***** vDataFile::ReadInfoInHeader() -> Error while trying to read modality-specific informations from the header data file !" << endl); 00236 return 1; 00237 } 00238 00239 // Give the calibration factor to the oImageDimensionsAndQuantification that manages the quantification factors 00240 if (m_dataMode!=MODE_NORMALIZATION && mp_ID->SetCalibrationFactor(m_bedIndex, m_calibrationFactor)) 00241 { 00242 Cerr("***** vDataFile::ReadSpecificInfoInHeader() -> A problem occured while setting the calibration factor to oImageDimensionsAndQuantification !" << endl); 00243 return 1; 00244 } 00245 00246 return 0; 00247 } 00248 00249 // ===================================================================== 00250 // --------------------------------------------------------------------- 00251 // --------------------------------------------------------------------- 00252 // ===================================================================== 00253 00254 int vDataFile::CheckParameters() 00255 { 00256 if(m_verbose >=3) Cout("vDataFile::CheckParameters() ..." << endl); 00257 00258 if (mp_ID == NULL) 00259 { 00260 Cerr("***** vDataFile::CheckParameters() -> Error : ImageDimensionsAndQuantification object not initialized !" << endl); 00261 return 1; 00262 } 00263 if (m_headerFileName == "") 00264 { 00265 Cerr("***** vDataFile::CheckParameters() -> Error : string containing path to header file not initialized !" << endl); 00266 return 1; 00267 } 00268 if (m_dataFileName == "") 00269 { 00270 Cerr("***** vDataFile::CheckParameters() -> Error : string containing path to raw data file not initialized !" << endl); 00271 return 1; 00272 } 00273 if (m_percentageLoad<0 || m_percentageLoad>100) 00274 { 00275 Cerr("***** vDataFile::CheckParameters() -> Percentage load of the data file incorrectly set !" << endl); 00276 return 1; 00277 } 00278 if (m_totalNbEvents<0) 00279 { 00280 Cerr("***** vDataFile::CheckParameters() -> Number of events incorrectly initialized !" << endl); 00281 return 1; 00282 } 00283 if (m_dataMode!=MODE_LIST && m_dataMode!=MODE_HISTOGRAM && m_dataMode!=MODE_NORMALIZATION) 00284 { 00285 Cerr("***** vDataFile::CheckParameters() -> Data mode incorrectly initialized !" << endl); 00286 return 1; 00287 } 00288 if (m_dataMode!= MODE_NORMALIZATION && m_durationInSec<0) 00289 { 00290 Cerr("***** vDataFile::CheckParameters() -> Acquisition duration (s) incorrectly initialized !" << endl); 00291 return 1; 00292 } 00293 if (m_dataType!=TYPE_PET && m_dataType!=TYPE_SPECT && m_dataType!=TYPE_TRANSMISSION) 00294 { 00295 Cerr("***** vDataFile::CheckParameters() -> Data type incorrectly initialized !" << endl); 00296 return 1; 00297 } 00298 if (m_bedIndex<0) 00299 { 00300 Cerr("***** vDataFile::CheckParameters() -> Bed position index incorrectly initialized !" << endl); 00301 return 1; 00302 } 00303 if (m_verbose<0) 00304 { 00305 Cerr("***** vDataFile::CheckParameters() -> Verbosity incorrectly initialized !" << endl); 00306 return 1; 00307 } 00308 00309 // Call to the related function implemented in child classes 00310 if (CheckSpecificParameters()) 00311 { 00312 Cerr("***** vDataFile::CheckParameters() -> Error while checking specific parameters !" << endl); 00313 return 1; 00314 } 00315 00316 // End 00317 return 0; 00318 } 00319 00320 // ===================================================================== 00321 // --------------------------------------------------------------------- 00322 // --------------------------------------------------------------------- 00323 // ===================================================================== 00324 00325 int vDataFile::InitializeFile() 00326 { 00327 if (m_verbose>=3) Cout("vDataFile::InitializeFile() ..." << endl); 00328 00329 // Instantiate as many file streams as threads 00330 m2p_dataFile = new fstream*[mp_ID->GetNbThreadsForProjection()]; 00331 for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++) 00332 { 00333 m2p_dataFile[th] = new fstream( m_dataFileName.c_str(), ios::binary| ios::in ); 00334 // If the datafile does not exist, notify to the user and exit program. 00335 if (!m2p_dataFile[th]->is_open()) 00336 { 00337 Cerr("***** vDataFile::InitializeFile() -> Error reading the input data file at the path: '" << m_dataFileName.c_str() << "'" << endl); 00338 Cerr(" (provided in the data file header: " << m_headerFileName << ")" << endl); 00339 return 1; 00340 } 00341 } 00342 // Check file size consistency here (this is a pure virtual function implemented by children) 00343 if (CheckFileSizeConsistency()) 00344 { 00345 Cerr("***** vDataFile::InitializeFile() -> A problem occured while checking file size consistency !" << endl); 00346 return 1; 00347 } 00348 // Initialization of the buffer related to direct data reading from file (during reconstruction) 00349 m2p_bufferEventFromFile = new char*[mp_ID->GetNbThreadsForProjection()]; 00350 for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++) 00351 { 00352 m2p_bufferEventFromFile[th] = new char[m_sizeEvent]; 00353 } 00354 // Initialization of the boolean tab used to know if any thread is currently reading into the file buffer 00355 mp_currentlyReadingBuffer = (bool*)malloc(mp_ID->GetNbThreadsForProjection()*sizeof(bool)); 00356 for (int th=0; th<mp_ID->GetNbThreadsForProjection(); th++) mp_currentlyReadingBuffer[th] = false; 00357 // Initialization of the boolean tab used to know if some threads are under the file buffer range 00358 mp_overBufferRange = (bool*)malloc(mp_ID->GetNbThreadsForProjection()*sizeof(bool)); 00359 for (int th=0; th<mp_ID->GetNbThreadsForProjection(); th++) mp_overBufferRange[th] = false; 00360 // Initialization of the counters for how many events are read directly from file 00361 mp_nbEventsReadFromFile = (int64_t*)malloc(mp_ID->GetNbThreadsForProjection()*sizeof(int64_t)); 00362 for (int th=0; th<mp_ID->GetNbThreadsForProjection(); th++) mp_nbEventsReadFromFile[th] = 0; 00363 00364 // ------------------ Compute here the piece of file that each MPI instance manages -------------------- 00365 00366 // 1. Compute the number of events that each instance has to manage 00367 int64_t instance_size = m_totalNbEvents / mp_ID->GetMPISize(); 00368 // All instances manage an equal part of the file but the last one which also manages the few last events 00369 if (mp_ID->GetMPIRank()!=mp_ID->GetMPISize()-1) m_mpiNbEvents = instance_size; 00370 else m_mpiNbEvents = instance_size + (m_totalNbEvents - instance_size*mp_ID->GetMPISize()); 00371 // 2. Compute the first event managed by the instance 00372 m_mpi1stEvent = mp_ID->GetMPIRank() * instance_size; 00373 // 3. Compute the last event managed by the instance 00374 m_mpiLastEvent = m_mpi1stEvent + m_mpiNbEvents - 1; 00375 00376 // Instanciation of the array of preloaded events (if enabled) 00377 if (m_percentageLoad>0) 00378 { 00379 if (m_verbose>=3) 00380 { 00381 Cout(" --> Allocating memory for the datafile" << endl); 00382 Cout(" --> Number of events in memory: " << m_mpiNbEvents << endl); 00383 } 00384 // Computation of the exact size of the buffer of preloaded events from the provided percentage load 00385 m_sizeArrayEvents = (((int64_t)m_percentageLoad) * m_mpiNbEvents) / 100; 00386 // Compute the number of bytes 00387 int64_t nb_bytes_to_load = m_sizeArrayEvents*m_sizeEvent; 00388 // Verbose to say that we proceed to the first reading of the datafile 00389 if (m_verbose>=3) Cout(" --> Load of " << m_percentageLoad << "% of the datafile (" << nb_bytes_to_load << " bytes)" << endl); 00390 // Instantiate the array of events in which the file content is directly copied 00391 mp_arrayEvents = new char [nb_bytes_to_load]; 00392 } 00393 else m_sizeArrayEvents = 0; 00394 00395 // End 00396 return 0; 00397 } 00398 00399 // ===================================================================== 00400 // --------------------------------------------------------------------- 00401 // --------------------------------------------------------------------- 00402 // ===================================================================== 00403 00404 void vDataFile::ResetBufferRange() 00405 { 00406 // Reset the range indices only if the percentage is strictly between 0 and 100. 00407 // For the case 0, it is simply a security. 00408 // For the case 100, it would force the buffer to be re-read again... 00409 // The purpose of this function is to be called before a loop over the datafile, so that all 00410 // threads will have to deal with event indices always inside or above the range (never under). 00411 // This is important to avoid that the thread 0 filling the buffer is ahead of other threads, leading to the other threads 00412 // not benefiting from the file buffer. There is a mechanism in the GetEvent() function so that the thread 0 waits for the 00413 // other threads to be over the buffer range before loading a new portion of the data file into the buffer. 00414 if (m_percentageLoad>0 && m_percentageLoad<100) 00415 { 00416 m_1stIdxArrayEvents = -1; 00417 m_lastIdxArrayEvents = -1; 00418 } 00419 } 00420 00421 // ===================================================================== 00422 // --------------------------------------------------------------------- 00423 // --------------------------------------------------------------------- 00424 // ===================================================================== 00425 00426 int vDataFile::FillBuffer(int64_t a_eventIndex) 00427 { 00428 #ifdef CASTOR_VERBOSE 00429 if (m_verbose >=4) Cout("vDataFile::FillBuffer() ..." << endl); 00430 #endif 00431 00432 // Important note: m_mpiLastEvent/m_lastIdxArrayEvent are the last event indices INCLUDED in the instance/buffer 00433 00434 // Note: This function is called from the GetEvent() function inside the iterations, but only by thread #0 00435 int thread_0 = 0; 00436 00437 // Seek to the appropriate position in the input data file 00438 m2p_dataFile[thread_0]->seekg(m_sizeEvent*a_eventIndex); 00439 00440 // Compute the last event index that can be loaded into the buffer (included; not excluded) 00441 int64_t last_included_event_that_can_be_loaded = a_eventIndex + m_sizeArrayEvents - 1; 00442 00443 // This is the number of events that will be loaded 00444 int64_t nb_events_to_load = 0; 00445 // If the number of remaining events to be loaded is less than the buffer capacity, we only load until m_mpiLastEvent is reached 00446 if (last_included_event_that_can_be_loaded > m_mpiLastEvent) nb_events_to_load = m_mpiLastEvent + 1 - a_eventIndex; 00447 // Otherwise we fully load the buffer 00448 else nb_events_to_load = m_sizeArrayEvents; 00449 00450 // Read the data 00451 m2p_dataFile[thread_0]->read(mp_arrayEvents, nb_events_to_load*m_sizeEvent); 00452 00453 // Todo: histogram/SPECT only!!!!!!!!!!!!!! 00454 //Shuffle( nb_events_to_load ); 00455 00456 // Check if the reading was successfull 00457 if (!(*m2p_dataFile[thread_0])) 00458 { 00459 Cerr("***** vDataFile::FillBuffer() -> Failed to read " << nb_events_to_load << " events in datafile (only read " << m2p_dataFile[thread_0]->gcount() << " events) !" << endl); 00460 return 1; 00461 } 00462 00463 // Update the buffer first and last indices 00464 m_1stIdxArrayEvents = a_eventIndex; 00465 m_lastIdxArrayEvents = m_1stIdxArrayEvents + nb_events_to_load - 1; 00466 00467 // Verbose 00468 #ifdef CASTOR_VERBOSE 00469 if (m_verbose >=4) Cout("vDataFile::FillBuffer() completed" << endl); 00470 #endif 00471 00472 // End 00473 return 0; 00474 } 00475 00476 // ===================================================================== 00477 // --------------------------------------------------------------------- 00478 // --------------------------------------------------------------------- 00479 // ===================================================================== 00480 00481 int vDataFile::Shuffle( int64_t nb_events_to_load ) 00482 { 00483 // Buffer storing ordered indices from ( 0 ) to ( nb_events_to_load - 1 ) 00484 int64_t *rndmIdx = new int64_t[ nb_events_to_load ]; 00485 std::iota( rndmIdx, rndmIdx + nb_events_to_load, 0 ); 00486 00487 // Initializing random 00488 std::random_device rd; 00489 std::mt19937 rndm( rd() ); 00490 rndm.seed( 1100001001 ); 00491 00492 // Shuffling the buffer of indices 00493 std::shuffle( rndmIdx, rndmIdx + nb_events_to_load, rndm ); 00494 00495 // Creating a tmp buffer 00496 char *mp_arrayEvents_tmp = new char[ nb_events_to_load*m_sizeEvent ]; 00497 00498 // Copy sorted buffer to shuffled buffer 00499 for( int64_t i = 0; i < nb_events_to_load; ++i ) 00500 { 00501 for( int64_t j = 0; j < m_sizeEvent; ++j ) 00502 { 00503 mp_arrayEvents_tmp[ i * m_sizeEvent + j ] = mp_arrayEvents[ rndmIdx[ i ] * m_sizeEvent + j ]; 00504 } 00505 } 00506 00507 // Freeing memory 00508 delete[] rndmIdx; 00509 delete[] mp_arrayEvents; 00510 00511 mp_arrayEvents = mp_arrayEvents_tmp; 00512 00513 return 0; 00514 } 00515 00516 // ===================================================================== 00517 // --------------------------------------------------------------------- 00518 // --------------------------------------------------------------------- 00519 // ===================================================================== 00520 00521 vEvent* vDataFile::GetEventWithAscendingOrderAssumption(int64_t a_eventIndex, int64_t a_eventIndexLimit, int a_th) 00522 { 00523 // With the assumption that this function is called from a loop over event indices in an ascending order, the implementation 00524 // is as follows: 00525 // - The ResetBufferRange() function MUST be called before launching the loop over event indices 00526 // - Thread 0 is a particular case: it is the only one able to fill the buffer, and to do that it waits for the other threads to be above the current range. 00527 // - The other threads read directly from the file if the first thread is currently filling the buffer, or if below or above the current range. In the latter 00528 // case, they set a boolean flag saying they are above so that the thread 0 can know it. Otherwise they read from the buffer. 00529 // See the comments at the beginning of the GetEventWithoutOrderAssumption() function for details about motivations, etc. 00530 00531 #ifdef CASTOR_VERBOSE 00532 if (m_verbose >=4) Cout("vDataFile::GetEventWithAscendingOrderAssumption() ..." << endl); 00533 #endif 00534 00535 // Particular case for an empty buffer (-load 0): read the event directly from the file, whatever the thread number 00536 if (m_sizeArrayEvents==0) return GetEventFromFile(a_eventIndex, a_th); 00537 00538 // Special treatment for the first thread (it is the only one allowed to refill the buffer) 00539 if (a_th==0) 00540 { 00541 // If the event index is below the current indices range loaded in memory, it is not normal as only this thread refills the buffer 00542 if (a_eventIndex<m_1stIdxArrayEvents) 00543 { 00544 Cerr("***** vDataFile::GetEventWithAscendingOrderAssumption() -> The requested event index is below the current range of indices loaded in memory !" << endl); 00545 Cerr(" --> Note that this situation should never happen if this function is being called within an ascending ordered loop on event indices, AND if" << endl); 00546 Cerr(" the vDataFile::ResetBufferRange() function has been called before performing the loop. So if this is really the case, then please report" << endl); 00547 Cerr(" the bug." << endl); 00548 return NULL; 00549 } 00550 // Else if the event is above the current indices range, then this first thread has to refill the buffer 00551 else if (a_eventIndex>m_lastIdxArrayEvents) 00552 { 00553 // Here, the thread has to wait for all others to be over the current buffer range 00554 bool all_threads_over_range = true; 00555 for (int th=1; th<mp_ID->GetNbThreadsForProjection(); th++) 00556 all_threads_over_range = all_threads_over_range && mp_overBufferRange[th]; 00557 while (!all_threads_over_range) 00558 { 00559 all_threads_over_range = true; 00560 for (int th=1; th<mp_ID->GetNbThreadsForProjection(); th++) 00561 all_threads_over_range = all_threads_over_range && mp_overBufferRange[th]; 00562 } 00563 // Now we are good, so we notify that the first thread is currently filling the buffer 00564 m_currentlyFillingBuffer = true; 00565 // Now the first thread can safely fill the buffer from the current event index, because all threads are above the range and they know the first 00566 // thread will refill the buffer 00567 if (FillBuffer(a_eventIndex)) 00568 { 00569 Cerr("***** vDataFile::GetEventWithAscendingOrderAssumption() -> An error occured while filling the buffer from datafile !" << endl); 00570 return NULL; 00571 } 00572 // Say that the first thread is done filling the buffer 00573 m_currentlyFillingBuffer = false; 00574 } 00575 // Get address of the event to recover from the array buffer 00576 char* event_address_in_array = mp_arrayEvents + (a_eventIndex-m_1stIdxArrayEvents)*m_sizeEvent; 00577 // And read from the buffer 00578 return GetEventFromBuffer(event_address_in_array, a_th); 00579 } 00580 // For the other threads 00581 else 00582 { 00583 // The event pointer that will be returned 00584 vEvent* p_event; 00585 // We first check if the thread 0 is currently filling the buffer; in that case then read from file 00586 if (m_currentlyFillingBuffer) 00587 { 00588 // In this case, we still say that the thread is over the buffer range. In case this thread finishes its part of the loop 00589 // while the first thread is filling the buffer, the first thread won't be blocked into waiting for all threads to be over 00590 // the range. 00591 mp_nbEventsReadFromFile[a_th]++; 00592 p_event = GetEventFromFile(a_eventIndex, a_th); 00593 } 00594 // Else if the event index is below the current indices range 00595 else if (a_eventIndex<m_1stIdxArrayEvents) 00596 { 00597 // This situation may still occur in some cases: right after a buffer filling and if the number of subsets is high. This 00598 // is because the new portion of the data file loaded by the thread 0 is not strictly contiguous to the previous portion 00599 // due to subsets and multi-threading (in case of a block thread sequence higher than 1 or a high number of threads). In 00600 // this case we read directly from the file. 00601 mp_overBufferRange[a_th] = false; 00602 mp_nbEventsReadFromFile[a_th]++; 00603 p_event = GetEventFromFile(a_eventIndex, a_th); 00604 } 00605 // Else if the event index is above the current indices range 00606 else if (a_eventIndex>m_lastIdxArrayEvents) 00607 { 00608 // We say that this thread is above the current range, so that the thread 0 knows that 00609 mp_overBufferRange[a_th] = true; 00610 // And we read directly from the file 00611 mp_nbEventsReadFromFile[a_th]++; 00612 p_event = GetEventFromFile(a_eventIndex, a_th); 00613 } 00614 // Else, we can safely read from the buffer 00615 else 00616 { 00617 // We check again here that the first thread is not reading into the buffer in case it started between now and the first check. 00618 // This allows to catch some potential bad behaviors in extrem conditions (a lot of threads with a very small load percentage). 00619 if (m_currentlyFillingBuffer) 00620 { 00621 mp_nbEventsReadFromFile[a_th]++; 00622 p_event = GetEventFromFile(a_eventIndex, a_th); 00623 } 00624 else 00625 { 00626 // Say that we are not above range 00627 mp_overBufferRange[a_th] = false; 00628 // Compute the adress into the buffer 00629 char* event_address_in_array = mp_arrayEvents + (a_eventIndex-m_1stIdxArrayEvents)*m_sizeEvent; 00630 // Get the event 00631 p_event = GetEventFromBuffer(event_address_in_array, a_th); 00632 } 00633 } 00634 // If the current index is over the provided limit index, we say that this thread is over the buffer range to avoid thread 0 to be stuck 00635 if (a_eventIndex >= a_eventIndexLimit) mp_overBufferRange[a_th] = true; 00636 // Return the event 00637 return p_event; 00638 } 00639 } 00640 00641 // ===================================================================== 00642 // --------------------------------------------------------------------- 00643 // --------------------------------------------------------------------- 00644 // ===================================================================== 00645 00646 vEvent* vDataFile::GetEventWithoutOrderAssumption(int64_t a_eventIndex, int a_th) 00647 { 00648 // Without any assumption about the order of event indices, the implementation of this function is quite 00649 // tricky when we want genericity and efficiency! We absolutely want that only one thread deals with the 00650 // buffer filling (the first thread) while others are not blocked during this process and can still read 00651 // events directly from the file. Under these restrictions, multiple strategies were tested, but the current 00652 // one is the most efficient and stable one. The most stressful situation for this function is when using 00653 // many threads with a small load percentage. The use of OpenMP locks, in many different ways has always 00654 // lead to rare occurences (1-2%) of segmentation fault (load percentage typically between 1 to 10 and 16 00655 // threads), which could not be explained but also not tolerated... The current implementation uses home-made 00656 // locks at multiple levels but which does not hamper the efficiency. It has been validated with intense testing 00657 // (1500 reconstructions of the PET benchmark with a number of threads randomly chosen between 50 to 120 and a load 00658 // percentage of 2%; note that if somebody has such a powerfull computer with such a small amount of RAM, it would 00659 // be quite a paradoxal situation, but nonetheless, we wanted to stress this function at most). The results are: 00660 // 1 segmentation fault and 2 runs with a few pixels variating up to 0.2% (basically, it is one event at one moment 00661 // that succeeds in passing between the test of m_requestBufferFilling and the affectation of mp_currentlyReadingBuffer, 00662 // so that it runs through the GetEventFromBuffer function while the first thread also succeed to pass between the 00663 // checks and is filling the buffer at the same time; plus in order to give a corrupted event, the buffer has to be 00664 // changed right before the thread read at this memory location; with such rare stuff in such rare conditions, we feel 00665 // this function is sufficiently robust with respect to the restrictions we assume at the beginning; still, if 00666 // someone has something better, we take it!). 00667 // Anyway, if you want to get events inside an ascending ordered loop over events, then use the GetEventWithAscendingOrderAssumption() 00668 // function instead. It is more efficient and has the same failure rate in the same chaotic conditions (a lot of threads with a very 00669 // small load percentage. But do not forget to call the ResetBufferRange() function before the loop. However, note that this function 00670 // is not yet compatible with the use of MPI. 00671 00672 #ifdef CASTOR_VERBOSE 00673 if (m_verbose >=4) Cout("vDataFile::GetEventWithoutOrderAssumption() ..." << endl); 00674 #endif 00675 00676 // Particular case for an empty buffer (-load 0): read the event directly from the file, whatever the thread number 00677 if (m_sizeArrayEvents==0) return GetEventFromFile(a_eventIndex, a_th); 00678 00679 // Special treatment for the first thread (it is the only one allowed to refill the buffer) 00680 if (a_th==0) 00681 { 00682 // If the event is outside the current buffer, then the thread refills the buffer 00683 if (a_eventIndex<m_1stIdxArrayEvents || a_eventIndex>m_lastIdxArrayEvents) 00684 { 00685 // Say that the first thread is requesting to fill the buffer 00686 m_requestBufferFilling = true; 00687 // Now this thread will wait for all other threads to declare that they are not currently reading into the buffer (otherwise 00688 // the reading of the event by the other threads may be corrupted). The other thread are now aware that the first thread is 00689 // waiting so they will not read from the buffer once finished their current reading. 00690 bool all_threads_not_reading_buffer = true; 00691 for (int th=1; th<mp_ID->GetNbThreadsForProjection(); th++) 00692 all_threads_not_reading_buffer = all_threads_not_reading_buffer && !mp_currentlyReadingBuffer[th]; 00693 while (!all_threads_not_reading_buffer) 00694 { 00695 all_threads_not_reading_buffer = true; 00696 for (int th=1; th<mp_ID->GetNbThreadsForProjection(); th++) 00697 all_threads_not_reading_buffer = all_threads_not_reading_buffer && !mp_currentlyReadingBuffer[th]; 00698 } 00699 // We are good, so say that the first thread is currently filling the buffer (this is needed below, see explanations) 00700 m_currentlyFillingBuffer = true; 00701 // Now the first thread can safely fill the buffer from the current event index, because as all threads know it was waiting for 00702 // filling the buffer, they know read directly from the file 00703 if (FillBuffer(a_eventIndex)) 00704 { 00705 Cerr("***** vDataFile::GetEventWithoutOrderAssumption() -> An error occured while filling the buffer from datafile !" << endl); 00706 return NULL; 00707 } 00708 // Say that the first thread is done filling the buffer 00709 m_currentlyFillingBuffer = false; 00710 // No more request to fill the buffer 00711 m_requestBufferFilling = false; 00712 } 00713 // Get address of the event to recover from the array buffer 00714 char* event_address_in_array = mp_arrayEvents + (a_eventIndex-m_1stIdxArrayEvents)*m_sizeEvent; 00715 // And read from the buffer 00716 return GetEventFromBuffer(event_address_in_array, a_th); 00717 } 00718 // For the other threads 00719 else 00720 { 00721 // If out of buffer range, then read from file (we must check at first if the first thread is currently filling the buffer, otherwise 00722 // the range 1st and last indices may not reflect the actual range; then we cannot use here the m_requestBufferFilling flag instead, 00723 // because some compiler optimization might merge this if section with the next one, and we absolutely need the next one to be alone, 00724 // and right befre the first instruction of the else section setting the mp_currentlyReadingBuffer[a_th] flag to true. 00725 if (m_currentlyFillingBuffer || a_eventIndex<m_1stIdxArrayEvents || a_eventIndex>m_lastIdxArrayEvents) 00726 { 00727 mp_nbEventsReadFromFile[a_th]++; 00728 return GetEventFromFile(a_eventIndex, a_th); 00729 } 00730 // Else if request filling, then read from file (this check has to be separated from the one above, otherwise if there is no buffer 00731 // filling request, between the time this thread do the checks and set the currentlyReadingBuffer to true, the thread0 may have 00732 // enaugh time to start the filling of the buffer, creating erroneous lines, also may be due to some compiler optimizations reversing 00733 // some checks; at least, this version is stable) 00734 else if (m_requestBufferFilling) 00735 { 00736 mp_nbEventsReadFromFile[a_th]++; 00737 return GetEventFromFile(a_eventIndex, a_th); 00738 } 00739 // Else, we can safely read from the buffer 00740 else 00741 { 00742 // Say that this thread is reading into the buffer 00743 mp_currentlyReadingBuffer[a_th] = true; 00744 // Compute the adress into the buffer 00745 char* event_address_in_array = mp_arrayEvents + (a_eventIndex-m_1stIdxArrayEvents)*m_sizeEvent; 00746 // Get the event 00747 vEvent* p_event = GetEventFromBuffer(event_address_in_array, a_th); 00748 // Say that the thread has done reading into the buffer 00749 mp_currentlyReadingBuffer[a_th] = false; 00750 // Return the event 00751 return p_event; 00752 } 00753 } 00754 } 00755 00756 // ===================================================================== 00757 // --------------------------------------------------------------------- 00758 // --------------------------------------------------------------------- 00759 // ===================================================================== 00760 00761 vEvent* vDataFile::GetEventFromFile(int64_t a_eventIndex, int a_th) 00762 { 00763 #ifdef CASTOR_VERBOSE 00764 if (m_verbose >=4) Cout("vDataFile::GetEventFromFile() ..." << endl); 00765 #endif 00766 00767 // Seek to the position and read the event 00768 m2p_dataFile[a_th]->seekg((int64_t)m_sizeEvent*a_eventIndex); 00769 m2p_dataFile[a_th]->read(m2p_bufferEventFromFile[a_th], m_sizeEvent); 00770 // Check the reading 00771 if (!(*m2p_dataFile[a_th])) 00772 { 00773 Cerr("***** vDataFile::GetEventFromFile() -> Failed to read one event in datafile !" << endl); 00774 return NULL; 00775 } 00776 // Pass the pointer to the event to the interpretor function (implemented by children) 00777 return GetEventFromBuffer(m2p_bufferEventFromFile[a_th], a_th); 00778 } 00779 00780 // ===================================================================== 00781 // --------------------------------------------------------------------- 00782 // --------------------------------------------------------------------- 00783 // ===================================================================== 00784 00785 void vDataFile::GetEventIndexStartAndStop( int64_t* ap_indexStart, int64_t* ap_indexStop, int a_subsetNum, int a_nbSubsets ) 00786 { 00787 #ifdef CASTOR_VERBOSE 00788 if (m_verbose >=4) Cout("vDataFile::GetEventIndexStartAndStop() ..." << endl); 00789 #endif 00790 // Basically here, the index start for a single MPI instance will be the current subset number. 00791 // If multiple instances are used, the whole datafile is split into equal-size concatenated pieces. 00792 // So for each instance, we just have to found the first index falling in its range (assuming that 00793 // the event index step is always equal to the number of subsets). 00794 00795 // For the first machine, index start is the current subset number 00796 if (mp_ID->GetMPIRank()==0) *ap_indexStart = a_subsetNum; 00797 // For the other machines, we must search for the first index falling in their range belonging to this 00798 // subset (a subset being starting at a_subsetNum with a step equal to the number of subsets, a_nbSubsets) 00799 else 00800 { 00801 // Compute the modulo of the first index of this machine minus the subset number with respect to the number of subsets 00802 int64_t modulo = (m_mpi1stEvent-a_subsetNum) % a_nbSubsets; 00803 // If this modulo is null, then the index start is the first index 00804 if (modulo==0) *ap_indexStart = m_mpi1stEvent; 00805 // Otherwise, the index start is equal to the first index plus the number of subsets minus the modulo 00806 else *ap_indexStart = m_mpi1stEvent + (a_nbSubsets - modulo); 00807 } 00808 00809 // For index stop, we simply get the last event of the MPI instance (+1 because the for loop is exclusive) 00810 *ap_indexStop = m_mpiLastEvent + 1; 00811 } 00812 00813 // ===================================================================== 00814 // --------------------------------------------------------------------- 00815 // --------------------------------------------------------------------- 00816 // ===================================================================== 00817 00818 int vDataFile::GetMaxRingDiff() 00819 { 00820 Cerr("*****vDataFile::GetMaxRingDiff() ->This function is not implemented for the used system" << endl); 00821 Cerr(" (this error may be prompted if the present function is erroneously called for a SPECT system)" << endl); 00822 return -1; 00823 } 00824 00825 /************************************************ 00826 * PROJECTION SCRIPT FUNCTIONS 00827 ************************************************/ 00828 00829 // ===================================================================== 00830 // --------------------------------------------------------------------- 00831 // --------------------------------------------------------------------- 00832 // ===================================================================== 00833 /* 00834 \fn PROJ_WriteData() 00835 \brief Write/Merge chunk of data in a general data file. 00836 \todo check if some file manipulations are correctly done 00837 \todo adapt to the data loading/writing in RAM 00838 \return 0 if success, and positive value otherwise. 00839 */ 00840 int vDataFile::PROJ_WriteData() 00841 { 00842 if (m_verbose >=2) Cout("vDataFile::PROJ_WriteData() ..." << endl); 00843 00844 // Close all multithreaded datafiles before merging 00845 for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++) 00846 if (m2p_dataFile[th]) m2p_dataFile[th]->close(); 00847 00848 00849 // If the output file doesn't exist yet, simply rename the first temporary file name using the ouput file name 00850 if (!ifstream(m_dataFileName)) 00851 { 00852 // If only one projection is required (i.e one threads && no projection of frame or gates)... 00853 if(mp_ID->GetNbThreadsForProjection()==1 // No multithreading so no multiple tmp datafiles 00854 && mp_ID->GetNbTimeFrames()* 00855 mp_ID->GetSensNbRespGates()* 00856 mp_ID->GetSensNbCardGates() == 1) 00857 { 00858 // rename the first temporary file name to the global name 00859 string tmp_file_name = m_dataFileName + "_0"; 00860 00861 // ... then just rename the first temporary file name using the ouput file name 00862 rename(tmp_file_name.c_str(),m_dataFileName.c_str()); 00863 // no need to concatenate, so we leave here. 00864 return 0 ; 00865 } 00866 } 00867 00868 // Create the final output file which will concatenate the events inside the thread-specific data files 00869 ofstream merged_file(m_dataFileName.c_str(), ios::out | ios::binary | ios::app); 00870 00871 // Concatenation : generate input file ("data_file") to read the buffer of the thread-specific data files and store the information in the final output file 00872 for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++) 00873 { 00874 // Build thread file name 00875 stringstream ss; ss << th; 00876 string file_name = m_dataFileName; 00877 file_name.append("_").append(ss.str()); 00878 // Open it 00879 // Note SS: Maybe we can use the m2p_dataFile[th] here by just rewarding to the beginning of the file ? 00880 // Note TM: There were some issues involving the use of rdbuf and concatenation of the file in this case (ifstream were needed instead of fstream for some reasons) 00881 // But we should have another look on how the projection data writing works with the implementation of the new analytical simulator. 00882 ifstream data_file(file_name.c_str(), ios::binary | ios::in); 00883 00884 if (!data_file) 00885 { 00886 Cerr(endl); 00887 Cerr("***** vDataFile::PROJ_ConcatenateMthDatafile() -> Input temporary thread file '" << file_name << "' is missing or corrupted !" << endl); 00888 return 1; 00889 } 00890 00891 // Concatenate it to the merged file 00892 merged_file << data_file.rdbuf(); 00893 // Close file 00894 data_file.close(); 00895 00896 // Re-open datafiles (needed if projecting frame/gates, as the contents of the temporary datafile are copied to the main datafile after each frame/gate) 00897 m2p_dataFile[th]->open( file_name.c_str(), ios::binary | ios::out | ios::trunc); 00898 } 00899 00900 // Close merged file 00901 merged_file.close(); 00902 00903 return 0; 00904 } 00905 00906 00907 00908 00909 00910 00911 // ===================================================================== 00912 // --------------------------------------------------------------------- 00913 // --------------------------------------------------------------------- 00914 // ===================================================================== 00915 /* 00916 \fn PROJ_DeleteTmpDatafile() 00917 \brief Delete temporary datafile used for multithreaded output writing if needed 00918 \return 0 if success, and positive value otherwise. 00919 */ 00920 int vDataFile::PROJ_DeleteTmpDatafile() 00921 { 00922 if (m_verbose >=3) Cout("vDataFile::PROJ_DeleteTmpDatafile() ..." << endl); 00923 00924 // Generate input file ("data_file") to read the buffer of the thread-specific data files and store the information in the final output file 00925 for (int th=0 ; th<mp_ID->GetNbThreadsForProjection() ; th++) 00926 { 00927 // Build thread file name 00928 stringstream ss; ss << th; 00929 00930 if (m2p_dataFile[th]) m2p_dataFile[th]->close(); 00931 00932 string file_name = m_dataFileName; 00933 file_name.append("_").append(ss.str()); 00934 00935 // Remove temporary file for data output writing (Projection script only) 00936 ifstream fcheck(file_name.c_str()); 00937 if(fcheck.good()) 00938 { 00939 fcheck.close(); 00940 #ifdef _WIN32 00941 string dos_instruction = "del " + file_name; 00942 system(dos_instruction.c_str()); 00943 #else 00944 remove(file_name.c_str()); 00945 #endif 00946 } 00947 } 00948 00949 return 0; 00950 } 00951 00952 00953 00954 00955 // ===================================================================== 00956 // --------------------------------------------------------------------- 00957 // --------------------------------------------------------------------- 00958 // ===================================================================== 00959 /* 00960 \fn PROJ_GenerateEvent(int idx_elt1, int idx_elt2, int a_th) 00961 \param idx_elt1 : first ID of the event 00962 \param idx_elt2 : second ID of the event 00963 \param a_th : index of the thread from which the function was called 00964 \brief Generate a standard event and set up its ID 00965 Used by the projection, list-mode sensitivity generation, and datafile converter scripts 00966 \return the thread specific m2p_BufferEvent array containing the event 00967 */ 00968 vEvent* vDataFile::PROJ_GenerateEvent(int a_idxElt1, int a_idxElt2, int a_th) 00969 { 00970 #ifdef CASTOR_VERBOSE 00971 if (m_verbose >=4) Cout("vDataFile::PROJ_GenerateEvent() ..." << endl); 00972 #endif 00973 00974 // Only 1 line required for projection/sensitivity generation 00975 m2p_BufferEvent[a_th]->SetNbLines(1); 00976 m2p_BufferEvent[a_th]->SetID1(0, a_idxElt1); 00977 m2p_BufferEvent[a_th]->SetID2(0, a_idxElt2); 00978 00979 return m2p_BufferEvent[a_th]; 00980 }