24 #ifndef GRAPHLAB_BUFFERED_EXCHANGE_HPP
25 #define GRAPHLAB_BUFFERED_EXCHANGE_HPP
27 #include <graphlab/parallel/pthread_tools.hpp>
28 #include <graphlab/rpc/dc.hpp>
29 #include <graphlab/rpc/dc_dist_object.hpp>
30 #include <graphlab/util/mpi_tools.hpp>
33 #include <graphlab/macros_def.hpp>
41 class buffered_exchange {
43 typedef std::vector<T> buffer_type;
46 struct buffer_record {
49 buffer_record() : proc(-1) { }
55 mutable dc_dist_object<buffered_exchange> rpc;
57 std::deque< buffer_record > recv_buffers;
66 std::vector<send_record> send_buffers;
67 std::vector< mutex > send_locks;
68 const size_t num_threads;
69 const size_t max_buffer_size;
76 buffered_exchange(distributed_control& dc,
77 const size_t num_threads = 1,
78 const size_t max_buffer_size = 1024 * 1024 ) :
80 send_buffers(num_threads * dc.numprocs()),
81 send_locks(num_threads * dc.numprocs()),
82 num_threads(num_threads),
83 max_buffer_size(max_buffer_size) {
85 for (
size_t i = 0;i < send_buffers.size(); ++i) {
87 send_buffers[i].oarc = rpc.split_call_begin(&buffered_exchange::rpc_recv);
88 send_buffers[i].numinserts = 0;
90 (*(send_buffers[i].oarc)) << rpc.procid();
96 ~buffered_exchange() {
98 for (
size_t i = 0;i < send_buffers.size(); ++i) {
99 rpc.split_call_cancel(send_buffers[i].oarc);
108 void send(
const procid_t proc,
const T& value,
const size_t thread_id = 0) {
109 ASSERT_LT(proc, rpc.numprocs());
110 ASSERT_LT(thread_id, num_threads);
111 const size_t index = thread_id * rpc.numprocs() + proc;
112 ASSERT_LT(index, send_locks.size());
113 send_locks[index].lock();
115 (*(send_buffers[index].oarc)) << value;
116 ++send_buffers[index].numinserts;
118 if(send_buffers[index].oarc->off >= max_buffer_size) {
119 oarchive* prevarc = swap_buffer(index);
120 send_locks[index].unlock();
122 rpc.split_call_end(proc, prevarc);
124 send_locks[index].unlock();
129 void partial_flush(
size_t thread_id) {
130 for(
procid_t proc = 0; proc < rpc.numprocs(); ++proc) {
131 const size_t index = thread_id * rpc.numprocs() + proc;
132 ASSERT_LT(proc, rpc.numprocs());
133 if (send_buffers[index].numinserts > 0) {
134 send_locks[index].lock();
135 oarchive* prevarc = swap_buffer(index);
136 send_locks[index].unlock();
138 rpc.split_call_end(proc, prevarc);
144 for(
size_t i = 0; i < send_buffers.size(); ++i) {
145 const procid_t proc = i % rpc.numprocs();
146 ASSERT_LT(proc, rpc.numprocs());
147 send_locks[i].lock();
148 if (send_buffers[i].numinserts > 0) {
149 oarchive* prevarc = swap_buffer(i);
151 rpc.split_call_end(proc, prevarc);
153 send_locks[i].unlock();
159 bool recv(
procid_t& ret_proc, buffer_type& ret_buffer,
160 const bool try_lock =
false) {
161 dc_impl::blob read_buffer;
162 bool has_lock =
false;
164 if (recv_buffers.empty())
return false;
165 has_lock = recv_lock.try_lock();
170 bool success =
false;
172 if(!recv_buffers.empty()) {
174 buffer_record& rec = recv_buffers.front();
177 ret_buffer.swap(rec.buffer);
178 ASSERT_LT(ret_proc, rpc.numprocs());
179 recv_buffers.pop_front();
192 size_t size()
const {
193 typedef typename std::deque< buffer_record >::const_iterator iterator;
196 foreach(
const buffer_record& rec, recv_buffers) {
197 count += rec.buffer.size();
203 bool empty()
const {
return recv_buffers.empty(); }
207 void rpc_recv(
size_t len, wild_pointer w) {
209 iarchive iarc(reinterpret_cast<const char*>(w.ptr), len);
211 procid_t src_proc; iarc >> src_proc;
212 ASSERT_LT(src_proc, rpc.numprocs());
215 iarchive numel_iarc(reinterpret_cast<const char*>(w.ptr) + len -
sizeof(
size_t),
217 size_t numel; numel_iarc >> numel;
220 for (
size_t i = 0;i < numel; ++i) {
225 recv_buffers.push_back(buffer_record());
226 buffer_record& rec = recv_buffers.back();
228 rec.buffer.swap(tmp);
234 oarchive* swap_buffer(
size_t index) {
235 oarchive* swaparc = rpc.split_call_begin(&buffered_exchange::rpc_recv);
236 swaparc->expand_buf(max_buffer_size * 1.2);
237 std::swap(send_buffers[index].oarc, swaparc);
239 (*swaparc) << (size_t)(send_buffers[index].numinserts);
243 send_buffers[index].numinserts = 0;
245 (*(send_buffers[index].oarc)) << rpc.procid();
254 #include <graphlab/macros_undef.hpp>