|
Sierra Toolkit Version of the Day
|
00001 /*------------------------------------------------------------------------*/ 00002 /* Copyright 2010 Sandia Corporation. */ 00003 /* Under terms of Contract DE-AC04-94AL85000, there is a non-exclusive */ 00004 /* license for use of this work by or on behalf of the U.S. Government. */ 00005 /* Export of this program may require a license from the */ 00006 /* United States Government. */ 00007 /*------------------------------------------------------------------------*/ 00008 00009 #include <stdlib.h> 00010 #include <stdexcept> 00011 #include <sstream> 00012 #include <vector> 00013 00014 #include <stk_util/parallel/ParallelComm.hpp> 00015 #include <stk_util/parallel/ParallelReduce.hpp> 00016 00017 namespace stk { 00018 00019 //----------------------------------------------------------------------- 00020 00021 #if defined( STK_HAS_MPI ) 00022 00023 enum { STK_MPI_TAG_SIZING = 0 , STK_MPI_TAG_DATA = 1 }; 00024 00025 // Communicate in sparse or dense mode, as directed during allocation 00026 00027 namespace { 00028 00029 bool all_to_all_dense( ParallelMachine p_comm , 00030 const CommBuffer * const send , 00031 const CommBuffer * const recv , 00032 std::ostream & msg ) 00033 { 00034 typedef unsigned char * ucharp ; 00035 00036 static const char method[] = "stk::CommAll::communicate" ; 00037 00038 int result ; 00039 00040 { 00041 const unsigned p_size = parallel_machine_size( p_comm ); 00042 00043 std::vector<int> tmp( p_size * 4 ); 00044 00045 int * const send_counts = & tmp[0] ; 00046 int * const send_displs = send_counts + p_size ; 00047 int * const recv_counts = send_displs + p_size ; 00048 int * const recv_displs = recv_counts + p_size ; 00049 00050 unsigned char * const ps = static_cast<ucharp>(send[0].buffer()); 00051 unsigned char * const pr = static_cast<ucharp>(recv[0].buffer()); 00052 00053 for ( unsigned i = 0 ; i < p_size ; ++i ) { 00054 const CommBuffer & send_buf = send[i] ; 00055 const CommBuffer & recv_buf = recv[i] ; 00056 00057 send_counts[i] = send_buf.capacity(); 00058 recv_counts[i] = recv_buf.capacity(); 00059 00060 send_displs[i] = static_cast<ucharp>(send_buf.buffer()) - ps ; 00061 recv_displs[i] = static_cast<ucharp>(recv_buf.buffer()) - pr ; 00062 } 00063 00064 result = MPI_Alltoallv( ps , send_counts , send_displs , MPI_BYTE , 00065 pr , recv_counts , recv_displs , MPI_BYTE , 00066 p_comm ); 00067 00068 if ( MPI_SUCCESS != result ) { 00069 msg << method << " GLOBAL ERROR: " << result << " == MPI_Alltoallv" ; 00070 } 00071 } 00072 00073 return MPI_SUCCESS == result ; 00074 } 00075 00076 bool all_to_all_sparse( ParallelMachine p_comm , 00077 const CommBuffer * const send , 00078 const CommBuffer * const recv , 00079 std::ostream & msg ) 00080 { 00081 static const char method[] = "stk::CommAll::communicate" ; 00082 static const int mpi_tag = STK_MPI_TAG_DATA ; 00083 00084 int result = MPI_SUCCESS ; 00085 00086 { 00087 const unsigned p_size = parallel_machine_size( p_comm ); 00088 const unsigned p_rank = parallel_machine_rank( p_comm ); 00089 00090 //------------------------------ 00091 // Receive count 00092 00093 unsigned num_recv = 0 ; 00094 00095 for ( unsigned i = 0 ; i < p_size ; ++i ) { 00096 if ( recv[i].capacity() ) { ++num_recv ; } 00097 } 00098 00099 //------------------------------ 00100 // Post receives for specific processors with specific sizes 00101 00102 MPI_Request request_null = MPI_REQUEST_NULL ; 00103 std::vector<MPI_Request> request( num_recv , request_null ); 00104 std::vector<MPI_Status> status( num_recv ); 00105 00106 unsigned count = 0 ; 00107 00108 for ( unsigned i = 0 ; result == MPI_SUCCESS && i < p_size ; ++i ) { 00109 const unsigned recv_size = recv[i].capacity(); 00110 void * const recv_buf = recv[i].buffer(); 00111 if ( recv_size ) { 00112 result = MPI_Irecv( recv_buf , recv_size , MPI_BYTE , 00113 i , mpi_tag , p_comm , & request[count] ); 00114 ++count ; 00115 } 00116 } 00117 00118 if ( MPI_SUCCESS != result ) { 00119 msg << method << " LOCAL[" << p_rank << "] ERROR: " 00120 << result << " == MPI_Irecv , " ; 00121 } 00122 00123 //------------------------------ 00124 // Sync to allow ready sends and for a potential error 00125 00126 int local_error = MPI_SUCCESS == result ? 0 : 1 ; 00127 int global_error = 0 ; 00128 00129 result = MPI_Allreduce( & local_error , & global_error , 00130 1 , MPI_INT , MPI_SUM , p_comm ); 00131 00132 if ( MPI_SUCCESS != result ) { 00133 msg << method << " GLOBAL ERROR: " << result << " == MPI_Allreduce" ; 00134 } 00135 else if ( global_error ) { 00136 result = MPI_ERR_UNKNOWN ; 00137 } 00138 else { 00139 // Everything is local from here on out, no more syncs 00140 00141 //------------------------------ 00142 // Ready-send the buffers, rotate the send processor 00143 // in a simple attempt to smooth out the communication traffic. 00144 00145 for ( unsigned i = 0 ; MPI_SUCCESS == result && i < p_size ; ++i ) { 00146 const int dst = ( i + p_rank ) % p_size ; 00147 const unsigned send_size = send[dst].capacity(); 00148 void * const send_buf = send[dst].buffer(); 00149 if ( send_size ) { 00150 result = MPI_Rsend( send_buf , send_size , MPI_BYTE , 00151 dst , mpi_tag , p_comm ); 00152 } 00153 } 00154 00155 if ( MPI_SUCCESS != result ) { 00156 msg << method << " LOCAL ERROR: " << result << " == MPI_Rsend , " ; 00157 } 00158 else { 00159 MPI_Request * const p_request = & request[0] ; 00160 MPI_Status * const p_status = & status[0] ; 00161 00162 result = MPI_Waitall( num_recv , p_request , p_status ); 00163 } 00164 00165 if ( MPI_SUCCESS != result ) { 00166 msg << method << " LOCAL[" << p_rank << "] ERROR: " 00167 << result << " == MPI_Waitall , " ; 00168 } 00169 else { 00170 00171 for ( unsigned i = 0 ; i < num_recv ; ++i ) { 00172 MPI_Status * const recv_status = & status[i] ; 00173 const int recv_proc = recv_status->MPI_SOURCE ; 00174 const int recv_tag = recv_status->MPI_TAG ; 00175 const int recv_plan = recv[recv_proc].capacity(); 00176 int recv_count = 0 ; 00177 00178 MPI_Get_count( recv_status , MPI_BYTE , & recv_count ); 00179 00180 if ( recv_tag != mpi_tag || recv_count != recv_plan ) { 00181 msg << method << " LOCAL[" << p_rank << "] ERROR: Recv[" 00182 << recv_proc << "] Size( " 00183 << recv_count << " != " << recv_plan << " ) , " ; 00184 result = MPI_ERR_UNKNOWN ; 00185 } 00186 } 00187 } 00188 } 00189 } 00190 00191 return MPI_SUCCESS == result ; 00192 } 00193 00194 } 00195 00196 #else 00197 00198 // Not parallel 00199 00200 namespace { 00201 00202 bool all_to_all_dense( ParallelMachine , 00203 const CommBuffer * const send , 00204 const CommBuffer * const recv , 00205 std::ostream & ) 00206 { return send == recv ; } 00207 00208 bool all_to_all_sparse( ParallelMachine , 00209 const CommBuffer * const send , 00210 const CommBuffer * const recv , 00211 std::ostream & ) 00212 { return send == recv ; } 00213 00214 } 00215 00216 #endif 00217 00218 //---------------------------------------------------------------------- 00219 00220 namespace { 00221 00222 inline 00223 size_t align_quad( size_t n ) 00224 { 00225 enum { Size = 4 * sizeof(int) }; 00226 return n + CommBufferAlign<Size>::align(n); 00227 } 00228 00229 } 00230 00231 //---------------------------------------------------------------------- 00232 00233 void CommBuffer::pack_overflow() const 00234 { 00235 std::ostringstream os ; 00236 os << "stk::CommBuffer::pack<T>(...){ overflow by " ; 00237 os << remaining() ; 00238 os << " bytes. }" ; 00239 throw std::overflow_error( os.str() ); 00240 } 00241 00242 void CommBuffer::unpack_overflow() const 00243 { 00244 std::ostringstream os ; 00245 os << "stk::CommBuffer::unpack<T>(...){ overflow by " ; 00246 os << remaining(); 00247 os << " bytes. }" ; 00248 throw std::overflow_error( os.str() ); 00249 } 00250 00251 void CommAll::rank_error( const char * method , unsigned p ) const 00252 { 00253 std::ostringstream os ; 00254 os << "stk::CommAll::" << method 00255 << "(" << p << ") ERROR: Not in [0:" << m_size << ")" ; 00256 throw std::range_error( os.str() ); 00257 } 00258 00259 //---------------------------------------------------------------------- 00260 00261 CommBuffer::CommBuffer() 00262 : m_beg(NULL), m_ptr(NULL), m_end(NULL) 00263 { } 00264 00265 CommBuffer::~CommBuffer() 00266 { } 00267 00268 void CommBuffer::deallocate( const unsigned number , CommBuffer * buffers ) 00269 { 00270 if ( NULL != buffers ) { 00271 for ( unsigned i = 0 ; i < number ; ++i ) { 00272 ( buffers + i )->~CommBuffer(); 00273 } 00274 free( buffers ); 00275 } 00276 } 00277 00278 CommBuffer * CommBuffer::allocate( 00279 const unsigned number , const unsigned * const size ) 00280 { 00281 const size_t n_base = align_quad( number * sizeof(CommBuffer) ); 00282 size_t n_size = n_base ; 00283 00284 if ( NULL != size ) { 00285 for ( unsigned i = 0 ; i < number ; ++i ) { 00286 n_size += align_quad( size[i] ); 00287 } 00288 } 00289 00290 // Allocate space for buffers 00291 00292 void * const p_malloc = malloc( n_size ); 00293 00294 CommBuffer * const b_base = 00295 p_malloc != NULL ? reinterpret_cast<CommBuffer*>(p_malloc) 00296 : reinterpret_cast<CommBuffer*>( NULL ); 00297 00298 if ( p_malloc != NULL ) { 00299 00300 for ( unsigned i = 0 ; i < number ; ++i ) { 00301 new( b_base + i ) CommBuffer(); 00302 } 00303 00304 if ( NULL != size ) { 00305 00306 ucharp ptr = reinterpret_cast<ucharp>( p_malloc ); 00307 00308 ptr += n_base ; 00309 00310 for ( unsigned i = 0 ; i < number ; ++i ) { 00311 CommBuffer & b = b_base[i] ; 00312 b.m_beg = ptr ; 00313 b.m_ptr = ptr ; 00314 b.m_end = ptr + size[i] ; 00315 ptr += align_quad( size[i] ); 00316 } 00317 } 00318 } 00319 00320 return b_base ; 00321 } 00322 00323 //---------------------------------------------------------------------- 00324 //---------------------------------------------------------------------- 00325 00326 CommAll::~CommAll() 00327 { 00328 try { 00329 CommBuffer::deallocate( m_size , m_send ); 00330 if ( 1 < m_size ) { CommBuffer::deallocate( m_size , m_recv ); } 00331 } catch(...){} 00332 m_comm = parallel_machine_null(); 00333 m_size = 0 ; 00334 m_rank = 0 ; 00335 m_send = NULL ; 00336 m_recv = NULL ; 00337 } 00338 00339 CommAll::CommAll() 00340 : m_comm( parallel_machine_null() ), 00341 m_size( 0 ), m_rank( 0 ), 00342 m_bound( 0 ), 00343 m_max( 0 ), 00344 m_send(NULL), 00345 m_recv(NULL) 00346 {} 00347 00348 CommAll::CommAll( ParallelMachine comm ) 00349 : m_comm( comm ), 00350 m_size( parallel_machine_size( comm ) ), 00351 m_rank( parallel_machine_rank( comm ) ), 00352 m_bound( 0 ), 00353 m_max( 0 ), 00354 m_send(NULL), 00355 m_recv(NULL) 00356 { 00357 m_send = CommBuffer::allocate( m_size , NULL ); 00358 00359 if ( NULL == m_send ) { 00360 std::string msg("stk::CommAll::CommAll FAILED malloc"); 00361 throw std::runtime_error(msg); 00362 } 00363 } 00364 00365 bool CommAll::allocate_buffers( const unsigned num_msg_bounds , 00366 const bool symmetric , 00367 const bool local_flag ) 00368 { 00369 const unsigned zero = 0 ; 00370 std::vector<unsigned> tmp( m_size , zero ); 00371 00372 for ( unsigned i = 0 ; i < m_size ; ++i ) { 00373 tmp[i] = m_send[i].size(); 00374 } 00375 00376 const unsigned * const send_size = & tmp[0] ; 00377 const unsigned * const recv_size = symmetric ? & tmp[0] : NULL ; 00378 00379 return allocate_buffers( m_comm, num_msg_bounds, 00380 send_size, recv_size, local_flag ); 00381 } 00382 00383 //---------------------------------------------------------------------- 00384 00385 void CommAll::reset_buffers() 00386 { 00387 if ( m_send ) { 00388 CommBuffer * m = m_send ; 00389 CommBuffer * const me = m + m_size ; 00390 for ( ; m != me ; ++m ) { m->reset(); } 00391 } 00392 if ( m_recv && 1 < m_size ) { 00393 CommBuffer * m = m_recv ; 00394 CommBuffer * const me = m + m_size ; 00395 for ( ; m != me ; ++m ) { m->reset(); } 00396 } 00397 } 00398 00399 //---------------------------------------------------------------------- 00400 00401 void CommAll::swap_send_recv() 00402 { 00403 if ( m_recv == NULL ) { 00404 // ERROR 00405 std::string 00406 msg("stk::CommAll::swap_send_recv(){ NULL recv buffers }" ); 00407 throw std::logic_error( msg ); 00408 } 00409 00410 CommBuffer * tmp_msg = m_send ; 00411 m_send = m_recv ; 00412 m_recv = tmp_msg ; 00413 } 00414 00415 //---------------------------------------------------------------------- 00416 00417 bool CommAll::allocate_buffers( ParallelMachine comm , 00418 const unsigned num_msg_bounds , 00419 const unsigned * const send_size , 00420 const unsigned * const recv_size , 00421 const bool local_flag ) 00422 { 00423 static const char method[] = "stk::CommAll::allocate_buffers" ; 00424 const unsigned uzero = 0 ; 00425 00426 CommBuffer::deallocate( m_size , m_send ); 00427 CommBuffer::deallocate( m_size , m_recv ); 00428 00429 m_comm = comm ; 00430 m_size = parallel_machine_size( comm ); 00431 m_rank = parallel_machine_rank( comm ); 00432 m_bound = num_msg_bounds ; 00433 00434 std::ostringstream msg ; 00435 00436 //-------------------------------- 00437 // Buffer allocation 00438 00439 { 00440 const bool send_none = NULL == send_size ; 00441 00442 std::vector<unsigned> tmp_send ; 00443 00444 if ( send_none ) { tmp_send.resize( m_size , uzero ); } 00445 00446 const unsigned * const send = send_none ? & tmp_send[0] : send_size ; 00447 00448 m_send = CommBuffer::allocate( m_size , send ); 00449 00450 if ( 1 < m_size ) { 00451 00452 std::vector<unsigned> tmp_recv ; 00453 00454 const bool recv_tbd = NULL == recv_size ; 00455 00456 if ( recv_tbd ) { // Had better be globally consistent. 00457 00458 tmp_recv.resize( m_size , uzero ); 00459 00460 unsigned * const r = & tmp_recv[0] ; 00461 00462 comm_sizes( m_comm , m_bound , m_max , send , r ); 00463 } 00464 00465 const unsigned * const recv = recv_tbd ? & tmp_recv[0] : recv_size ; 00466 00467 m_recv = CommBuffer::allocate( m_size , recv ); 00468 } 00469 else { 00470 m_recv = m_send ; 00471 } 00472 } 00473 00474 bool error_alloc = m_send == NULL || m_recv == NULL ; 00475 00476 //-------------------------------- 00477 // Propogation of error flag, input flag, and quick/cheap/approximate 00478 // verification of send and receive messages. 00479 // Is the number and total size of messages consistent? 00480 // Sum message counts and sizes for grouped processors. 00481 // Sent are positive and received are negative. 00482 // Should finish with all total counts of zero. 00483 00484 enum { NPSum = 7 }; 00485 enum { Length = 2 + 2 * NPSum }; 00486 00487 int local_result[ Length ]; 00488 int global_result[ Length ]; 00489 00490 Copy<Length>( local_result , 0 ); 00491 00492 local_result[ Length - 2 ] = error_alloc ; 00493 local_result[ Length - 1 ] = local_flag ; 00494 00495 if ( ! error_alloc ) { 00496 00497 const unsigned r = 2 * ( m_rank % NPSum ); 00498 00499 for ( unsigned i = 0 ; i < m_size ; ++i ) { 00500 const unsigned n_send = m_send[i].capacity(); 00501 const unsigned n_recv = m_recv[i].capacity(); 00502 00503 const unsigned s = 2 * ( i % NPSum ); 00504 00505 local_result[s] += n_send ? 1 : 0 ; 00506 local_result[s+1] += n_send ; 00507 00508 local_result[r] -= n_recv ? 1 : 0 ; 00509 local_result[r+1] -= n_recv ; 00510 } 00511 } 00512 00513 all_reduce_sum( m_comm , local_result , global_result , Length ); 00514 00515 bool global_flag ; 00516 00517 error_alloc = global_result[ Length - 2 ] ; 00518 global_flag = global_result[ Length - 1 ] ; 00519 00520 bool ok = true ; 00521 00522 for ( unsigned i = 0 ; ok && i < 2 * NPSum ; ++i ) { 00523 ok = 0 == global_result[i] ; 00524 } 00525 00526 if ( error_alloc || ! ok ) { 00527 msg << method << " ERROR:" ; 00528 if ( error_alloc ) { msg << " Failed memory allocation ," ; } 00529 if ( ! ok ) { msg << " Parallel inconsistent send/receive ," ; } 00530 throw std::runtime_error( msg.str() ); 00531 } 00532 00533 return global_flag ; 00534 } 00535 00536 //---------------------------------------------------------------------- 00537 00538 void CommAll::communicate() 00539 { 00540 static const char method[] = "stk::CommAll::communicate" ; 00541 00542 std::ostringstream msg ; 00543 00544 // Verify the send buffers have been filled, reset the buffer pointers 00545 00546 for ( unsigned i = 0 ; i < m_size ; ++i ) { 00547 00548 if ( m_send[i].remaining() ) { 00549 msg << method << " LOCAL[" << m_rank << "] ERROR: Send[" << i 00550 << "] Buffer not filled." ; 00551 throw std::underflow_error( msg.str() ); 00552 } 00553 /* 00554 m_send[i].reset(); 00555 */ 00556 m_recv[i].reset(); 00557 } 00558 00559 if ( 1 < m_size ) { 00560 bool ok ; 00561 00562 if ( m_bound < m_max ) { 00563 ok = all_to_all_dense( m_comm , m_send , m_recv , msg ); 00564 } 00565 else { 00566 ok = all_to_all_sparse( m_comm , m_send , m_recv , msg ); 00567 } 00568 00569 if ( ! ok ) { throw std::runtime_error( msg.str() ); } 00570 } 00571 } 00572 00573 //---------------------------------------------------------------------- 00574 //---------------------------------------------------------------------- 00575 00576 CommBroadcast::CommBroadcast( ParallelMachine comm , unsigned root_rank ) 00577 : m_comm( comm ), 00578 m_size( parallel_machine_size( comm ) ), 00579 m_rank( parallel_machine_rank( comm ) ), 00580 m_root_rank( root_rank ), 00581 m_buffer() 00582 {} 00583 00584 bool CommBroadcast::allocate_buffer( const bool local_flag ) 00585 { 00586 static const char method[] = "stk::CommBroadcast::allocate_buffer" ; 00587 00588 unsigned root_rank_min = m_root_rank ; 00589 unsigned root_rank_max = m_root_rank ; 00590 unsigned root_send_size = m_root_rank == m_rank ? m_buffer.size() : 0 ; 00591 unsigned flag = local_flag ; 00592 00593 all_reduce( m_comm , ReduceMin<1>( & root_rank_min ) & 00594 ReduceMax<1>( & root_rank_max ) & 00595 ReduceMax<1>( & root_send_size ) & 00596 ReduceBitOr<1>( & flag ) ); 00597 00598 if ( root_rank_min != root_rank_max ) { 00599 std::string msg ; 00600 msg.append( method ); 00601 msg.append( " FAILED: inconsistent root processor" ); 00602 throw std::runtime_error( msg ); 00603 } 00604 00605 m_buffer.m_beg = static_cast<CommBuffer::ucharp>( malloc( root_send_size ) ); 00606 m_buffer.m_ptr = m_buffer.m_beg ; 00607 m_buffer.m_end = m_buffer.m_beg + root_send_size ; 00608 00609 return flag ; 00610 } 00611 00612 CommBroadcast::~CommBroadcast() 00613 { 00614 try { 00615 if ( m_buffer.m_beg ) { free( static_cast<void*>( m_buffer.m_beg ) ); } 00616 } catch(...) {} 00617 m_buffer.m_beg = NULL ; 00618 m_buffer.m_ptr = NULL ; 00619 m_buffer.m_end = NULL ; 00620 } 00621 00622 CommBuffer & CommBroadcast::recv_buffer() 00623 { 00624 return m_buffer ; 00625 } 00626 00627 CommBuffer & CommBroadcast::send_buffer() 00628 { 00629 static const char method[] = "stk::CommBroadcast::send_buffer" ; 00630 00631 if ( m_root_rank != m_rank ) { 00632 std::string msg ; 00633 msg.append( method ); 00634 msg.append( " FAILED: is not root processor" ); 00635 throw std::runtime_error( msg ); 00636 } 00637 00638 return m_buffer ; 00639 } 00640 00641 void CommBroadcast::communicate() 00642 { 00643 #if defined( STK_HAS_MPI ) 00644 { 00645 const int count = m_buffer.capacity(); 00646 void * const buf = m_buffer.buffer(); 00647 00648 const int result = MPI_Bcast( buf, count, MPI_BYTE, m_root_rank, m_comm); 00649 00650 if ( MPI_SUCCESS != result ) { 00651 std::ostringstream msg ; 00652 msg << "stk::CommBroadcast::communicate ERROR : " 00653 << result << " == MPI_Bcast" ; 00654 throw std::runtime_error( msg.str() ); 00655 } 00656 } 00657 #endif 00658 00659 m_buffer.reset(); 00660 } 00661 00662 //---------------------------------------------------------------------- 00663 //---------------------------------------------------------------------- 00664 00665 CommGather::~CommGather() 00666 { 00667 try { 00668 free( static_cast<void*>( m_send.m_beg ) ); 00669 00670 if ( NULL != m_recv_count ) { free( static_cast<void*>( m_recv_count ) ); } 00671 00672 if ( NULL != m_recv ) { CommBuffer::deallocate( m_size , m_recv ); } 00673 } catch(...){} 00674 } 00675 00676 void CommGather::reset() 00677 { 00678 m_send.reset(); 00679 00680 if ( NULL != m_recv ) { 00681 for ( unsigned i = 0 ; i < m_size ; ++i ) { m_recv[i].reset(); } 00682 } 00683 } 00684 00685 CommBuffer & CommGather::recv_buffer( unsigned p ) 00686 { 00687 static CommBuffer empty ; 00688 00689 return m_size <= p ? empty : ( 00690 m_size <= 1 ? m_send : m_recv[p] ); 00691 } 00692 00693 //---------------------------------------------------------------------- 00694 00695 CommGather::CommGather( ParallelMachine comm , 00696 unsigned root_rank , unsigned send_size ) 00697 : m_comm( comm ), 00698 m_size( parallel_machine_size( comm ) ), 00699 m_rank( parallel_machine_rank( comm ) ), 00700 m_root_rank( root_rank ), 00701 m_send(), 00702 m_recv(NULL), 00703 m_recv_count(NULL), 00704 m_recv_displ(NULL) 00705 { 00706 m_send.m_beg = static_cast<CommBuffer::ucharp>( malloc( send_size ) ); 00707 m_send.m_ptr = m_send.m_beg ; 00708 m_send.m_end = m_send.m_beg + send_size ; 00709 00710 #if defined( STK_HAS_MPI ) 00711 00712 if ( 1 < m_size ) { 00713 00714 const bool is_root = m_rank == m_root_rank ; 00715 00716 if ( is_root ) { 00717 m_recv_count = static_cast<int*>( malloc(2*m_size*sizeof(int)) ); 00718 m_recv_displ = m_recv_count + m_size ; 00719 } 00720 00721 MPI_Gather( & send_size , 1 , MPI_INT , 00722 m_recv_count , 1 , MPI_INT , 00723 m_root_rank , m_comm ); 00724 00725 if ( is_root ) { 00726 m_recv = CommBuffer::allocate( m_size , 00727 reinterpret_cast<unsigned*>( m_recv_count ) ); 00728 00729 for ( unsigned i = 0 ; i < m_size ; ++i ) { 00730 m_recv_displ[i] = m_recv[i].m_beg - m_recv[0].m_beg ; 00731 } 00732 } 00733 } 00734 00735 #endif 00736 00737 } 00738 00739 00740 void CommGather::communicate() 00741 { 00742 #if defined( STK_HAS_MPI ) 00743 00744 if ( 1 < m_size ) { 00745 00746 const int send_count = m_send.capacity(); 00747 00748 void * const send_buf = m_send.buffer(); 00749 void * const recv_buf = m_rank == m_root_rank ? m_recv->buffer() : NULL ; 00750 00751 MPI_Gatherv( send_buf , send_count , MPI_BYTE , 00752 recv_buf , m_recv_count , m_recv_displ , MPI_BYTE , 00753 m_root_rank , m_comm ); 00754 } 00755 00756 #endif 00757 00758 reset(); 00759 } 00760 00761 //---------------------------------------------------------------------- 00762 //---------------------------------------------------------------------- 00763 00764 #if defined( STK_HAS_MPI ) 00765 00766 bool comm_dense_sizes( ParallelMachine comm , 00767 const unsigned * const send_size , 00768 unsigned * const recv_size , 00769 bool local_flag ) 00770 { 00771 static const char method[] = "stk::comm_dense_sizes" ; 00772 00773 const unsigned zero = 0 ; 00774 const unsigned p_size = parallel_machine_size( comm ); 00775 00776 std::vector<unsigned> send_buf( p_size * 2 , zero ); 00777 std::vector<unsigned> recv_buf( p_size * 2 , zero ); 00778 00779 for ( unsigned i = 0 ; i < p_size ; ++i ) { 00780 const unsigned i2 = i * 2 ; 00781 send_buf[i2] = send_size[i] ; 00782 send_buf[i2+1] = local_flag ; 00783 } 00784 00785 { 00786 unsigned * const ps = & send_buf[0] ; 00787 unsigned * const pr = & recv_buf[0] ; 00788 const int result = 00789 MPI_Alltoall( ps , 2 , MPI_UNSIGNED , pr , 2 , MPI_UNSIGNED , comm ); 00790 00791 if ( MPI_SUCCESS != result ) { 00792 std::string msg ; 00793 msg.append( method ); 00794 msg.append( " FAILED: MPI_SUCCESS != MPI_Alltoall" ); 00795 throw std::runtime_error( msg ); 00796 } 00797 } 00798 00799 bool global_flag = false ; 00800 00801 for ( unsigned i = 0 ; i < p_size ; ++i ) { 00802 const unsigned i2 = i * 2 ; 00803 recv_size[i] = recv_buf[i2] ; 00804 if ( recv_buf[i2+1] ) { global_flag = true ; } 00805 } 00806 00807 return global_flag ; 00808 } 00809 00810 //---------------------------------------------------------------------- 00811 00812 namespace { 00813 00814 extern "C" { 00815 00816 void sum_np_max_2_op( 00817 void * inv , void * outv , int * len , ParallelDatatype * ) 00818 { 00819 const int np = *len - 2 ; 00820 unsigned * ind = (unsigned *) inv ; 00821 unsigned * outd = (unsigned *) outv ; 00822 00823 // Sum all but the last two 00824 // the last two are maximum 00825 00826 for ( int i = 0 ; i < np ; ++i ) { 00827 *outd += *ind ; 00828 ++outd ; 00829 ++ind ; 00830 } 00831 if ( outd[0] < ind[0] ) { outd[0] = ind[0] ; } 00832 if ( outd[1] < ind[1] ) { outd[1] = ind[1] ; } 00833 } 00834 00835 } 00836 00837 } 00838 00839 bool comm_sizes( ParallelMachine comm , 00840 const unsigned num_msg_bound , 00841 unsigned & num_msg_maximum , 00842 const unsigned * const send_size , 00843 unsigned * const recv_size , 00844 bool local_flag ) 00845 { 00846 static const char method[] = "stk::comm_unknown_sizes" ; 00847 const unsigned uzero = 0 ; 00848 00849 static MPI_Op mpi_op = MPI_OP_NULL ; 00850 00851 if ( mpi_op == MPI_OP_NULL ) { 00852 // Is fully commutative 00853 MPI_Op_create( sum_np_max_2_op , 1 , & mpi_op ); 00854 } 00855 00856 const unsigned p_size = parallel_machine_size( comm ); 00857 const unsigned p_rank = parallel_machine_rank( comm ); 00858 00859 int result ; 00860 00861 std::ostringstream msg ; 00862 00863 num_msg_maximum = 0 ; 00864 00865 unsigned num_recv = 0 ; 00866 unsigned max_msg = 0 ; 00867 bool global_flag = false ; 00868 00869 { 00870 std::vector<unsigned> send_buf( p_size + 2 , uzero ); 00871 std::vector<unsigned> recv_buf( p_size + 2 , uzero ); 00872 00873 unsigned * const p_send = & send_buf[0] ; 00874 unsigned * const p_recv = & recv_buf[0] ; 00875 00876 for ( unsigned i = 0 ; i < p_size ; ++i ) { 00877 recv_size[i] = 0 ; // Zero output 00878 if ( send_size[i] ) { 00879 send_buf[i] = 1 ; 00880 ++max_msg ; 00881 } 00882 } 00883 send_buf[p_size] = max_msg ; 00884 send_buf[p_size+1] = local_flag ; 00885 00886 result = MPI_Allreduce(p_send,p_recv,p_size+2,MPI_UNSIGNED,mpi_op,comm); 00887 00888 if ( result != MPI_SUCCESS ) { 00889 // PARALLEL ERROR 00890 msg << method << " ERROR: " << result << " == MPI_AllReduce" ; 00891 throw std::runtime_error( msg.str() ); 00892 } 00893 00894 num_recv = recv_buf[ p_rank ] ; 00895 max_msg = recv_buf[ p_size ] ; 00896 global_flag = recv_buf[ p_size + 1 ] ; 00897 00898 // max_msg is now the maximum send count, 00899 // Loop over receive counts to determine 00900 // if a receive count is larger. 00901 00902 for ( unsigned i = 0 ; i < p_size ; ++i ) { 00903 if ( max_msg < recv_buf[i] ) { max_msg = recv_buf[i] ; } 00904 } 00905 } 00906 00907 num_msg_maximum = max_msg ; 00908 00909 if ( num_msg_bound < max_msg ) { 00910 // Dense, pay for an all-to-all 00911 00912 result = 00913 MPI_Alltoall( (void*) send_size , 1 , MPI_UNSIGNED , 00914 recv_size , 1 , MPI_UNSIGNED , comm ); 00915 00916 if ( MPI_SUCCESS != result ) { 00917 // LOCAL ERROR ? 00918 msg << method << " ERROR: " << result << " == MPI_Alltoall" ; 00919 throw std::runtime_error( msg.str() ); 00920 } 00921 } 00922 else if ( max_msg ) { 00923 // Sparse, just do point-to-point 00924 00925 const int mpi_tag = STK_MPI_TAG_SIZING ; 00926 00927 MPI_Request request_null = MPI_REQUEST_NULL ; 00928 std::vector<MPI_Request> request( num_recv , request_null ); 00929 std::vector<MPI_Status> status( num_recv ); 00930 std::vector<unsigned> buf( num_recv ); 00931 00932 // Post receives for point-to-point message sizes 00933 00934 for ( unsigned i = 0 ; i < num_recv ; ++i ) { 00935 unsigned * const p_buf = & buf[i] ; 00936 MPI_Request * const p_request = & request[i] ; 00937 result = MPI_Irecv( p_buf , 1 , MPI_UNSIGNED , 00938 MPI_ANY_SOURCE , mpi_tag , comm , p_request ); 00939 if ( MPI_SUCCESS != result ) { 00940 // LOCAL ERROR 00941 msg << method << " ERROR: " << result << " == MPI_Irecv" ; 00942 throw std::runtime_error( msg.str() ); 00943 } 00944 } 00945 00946 // Send the point-to-point message sizes, 00947 // rotate the sends in an attempt to balance the message traffic. 00948 00949 for ( unsigned i = 0 ; i < p_size ; ++i ) { 00950 int dst = ( i + p_rank ) % p_size ; 00951 unsigned value = send_size[dst] ; 00952 if ( value ) { 00953 result = MPI_Send( & value , 1 , MPI_UNSIGNED , dst , mpi_tag , comm ); 00954 if ( MPI_SUCCESS != result ) { 00955 // LOCAL ERROR 00956 msg << method << " ERROR: " << result << " == MPI_Send" ; 00957 throw std::runtime_error( msg.str() ); 00958 } 00959 } 00960 } 00961 00962 // Wait for all receives 00963 00964 { 00965 MPI_Request * const p_request = & request[0] ; 00966 MPI_Status * const p_status = & status[0] ; 00967 result = MPI_Waitall( num_recv , p_request , p_status ); 00968 } 00969 if ( MPI_SUCCESS != result ) { 00970 // LOCAL ERROR ? 00971 msg << method << " ERROR: " << result << " == MPI_Waitall" ; 00972 throw std::runtime_error( msg.str() ); 00973 } 00974 00975 // Set the receive message sizes 00976 00977 for ( unsigned i = 0 ; i < num_recv ; ++i ) { 00978 MPI_Status * const recv_status = & status[i] ; 00979 const int recv_proc = recv_status->MPI_SOURCE ; 00980 const int recv_tag = recv_status->MPI_TAG ; 00981 int recv_count = 0 ; 00982 00983 MPI_Get_count( recv_status , MPI_UNSIGNED , & recv_count ); 00984 00985 if ( recv_tag != mpi_tag || recv_count != 1 ) { 00986 msg << method << " ERROR: Received buffer mismatch " ; 00987 msg << "P" << p_rank << " <- P" << recv_proc ; 00988 msg << " " << 1 << " != " << recv_count ; 00989 throw std::runtime_error( msg.str() ); 00990 } 00991 00992 const unsigned r_size = buf[i] ; 00993 recv_size[ recv_proc ] = r_size ; 00994 } 00995 } 00996 00997 return global_flag ; 00998 } 00999 01000 //---------------------------------------------------------------------- 01001 //---------------------------------------------------------------------- 01002 01003 #else 01004 01005 01006 bool comm_sizes( ParallelMachine , 01007 const unsigned , 01008 unsigned & num_msg_maximum , 01009 const unsigned * const send_size , 01010 unsigned * const recv_size , 01011 bool local_flag ) 01012 { 01013 num_msg_maximum = send_size[0] ? 1 : 0 ; 01014 01015 recv_size[0] = send_size[0] ; 01016 01017 return local_flag ; 01018 } 01019 01020 bool comm_dense_sizes( ParallelMachine , 01021 const unsigned * const send_size , 01022 unsigned * const recv_size , 01023 bool local_flag ) 01024 { 01025 recv_size[0] = send_size[0] ; 01026 01027 return local_flag ; 01028 } 01029 01030 //---------------------------------------------------------------------- 01031 01032 #endif 01033 01034 } 01035