GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
buffered_exchange.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_BUFFERED_EXCHANGE_HPP
25 #define GRAPHLAB_BUFFERED_EXCHANGE_HPP
26 
27 #include <graphlab/parallel/pthread_tools.hpp>
28 #include <graphlab/rpc/dc.hpp>
29 #include <graphlab/rpc/dc_dist_object.hpp>
30 #include <graphlab/util/mpi_tools.hpp>
31 
32 
33 #include <graphlab/macros_def.hpp>
34 namespace graphlab {
35 
36  /**
37  * \ingroup rpc
38  * \internal
39  */
40  template<typename T>
41  class buffered_exchange {
42  public:
43  typedef std::vector<T> buffer_type;
44 
45  private:
46  struct buffer_record {
47  procid_t proc;
48  buffer_type buffer;
49  buffer_record() : proc(-1) { }
50  }; // end of buffer record
51 
52 
53 
54  /** The rpc interface for this class */
55  mutable dc_dist_object<buffered_exchange> rpc;
56 
57  std::deque< buffer_record > recv_buffers;
58  mutex recv_lock;
59 
60 
61  struct send_record {
62  oarchive* oarc;
63  size_t numinserts;
64  };
65 
66  std::vector<send_record> send_buffers;
67  std::vector< mutex > send_locks;
68  const size_t num_threads;
69  const size_t max_buffer_size;
70 
71 
72  // typedef boost::function<void (const T& tref)> handler_type;
73  // handler_type recv_handler;
74 
75  public:
76  buffered_exchange(distributed_control& dc,
77  const size_t num_threads = 1,
78  const size_t max_buffer_size = 1024 * 1024 /* 1MB */) :
79  rpc(dc, this),
80  send_buffers(num_threads * dc.numprocs()),
81  send_locks(num_threads * dc.numprocs()),
82  num_threads(num_threads),
83  max_buffer_size(max_buffer_size) {
84  //
85  for (size_t i = 0;i < send_buffers.size(); ++i) {
86  // initialize the split call
87  send_buffers[i].oarc = rpc.split_call_begin(&buffered_exchange::rpc_recv);
88  send_buffers[i].numinserts = 0;
89  // begin by writing the src proc.
90  (*(send_buffers[i].oarc)) << rpc.procid();
91  }
92  rpc.barrier();
93  }
94 
95 
96  ~buffered_exchange() {
97  // clear the send buffers
98  for (size_t i = 0;i < send_buffers.size(); ++i) {
99  rpc.split_call_cancel(send_buffers[i].oarc);
100  }
101  }
102  // buffered_exchange(distributed_control& dc, handler_type recv_handler,
103  // size_t buffer_size = 1000) :
104  // rpc(dc, this), send_buffers(dc.numprocs()), send_locks(dc.numprocs()),
105  // max_buffer_size(buffer_size), recv_handler(recv_handler) { rpc.barrier(); }
106 
107 
108  void send(const procid_t proc, const T& value, const size_t thread_id = 0) {
109  ASSERT_LT(proc, rpc.numprocs());
110  ASSERT_LT(thread_id, num_threads);
111  const size_t index = thread_id * rpc.numprocs() + proc;
112  ASSERT_LT(index, send_locks.size());
113  send_locks[index].lock();
114 
115  (*(send_buffers[index].oarc)) << value;
116  ++send_buffers[index].numinserts;
117 
118  if(send_buffers[index].oarc->off >= max_buffer_size) {
119  oarchive* prevarc = swap_buffer(index);
120  send_locks[index].unlock();
121  // complete the send
122  rpc.split_call_end(proc, prevarc);
123  } else {
124  send_locks[index].unlock();
125  }
126  } // end of send
127 
128 
129  void partial_flush(size_t thread_id) {
130  for(procid_t proc = 0; proc < rpc.numprocs(); ++proc) {
131  const size_t index = thread_id * rpc.numprocs() + proc;
132  ASSERT_LT(proc, rpc.numprocs());
133  if (send_buffers[index].numinserts > 0) {
134  send_locks[index].lock();
135  oarchive* prevarc = swap_buffer(index);
136  send_locks[index].unlock();
137  // complete the send
138  rpc.split_call_end(proc, prevarc);
139  }
140  }
141  }
142 
143  void flush() {
144  for(size_t i = 0; i < send_buffers.size(); ++i) {
145  const procid_t proc = i % rpc.numprocs();
146  ASSERT_LT(proc, rpc.numprocs());
147  send_locks[i].lock();
148  if (send_buffers[i].numinserts > 0) {
149  oarchive* prevarc = swap_buffer(i);
150  // complete the send
151  rpc.split_call_end(proc, prevarc);
152  }
153  send_locks[i].unlock();
154  }
155  rpc.full_barrier();
156  } // end of flush
157 
158 
159  bool recv(procid_t& ret_proc, buffer_type& ret_buffer,
160  const bool try_lock = false) {
161  dc_impl::blob read_buffer;
162  bool has_lock = false;
163  if(try_lock) {
164  if (recv_buffers.empty()) return false;
165  has_lock = recv_lock.try_lock();
166  } else {
167  recv_lock.lock();
168  has_lock = true;
169  }
170  bool success = false;
171  if(has_lock) {
172  if(!recv_buffers.empty()) {
173  success = true;
174  buffer_record& rec = recv_buffers.front();
175  // read the record
176  ret_proc = rec.proc;
177  ret_buffer.swap(rec.buffer);
178  ASSERT_LT(ret_proc, rpc.numprocs());
179  recv_buffers.pop_front();
180  }
181  recv_lock.unlock();
182  }
183 
184  return success;
185  } // end of recv
186 
187 
188 
189  /**
190  * Returns the number of elements to recv
191  */
192  size_t size() const {
193  typedef typename std::deque< buffer_record >::const_iterator iterator;
194  recv_lock.lock();
195  size_t count = 0;
196  foreach(const buffer_record& rec, recv_buffers) {
197  count += rec.buffer.size();
198  }
199  recv_lock.unlock();
200  return count;
201  } // end of size
202 
203  bool empty() const { return recv_buffers.empty(); }
204 
205  void clear() { }
206  private:
207  void rpc_recv(size_t len, wild_pointer w) {
208  buffer_type tmp;
209  iarchive iarc(reinterpret_cast<const char*>(w.ptr), len);
210  // first desrialize the source process
211  procid_t src_proc; iarc >> src_proc;
212  ASSERT_LT(src_proc, rpc.numprocs());
213  // create an iarchive which just points to the last size_t bytes
214  // to get the number of elements
215  iarchive numel_iarc(reinterpret_cast<const char*>(w.ptr) + len - sizeof(size_t),
216  sizeof(size_t));
217  size_t numel; numel_iarc >> numel;
218  //std::cout << "Receiving: " << numel << "\n";
219  tmp.resize(numel);
220  for (size_t i = 0;i < numel; ++i) {
221  iarc >> tmp[i];
222  }
223 
224  recv_lock.lock();
225  recv_buffers.push_back(buffer_record());
226  buffer_record& rec = recv_buffers.back();
227  rec.proc = src_proc;
228  rec.buffer.swap(tmp);
229  recv_lock.unlock();
230  } // end of rpc rcv
231 
232 
233  // create a new buffer for send_buffer[index], returning the old buffer
234  oarchive* swap_buffer(size_t index) {
235  oarchive* swaparc = rpc.split_call_begin(&buffered_exchange::rpc_recv);
236  swaparc->expand_buf(max_buffer_size * 1.2);
237  std::swap(send_buffers[index].oarc, swaparc);
238  // write the length at the end of the buffere are returning
239  (*swaparc) << (size_t)(send_buffers[index].numinserts);
240 
241  //std::cout << "Sending : " << (send_buffers[index].numinserts)<< "\n";
242  // reset the insertion count
243  send_buffers[index].numinserts = 0;
244  // write the current procid into the new buffer
245  (*(send_buffers[index].oarc)) << rpc.procid();
246  return swaparc;
247  }
248 
249 
250  }; // end of buffered exchange
251 
252 
253 }; // end of graphlab namespace
254 #include <graphlab/macros_undef.hpp>
255 
256 #endif
257 
258