Added boost header
This commit is contained in:
177
test/external/boost/graph/distributed/detail/queue.ipp
vendored
Normal file
177
test/external/boost/graph/distributed/detail/queue.ipp
vendored
Normal file
@@ -0,0 +1,177 @@
|
||||
// Copyright (C) 2004-2006 The Trustees of Indiana University.
|
||||
|
||||
// Use, modification and distribution is subject to the Boost Software
|
||||
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
|
||||
// http://www.boost.org/LICENSE_1_0.txt)
|
||||
|
||||
// Authors: Douglas Gregor
|
||||
// Andrew Lumsdaine
|
||||
#include <boost/optional.hpp>
|
||||
#include <cassert>
|
||||
#include <boost/graph/parallel/algorithm.hpp>
|
||||
#include <boost/graph/parallel/process_group.hpp>
|
||||
#include <functional>
|
||||
#include <algorithm>
|
||||
#include <boost/graph/parallel/simple_trigger.hpp>
|
||||
|
||||
#ifndef BOOST_GRAPH_USE_MPI
|
||||
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
|
||||
#endif
|
||||
|
||||
namespace boost { namespace graph { namespace distributed {
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
BOOST_DISTRIBUTED_QUEUE_TYPE::
|
||||
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
|
||||
const Buffer& buffer, bool polling)
|
||||
: process_group(process_group, attach_distributed_object()),
|
||||
owner(owner),
|
||||
buffer(buffer),
|
||||
polling(polling)
|
||||
{
|
||||
if (!polling)
|
||||
outgoing_buffers.reset(
|
||||
new outgoing_buffers_t(num_processes(process_group)));
|
||||
|
||||
setup_triggers();
|
||||
}
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
BOOST_DISTRIBUTED_QUEUE_TYPE::
|
||||
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
|
||||
const Buffer& buffer, const UnaryPredicate& pred,
|
||||
bool polling)
|
||||
: process_group(process_group, attach_distributed_object()),
|
||||
owner(owner),
|
||||
buffer(buffer),
|
||||
pred(pred),
|
||||
polling(polling)
|
||||
{
|
||||
if (!polling)
|
||||
outgoing_buffers.reset(
|
||||
new outgoing_buffers_t(num_processes(process_group)));
|
||||
|
||||
setup_triggers();
|
||||
}
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
BOOST_DISTRIBUTED_QUEUE_TYPE::
|
||||
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
|
||||
const UnaryPredicate& pred, bool polling)
|
||||
: process_group(process_group, attach_distributed_object()),
|
||||
owner(owner),
|
||||
pred(pred),
|
||||
polling(polling)
|
||||
{
|
||||
if (!polling)
|
||||
outgoing_buffers.reset(
|
||||
new outgoing_buffers_t(num_processes(process_group)));
|
||||
|
||||
setup_triggers();
|
||||
}
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
void
|
||||
BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
|
||||
{
|
||||
typename ProcessGroup::process_id_type dest = get(owner, x);
|
||||
if (outgoing_buffers)
|
||||
outgoing_buffers->at(dest).push_back(x);
|
||||
else if (dest == process_id(process_group))
|
||||
buffer.push(x);
|
||||
else
|
||||
send(process_group, get(owner, x), msg_push, x);
|
||||
}
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
bool
|
||||
BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
|
||||
{
|
||||
/* Processes will stay here until the buffer is nonempty or
|
||||
synchronization with the other processes indicates that all local
|
||||
buffers are empty (and no messages are in transit).
|
||||
*/
|
||||
while (buffer.empty() && !do_synchronize()) ;
|
||||
|
||||
return buffer.empty();
|
||||
}
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
|
||||
BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
|
||||
{
|
||||
empty();
|
||||
return buffer.size();
|
||||
}
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
|
||||
{
|
||||
using boost::graph::parallel::simple_trigger;
|
||||
|
||||
simple_trigger(process_group, msg_push, this,
|
||||
&distributed_queue::handle_push);
|
||||
simple_trigger(process_group, msg_multipush, this,
|
||||
&distributed_queue::handle_multipush);
|
||||
}
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
void
|
||||
BOOST_DISTRIBUTED_QUEUE_TYPE::
|
||||
handle_push(int /*source*/, int /*tag*/, const value_type& value,
|
||||
trigger_receive_context)
|
||||
{
|
||||
if (pred(value)) buffer.push(value);
|
||||
}
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
void
|
||||
BOOST_DISTRIBUTED_QUEUE_TYPE::
|
||||
handle_multipush(int /*source*/, int /*tag*/,
|
||||
const std::vector<value_type>& values,
|
||||
trigger_receive_context)
|
||||
{
|
||||
for (std::size_t i = 0; i < values.size(); ++i)
|
||||
if (pred(values[i])) buffer.push(values[i]);
|
||||
}
|
||||
|
||||
template<BOOST_DISTRIBUTED_QUEUE_PARMS>
|
||||
bool
|
||||
BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
|
||||
{
|
||||
#ifdef PBGL_ACCOUNTING
|
||||
++num_synchronizations;
|
||||
#endif
|
||||
|
||||
using boost::parallel::all_reduce;
|
||||
using std::swap;
|
||||
|
||||
typedef typename ProcessGroup::process_id_type process_id_type;
|
||||
|
||||
if (outgoing_buffers) {
|
||||
// Transfer all of the push requests
|
||||
process_id_type id = process_id(process_group);
|
||||
process_id_type np = num_processes(process_group);
|
||||
for (process_id_type dest = 0; dest < np; ++dest) {
|
||||
outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
|
||||
std::size_t size = outgoing.size();
|
||||
if (size != 0) {
|
||||
if (dest != id) {
|
||||
send(process_group, dest, msg_multipush, outgoing);
|
||||
} else {
|
||||
for (std::size_t i = 0; i < size; ++i)
|
||||
buffer.push(outgoing[i]);
|
||||
}
|
||||
outgoing.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
synchronize(process_group);
|
||||
|
||||
unsigned local_size = buffer.size();
|
||||
unsigned global_size =
|
||||
all_reduce(process_group, local_size, std::plus<unsigned>());
|
||||
return global_size == 0;
|
||||
}
|
||||
|
||||
} } } // end namespace boost::graph::distributed
|
||||
Reference in New Issue
Block a user