|
Sierra Toolkit Version of the Day
|
00001 /*------------------------------------------------------------------------*/ 00002 /* Copyright 2010 Sandia Corporation. */ 00003 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */ 00004 /* license for use of this work by or on behalf of the U.S. Government. */ 00005 /* Export of this program may require a license from the */ 00006 /* United States Government. */ 00007 /*------------------------------------------------------------------------*/ 00008 00009 00010 #include <stdexcept> 00011 #include <sstream> 00012 #include <algorithm> 00013 #include <limits> 00014 #include <stdint.h> 00015 #include <stk_util/parallel/ParallelComm.hpp> 00016 #include <stk_util/parallel/DistributedIndex.hpp> 00017 00018 namespace stk { 00019 namespace parallel { 00020 00021 //---------------------------------------------------------------------- 00022 00023 namespace { 00024 00025 struct KeyProcLess { 00026 00027 bool operator()( const DistributedIndex::KeyProc & lhs , 00028 const DistributedIndex::KeyProc & rhs ) const 00029 { 00030 return lhs.first != rhs.first ? lhs.first < rhs.first 00031 : lhs.second < rhs.second ; 00032 } 00033 00034 bool operator()( const DistributedIndex::KeyProc & lhs , 00035 const DistributedIndex::KeyType & rhs ) const 00036 { return lhs.first < rhs ; } 00037 00038 }; 00039 00040 void sort_unique( std::vector<DistributedIndex::KeyProc> & key_usage ) 00041 { 00042 std::vector<DistributedIndex::KeyProc>::iterator 00043 i = key_usage.begin() , 00044 j = key_usage.end() ; 00045 00046 std::sort( i , j , KeyProcLess() ); 00047 00048 i = std::unique( i , j ); 00049 00050 key_usage.erase( i , j ); 00051 } 00052 00053 void sort_unique( std::vector<DistributedIndex::KeyType> & keys ) 00054 { 00055 std::vector<DistributedIndex::KeyType>::iterator 00056 i = keys.begin() , 00057 j = keys.end() ; 00058 00059 std::sort( i , j ); 00060 00061 i = std::unique( i , j ); 00062 00063 keys.erase( i , j ); 00064 } 00065 00066 } 00067 00068 //---------------------------------------------------------------------- 00069 00070 enum { DISTRIBUTED_INDEX_CHUNK_BITS = 12 }; 00071 00072 enum { DISTRIBUTED_INDEX_CHUNK_SIZE = 00073 size_t(1) << DISTRIBUTED_INDEX_CHUNK_BITS }; 00074 00075 DistributedIndex::ProcType 00076 DistributedIndex::to_which_proc( const DistributedIndex::KeyType & key ) const 00077 { 00078 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00079 if ( m_key_span[i].first <= key && key <= m_key_span[i].second ) { 00080 const KeyType offset = key - m_key_span[i].first ; 00081 return ( offset >> DISTRIBUTED_INDEX_CHUNK_BITS ) % m_comm_size ; 00082 } 00083 } 00084 return m_comm_size ; 00085 } 00086 00087 //---------------------------------------------------------------------- 00088 00089 DistributedIndex::~DistributedIndex() {} 00090 00091 DistributedIndex::DistributedIndex ( 00092 ParallelMachine comm , 00093 const std::vector<KeySpan> & partition_bounds ) 00094 : m_comm( comm ), 00095 m_comm_rank( parallel_machine_rank( comm ) ), 00096 m_comm_size( parallel_machine_size( comm ) ), 00097 m_span_count(0), 00098 m_key_span(), 00099 m_key_usage() 00100 { 00101 unsigned info[2] ; 00102 info[0] = partition_bounds.size(); 00103 info[1] = 0 ; 00104 00105 // Check each span for validity 00106 00107 for ( std::vector<KeySpan>::const_iterator 00108 i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) { 00109 if ( i->second < i->first || 00110 ( i != partition_bounds.begin() && i->first <= (i-1)->second ) ) { 00111 info[1] = 1 ; 00112 } 00113 } 00114 00115 #if defined( STK_HAS_MPI ) 00116 MPI_Bcast( info , 2 , MPI_UNSIGNED , 0 , comm ); 00117 00118 if ( 0 < info[0] ) { 00119 m_key_span.resize( info[0] ); 00120 if ( 0 == parallel_machine_rank( comm ) ) { 00121 m_key_span = partition_bounds ; 00122 } 00123 MPI_Bcast( & m_key_span[0], info[0] * sizeof(KeySpan), MPI_BYTE, 0, comm ); 00124 } 00125 #else 00126 m_key_span = partition_bounds ; 00127 #endif 00128 00129 if ( info[1] ) { 00130 std::ostringstream msg ; 00131 msg << "sierra::parallel::DistributedIndex ctor( comm , " ; 00132 00133 for ( std::vector<KeySpan>::const_iterator 00134 i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) { 00135 msg << " ( min = " << i->first << " , max = " << i->second << " )" ; 00136 } 00137 msg << " ) contains invalid span of keys" ; 00138 throw std::runtime_error( msg.str() ); 00139 } 00140 00141 m_span_count = info[0] ; 00142 00143 if ( 0 == m_span_count ) { 00144 m_key_span.push_back( 00145 KeySpan( std::numeric_limits<KeyType>::min(), 00146 std::numeric_limits<KeyType>::max() ) ); 00147 m_span_count = 1 ; 00148 } 00149 } 00150 00151 //---------------------------------------------------------------------- 00152 //---------------------------------------------------------------------- 00153 00154 namespace { 00155 00156 void query_pack_to_usage( 00157 const std::vector<DistributedIndex::KeyProc> & key_usage , 00158 const std::vector<DistributedIndex::KeyType> & request , 00159 CommAll & all ) 00160 { 00161 std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin(); 00162 std::vector<DistributedIndex::KeyType>::const_iterator k = request.begin(); 00163 00164 for ( ; k != request.end() && i != key_usage.end() ; ++k ) { 00165 00166 for ( ; i != key_usage.end() && i->first < *k ; ++i ); 00167 00168 std::vector<DistributedIndex::KeyProc>::const_iterator j = i ; 00169 for ( ; j != key_usage.end() && j->first == *k ; ++j ); 00170 00171 for ( std::vector<DistributedIndex::KeyProc>::const_iterator 00172 jsend = i ; jsend != j ; ++jsend ) { 00173 00174 for ( std::vector<DistributedIndex::KeyProc>::const_iterator 00175 jinfo = i ; jinfo != j ; ++jinfo ) { 00176 00177 all.send_buffer( jsend->second ) 00178 .pack<DistributedIndex::KeyProc>( *jinfo ); 00179 } 00180 } 00181 } 00182 } 00183 00184 void query_pack( const std::vector<DistributedIndex::KeyProc> & key_usage , 00185 const std::vector<DistributedIndex::KeyProc> & request , 00186 CommAll & all ) 00187 { 00188 std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin(); 00189 00190 for ( std::vector<DistributedIndex::KeyProc>::const_iterator 00191 k = request.begin() ; 00192 k != request.end() && 00193 i != key_usage.end() ; ++k ) { 00194 00195 for ( ; i != key_usage.end() && i->first < k->first ; ++i ); 00196 00197 for ( std::vector<DistributedIndex::KeyProc>::const_iterator j = i ; 00198 j != key_usage.end() && j->first == k->first ; ++j ) { 00199 all.send_buffer( k->second ).pack<DistributedIndex::KeyProc>( *j ); 00200 } 00201 } 00202 } 00203 00204 } 00205 00206 void DistributedIndex::query( 00207 const std::vector<DistributedIndex::KeyProc> & request , 00208 std::vector<DistributedIndex::KeyProc> & sharing_of_keys ) const 00209 { 00210 sharing_of_keys.clear(); 00211 00212 CommAll all( m_comm ); 00213 00214 query_pack( m_key_usage , request , all ); // Sizing 00215 00216 all.allocate_buffers( m_comm_size / 4 , false ); 00217 00218 query_pack( m_key_usage , request , all ); // Packing 00219 00220 all.communicate(); 00221 00222 for ( ProcType p = 0 ; p < m_comm_size ; ++p ) { 00223 CommBuffer & buf = all.recv_buffer( p ); 00224 while ( buf.remaining() ) { 00225 KeyProc kp ; 00226 buf.unpack( kp ); 00227 sharing_of_keys.push_back( kp ); 00228 } 00229 } 00230 00231 std::sort( sharing_of_keys.begin() , sharing_of_keys.end() ); 00232 } 00233 00234 void DistributedIndex::query( 00235 std::vector<DistributedIndex::KeyProc> & sharing_of_local_keys ) const 00236 { 00237 query( m_key_usage , sharing_of_local_keys ); 00238 } 00239 00240 void DistributedIndex::query( 00241 const std::vector<DistributedIndex::KeyType> & keys , 00242 std::vector<DistributedIndex::KeyProc> & sharing_keys ) const 00243 { 00244 std::vector<KeyProc> request ; 00245 00246 { 00247 bool bad_key = false ; 00248 CommAll all( m_comm ); 00249 00250 for ( std::vector<KeyType>::const_iterator 00251 k = keys.begin() ; k != keys.end() ; ++k ) { 00252 const ProcType p = to_which_proc( *k ); 00253 00254 if ( p < m_comm_size ) { 00255 all.send_buffer( p ).pack<KeyType>( *k ); 00256 } 00257 else { 00258 bad_key = true ; 00259 } 00260 } 00261 00262 // Error condition becomes global: 00263 00264 bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key ); 00265 00266 if ( bad_key ) { 00267 throw std::runtime_error("stk::parallel::DistributedIndex::query given a key which is out of range"); 00268 } 00269 00270 for ( std::vector<KeyType>::const_iterator 00271 k = keys.begin() ; k != keys.end() ; ++k ) { 00272 all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k ); 00273 } 00274 00275 all.communicate(); 00276 00277 for ( ProcType p = 0 ; p < m_comm_size ; ++p ) { 00278 CommBuffer & buf = all.recv_buffer( p ); 00279 KeyProc kp ; 00280 kp.second = p ; 00281 while ( buf.remaining() ) { 00282 buf.unpack<KeyType>( kp.first ); 00283 request.push_back( kp ); 00284 } 00285 } 00286 } 00287 00288 sort_unique( request ); 00289 00290 query( request , sharing_keys ); 00291 } 00292 00293 void DistributedIndex::query_to_usage( 00294 const std::vector<DistributedIndex::KeyType> & keys , 00295 std::vector<DistributedIndex::KeyProc> & sharing_keys ) const 00296 { 00297 std::vector<KeyType> request ; 00298 00299 { 00300 bool bad_key = false ; 00301 CommAll all( m_comm ); 00302 00303 for ( std::vector<KeyType>::const_iterator 00304 k = keys.begin() ; k != keys.end() ; ++k ) { 00305 const ProcType p = to_which_proc( *k ); 00306 00307 if ( p < m_comm_size ) { 00308 all.send_buffer( p ).pack<KeyType>( *k ); 00309 } 00310 else { 00311 bad_key = true ; 00312 } 00313 } 00314 00315 // Error condition becomes global: 00316 00317 bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key ); 00318 00319 if ( bad_key ) { 00320 throw std::runtime_error("stk::parallel::DistributedIndex::query given a key which is out of range"); 00321 } 00322 00323 for ( std::vector<KeyType>::const_iterator 00324 k = keys.begin() ; k != keys.end() ; ++k ) { 00325 all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k ); 00326 } 00327 00328 all.communicate(); 00329 00330 for ( ProcType p = 0 ; p < m_comm_size ; ++p ) { 00331 CommBuffer & buf = all.recv_buffer( p ); 00332 KeyType key ; 00333 while ( buf.remaining() ) { 00334 buf.unpack<KeyType>( key ); 00335 request.push_back( key ); 00336 } 00337 } 00338 } 00339 00340 sort_unique( request ); 00341 00342 { 00343 CommAll all( m_comm ); 00344 00345 query_pack_to_usage( m_key_usage , request , all ); // Sizing 00346 00347 all.allocate_buffers( m_comm_size / 4 , false ); 00348 00349 query_pack_to_usage( m_key_usage , request , all ); // Packing 00350 00351 all.communicate(); 00352 00353 for ( ProcType p = 0 ; p < m_comm_size ; ++p ) { 00354 CommBuffer & buf = all.recv_buffer( p ); 00355 while ( buf.remaining() ) { 00356 KeyProc kp ; 00357 buf.unpack( kp ); 00358 sharing_keys.push_back( kp ); 00359 } 00360 } 00361 00362 std::sort( sharing_keys.begin() , sharing_keys.end() ); 00363 } 00364 } 00365 00366 //---------------------------------------------------------------------- 00367 //---------------------------------------------------------------------- 00368 00369 namespace { 00370 00371 struct RemoveKeyProc { 00372 00373 bool operator()( const DistributedIndex::KeyProc & kp ) const 00374 { return kp.second < 0 ; } 00375 00376 static void mark( std::vector<DistributedIndex::KeyProc> & key_usage , 00377 const DistributedIndex::KeyProc & kp ) 00378 { 00379 std::vector<DistributedIndex::KeyProc>::iterator 00380 i = std::lower_bound( key_usage.begin(), 00381 key_usage.end(), kp.first , KeyProcLess() ); 00382 while ( i != key_usage.end() && kp != *i ) { ++i ; } 00383 if ( i != key_usage.end() && kp == *i ) { 00384 i->second = -1 ; 00385 } 00386 } 00387 00388 static void clean( std::vector<DistributedIndex::KeyProc> & key_usage ) 00389 { 00390 std::vector<DistributedIndex::KeyProc>::iterator end = 00391 std::remove_if( key_usage.begin() , key_usage.end() , RemoveKeyProc() ); 00392 key_usage.erase( end , key_usage.end() ); 00393 } 00394 }; 00395 00396 } 00397 00398 void DistributedIndex::update_keys( 00399 const std::vector<DistributedIndex::KeyType> & add_new_keys , 00400 const std::vector<DistributedIndex::KeyType> & remove_existing_keys ) 00401 { 00402 std::vector<unsigned long> count_remove( m_comm_size , (unsigned long)0 ); 00403 std::vector<unsigned long> count_add( m_comm_size , (unsigned long)0 ); 00404 00405 size_t local_bad_input = 0 ; 00406 00407 for ( std::vector<KeyType>::const_iterator 00408 i = remove_existing_keys.begin(); 00409 i != remove_existing_keys.end(); ++i ) { 00410 const ProcType p = to_which_proc( *i ); 00411 if ( m_comm_size <= p ) { 00412 // Key is not within one of the span: 00413 ++local_bad_input ; 00414 } 00415 else if ( p != m_comm_rank ) { 00416 ++( count_remove[ p ] ); 00417 } 00418 } 00419 00420 for ( std::vector<KeyType>::const_iterator 00421 i = add_new_keys.begin(); 00422 i != add_new_keys.end(); ++i ) { 00423 00424 // Count 00425 00426 const ProcType p = to_which_proc( *i ); 00427 if ( p == m_comm_size ) { 00428 // Key is not within one of the span: 00429 ++local_bad_input ; 00430 } 00431 else if ( p != m_comm_rank ) { 00432 ++( count_add[ p ] ); 00433 } 00434 } 00435 00436 if ( 0 < local_bad_input ) { 00437 // If this process knows it will throw 00438 // then don't bother communicating the add and remove requests. 00439 count_remove.clear(); 00440 count_add.clear(); 00441 } 00442 00443 CommAll all( m_comm ); 00444 00445 // Sizing and add_new_keys bounds checking: 00446 00447 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00448 if ( count_remove[p] || count_add[p] ) { 00449 CommBuffer & buf = all.send_buffer( p ); 00450 buf.skip<unsigned long>( 1 ); 00451 buf.skip<KeyType>( count_remove[p] ); 00452 buf.skip<KeyType>( count_add[p] ); 00453 } 00454 } 00455 00456 // Allocate buffers and perform a global 00457 const bool symmetry_flag = false ; 00458 const bool error_flag = 0 < local_bad_input ; 00459 00460 bool global_bad_input = 00461 all.allocate_buffers( m_comm_size / 4, symmetry_flag , error_flag ); 00462 00463 if ( global_bad_input ) { 00464 std::ostringstream msg ; 00465 00466 if ( 0 < local_bad_input ) { 00467 msg << "stk::parallel::DistributedIndex::update_keys ERROR Given " 00468 << local_bad_input << " of " << add_new_keys.size() 00469 << " add_new_keys outside of any span" ; 00470 } 00471 00472 throw std::runtime_error( msg.str() ); 00473 } 00474 00475 // Packing: 00476 00477 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00478 if ( count_remove[p] || count_add[p] ) { 00479 all.send_buffer( p ).pack<unsigned long>( count_remove[p] ); 00480 } 00481 } 00482 00483 for ( std::vector<KeyType>::const_iterator 00484 i = remove_existing_keys.begin(); 00485 i != remove_existing_keys.end(); ++i ) { 00486 const ProcType p = to_which_proc( *i ); 00487 if ( p != m_comm_rank ) { 00488 all.send_buffer( p ).pack<KeyType>( *i ); 00489 } 00490 } 00491 00492 for ( std::vector<KeyType>::const_iterator 00493 i = add_new_keys.begin(); 00494 i != add_new_keys.end(); ++i ) { 00495 const ProcType p = to_which_proc( *i ); 00496 if ( p != m_comm_rank ) { 00497 all.send_buffer( p ).pack<KeyType>( *i ); 00498 } 00499 } 00500 00501 all.communicate(); 00502 00503 //------------------------------ 00504 // Remove for local keys 00505 00506 for ( std::vector<KeyType>::const_iterator 00507 i = remove_existing_keys.begin(); 00508 i != remove_existing_keys.end(); ++i ) { 00509 const ProcType p = to_which_proc( *i ); 00510 if ( p == m_comm_rank ) { 00511 KeyProc kp( *i , p ); 00512 RemoveKeyProc::mark( m_key_usage , kp ); 00513 } 00514 } 00515 00516 // Unpack the remove key and find it. 00517 // Set the process to a negative value for subsequent removal. 00518 00519 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00520 CommBuffer & buf = all.recv_buffer( p ); 00521 if ( buf.remaining() ) { 00522 unsigned long remove_count = 0 ; 00523 00524 KeyProc kp ; 00525 00526 kp.second = p ; 00527 00528 buf.unpack<unsigned long>( remove_count ); 00529 00530 for ( ; 0 < remove_count ; --remove_count ) { 00531 buf.unpack<KeyType>( kp.first ); 00532 00533 RemoveKeyProc::mark( m_key_usage , kp ); 00534 } 00535 } 00536 } 00537 00538 RemoveKeyProc::clean( m_key_usage ); 00539 00540 //------------------------------ 00541 // Append for local keys 00542 00543 for ( std::vector<KeyType>::const_iterator 00544 i = add_new_keys.begin(); 00545 i != add_new_keys.end(); ++i ) { 00546 00547 const ProcType p = to_which_proc( *i ); 00548 if ( p == m_comm_rank ) { 00549 KeyProc kp( *i , p ); 00550 m_key_usage.push_back( kp ); 00551 } 00552 } 00553 00554 // Unpack and append for remote keys: 00555 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00556 CommBuffer & buf = all.recv_buffer( p ); 00557 00558 KeyProc kp ; 00559 00560 kp.second = p ; 00561 00562 while ( buf.remaining() ) { 00563 buf.unpack<KeyType>( kp.first ); 00564 m_key_usage.push_back( kp ); 00565 } 00566 } 00567 00568 sort_unique( m_key_usage ); 00569 //------------------------------ 00570 } 00571 00572 //---------------------------------------------------------------------- 00573 //---------------------------------------------------------------------- 00574 00575 void DistributedIndex::generate_new_global_key_upper_bound( 00576 const std::vector<size_t> & requests , 00577 std::vector<DistributedIndex::KeyType> & global_key_upper_bound ) const 00578 { 00579 bool bad_request = m_span_count != requests.size(); 00580 00581 std::ostringstream error_msg ; 00582 00583 error_msg 00584 << "sierra::parallel::DistributedIndex::generate_new_keys_global_counts( " ; 00585 00586 std::vector<unsigned long> 00587 local_counts( m_span_count + 1 , (unsigned long) 0 ), 00588 global_counts( m_span_count + 1 , (unsigned long) 0 ); 00589 00590 // Count unique keys in each span and add requested keys for 00591 // final total count of keys needed. 00592 00593 // Append the error check to this communication to avoid 00594 // and extra reduction operation. 00595 local_counts[ m_span_count ] = m_span_count != requests.size(); 00596 00597 if ( m_span_count == requests.size() ) { 00598 00599 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00600 local_counts[i] = requests[i] ; 00601 } 00602 00603 std::vector<KeyProc>::const_iterator j = m_key_usage.begin(); 00604 00605 for ( size_t i = 0 ; i < m_span_count && j != m_key_usage.end() ; ++i ) { 00606 const KeyType key_span_last = m_key_span[i].second ; 00607 size_t count = 0 ; 00608 while ( j != m_key_usage.end() && j->first <= key_span_last ) { 00609 const KeyType key = j->first ; 00610 while ( j != m_key_usage.end() && key == j->first ) { ++j ; } 00611 ++count ; 00612 } 00613 local_counts[i] += count ; 00614 } 00615 } 00616 00617 #if defined( STK_HAS_MPI ) 00618 MPI_Allreduce( & local_counts[0] , & global_counts[0] , 00619 m_span_count + 1 , MPI_UNSIGNED_LONG , 00620 MPI_SUM , m_comm ); 00621 #else 00622 global_counts = local_counts ; 00623 #endif 00624 00625 bad_request = global_counts[m_span_count] != 0 ; 00626 00627 if ( bad_request ) { 00628 if ( m_span_count != requests.size() ) { 00629 error_msg << " requests.size() = " << requests.size() 00630 << " != " << m_span_count << " )" ; 00631 } 00632 } 00633 00634 if ( ! bad_request ) { 00635 for ( unsigned i = 0 ; i < m_span_count ; ++i ) { 00636 const size_t span_available = 00637 ( 1 + m_key_span[i].second - m_key_span[i].first ); 00638 00639 const size_t span_requested = global_counts[i]; 00640 00641 if ( span_available < span_requested ) { 00642 bad_request = true ; 00643 error_msg << " global_sum( (existing+request)[" << i << "] ) = " 00644 << span_requested 00645 << " > global_sum( span_available ) = " 00646 << span_available ; 00647 } 00648 } 00649 } 00650 00651 if ( bad_request ) { 00652 throw std::runtime_error( error_msg.str() ); 00653 } 00654 00655 // Determine the maximum generated key 00656 00657 global_key_upper_bound.resize( m_span_count ); 00658 00659 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00660 global_key_upper_bound[i] = m_key_span[i].first + global_counts[i] - 1 ; 00661 } 00662 } 00663 00664 //-------------------------------------------------------------------- 00665 //-------------------------------------------------------------------- 00666 00667 void DistributedIndex::generate_new_keys_local_planning( 00668 const std::vector<DistributedIndex::KeyType> & key_global_upper_bound , 00669 const std::vector<size_t> & requests_local , 00670 std::vector<long> & new_request , 00671 std::vector<KeyType> & requested_keys , 00672 std::vector<KeyType> & contrib_keys ) const 00673 { 00674 new_request.assign( m_span_count , long(0) ); 00675 00676 contrib_keys.clear(); 00677 00678 std::vector<KeyProc>::const_iterator j = m_key_usage.begin(); 00679 00680 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00681 // The maximum generated key from any process will 00682 // not exceed this value. 00683 const KeyType key_upper_bound = key_global_upper_bound[i] ; 00684 00685 const size_t init_size = contrib_keys.size(); 00686 00687 const size_t chunk_inc = m_comm_size * DISTRIBUTED_INDEX_CHUNK_SIZE ; 00688 00689 for ( KeyType key_begin = m_key_span[i].first + 00690 m_comm_rank * DISTRIBUTED_INDEX_CHUNK_SIZE ; 00691 key_begin <= key_upper_bound ; key_begin += chunk_inc ) { 00692 00693 // What is the first key of the chunk 00694 KeyType key_iter = key_begin ; 00695 00696 // What is the last key belonging to this process' chunk 00697 const KeyType key_last = 00698 std::min( key_begin + DISTRIBUTED_INDEX_CHUNK_SIZE - 1 , key_upper_bound ); 00699 00700 // Jump into the sorted used key vector to 00701 // the key which may be contributed 00702 00703 j = std::lower_bound( j, m_key_usage.end(), key_iter, KeyProcLess() ); 00704 // now know: j == m_key_usage.end() OR 00705 // key_iter <= j->first 00706 00707 for ( ; key_iter <= key_last ; ++key_iter ) { 00708 if ( j == m_key_usage.end() || key_iter < j->first ) { 00709 // The current attempt 'key_iter' is not used, contribute it. 00710 contrib_keys.push_back( key_iter ); 00711 } 00712 else { // j != m_key_usage.end() && key_iter == j->first 00713 // The current attempt 'key_iter' is already used, 00714 // increment the used-iterator to its next key value. 00715 while ( j != m_key_usage.end() && key_iter == j->first ) { 00716 ++j ; 00717 } 00718 } 00719 } 00720 } 00721 00722 // Determine which local keys will be contributed, 00723 // keeping what this process could use from the contribution. 00724 // This can reduce the subsequent communication load when 00725 // donating keys to another process. 00726 00727 const size_t this_contrib = contrib_keys.size() - init_size ; 00728 00729 // How many keys will this process keep: 00730 const size_t keep = std::min( requests_local[i] , this_contrib ); 00731 00732 // Take the kept keys from the contributed key vector. 00733 requested_keys.insert( requested_keys.end() , 00734 contrib_keys.end() - keep , 00735 contrib_keys.end() ); 00736 00737 contrib_keys.erase( contrib_keys.end() - keep , 00738 contrib_keys.end() ); 00739 00740 // New request is positive for needed keys or negative for donated keys 00741 new_request[i] = requests_local[i] - this_contrib ; 00742 } 00743 } 00744 00745 //---------------------------------------------------------------------- 00746 00747 void DistributedIndex::generate_new_keys_global_planning( 00748 const std::vector<long> & new_request , 00749 std::vector<long> & my_donations ) const 00750 { 00751 my_donations.assign( m_comm_size * m_span_count , long(0) ); 00752 00753 // Gather the global request plan for receiving and donating keys 00754 // Positive values for receiving, negative values for donating. 00755 00756 std::vector<long> new_request_global( m_comm_size * m_span_count ); 00757 00758 #if defined( STK_HAS_MPI ) 00759 00760 { // Gather requests into per-process spans 00761 // MPI doesn't do 'const' in its interface, but the send buffer is const 00762 void * send_buf = const_cast<void*>( (void *)( & new_request[0] )); 00763 void * recv_buf = & new_request_global[0] ; 00764 MPI_Allgather( send_buf , m_span_count , MPI_LONG , 00765 recv_buf , m_span_count , MPI_LONG , m_comm ); 00766 } 00767 #else 00768 new_request_global = new_request ; 00769 #endif 00770 00771 // Now have the global receive & donate plan. 00772 //-------------------------------------------------------------------- 00773 // Generate my donate plan from the global receive & donate plan. 00774 00775 for ( unsigned i = 0 ; i < m_span_count ; ++i ) { 00776 00777 if ( new_request[i] < 0 ) { // This process is donating on this span 00778 long my_total_donate = - new_request[i] ; 00779 00780 long previous_donate = 0 ; 00781 00782 // Count what previous processes have donated: 00783 for ( int p = 0 ; p < m_comm_rank ; ++p ) { 00784 const long new_request_p = new_request_global[ p * m_span_count + i ] ; 00785 if ( new_request_p < 0 ) { 00786 previous_donate -= new_request_p ; 00787 } 00788 } 00789 00790 // What the donation count will be with my donation: 00791 long end_donate = previous_donate + my_total_donate ; 00792 00793 long previous_receive = 0 ; 00794 00795 // Determine my donation to other processes (one to many). 00796 00797 for ( int p = 0 ; p < m_comm_size && 0 < my_total_donate ; ++p ) { 00798 00799 const long new_request_p = new_request_global[ p * m_span_count + i ]; 00800 00801 if ( 0 < new_request_p ) { // Process 'p' receives keys 00802 00803 // Accumulation of requests: 00804 00805 previous_receive += new_request_p ; 00806 00807 if ( previous_donate < previous_receive ) { 00808 // I am donating to process 'p' 00809 const long n = std::min( previous_receive , end_donate ) 00810 - previous_donate ; 00811 00812 my_donations[ p * m_span_count + i ] = n ; 00813 previous_donate += n ; 00814 my_total_donate -= n ; 00815 } 00816 } 00817 } 00818 } 00819 } 00820 } 00821 00822 //-------------------------------------------------------------------- 00823 00824 void DistributedIndex::generate_new_keys( 00825 const std::vector<size_t> & requests , 00826 std::vector< std::vector<KeyType> > & requested_keys ) 00827 { 00828 //-------------------------------------------------------------------- 00829 // Develop the plan: 00830 00831 std::vector<KeyType> global_key_upper_bound ; 00832 std::vector<long> new_request ; 00833 std::vector<long> my_donations ; 00834 std::vector<KeyType> contrib_keys ; 00835 std::vector<KeyType> new_keys ; 00836 00837 // Verify input and generate global sum of 00838 // current key usage and requested new keys. 00839 // Throw a parallel consistent exception if the input is bad. 00840 00841 generate_new_global_key_upper_bound( requests , global_key_upper_bound ); 00842 00843 // No exception thrown means all inputs are good and parallel consistent 00844 00845 // Determine which local keys will be contributed, 00846 // keeping what this process could use from the contribution. 00847 // This can reduce the subsequent communication load when 00848 // donating keys to another process. 00849 00850 generate_new_keys_local_planning( global_key_upper_bound , 00851 requests , 00852 new_request , 00853 new_keys , 00854 contrib_keys ); 00855 00856 // Determine where this process will be donating 'contrib_keys' 00857 generate_new_keys_global_planning( new_request, my_donations ); 00858 00859 // Due to using an upper bound as opposed to an exact maximum 00860 // the contrib_keys is likely to contain more keys that are needed. 00861 // Remove unneeded keys. 00862 00863 // Backwards to erase from the end 00864 for ( size_t i = m_span_count ; 0 < i ; ) { 00865 --i ; 00866 size_t count = 0 ; 00867 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00868 count += my_donations[ p * m_span_count + i ]; 00869 } 00870 std::vector<KeyType>::iterator j_beg = contrib_keys.begin(); 00871 std::vector<KeyType>::iterator j_end = contrib_keys.end(); 00872 j_beg = std::lower_bound( j_beg , j_end , m_key_span[i].first ); 00873 j_end = std::upper_bound( j_beg , j_end , m_key_span[i].second ); 00874 const size_t n = std::distance( j_beg , j_end ); 00875 if ( count < n ) { 00876 contrib_keys.erase( j_beg + count , j_end ); 00877 } 00878 } 00879 00880 // Plan is done, communicate the new keys. 00881 //-------------------------------------------------------------------- 00882 // Put key this process is keeping into the index. 00883 00884 for ( std::vector<KeyType>::iterator i = new_keys.begin(); 00885 i != new_keys.end() ; ++i ) { 00886 m_key_usage.push_back( KeyProc( *i , m_comm_rank ) ); 00887 } 00888 00889 //-------------------------------------------------------------------- 00890 00891 CommAll all( m_comm ); 00892 00893 // Sizing 00894 00895 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00896 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00897 const size_t n_to_p = my_donations[ p * m_span_count + i ]; 00898 if ( 0 < n_to_p ) { 00899 all.send_buffer(p).skip<KeyType>( n_to_p ); 00900 } 00901 } 00902 } 00903 00904 all.allocate_buffers( m_comm_size / 4 , false ); 00905 00906 // Packing 00907 00908 { 00909 size_t n = 0 ; 00910 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00911 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00912 const size_t n_to_p = my_donations[ p * m_span_count + i ]; 00913 if ( 0 < n_to_p ) { 00914 all.send_buffer(p).pack<KeyType>( & contrib_keys[n] , n_to_p ); 00915 for ( size_t k = 0 ; k < n_to_p ; ++k , ++n ) { 00916 m_key_usage.push_back( KeyProc( contrib_keys[n] , p ) ); 00917 } 00918 } 00919 } 00920 } 00921 } 00922 00923 std::sort( m_key_usage.begin() , m_key_usage.end() ); 00924 00925 all.communicate(); 00926 00927 // Unpacking 00928 00929 for ( int p = 0 ; p < m_comm_size ; ++p ) { 00930 CommBuffer & buf = all.recv_buffer( p ); 00931 while ( buf.remaining() ) { 00932 KeyType key ; 00933 buf.unpack<KeyType>( key ); 00934 new_keys.push_back( key ); 00935 } 00936 } 00937 00938 std::sort( new_keys.begin() , new_keys.end() ); 00939 00940 requested_keys.resize( m_span_count ); 00941 00942 { 00943 std::vector<KeyType>::iterator i_beg = new_keys.begin(); 00944 for ( size_t i = 0 ; i < m_span_count ; ++i ) { 00945 std::vector<KeyType>::iterator i_end = i_beg + requests[i] ; 00946 requested_keys[i].assign( i_beg , i_end ); 00947 i_beg = i_end ; 00948 } 00949 } 00950 00951 return ; 00952 } 00953 00954 //---------------------------------------------------------------------- 00955 00956 } // namespace util 00957 } // namespace stk 00958 00959