Sierra Toolkit Version of the Day
DistributedIndex.cpp
00001 /*------------------------------------------------------------------------*/
00002 /*                 Copyright 2010 Sandia Corporation.                     */
00003 /*  Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive   */
00004 /*  license for use of this work by or on behalf of the U.S. Government.  */
00005 /*  Export of this program may require a license from the                 */
00006 /*  United States Government.                                             */
00007 /*------------------------------------------------------------------------*/
00008 
00009 
00010 #include <stdexcept>
00011 #include <sstream>
00012 #include <algorithm>
00013 #include <limits>
00014 #include <stdint.h>
00015 #include <stk_util/parallel/ParallelComm.hpp>
00016 #include <stk_util/parallel/DistributedIndex.hpp>
00017 
00018 namespace stk {
00019 namespace parallel {
00020 
00021 //----------------------------------------------------------------------
00022 
00023 namespace {
00024 
00025 struct KeyProcLess {
00026 
00027   bool operator()( const DistributedIndex::KeyProc & lhs ,
00028                    const DistributedIndex::KeyProc & rhs ) const
00029   {
00030     return lhs.first != rhs.first ? lhs.first  < rhs.first
00031                                   : lhs.second < rhs.second ;
00032   }
00033 
00034   bool operator()( const DistributedIndex::KeyProc & lhs ,
00035                    const DistributedIndex::KeyType & rhs ) const
00036   { return lhs.first < rhs ; }
00037 
00038 };
00039 
00040 void sort_unique( std::vector<DistributedIndex::KeyProc> & key_usage )
00041 {
00042   std::vector<DistributedIndex::KeyProc>::iterator
00043     i = key_usage.begin() ,
00044     j = key_usage.end() ;
00045 
00046   std::sort( i , j , KeyProcLess() );
00047 
00048   i = std::unique( i , j );
00049 
00050   key_usage.erase( i , j );
00051 }
00052 
00053 void sort_unique( std::vector<DistributedIndex::KeyType> & keys )
00054 {
00055   std::vector<DistributedIndex::KeyType>::iterator
00056     i = keys.begin() ,
00057     j = keys.end() ;
00058 
00059   std::sort( i , j );
00060 
00061   i = std::unique( i , j );
00062 
00063   keys.erase( i , j );
00064 }
00065 
00066 }
00067 
00068 //----------------------------------------------------------------------
00069 
00070 enum { DISTRIBUTED_INDEX_CHUNK_BITS = 12 }; 
00071 
00072 enum { DISTRIBUTED_INDEX_CHUNK_SIZE =
00073          size_t(1) << DISTRIBUTED_INDEX_CHUNK_BITS };
00074 
00075 DistributedIndex::ProcType
00076 DistributedIndex::to_which_proc( const DistributedIndex::KeyType & key ) const
00077 {
00078   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00079     if ( m_key_span[i].first <= key && key <= m_key_span[i].second ) {
00080        const KeyType offset = key - m_key_span[i].first ;
00081        return ( offset >> DISTRIBUTED_INDEX_CHUNK_BITS ) % m_comm_size ;
00082     }
00083   }
00084   return m_comm_size ;
00085 }
00086 
00087 //----------------------------------------------------------------------
00088 
00089 DistributedIndex::~DistributedIndex() {}
00090 
00091 DistributedIndex::DistributedIndex (
00092   ParallelMachine comm ,
00093   const std::vector<KeySpan> & partition_bounds )
00094   : m_comm( comm ),
00095     m_comm_rank( parallel_machine_rank( comm ) ),
00096     m_comm_size( parallel_machine_size( comm ) ),
00097     m_span_count(0),
00098     m_key_span(),
00099     m_key_usage()
00100 {
00101   unsigned info[2] ;
00102   info[0] = partition_bounds.size();
00103   info[1] = 0 ;
00104 
00105   // Check each span for validity
00106 
00107   for ( std::vector<KeySpan>::const_iterator
00108         i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
00109     if ( i->second < i->first ||
00110          ( i != partition_bounds.begin() && i->first <= (i-1)->second ) ) {
00111       info[1] = 1 ;
00112     }
00113   }
00114 
00115 #if defined( STK_HAS_MPI )
00116   MPI_Bcast( info , 2 , MPI_UNSIGNED , 0 , comm );
00117 
00118   if ( 0 < info[0] ) {
00119     m_key_span.resize( info[0] );
00120     if ( 0 == parallel_machine_rank( comm ) ) {
00121       m_key_span = partition_bounds ;
00122     }
00123     MPI_Bcast( & m_key_span[0], info[0] * sizeof(KeySpan), MPI_BYTE, 0, comm );
00124   }
00125 #else
00126   m_key_span = partition_bounds ;
00127 #endif
00128 
00129   if ( info[1] ) {
00130     std::ostringstream msg ;
00131     msg << "sierra::parallel::DistributedIndex ctor( comm , " ;
00132 
00133     for ( std::vector<KeySpan>::const_iterator
00134           i = partition_bounds.begin() ; i != partition_bounds.end() ; ++i ) {
00135       msg << " ( min = " << i->first << " , max = " << i->second << " )" ;
00136     }
00137     msg << " ) contains invalid span of keys" ;
00138     throw std::runtime_error( msg.str() );
00139   }
00140 
00141   m_span_count = info[0] ;
00142 
00143   if ( 0 == m_span_count ) {
00144     m_key_span.push_back(
00145       KeySpan( std::numeric_limits<KeyType>::min(),
00146                std::numeric_limits<KeyType>::max() ) );
00147     m_span_count = 1 ;
00148   }
00149 }
00150 
00151 //----------------------------------------------------------------------
00152 //----------------------------------------------------------------------
00153 
00154 namespace {
00155 
00156 void query_pack_to_usage(
00157   const std::vector<DistributedIndex::KeyProc> & key_usage ,
00158   const std::vector<DistributedIndex::KeyType> & request ,
00159   CommAll & all )
00160 {
00161   std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin();
00162   std::vector<DistributedIndex::KeyType>::const_iterator k = request.begin();
00163 
00164   for ( ; k != request.end() && i != key_usage.end() ; ++k ) {
00165 
00166     for ( ; i != key_usage.end() && i->first < *k ; ++i );
00167 
00168     std::vector<DistributedIndex::KeyProc>::const_iterator j = i ;
00169     for ( ; j != key_usage.end() && j->first == *k ; ++j );
00170 
00171     for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00172           jsend = i ; jsend != j ; ++jsend ) {
00173 
00174       for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00175             jinfo = i ; jinfo != j ; ++jinfo ) {
00176 
00177         all.send_buffer( jsend->second )
00178            .pack<DistributedIndex::KeyProc>( *jinfo );
00179       }
00180     }
00181   }
00182 }
00183 
00184 void query_pack( const std::vector<DistributedIndex::KeyProc> & key_usage ,
00185                  const std::vector<DistributedIndex::KeyProc> & request ,
00186                  CommAll & all )
00187 {
00188   std::vector<DistributedIndex::KeyProc>::const_iterator i = key_usage.begin();
00189 
00190   for ( std::vector<DistributedIndex::KeyProc>::const_iterator
00191         k =  request.begin() ;
00192         k != request.end() &&
00193         i != key_usage.end() ; ++k ) {
00194 
00195     for ( ; i != key_usage.end() && i->first < k->first ; ++i );
00196 
00197     for ( std::vector<DistributedIndex::KeyProc>::const_iterator j = i ;
00198           j != key_usage.end() && j->first == k->first ; ++j ) {
00199       all.send_buffer( k->second ).pack<DistributedIndex::KeyProc>( *j );
00200     }
00201   }
00202 }
00203 
00204 }
00205 
00206 void DistributedIndex::query(
00207   const std::vector<DistributedIndex::KeyProc> & request ,
00208         std::vector<DistributedIndex::KeyProc> & sharing_of_keys ) const
00209 {
00210   sharing_of_keys.clear();
00211 
00212   CommAll all( m_comm );
00213 
00214   query_pack( m_key_usage , request , all ); // Sizing
00215 
00216   all.allocate_buffers( m_comm_size / 4 , false );
00217 
00218   query_pack( m_key_usage , request , all ); // Packing
00219 
00220   all.communicate();
00221 
00222   for ( ProcType p = 0 ; p < m_comm_size ; ++p ) {
00223     CommBuffer & buf = all.recv_buffer( p );
00224     while ( buf.remaining() ) {
00225       KeyProc kp ;
00226       buf.unpack( kp );
00227       sharing_of_keys.push_back( kp );
00228     }
00229   }
00230 
00231   std::sort( sharing_of_keys.begin() , sharing_of_keys.end() );
00232 }
00233 
00234 void DistributedIndex::query(
00235   std::vector<DistributedIndex::KeyProc> & sharing_of_local_keys ) const
00236 {
00237   query( m_key_usage , sharing_of_local_keys );
00238 }
00239 
00240 void DistributedIndex::query(
00241   const std::vector<DistributedIndex::KeyType> & keys ,
00242         std::vector<DistributedIndex::KeyProc> & sharing_keys ) const
00243 {
00244   std::vector<KeyProc> request ;
00245 
00246   {
00247     bool bad_key = false ;
00248     CommAll all( m_comm );
00249 
00250     for ( std::vector<KeyType>::const_iterator
00251           k = keys.begin() ; k != keys.end() ; ++k ) {
00252       const ProcType p = to_which_proc( *k );
00253 
00254       if ( p < m_comm_size ) {
00255         all.send_buffer( p ).pack<KeyType>( *k );
00256       }
00257       else {
00258         bad_key = true ;
00259       }
00260     }
00261 
00262     // Error condition becomes global:
00263 
00264     bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key );
00265 
00266     if ( bad_key ) {
00267       throw std::runtime_error("stk::parallel::DistributedIndex::query given a key which is out of range");
00268     }
00269 
00270     for ( std::vector<KeyType>::const_iterator
00271           k = keys.begin() ; k != keys.end() ; ++k ) {
00272       all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
00273     }
00274 
00275     all.communicate();
00276 
00277     for ( ProcType p = 0 ; p < m_comm_size ; ++p ) {
00278       CommBuffer & buf = all.recv_buffer( p );
00279       KeyProc kp ;
00280       kp.second = p ;
00281       while ( buf.remaining() ) {
00282         buf.unpack<KeyType>( kp.first );
00283         request.push_back( kp );
00284       }
00285     }
00286   }
00287 
00288   sort_unique( request );
00289 
00290   query( request , sharing_keys );
00291 }
00292 
00293 void DistributedIndex::query_to_usage(
00294   const std::vector<DistributedIndex::KeyType> & keys ,
00295         std::vector<DistributedIndex::KeyProc> & sharing_keys ) const
00296 {
00297   std::vector<KeyType> request ;
00298 
00299   {
00300     bool bad_key = false ;
00301     CommAll all( m_comm );
00302 
00303     for ( std::vector<KeyType>::const_iterator
00304           k = keys.begin() ; k != keys.end() ; ++k ) {
00305       const ProcType p = to_which_proc( *k );
00306 
00307       if ( p < m_comm_size ) {
00308         all.send_buffer( p ).pack<KeyType>( *k );
00309       }
00310       else {
00311         bad_key = true ;
00312       }
00313     }
00314 
00315     // Error condition becomes global:
00316 
00317     bad_key = all.allocate_buffers( m_comm_size / 4 , false , bad_key );
00318 
00319     if ( bad_key ) {
00320       throw std::runtime_error("stk::parallel::DistributedIndex::query given a key which is out of range");
00321     }
00322 
00323     for ( std::vector<KeyType>::const_iterator
00324           k = keys.begin() ; k != keys.end() ; ++k ) {
00325       all.send_buffer( to_which_proc( *k ) ).pack<KeyType>( *k );
00326     }
00327 
00328     all.communicate();
00329 
00330     for ( ProcType p = 0 ; p < m_comm_size ; ++p ) {
00331       CommBuffer & buf = all.recv_buffer( p );
00332       KeyType key ;
00333       while ( buf.remaining() ) {
00334         buf.unpack<KeyType>( key );
00335         request.push_back( key );
00336       }
00337     }
00338   }
00339 
00340   sort_unique( request );
00341 
00342   {
00343     CommAll all( m_comm );
00344 
00345     query_pack_to_usage( m_key_usage , request , all ); // Sizing
00346 
00347     all.allocate_buffers( m_comm_size / 4 , false );
00348 
00349     query_pack_to_usage( m_key_usage , request , all ); // Packing
00350 
00351     all.communicate();
00352 
00353     for ( ProcType p = 0 ; p < m_comm_size ; ++p ) {
00354       CommBuffer & buf = all.recv_buffer( p );
00355       while ( buf.remaining() ) {
00356         KeyProc kp ;
00357         buf.unpack( kp );
00358         sharing_keys.push_back( kp );
00359       }
00360     }
00361 
00362     std::sort( sharing_keys.begin() , sharing_keys.end() );
00363   }
00364 }
00365 
00366 //----------------------------------------------------------------------
00367 //----------------------------------------------------------------------
00368 
00369 namespace {
00370 
00371 struct RemoveKeyProc {
00372 
00373   bool operator()( const DistributedIndex::KeyProc & kp ) const
00374     { return kp.second < 0 ; }
00375 
00376   static void mark( std::vector<DistributedIndex::KeyProc> & key_usage ,
00377                     const DistributedIndex::KeyProc & kp )
00378   {
00379     std::vector<DistributedIndex::KeyProc>::iterator
00380       i = std::lower_bound( key_usage.begin(),
00381                             key_usage.end(), kp.first , KeyProcLess() );
00382     while ( i != key_usage.end() && kp != *i ) { ++i ; }
00383     if ( i != key_usage.end() && kp == *i ) {
00384       i->second = -1 ;
00385     }
00386   }
00387 
00388   static void clean( std::vector<DistributedIndex::KeyProc> & key_usage )
00389   {
00390     std::vector<DistributedIndex::KeyProc>::iterator end =
00391       std::remove_if( key_usage.begin() , key_usage.end() , RemoveKeyProc() );
00392     key_usage.erase( end , key_usage.end() );
00393   }
00394 };
00395 
00396 }
00397 
00398 void DistributedIndex::update_keys(
00399   const std::vector<DistributedIndex::KeyType> & add_new_keys ,
00400   const std::vector<DistributedIndex::KeyType> & remove_existing_keys )
00401 {
00402   std::vector<unsigned long> count_remove( m_comm_size , (unsigned long)0 );
00403   std::vector<unsigned long> count_add(    m_comm_size , (unsigned long)0 );
00404 
00405   size_t local_bad_input = 0 ;
00406 
00407   for ( std::vector<KeyType>::const_iterator
00408         i = remove_existing_keys.begin();
00409         i != remove_existing_keys.end(); ++i ) {
00410     const ProcType p = to_which_proc( *i );
00411     if ( m_comm_size <= p ) {
00412       // Key is not within one of the span:
00413       ++local_bad_input ;
00414     }
00415     else if ( p != m_comm_rank ) {
00416       ++( count_remove[ p ] );
00417     }
00418   }
00419 
00420   for ( std::vector<KeyType>::const_iterator
00421         i = add_new_keys.begin();
00422         i != add_new_keys.end(); ++i ) {
00423 
00424     // Count
00425 
00426     const ProcType p = to_which_proc( *i );
00427     if ( p == m_comm_size ) {
00428       // Key is not within one of the span:
00429       ++local_bad_input ;
00430     }
00431     else if ( p != m_comm_rank ) {
00432       ++( count_add[ p ] );
00433     }
00434   }
00435 
00436   if ( 0 < local_bad_input ) {
00437     // If this process knows it will throw
00438     // then don't bother communicating the add and remove requests.
00439     count_remove.clear();
00440     count_add.clear();
00441   }
00442 
00443   CommAll all( m_comm );
00444 
00445   // Sizing and add_new_keys bounds checking:
00446 
00447   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00448     if ( count_remove[p] || count_add[p] ) {
00449       CommBuffer & buf = all.send_buffer( p );
00450       buf.skip<unsigned long>( 1 );
00451       buf.skip<KeyType>( count_remove[p] );
00452       buf.skip<KeyType>( count_add[p] );
00453     }
00454   }
00455 
00456   // Allocate buffers and perform a global
00457   const bool symmetry_flag = false ;
00458   const bool error_flag = 0 < local_bad_input ;
00459 
00460   bool global_bad_input =
00461     all.allocate_buffers( m_comm_size / 4, symmetry_flag , error_flag );
00462 
00463   if ( global_bad_input ) {
00464     std::ostringstream msg ;
00465 
00466     if ( 0 < local_bad_input ) {
00467       msg << "stk::parallel::DistributedIndex::update_keys ERROR Given "
00468           << local_bad_input << " of " << add_new_keys.size()
00469           << " add_new_keys outside of any span" ;
00470     }
00471 
00472     throw std::runtime_error( msg.str() );
00473   }
00474 
00475   // Packing:
00476 
00477   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00478     if ( count_remove[p] || count_add[p] ) {
00479       all.send_buffer( p ).pack<unsigned long>( count_remove[p] );
00480     }
00481   }
00482 
00483   for ( std::vector<KeyType>::const_iterator
00484         i = remove_existing_keys.begin();
00485         i != remove_existing_keys.end(); ++i ) {
00486     const ProcType p = to_which_proc( *i );
00487     if ( p != m_comm_rank ) {
00488       all.send_buffer( p ).pack<KeyType>( *i );
00489     }
00490   }
00491 
00492   for ( std::vector<KeyType>::const_iterator
00493         i = add_new_keys.begin();
00494         i != add_new_keys.end(); ++i ) {
00495     const ProcType p = to_which_proc( *i );
00496     if ( p != m_comm_rank ) {
00497       all.send_buffer( p ).pack<KeyType>( *i );
00498     }
00499   }
00500 
00501   all.communicate();
00502 
00503   //------------------------------
00504   // Remove for local keys
00505 
00506   for ( std::vector<KeyType>::const_iterator
00507         i = remove_existing_keys.begin();
00508         i != remove_existing_keys.end(); ++i ) {
00509     const ProcType p = to_which_proc( *i );
00510     if ( p == m_comm_rank ) {
00511       KeyProc kp( *i , p );
00512       RemoveKeyProc::mark( m_key_usage , kp );
00513     }
00514   }
00515 
00516   // Unpack the remove key and find it.
00517   // Set the process to a negative value for subsequent removal.
00518 
00519   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00520     CommBuffer & buf = all.recv_buffer( p );
00521     if ( buf.remaining() ) {
00522       unsigned long remove_count = 0 ;
00523 
00524       KeyProc kp ;
00525 
00526       kp.second = p ;
00527 
00528       buf.unpack<unsigned long>( remove_count );
00529 
00530       for ( ; 0 < remove_count ; --remove_count ) {
00531         buf.unpack<KeyType>( kp.first );
00532 
00533         RemoveKeyProc::mark( m_key_usage , kp );
00534       }
00535     }
00536   }
00537 
00538   RemoveKeyProc::clean( m_key_usage );
00539 
00540   //------------------------------
00541   // Append for local keys
00542 
00543   for ( std::vector<KeyType>::const_iterator
00544         i = add_new_keys.begin();
00545         i != add_new_keys.end(); ++i ) {
00546 
00547     const ProcType p = to_which_proc( *i );
00548     if ( p == m_comm_rank ) {
00549       KeyProc kp( *i , p );
00550       m_key_usage.push_back( kp );
00551     }
00552   }
00553 
00554   // Unpack and append for remote keys:
00555   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00556     CommBuffer & buf = all.recv_buffer( p );
00557 
00558     KeyProc kp ;
00559 
00560     kp.second = p ;
00561 
00562     while ( buf.remaining() ) {
00563       buf.unpack<KeyType>( kp.first );
00564       m_key_usage.push_back( kp );
00565     }
00566   }
00567 
00568   sort_unique( m_key_usage );
00569   //------------------------------
00570 }
00571 
00572 //----------------------------------------------------------------------
00573 //----------------------------------------------------------------------
00574 
00575 void DistributedIndex::generate_new_global_key_upper_bound(
00576   const std::vector<size_t>  & requests ,
00577         std::vector<DistributedIndex::KeyType> & global_key_upper_bound ) const
00578 {
00579   bool bad_request = m_span_count != requests.size();
00580 
00581   std::ostringstream error_msg ;
00582 
00583   error_msg
00584   << "sierra::parallel::DistributedIndex::generate_new_keys_global_counts( " ;
00585 
00586   std::vector<unsigned long>
00587     local_counts( m_span_count + 1 , (unsigned long) 0 ),
00588     global_counts( m_span_count + 1 , (unsigned long) 0 );
00589 
00590   // Count unique keys in each span and add requested keys for
00591   // final total count of keys needed.
00592 
00593   // Append the error check to this communication to avoid
00594   // and extra reduction operation.
00595   local_counts[ m_span_count ] = m_span_count != requests.size();
00596 
00597   if ( m_span_count == requests.size() ) {
00598 
00599     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00600       local_counts[i] = requests[i] ;
00601     }
00602 
00603     std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
00604 
00605     for ( size_t i = 0 ; i < m_span_count && j != m_key_usage.end() ; ++i ) {
00606       const KeyType key_span_last = m_key_span[i].second ;
00607       size_t count = 0 ;
00608       while ( j != m_key_usage.end() && j->first <= key_span_last ) {
00609         const KeyType key = j->first ;
00610         while ( j != m_key_usage.end() && key == j->first ) { ++j ; }
00611         ++count ;
00612       }
00613       local_counts[i] += count ;
00614     }
00615   }
00616 
00617 #if defined( STK_HAS_MPI )
00618   MPI_Allreduce( & local_counts[0] , & global_counts[0] ,
00619                  m_span_count + 1 , MPI_UNSIGNED_LONG ,
00620                  MPI_SUM , m_comm );
00621 #else
00622   global_counts = local_counts ;
00623 #endif
00624 
00625   bad_request = global_counts[m_span_count] != 0 ;
00626 
00627   if ( bad_request ) {
00628     if ( m_span_count != requests.size() ) {
00629       error_msg << " requests.size() = " << requests.size()
00630                 << " != " << m_span_count << " )" ;
00631     }
00632   }
00633 
00634   if ( ! bad_request ) {
00635     for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
00636       const size_t span_available =
00637         ( 1 + m_key_span[i].second - m_key_span[i].first );
00638 
00639       const size_t span_requested = global_counts[i];
00640 
00641       if ( span_available < span_requested ) {
00642         bad_request = true ;
00643         error_msg << " global_sum( (existing+request)[" << i << "] ) = "
00644                   << span_requested
00645                   << " > global_sum( span_available ) = "
00646                   << span_available ;
00647       }
00648     }
00649   }
00650 
00651   if ( bad_request ) {
00652     throw std::runtime_error( error_msg.str() );
00653   }
00654 
00655   // Determine the maximum generated key
00656 
00657   global_key_upper_bound.resize( m_span_count );
00658 
00659   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00660      global_key_upper_bound[i] = m_key_span[i].first + global_counts[i] - 1 ;
00661   }
00662 }
00663 
00664 //--------------------------------------------------------------------
00665 //--------------------------------------------------------------------
00666 
00667 void DistributedIndex::generate_new_keys_local_planning(
00668   const std::vector<DistributedIndex::KeyType>  & key_global_upper_bound ,
00669   const std::vector<size_t>   & requests_local ,
00670         std::vector<long>     & new_request ,
00671         std::vector<KeyType>  & requested_keys ,
00672         std::vector<KeyType>  & contrib_keys ) const
00673 {
00674   new_request.assign( m_span_count , long(0) );
00675 
00676   contrib_keys.clear();
00677 
00678   std::vector<KeyProc>::const_iterator j = m_key_usage.begin();
00679 
00680   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00681     // The maximum generated key from any process will
00682     // not exceed this value.
00683     const KeyType key_upper_bound = key_global_upper_bound[i] ;
00684 
00685     const size_t init_size = contrib_keys.size();
00686 
00687     const size_t chunk_inc = m_comm_size * DISTRIBUTED_INDEX_CHUNK_SIZE ;
00688 
00689     for ( KeyType key_begin = m_key_span[i].first +
00690                   m_comm_rank * DISTRIBUTED_INDEX_CHUNK_SIZE ;
00691           key_begin <= key_upper_bound ; key_begin += chunk_inc ) {
00692 
00693       // What is the first key of the chunk
00694       KeyType key_iter = key_begin ;
00695 
00696       // What is the last key belonging to this process' chunk
00697       const KeyType key_last =
00698         std::min( key_begin + DISTRIBUTED_INDEX_CHUNK_SIZE - 1 , key_upper_bound );
00699 
00700       // Jump into the sorted used key vector to
00701       // the key which may be contributed
00702 
00703       j = std::lower_bound( j, m_key_usage.end(), key_iter, KeyProcLess() );
00704       // now know:  j == m_key_usage.end() OR
00705       //            key_iter <= j->first
00706 
00707       for ( ; key_iter <= key_last ; ++key_iter ) {
00708         if ( j == m_key_usage.end() || key_iter < j->first ) {
00709           // The current attempt 'key_iter' is not used, contribute it.
00710           contrib_keys.push_back( key_iter );
00711         }
00712         else { // j != m_key_usage.end() && key_iter == j->first
00713           // The current attempt 'key_iter' is already used,
00714           // increment the used-iterator to its next key value.
00715           while ( j != m_key_usage.end() && key_iter == j->first ) {
00716             ++j ;
00717           }
00718         }
00719       }
00720     }
00721 
00722     // Determine which local keys will be contributed,
00723     // keeping what this process could use from the contribution.
00724     // This can reduce the subsequent communication load when
00725     // donating keys to another process.
00726 
00727     const size_t this_contrib = contrib_keys.size() - init_size ;
00728 
00729     // How many keys will this process keep:
00730     const size_t keep = std::min( requests_local[i] , this_contrib );
00731 
00732     // Take the kept keys from the contributed key vector.
00733     requested_keys.insert( requested_keys.end() ,
00734                            contrib_keys.end() - keep ,
00735                            contrib_keys.end() );
00736 
00737     contrib_keys.erase( contrib_keys.end() - keep ,
00738                         contrib_keys.end() );
00739 
00740     // New request is positive for needed keys or negative for donated keys
00741     new_request[i] = requests_local[i] - this_contrib ;
00742   }
00743 }
00744 
00745 //----------------------------------------------------------------------
00746 
00747 void DistributedIndex::generate_new_keys_global_planning(
00748   const std::vector<long>    & new_request ,
00749         std::vector<long>    & my_donations ) const
00750 {
00751   my_donations.assign( m_comm_size * m_span_count , long(0) );
00752 
00753   // Gather the global request plan for receiving and donating keys
00754   // Positive values for receiving, negative values for donating.
00755 
00756   std::vector<long> new_request_global( m_comm_size * m_span_count );
00757 
00758 #if defined( STK_HAS_MPI )
00759 
00760   { // Gather requests into per-process spans
00761     // MPI doesn't do 'const' in its interface, but the send buffer is const
00762     void * send_buf = const_cast<void*>( (void *)( & new_request[0] ));
00763     void * recv_buf = & new_request_global[0] ;
00764     MPI_Allgather( send_buf , m_span_count , MPI_LONG ,
00765                    recv_buf , m_span_count , MPI_LONG , m_comm );
00766   }
00767 #else
00768   new_request_global = new_request ;
00769 #endif
00770 
00771   // Now have the global receive & donate plan.
00772   //--------------------------------------------------------------------
00773   // Generate my donate plan from the global receive & donate plan.
00774 
00775   for ( unsigned i = 0 ; i < m_span_count ; ++i ) {
00776 
00777     if ( new_request[i] < 0 ) { // This process is donating on this span
00778       long my_total_donate = - new_request[i] ;
00779 
00780       long previous_donate = 0 ;
00781 
00782       // Count what previous processes have donated:
00783       for ( int p = 0 ; p < m_comm_rank ; ++p ) {
00784         const long new_request_p = new_request_global[ p * m_span_count + i ] ;
00785         if ( new_request_p < 0 ) {
00786           previous_donate -= new_request_p ;
00787         }
00788       }
00789 
00790       // What the donation count will be with my donation:
00791       long end_donate = previous_donate + my_total_donate ;
00792 
00793       long previous_receive = 0 ;
00794 
00795       // Determine my donation to other processes (one to many).
00796 
00797       for ( int p = 0 ; p < m_comm_size && 0 < my_total_donate ; ++p ) {
00798 
00799         const long new_request_p = new_request_global[ p * m_span_count + i ];
00800 
00801         if ( 0 < new_request_p ) { // Process 'p' receives keys
00802 
00803           // Accumulation of requests:
00804 
00805           previous_receive += new_request_p ;
00806 
00807           if ( previous_donate < previous_receive ) {
00808             // I am donating to process 'p'
00809             const long n = std::min( previous_receive , end_donate )
00810                            - previous_donate ;
00811 
00812             my_donations[ p * m_span_count + i ] = n ;
00813             previous_donate += n ;
00814             my_total_donate -= n ;
00815           }
00816         }
00817       }
00818     }
00819   }
00820 }
00821 
00822 //--------------------------------------------------------------------
00823 
00824 void DistributedIndex::generate_new_keys(
00825   const std::vector<size_t>                 & requests ,
00826         std::vector< std::vector<KeyType> > & requested_keys )
00827 {
00828   //--------------------------------------------------------------------
00829   // Develop the plan:
00830 
00831   std::vector<KeyType> global_key_upper_bound ;
00832   std::vector<long>    new_request ;
00833   std::vector<long>    my_donations ;
00834   std::vector<KeyType> contrib_keys ;
00835   std::vector<KeyType> new_keys ;
00836 
00837   // Verify input and generate global sum of
00838   // current key usage and requested new keys.
00839   // Throw a parallel consistent exception if the input is bad.
00840 
00841   generate_new_global_key_upper_bound( requests , global_key_upper_bound );
00842 
00843   //  No exception thrown means all inputs are good and parallel consistent
00844 
00845   // Determine which local keys will be contributed,
00846   // keeping what this process could use from the contribution.
00847   // This can reduce the subsequent communication load when
00848   // donating keys to another process.
00849 
00850   generate_new_keys_local_planning( global_key_upper_bound ,
00851                                     requests ,
00852                                     new_request ,
00853                                     new_keys ,
00854                                     contrib_keys );
00855 
00856   // Determine where this process will be donating 'contrib_keys'
00857   generate_new_keys_global_planning( new_request, my_donations );
00858 
00859   // Due to using an upper bound as opposed to an exact maximum
00860   // the contrib_keys is likely to contain more keys that are needed.
00861   // Remove unneeded keys.
00862 
00863   // Backwards to erase from the end
00864   for ( size_t i = m_span_count ; 0 < i ; ) {
00865     --i ;
00866     size_t count = 0 ;
00867     for ( int p = 0 ; p < m_comm_size ; ++p ) {
00868       count += my_donations[ p * m_span_count + i ];
00869     }
00870     std::vector<KeyType>::iterator j_beg = contrib_keys.begin();
00871     std::vector<KeyType>::iterator j_end = contrib_keys.end();
00872     j_beg = std::lower_bound( j_beg , j_end , m_key_span[i].first );
00873     j_end = std::upper_bound( j_beg , j_end , m_key_span[i].second );
00874     const size_t n = std::distance( j_beg , j_end );
00875     if ( count < n ) {
00876       contrib_keys.erase( j_beg + count , j_end );
00877     }
00878   }
00879 
00880   // Plan is done, communicate the new keys.
00881   //--------------------------------------------------------------------
00882   // Put key this process is keeping into the index.
00883 
00884   for ( std::vector<KeyType>::iterator i = new_keys.begin();
00885         i != new_keys.end() ; ++i ) {
00886     m_key_usage.push_back( KeyProc( *i , m_comm_rank ) );
00887   }
00888 
00889   //--------------------------------------------------------------------
00890 
00891   CommAll all( m_comm );
00892 
00893   // Sizing
00894 
00895   for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00896     for ( int p = 0 ; p < m_comm_size ; ++p ) {
00897       const size_t n_to_p = my_donations[ p * m_span_count + i ];
00898       if ( 0 < n_to_p ) {
00899         all.send_buffer(p).skip<KeyType>( n_to_p );
00900       }
00901     }
00902   }
00903 
00904   all.allocate_buffers( m_comm_size / 4 , false );
00905 
00906   // Packing
00907 
00908   {
00909     size_t n = 0 ;
00910     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00911       for ( int p = 0 ; p < m_comm_size ; ++p ) {
00912         const size_t n_to_p = my_donations[ p * m_span_count + i ];
00913         if ( 0 < n_to_p ) {
00914           all.send_buffer(p).pack<KeyType>( & contrib_keys[n] , n_to_p );
00915           for ( size_t k = 0 ; k < n_to_p ; ++k , ++n ) {
00916             m_key_usage.push_back( KeyProc( contrib_keys[n] , p ) );
00917           }
00918         }
00919       }
00920     }
00921   }
00922 
00923   std::sort( m_key_usage.begin() , m_key_usage.end() );
00924 
00925   all.communicate();
00926 
00927   // Unpacking
00928 
00929   for ( int p = 0 ; p < m_comm_size ; ++p ) {
00930     CommBuffer & buf = all.recv_buffer( p );
00931     while ( buf.remaining() ) {
00932       KeyType key ;
00933       buf.unpack<KeyType>( key );
00934       new_keys.push_back( key );
00935     }
00936   }
00937 
00938   std::sort( new_keys.begin() , new_keys.end() );
00939 
00940   requested_keys.resize( m_span_count );
00941 
00942   {
00943     std::vector<KeyType>::iterator i_beg = new_keys.begin();
00944     for ( size_t i = 0 ; i < m_span_count ; ++i ) {
00945       std::vector<KeyType>::iterator i_end = i_beg + requests[i] ;
00946       requested_keys[i].assign( i_beg , i_end );
00947       i_beg = i_end ;
00948     }
00949   }
00950 
00951   return ;
00952 }
00953 
00954 //----------------------------------------------------------------------
00955 
00956 } // namespace util
00957 } // namespace stk
00958 
00959 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends