GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
distributed_chandy_misra.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_DISTRIBUTED_CHANDY_MISRA_HPP
25 #define GRAPHLAB_DISTRIBUTED_CHANDY_MISRA_HPP
26 #include <vector>
27 #include <graphlab/rpc/dc_dist_object.hpp>
28 #include <graphlab/rpc/distributed_event_log.hpp>
29 #include <graphlab/engine/chandy_misra_interface.hpp>
30 #include <graphlab/logger/assertions.hpp>
31 #include <graphlab/parallel/pthread_tools.hpp>
32 #include <graphlab/graph/graph_basic_types.hpp>
33 #include <graphlab/macros_def.hpp>
34 namespace graphlab {
35 
36 /**
37  *
38  * \internal
39  */
40 template <typename GraphType>
41 class distributed_chandy_misra : public chandy_misra_interface<GraphType> {
42  public:
43  typedef typename GraphType::local_vertex_type local_vertex_type;
44  typedef typename GraphType::local_edge_type local_edge_type;
45 
46  typedef typename GraphType::vertex_id_type vertex_id_type;
47  typedef typename GraphType::lvid_type lvid_type;
48 
49  typedef distributed_chandy_misra<GraphType> dcm_type;
50  dc_dist_object<dcm_type> rmi;
51  GraphType& graph;
52 
53  boost::function<void(lvid_type)> callback;
54  boost::function<void(lvid_type)> hors_doeuvre_callback;
55  /*
56  * Each "fork" is one character.
57  * bit 0: owner. if 0 is src. if 1 is target
58  * bit 1: clean = 0, dirty = 1
59  * bit 2: owner 0 request
60  * bit 3: owner 1 request
61  */
62  std::vector<unsigned char> forkset;
63  enum { OWNER_BIT = 1,
64  DIRTY_BIT = 2,
65  REQUEST_0 = 4,
66  REQUEST_1 = 8 };
67  enum {OWNER_SOURCE = 0, OWNER_TARGET = 1};
68  inline unsigned char request_bit(bool owner) {
69  return owner ? REQUEST_1 : REQUEST_0;
70  }
71 
72 
73  enum {
74  COLLISIONS = 0,
75  CANCELLATIONS = 1,
76  ACCEPTED_CANCELLATIONS = 2
77  };
78 
79  struct philosopher {
80  vertex_id_type num_edges;
81  vertex_id_type forks_acquired;
82  simple_spinlock lock;
83  unsigned char state;
84  unsigned char counter;
85  bool cancellation_sent;
86  bool lockid;
87  };
88  std::vector<philosopher> philosopherset;
89  atomic<size_t> clean_fork_count;
90 
91  /*
92  * Possible values for the philosopher state
93  */
94  enum {
95  THINKING = 0,
96  HUNGRY = 1,
97  HORS_DOEUVRE = 2,
98  EATING = 3
99  };
100 
101  /** Places a request for the fork. Requires fork to be locked */
102  inline void request_for_fork(size_t forkid, bool nextowner) {
103  __sync_fetch_and_or(&forkset[forkid], request_bit(nextowner));
104  }
105 
106  inline bool fork_owner(size_t forkid) {
107  return forkset[forkid] & OWNER_BIT;
108  }
109 
110  inline bool fork_dirty(size_t forkid) {
111  return !!(forkset[forkid] & DIRTY_BIT);
112  }
113 
114  inline void dirty_fork(size_t forkid) {
115  if ((forkset[forkid] & DIRTY_BIT) == 0) clean_fork_count.dec();
116  __sync_fetch_and_or(&forkset[forkid], DIRTY_BIT);
117  }
118 
119 
120  void compute_initial_fork_arrangement() {
121  for (lvid_type i = 0;i < graph.num_local_vertices(); ++i) {
122  local_vertex_type lvertex(graph.l_vertex(i));
123  philosopherset[i].num_edges = lvertex.num_in_edges() +
124  lvertex.num_out_edges();
125  philosopherset[i].state = THINKING;
126  philosopherset[i].forks_acquired = 0;
127  philosopherset[i].counter = 0;
128  philosopherset[i].cancellation_sent = false;
129  philosopherset[i].lockid = false;
130  }
131  for (lvid_type i = 0;i < graph.num_local_vertices(); ++i) {
132  local_vertex_type lvertex(graph.l_vertex(i));
133  foreach(local_edge_type edge, lvertex.in_edges()) {
134  if (edge.source().global_id() > edge.target().global_id()) {
135  forkset[edge.id()] = DIRTY_BIT | OWNER_TARGET;
136  philosopherset[edge.target().id()].forks_acquired++;
137  }
138  else {
139  forkset[edge.id()] = DIRTY_BIT | OWNER_SOURCE;
140  philosopherset[edge.source().id()].forks_acquired++;
141  }
142  }
143  }
144  }
145 
146  /**
147  * We already have v1, we want to acquire v2.
148  * When this function returns, both v1 and v2 locks are acquired
149  */
150  void try_acquire_edge_with_backoff(lvid_type v1,
151  lvid_type v2) {
152  if (v1 < v2) {
153  philosopherset[v2].lock.lock();
154  }
155  else if (!philosopherset[v2].lock.try_lock()) {
156  philosopherset[v1].lock.unlock();
157  philosopherset[v2].lock.lock();
158  philosopherset[v1].lock.lock();
159  }
160  }
161 
162 /****************************************************************************
163  * Tries to move a requested fork
164  *
165  * Pseudocode:
166  * If current owner is hungry and fork is clean
167  * Ignore
168  * ElseIf current owner is Thinking
169  * Relinquish fork immediately and clear the request flag
170  * ElseIf current owner is hors_doeuvre and fork is clean
171  * Ignore
172  * ElseIf current owner is hors_doeuvre and fork is dirty
173  * Send cancellation message
174  * Set cancelsent
175  * End
176  * Return true if changes were made
177  ***************************************************************************/
178  inline bool advance_fork_state_on_lock(size_t forkid,
179  lvid_type source,
180  lvid_type target) {
181  unsigned char currentowner = forkset[forkid] & OWNER_BIT;
182  if (currentowner == OWNER_SOURCE) {
183  // if the current owner is not eating, and the
184  // fork is dirty and other side has placed a request
185  if (philosopherset[source].state != EATING &&
186  (forkset[forkid] & DIRTY_BIT) &&
187  (forkset[forkid] & REQUEST_1)) {
188 
189  if (philosopherset[source].state != HORS_DOEUVRE) {
190  // change the owner and clean the fork)
191  forkset[forkid] = OWNER_TARGET;
192  clean_fork_count.inc();
193  if (philosopherset[source].state == HUNGRY) {
194  forkset[forkid] |= REQUEST_0;
195  }
196  philosopherset[source].forks_acquired--;
197  philosopherset[target].forks_acquired++;
198  return true;
199  }
200  else if (philosopherset[source].cancellation_sent == false) {
201  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, CANCELLATIONS, 1);
202  philosopherset[source].cancellation_sent = true;
203  bool lockid = philosopherset[source].lockid;
204  philosopherset[source].lock.unlock();
205  philosopherset[target].lock.unlock();
206  issue_cancellation_request_unlocked(source, lockid);
207  philosopherset[std::min(source, target)].lock.lock();
208  philosopherset[std::max(source, target)].lock.lock();
209  }
210  }
211  }
212  else {
213  // if the current owner is not eating, and the
214  // fork is dirty and other side has placed a request
215  if (philosopherset[target].state != EATING &&
216  (forkset[forkid] & DIRTY_BIT) &&
217  (forkset[forkid] & REQUEST_0)) {
218  // change the owner and clean the fork)
219  if (philosopherset[target].state != HORS_DOEUVRE) {
220  forkset[forkid] = OWNER_SOURCE;
221  clean_fork_count.inc();
222  if (philosopherset[target].state == HUNGRY) {
223  forkset[forkid] |= REQUEST_1;
224  }
225  philosopherset[source].forks_acquired++;
226  philosopherset[target].forks_acquired--;
227  return true;
228  }
229  else if (philosopherset[target].cancellation_sent == false) {
230  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, CANCELLATIONS, 1);
231  philosopherset[target].cancellation_sent = true;
232  bool lockid = philosopherset[target].lockid;
233  philosopherset[source].lock.unlock();
234  philosopherset[target].lock.unlock();
235  issue_cancellation_request_unlocked(target, lockid);
236  philosopherset[std::min(source, target)].lock.lock();
237  philosopherset[std::max(source, target)].lock.lock();
238  }
239  }
240  }
241  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, COLLISIONS, 1);
242  return false;
243  }
244 
245 
246 /****************************************************************************
247  * Performs a cancellation on a vertex.
248  *
249  * If lockIds do not match, ignore
250  * If counter == 0 ignore
251  * Otherwise, counter++ and reply cancellation accept.
252  * Unfortunately, I cannot perform a local call here even if I am the
253  * owner since this may produce a lock cycle. Irregardless of whether
254  * the owner is local or not, this must be performed by a remote call
255  ***************************************************************************/
256 
257  void cancellation_request_unlocked(lvid_type lvid, procid_t requestor, bool lockid) {
258  philosopherset[lvid].lock.lock();
259 
260  if (philosopherset[lvid].lockid == lockid) {
261  if (philosopherset[lvid].counter > 0) {
262  /*ASSERT_TRUE(philosopherset[lvid].state == HORS_DOEUVRE ||
263  philosopherset[lvid].state == HUNGRY);*/
264  ++philosopherset[lvid].counter;
265  bool lockid = philosopherset[lvid].lockid;
266  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, ACCEPTED_CANCELLATIONS, 1);
267  vertex_id_type gvid = graph.global_vid(lvid);
268  logstream(LOG_DEBUG) << rmi.procid() <<
269  ": Cancellation accepted on " << gvid <<
270  "(" << (int)philosopherset[lvid].counter << ")" << std::endl;
271  philosopherset[lvid].lock.unlock();
272 
273  if (requestor != rmi.procid()) {
274  unsigned char pkey = rmi.dc().set_sequentialization_key(gvid % 254 + 1);
275  rmi.pod_call(requestor,
276  &dcm_type::rpc_cancellation_accept,
277  gvid,
278  lockid);
279  rmi.dc().set_sequentialization_key(pkey);
280  }
281  else {
282  cancellation_accept_unlocked(lvid, lockid);
283  }
284  }
285  else {
286  philosopherset[lvid].lock.unlock();
287  logstream(LOG_DEBUG) << rmi.procid() <<
288  ": Cancellation on " << graph.global_vid(lvid) <<
289  " denied due to lock completion" << std::endl;
290  }
291  }
292  else {
293  philosopherset[lvid].lock.unlock();
294  logstream(LOG_DEBUG) << rmi.procid() <<
295  ": Cancellation on " << graph.global_vid(lvid) <<
296  " denied to invalid lock ID" << std::endl;
297  }
298 
299  }
300 
301  void rpc_cancellation_request(vertex_id_type gvid, procid_t requestor, bool lockid) {
302  lvid_type lvid = graph.local_vid(gvid);
303  cancellation_request_unlocked(lvid, requestor, lockid);
304  }
305 
306  void issue_cancellation_request_unlocked(lvid_type lvid, bool lockid) {
307  // signal the master
308  logstream(LOG_DEBUG) << rmi.procid() <<
309  ": Requesting cancellation on " << graph.global_vid(lvid) << std::endl;
310  local_vertex_type lvertex(graph.l_vertex(lvid));
311 
312  if (lvertex.owner() == rmi.procid()) {
313  cancellation_request_unlocked(lvid, rmi.procid(), lockid);
314  }
315  else {
316  unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
317  rmi.pod_call(lvertex.owner(),
318  &dcm_type::rpc_cancellation_request,
319  lvertex.global_id(),
320  rmi.procid(),
321  lockid);
322  rmi.dc().set_sequentialization_key(pkey);
323 
324  }
325  }
326 
327 
328 /****************************************************************************
329  * Accepts a cancellation on a vertex.
330  *
331  * Pseudocode
332  * Change back to Hungry
333  * Releases all dirty forks
334  ****************************************************************************/
335 
336  void rpc_cancellation_accept(vertex_id_type gvid, bool lockid) {
337  lvid_type lvid = graph.local_vid(gvid);
338  cancellation_accept_unlocked(lvid, lockid);
339  }
340 
341  void cancellation_accept_unlocked(lvid_type p_id, bool lockid) {
342  std::vector<lvid_type> retval;
343  philosopherset[p_id].lock.lock();
344  //philosopher is now hungry!
345  /*ASSERT_EQ (lockid, philosopherset[p_id].lockid);
346  ASSERT_EQ((int)philosopherset[p_id].state, (int)HORS_DOEUVRE); */
347  philosopherset[p_id].state = HUNGRY;
348  philosopherset[p_id].cancellation_sent = false;
349 
350  local_vertex_type lvertex(graph.l_vertex(p_id));
351  logstream(LOG_DEBUG) << rmi.procid() <<
352  ": Cancellation accept received on " << lvertex.global_id() << " " <<
353  philosopherset[p_id].state << std::endl;
354 
355  // for each fork I own, try to give it away
356  foreach(local_edge_type edge, lvertex.in_edges()) {
357  try_acquire_edge_with_backoff(edge.target().id(), edge.source().id());
358  if (philosopherset[p_id].state == HUNGRY) {
359  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
360  lvid_type other = edge.source().id();
361  size_t edgeid = edge.id();
362  if (fork_owner(edgeid) == OWNER_TARGET && fork_dirty(edgeid)) {
363 
364  if (advance_fork_state_on_lock(edgeid, edge.source().id(), edge.target().id()) &&
365  philosopherset[other].state == HUNGRY &&
366  philosopherset[other].forks_acquired == philosopherset[other].num_edges) {
367  philosopherset[other].state = HORS_DOEUVRE;
368  philosopherset[other].cancellation_sent = false;
369  // signal eating on other
370  retval.push_back(other);
371  }
372  }
373  philosopherset[edge.source().id()].lock.unlock();
374  }
375  else {
376  philosopherset[edge.source().id()].lock.unlock();
377  break;
378  }
379 
380  }
381  //std::cout << "out edges: " << std::endl;
382  foreach(local_edge_type edge, lvertex.out_edges()) {
383  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
384  try_acquire_edge_with_backoff(edge.source().id(), edge.target().id());
385  if (philosopherset[p_id].state == HUNGRY) {
386  lvid_type other = edge.target().id();
387  size_t edgeid = edge.id();
388  if (fork_owner(edgeid) == OWNER_SOURCE && fork_dirty(edgeid)) {
389  if (advance_fork_state_on_lock(edgeid, edge.source().id(), edge.target().id()) &&
390  philosopherset[other].state == HUNGRY &&
391  philosopherset[other].forks_acquired == philosopherset[other].num_edges) {
392  philosopherset[other].state = HORS_DOEUVRE;
393  philosopherset[other].cancellation_sent = false;
394  // signal eating on other
395  retval.push_back(other);
396  }
397  }
398  philosopherset[edge.target().id()].lock.unlock();
399  }
400  else {
401  philosopherset[edge.target().id()].lock.unlock();
402  break;
403  }
404  }
405 
406  if (philosopherset[p_id].state == HUNGRY &&
407  philosopherset[p_id].forks_acquired == philosopherset[p_id].num_edges) {
408  philosopherset[p_id].cancellation_sent = false;
409  philosopherset[p_id].state = HORS_DOEUVRE;
410  retval.push_back(p_id);
411  }
412 
413  philosopherset[p_id].lock.unlock();
414  foreach(lvid_type lvid, retval) {
415  enter_hors_doeuvre_unlocked(lvid);
416  }
417 
418  }
419 
420 /****************************************************************************
421  * Make Philosopher Hungry.
422  *
423  * Pseudocode:
424  * Set Philosopher to Hungry
425  * For all edges adjacent to v with forks it does not own:
426  * Send request for fork to neighboring vertex
427  *
428  * Conditions:
429  * Must be Thinking
430  * New lock ID must not be the same as the old lock ID
431  *
432  * Possible Immediate Transitions:
433  * Current vertex may enter HORS_DOEUVRE
434  ***************************************************************************/
435  void rpc_make_philosopher_hungry(vertex_id_type gvid, bool newlockid) {
436  lvid_type lvid = graph.local_vid(gvid);
437  logstream(LOG_DEBUG) << rmi.procid() <<
438  ": Local HUNGRY Philosopher " << gvid << std::endl;
439  philosopherset[lvid].lock.lock();
440 
441  //ASSERT_EQ((int)philosopherset[lvid].state, (int)THINKING);
442  philosopherset[lvid].state = HUNGRY;
443 
444 // ASSERT_NE(philosopherset[lvid].lockid, newlockid);
445  philosopherset[lvid].lockid = newlockid;
446 
447  philosopherset[lvid].lock.unlock();
448 
449  local_philosopher_grabs_forks(lvid);
450  }
451 
452  void local_philosopher_grabs_forks(lvid_type p_id) {
453  philosopherset[p_id].lock.lock();
454  local_vertex_type lvertex(graph.l_vertex(p_id));
455  //philosopher is now hungry!
456 // now try to get all the forks. lock one edge at a time
457  // using the backoff strategy
458  //std::cout << "vertex " << p_id << std::endl;
459  //std::cout << "in edges: " << std::endl;
460  foreach(local_edge_type edge, lvertex.in_edges()) {
461  try_acquire_edge_with_backoff(edge.target().id(), edge.source().id());
462  if (philosopherset[p_id].state == HUNGRY) {
463 
464  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
465  size_t edgeid = edge.id();
466  // if fork is owned by other edge, try to take it
467  if (fork_owner(edgeid) == OWNER_SOURCE) {
468  request_for_fork(edgeid, OWNER_TARGET);
469  advance_fork_state_on_lock(edgeid, edge.source().id(), edge.target().id());
470  }
471  philosopherset[edge.source().id()].lock.unlock();
472  }
473  else {
474  philosopherset[edge.source().id()].lock.unlock();
475  break;
476  }
477  }
478  //std::cout << "out edges: " << std::endl;
479  foreach(local_edge_type edge, lvertex.out_edges()) {
480  //std::cout << "\t" << graph.edge_id(edge) << ": " << edge.source() << "->" << edge.target() << std::endl;
481  try_acquire_edge_with_backoff(edge.source().id(), edge.target().id());
482  if (philosopherset[p_id].state == HUNGRY) {
483  size_t edgeid = edge.id();
484 
485  // if fork is owned by other edge, try to take it
486  if (fork_owner(edgeid) == OWNER_TARGET) {
487  request_for_fork(edgeid, OWNER_SOURCE);
488  advance_fork_state_on_lock(edgeid, edge.source().id(), edge.target().id());
489  }
490  philosopherset[edge.target().id()].lock.unlock();
491  }
492  else {
493  philosopherset[edge.target().id()].lock.unlock();
494  break;
495  }
496  }
497 
498  bool enter_hors = false;
499  if (philosopherset[p_id].state == HUNGRY &&
500  philosopherset[p_id].forks_acquired == philosopherset[p_id].num_edges) {
501  philosopherset[p_id].state = HORS_DOEUVRE;
502  philosopherset[p_id].cancellation_sent = false;
503  enter_hors = true;
504  }
505  philosopherset[p_id].lock.unlock();
506  if (enter_hors) enter_hors_doeuvre_unlocked(p_id);
507  }
508 
509 /************************************************************************
510  *
511  * Called when a vertex may be ready to enter hors dourre
512  * Locks must be maintained. HORS_DOEUVRE must be set prior
513  * to entering this function .
514  *
515  ***********************************************************************/
516  void enter_hors_doeuvre_unlocked(lvid_type p_id) {
517  // if I got all forks I can eat
518  logstream(LOG_DEBUG) << rmi.procid() <<
519  ": Local HORS_DOEUVRE Philosopher " << graph.global_vid(p_id) << std::endl;
520  // signal the master
521  local_vertex_type lvertex(graph.l_vertex(p_id));
522 
523  if (lvertex.owner() == rmi.procid()) {
524  signal_ready_unlocked(p_id, philosopherset[p_id].lockid);
525  }
526  else {
527  unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
528  if (hors_doeuvre_callback != NULL) hors_doeuvre_callback(p_id);
529  rmi.pod_call(lvertex.owner(),
530  &dcm_type::rpc_signal_ready,
531  lvertex.global_id(), philosopherset[p_id].lockid);
532  rmi.dc().set_sequentialization_key(pkey);
533  }
534  }
535 
536 /************************************************************************
537  *
538  * Called when a vertex enters HORS_DOEUVRE. Locks must be maintained.
539  *
540  * Conditions:
541  * vertex must be in HUNGRY or HORS_DOEUVRE
542  * lock IDs must match
543  *
544  * Possible Immediate Transitions
545  * If counter == 0, transit to EATING
546  ***********************************************************************/
547 
548  void signal_ready_unlocked(lvid_type lvid, bool lockid) {
549  philosopherset[lvid].lock.lock();
550  if(!(philosopherset[lvid].state == (int)HUNGRY ||
551  philosopherset[lvid].state == (int)HORS_DOEUVRE)) {
552  logstream(LOG_ERROR) << rmi.procid() <<
553  ": Bad signal ready state!!!! : " << (int)philosopherset[lvid].state << std::endl;
554  logstream(LOG_ERROR) << rmi.procid() <<
555  " Lock IDs : " << (int)philosopherset[lvid].lockid << " " << (int)lockid << std::endl;
556  logstream(LOG_ERROR) << rmi.procid() <<
557  ": BAD Global HORS_DOEUVRE " << graph.global_vid(lvid)
558  << "(" << (int)philosopherset[lvid].counter << ")" << std::endl;
559 
560  /* ASSERT_TRUE(philosopherset[lvid].state == (int)HUNGRY ||
561  philosopherset[lvid].state == (int)HORS_DOEUVRE);*/
562  }
563 
564 // ASSERT_EQ(philosopherset[lvid].lockid, lockid);
565  philosopherset[lvid].counter--;
566 
567  logstream(LOG_DEBUG) << rmi.procid() <<
568  ": Global HORS_DOEUVRE " << graph.global_vid(lvid)
569  << "(" << (int)philosopherset[lvid].counter << ")" << " " << (int)(philosopherset[lvid].state) << std::endl;
570 
571  if(philosopherset[lvid].counter == 0) {
572  philosopherset[lvid].lock.unlock();
573  // broadcast EATING
574  local_vertex_type lvertex(graph.l_vertex(lvid));
575  unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
576  rmi.pod_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
577  &dcm_type::rpc_set_eating, lvertex.global_id(), lockid);
578  set_eating(lvid, lockid);
579  rmi.dc().set_sequentialization_key(pkey);
580  }
581  else {
582  philosopherset[lvid].lock.unlock();
583  }
584  }
585 
586 
587  void rpc_signal_ready(vertex_id_type gvid, bool lockid) {
588  lvid_type lvid = graph.local_vid(gvid);
589  signal_ready_unlocked(lvid, lockid);
590  }
591 
592  void set_eating(lvid_type lvid, bool lockid) {
593  philosopherset[lvid].lock.lock();
594 
595  logstream(LOG_DEBUG) << rmi.procid() <<
596  ": EATING " << graph.global_vid(lvid)
597  << "(" << (int)philosopherset[lvid].counter << ")" << std::endl;
598 
599 // ASSERT_EQ((int)philosopherset[lvid].state, (int)HORS_DOEUVRE);
600 // ASSERT_EQ(philosopherset[lvid].lockid, lockid);
601  philosopherset[lvid].state = EATING;
602  philosopherset[lvid].cancellation_sent = false;
603  philosopherset[lvid].lock.unlock();
604  if (graph.l_vertex(lvid).owner() == rmi.procid()) {
605  logstream(LOG_DEBUG) << rmi.procid() <<
606  ": CALLBACK " << graph.global_vid(lvid) << std::endl;
607 
608  callback(lvid);
609  }
610  }
611 
612  void rpc_set_eating(vertex_id_type gvid, bool lockid) {
613 
614  logstream(LOG_DEBUG) << rmi.procid() <<
615  ": Receive Set EATING " << gvid << std::endl;
616 
617  lvid_type lvid = graph.local_vid(gvid);
618  set_eating(lvid, lockid);
619  }
620 /************************************************************************
621  *
622  * Called when a vertex stops eating
623  *
624  ***********************************************************************/
625 
626 
627 
628  inline bool advance_fork_state_on_unlock(size_t forkid,
629  lvid_type source,
630  lvid_type target) {
631 
632  unsigned char currentowner = forkset[forkid] & OWNER_BIT;
633  if (currentowner == OWNER_SOURCE) {
634  // if the current owner is not eating, and the
635  // fork is dirty and other side has placed a request
636  if ((forkset[forkid] & DIRTY_BIT) &&
637  (forkset[forkid] & REQUEST_1)) {
638  // change the owner and clean the fork)
639  // keep my request bit if any
640  clean_fork_count.inc();
641  forkset[forkid] = OWNER_TARGET;
642  philosopherset[source].forks_acquired--;
643  philosopherset[target].forks_acquired++;
644  return true;
645  }
646  }
647  else {
648  // if the current owner is not eating, and the
649  // fork is dirty and other side has placed a request
650  if ((forkset[forkid] & DIRTY_BIT) &&
651  (forkset[forkid] & REQUEST_0)) {
652  // change the owner and clean the fork)
653  // keep my request bit if any
654  clean_fork_count.inc();
655  forkset[forkid] = OWNER_SOURCE;
656  philosopherset[source].forks_acquired++;
657  philosopherset[target].forks_acquired--;
658  return true;
659  }
660  }
661  return false;
662  }
663 
664 
665 
666 
667 
668  void local_philosopher_stops_eating(lvid_type p_id) {
669  std::vector<lvid_type> retval;
670  philosopherset[p_id].lock.lock();
671  if (philosopherset[p_id].state != EATING) {
672  std::cout << rmi.procid() << ": " << p_id << "FAILED!! Cannot Stop Eating!" << std::endl;
673 // ASSERT_EQ((int)philosopherset[p_id].state, (int)EATING);
674  }
675 
676  local_vertex_type lvertex(graph.l_vertex(p_id));
677  // now forks are dirty
678  foreach(local_edge_type edge, lvertex.in_edges()) {
679  dirty_fork(edge.id());
680  }
681 
682  foreach(local_edge_type edge, lvertex.out_edges()) {
683  dirty_fork(edge.id());
684  }
685 
686 
687  philosopherset[p_id].state = THINKING;
688  philosopherset[p_id].counter = 0;
689 
690  // now forks are dirty
691  foreach(local_edge_type edge, lvertex.in_edges()) {
692  try_acquire_edge_with_backoff(edge.target().id(), edge.source().id());
693  lvid_type other = edge.source().id();
694  if (philosopherset[p_id].state == THINKING) {
695  size_t edgeid = edge.id();
696  advance_fork_state_on_unlock(edgeid, edge.source().id(), edge.target().id());
697  if (philosopherset[other].state == HUNGRY &&
698  philosopherset[other].forks_acquired ==
699  philosopherset[other].num_edges) {
700  philosopherset[other].state = HORS_DOEUVRE;
701  philosopherset[other].cancellation_sent = false;
702  // signal eating on other
703  retval.push_back(other);
704  }
705  philosopherset[other].lock.unlock();
706  }
707  else {
708  philosopherset[other].lock.unlock();
709  break;
710  }
711  }
712 
713  foreach(local_edge_type edge, lvertex.out_edges()) {
714  try_acquire_edge_with_backoff(edge.source().id(), edge.target().id());
715  lvid_type other = edge.target().id();
716  if (philosopherset[p_id].state == THINKING) {
717  size_t edgeid = edge.id();
718  advance_fork_state_on_unlock(edgeid, edge.source().id(), edge.target().id());
719  if (philosopherset[other].state == HUNGRY &&
720  philosopherset[other].forks_acquired ==
721  philosopherset[other].num_edges) {
722  philosopherset[other].state = HORS_DOEUVRE;
723  philosopherset[other].cancellation_sent = false;
724  // signal eating on other
725  retval.push_back(other);
726  }
727  philosopherset[other].lock.unlock();
728  }
729  else {
730  philosopherset[other].lock.unlock();
731  break;
732  }
733  }
734 
735  philosopherset[p_id].lock.unlock();
736  foreach(lvid_type lvid, retval) {
737  enter_hors_doeuvre_unlocked(lvid);
738  }
739  }
740 
741  void rpc_philosopher_stops_eating(vertex_id_type gvid) {
742  logstream(LOG_DEBUG) << rmi.procid() << ": Receive STOP eating on " << gvid << std::endl;
743  local_philosopher_stops_eating(graph.local_vid(gvid));
744  }
745 
746  public:
747  inline distributed_chandy_misra(distributed_control &dc,
748  GraphType &graph,
749  boost::function<void(lvid_type)> callback,
750  boost::function<void(lvid_type)> hors_doeuvre_callback = NULL
751  ):
752  rmi(dc, this),
753  graph(graph),
754  callback(callback),
755  hors_doeuvre_callback(hors_doeuvre_callback){
756  forkset.resize(graph.num_local_edges(), 0);
757  philosopherset.resize(graph.num_local_vertices());
758  compute_initial_fork_arrangement();
759 
760  rmi.barrier();
761  }
762 
763  size_t num_clean_forks() const {
764  return clean_fork_count.value;
765  }
766 
767  void initialize_master_philosopher_as_hungry_locked(lvid_type p_id,
768  bool lockid) {
769  philosopherset[p_id].lockid = lockid;
770  philosopherset[p_id].state = HUNGRY;
771  philosopherset[p_id].counter = graph.l_vertex(p_id).num_mirrors() + 1;
772  }
773 
774  void make_philosopher_hungry(lvid_type p_id) {
775  local_vertex_type lvertex(graph.l_vertex(p_id));
776 // ASSERT_EQ(rec.get_owner(), rmi.procid());
777  philosopherset[p_id].lock.lock();
778 // ASSERT_EQ((int)philosopherset[p_id].state, (int)THINKING);
779  bool newlockid = !philosopherset[p_id].lockid;
780  initialize_master_philosopher_as_hungry_locked(p_id, newlockid);
781 
782  logstream(LOG_DEBUG) << rmi.procid() <<
783  ": Global HUNGRY " << lvertex.global_id()
784  << "(" << (int)philosopherset[p_id].counter << ")" << std::endl;
785 
786  philosopherset[p_id].lock.unlock();
787 
788  unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
789  rmi.pod_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
790  &dcm_type::rpc_make_philosopher_hungry, lvertex.global_id(), newlockid);
791  rmi.dc().set_sequentialization_key(pkey);
792  local_philosopher_grabs_forks(p_id);
793  }
794 
795 
796 
797  void make_philosopher_hungry_per_replica(lvid_type p_id) {
798  local_vertex_type lvertex(graph.l_vertex(p_id));
799  philosopherset[p_id].lock.lock();
800 // ASSERT_EQ((int)philosopherset[p_id].state, (int)THINKING);
801 
802  if (lvertex.owner() == rmi.procid()) {
803  bool newlockid = !philosopherset[p_id].lockid;
804  initialize_master_philosopher_as_hungry_locked(p_id, newlockid);
805 
806  logstream(LOG_DEBUG) << rmi.procid() <<
807  ": Global HUNGRY " << lvertex.global_id()
808  << "(" << (int)philosopherset[p_id].counter << ")" << std::endl;
809  }
810  else {
811  bool newlockid = !philosopherset[p_id].lockid;
812  philosopherset[p_id].lockid = newlockid;
813  philosopherset[p_id].state = HUNGRY;
814  }
815  philosopherset[p_id].lock.unlock();
816  local_philosopher_grabs_forks(p_id);
817  }
818 
819 
820  void philosopher_stops_eating(lvid_type p_id) {
821  local_vertex_type lvertex(graph.l_vertex(p_id));
822 
823  logstream(LOG_DEBUG) << rmi.procid() <<
824  ": Global STOP Eating " << lvertex.global_id() << std::endl;
825 
826  philosopherset[p_id].lock.lock();
827 // ASSERT_EQ(philosopherset[p_id].state, (int)EATING);
828  philosopherset[p_id].counter = 0;
829  philosopherset[p_id].lock.unlock();
830  unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
831  rmi.pod_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
832  &dcm_type::rpc_philosopher_stops_eating, lvertex.global_id());
833  rmi.dc().set_sequentialization_key(pkey);
834  local_philosopher_stops_eating(p_id);
835  }
836 
837  void philosopher_stops_eating_per_replica(lvid_type p_id) {
838  logstream(LOG_DEBUG) << rmi.procid() <<
839  ": Global STOP Eating " << graph.global_vid(p_id) << std::endl;
840 
841 // ASSERT_EQ(philosopherset[p_id].state, (int)EATING);
842 
843  local_philosopher_stops_eating(p_id);
844  }
845 
846 
847  void no_locks_consistency_check() {
848  // make sure all forks are dirty
849  for (size_t i = 0;i < forkset.size(); ++i) ASSERT_TRUE(fork_dirty(i));
850  // all philosophers are THINKING
851  for (size_t i = 0;i < philosopherset.size(); ++i) ASSERT_TRUE(philosopherset[i].state == THINKING);
852  }
853 
854  void print_out() {
855 
856  boost::unordered_set<size_t> eidset1;
857  boost::unordered_set<size_t> eidset2;
858  for (lvid_type v = 0; v < graph.num_local_vertices(); ++v) {
859  local_vertex_type lvertex(graph.l_vertex(v));
860  foreach(local_edge_type edge, lvertex.in_edges()) {
861  size_t edgeid = edge.id();
862  ASSERT_TRUE(eidset1.find(edgeid) == eidset1.end());
863  eidset1.insert(edgeid);
864  }
865  foreach(local_edge_type edge, lvertex.out_edges()) {
866  size_t edgeid = edge.id();
867  ASSERT_TRUE(eidset2.find(edgeid) == eidset2.end());
868  eidset2.insert(edgeid);
869  }
870  }
871  ASSERT_EQ(eidset1.size(), eidset2.size());
872  eidset1.clear(); eidset2.clear();
873  complete_consistency_check();
874 
875  std::cout << "Philosophers\n";
876  std::cout << "------------\n";
877  for (lvid_type v = 0; v < graph.num_local_vertices(); ++v) {
878  local_vertex_type lvertex(graph.l_vertex(v));
879  std::cout << graph.global_vid(v) << ": " << (int)philosopherset[v].state << " " <<
880  philosopherset[v].forks_acquired << " " << philosopherset[v].num_edges << " ";
881  if (philosopherset[v].forks_acquired == philosopherset[v].num_edges) std::cout << "---------------!";
882  std::cout << "\n";
883  std::cout << "\tin: ";
884  foreach(local_edge_type edge, lvertex.in_edges()) {
885  size_t edgeid = edge.id();
886  if (fork_dirty(forkset[edgeid])) std::cout << edgeid << ":" << (int)forkset[edgeid] << " ";
887  }
888  std::cout << "\n\tout: ";
889  foreach(local_edge_type edge, lvertex.out_edges()) {
890  size_t edgeid = edge.id();
891  if (fork_dirty(forkset[edgeid])) std::cout << edgeid << ":" << (int)forkset[edgeid] << " ";
892  }
893  std::cout << "\n";
894  }
895  }
896 
897  void complete_consistency_check() {
898  for (lvid_type v = 0; v < graph.num_local_vertices(); ++v) {
899  local_vertex_type lvertex(graph.l_vertex(v));
900  // count the number of forks I own
901  size_t numowned = 0;
902  size_t numowned_clean = 0;
903  foreach(local_edge_type edge, lvertex.in_edges()) {
904  size_t edgeid = edge.id();
905  if (fork_owner(edgeid) == OWNER_TARGET) {
906  numowned++;
907  if (!fork_dirty(edgeid)) numowned_clean++;
908  }
909  }
910  foreach(local_edge_type edge, lvertex.out_edges()) {
911  size_t edgeid = edge.id();
912  if (fork_owner(edgeid) == OWNER_SOURCE) {
913  numowned++;
914  if (!fork_dirty(edgeid)) numowned_clean++;
915  }
916  }
917 
918  ASSERT_EQ(philosopherset[v].forks_acquired, numowned);
919  if (philosopherset[v].state == THINKING) {
920  ASSERT_EQ(numowned_clean, 0);
921  }
922  else if (philosopherset[v].state == HUNGRY) {
923  ASSERT_NE(philosopherset[v].num_edges, philosopherset[v].forks_acquired);
924  // any fork I am unable to acquire. Must be clean, and the other person
925  // must be eating or hungry
926  foreach(local_edge_type edge, lvertex.in_edges()) {
927  size_t edgeid = edge.id();
928  // not owned
929  if (fork_owner(edgeid) == OWNER_SOURCE) {
930  if (philosopherset[edge.source()].state != EATING) {
931  if (fork_dirty(edgeid)) {
932  std::cout << (int)(forkset[edgeid]) << " "
933  << (int)philosopherset[edge.source()].state
934  << "->" << (int)philosopherset[edge.target()].state
935  << std::endl;
936  ASSERT_FALSE(fork_dirty(edgeid));
937  }
938  }
939  ASSERT_NE(philosopherset[edge.source()].state, (int)THINKING);
940  }
941  }
942  foreach(local_edge_type edge, lvertex.out_edges()) {
943  size_t edgeid = edge.id();
944  if (fork_owner(edgeid) == OWNER_TARGET) {
945  if (philosopherset[edge.target()].state != EATING) {
946  if (fork_dirty(edgeid)) {
947  std::cout << (int)(forkset[edgeid]) << " "
948  << (int)philosopherset[edge.source()].state
949  << "->"
950  << (int)philosopherset[edge.target()].state
951  << std::endl;
952  ASSERT_FALSE(fork_dirty(edgeid));
953  }
954  }
955  ASSERT_NE(philosopherset[edge.target()].state, (int)THINKING);
956  }
957  }
958 
959  }
960  else if (philosopherset[v].state == EATING) {
961  ASSERT_EQ(philosopherset[v].forks_acquired, philosopherset[v].num_edges);
962  }
963  }
964  }
965 };
966 
967 }
968 
969 #include <graphlab/macros_undef.hpp>
970 
971 #endif