24 #ifndef GRAPHLAB_DISTRIBUTED_CHANDY_MISRA_HPP
25 #define GRAPHLAB_DISTRIBUTED_CHANDY_MISRA_HPP
27 #include <graphlab/rpc/dc_dist_object.hpp>
28 #include <graphlab/rpc/distributed_event_log.hpp>
29 #include <graphlab/engine/chandy_misra_interface.hpp>
30 #include <graphlab/logger/assertions.hpp>
31 #include <graphlab/parallel/pthread_tools.hpp>
32 #include <graphlab/graph/graph_basic_types.hpp>
33 #include <graphlab/macros_def.hpp>
40 template <
typename GraphType>
41 class distributed_chandy_misra :
public chandy_misra_interface<GraphType> {
43 typedef typename GraphType::local_vertex_type local_vertex_type;
44 typedef typename GraphType::local_edge_type local_edge_type;
47 typedef typename GraphType::lvid_type
lvid_type;
49 typedef distributed_chandy_misra<GraphType> dcm_type;
50 dc_dist_object<dcm_type> rmi;
53 boost::function<void(lvid_type)> callback;
54 boost::function<void(lvid_type)> hors_doeuvre_callback;
62 std::vector<unsigned char> forkset;
67 enum {OWNER_SOURCE = 0, OWNER_TARGET = 1};
68 inline unsigned char request_bit(
bool owner) {
69 return owner ? REQUEST_1 : REQUEST_0;
76 ACCEPTED_CANCELLATIONS = 2
84 unsigned char counter;
85 bool cancellation_sent;
88 std::vector<philosopher> philosopherset;
89 atomic<size_t> clean_fork_count;
102 inline void request_for_fork(
size_t forkid,
bool nextowner) {
103 __sync_fetch_and_or(&forkset[forkid], request_bit(nextowner));
106 inline bool fork_owner(
size_t forkid) {
107 return forkset[forkid] & OWNER_BIT;
110 inline bool fork_dirty(
size_t forkid) {
111 return !!(forkset[forkid] & DIRTY_BIT);
114 inline void dirty_fork(
size_t forkid) {
115 if ((forkset[forkid] & DIRTY_BIT) == 0) clean_fork_count.dec();
116 __sync_fetch_and_or(&forkset[forkid], DIRTY_BIT);
120 void compute_initial_fork_arrangement() {
121 for (
lvid_type i = 0;i < graph.num_local_vertices(); ++i) {
122 local_vertex_type lvertex(graph.l_vertex(i));
123 philosopherset[i].num_edges = lvertex.num_in_edges() +
124 lvertex.num_out_edges();
125 philosopherset[i].state = THINKING;
126 philosopherset[i].forks_acquired = 0;
127 philosopherset[i].counter = 0;
128 philosopherset[i].cancellation_sent =
false;
129 philosopherset[i].lockid =
false;
131 for (
lvid_type i = 0;i < graph.num_local_vertices(); ++i) {
132 local_vertex_type lvertex(graph.l_vertex(i));
133 foreach(local_edge_type edge, lvertex.in_edges()) {
134 if (edge.source().global_id() > edge.target().global_id()) {
135 forkset[edge.id()] = DIRTY_BIT | OWNER_TARGET;
136 philosopherset[edge.target().id()].forks_acquired++;
139 forkset[edge.id()] = DIRTY_BIT | OWNER_SOURCE;
140 philosopherset[edge.source().id()].forks_acquired++;
150 void try_acquire_edge_with_backoff(
lvid_type v1,
153 philosopherset[v2].lock.lock();
155 else if (!philosopherset[v2].lock.try_lock()) {
156 philosopherset[v1].lock.unlock();
157 philosopherset[v2].lock.lock();
158 philosopherset[v1].lock.lock();
178 inline bool advance_fork_state_on_lock(
size_t forkid,
181 unsigned char currentowner = forkset[forkid] & OWNER_BIT;
182 if (currentowner == OWNER_SOURCE) {
185 if (philosopherset[source].state != EATING &&
186 (forkset[forkid] & DIRTY_BIT) &&
187 (forkset[forkid] & REQUEST_1)) {
189 if (philosopherset[source].state != HORS_DOEUVRE) {
191 forkset[forkid] = OWNER_TARGET;
192 clean_fork_count.inc();
193 if (philosopherset[source].state == HUNGRY) {
194 forkset[forkid] |= REQUEST_0;
196 philosopherset[source].forks_acquired--;
197 philosopherset[target].forks_acquired++;
200 else if (philosopherset[source].cancellation_sent ==
false) {
202 philosopherset[source].cancellation_sent =
true;
203 bool lockid = philosopherset[source].lockid;
204 philosopherset[source].lock.unlock();
205 philosopherset[target].lock.unlock();
206 issue_cancellation_request_unlocked(source, lockid);
207 philosopherset[std::min(source, target)].lock.lock();
208 philosopherset[std::max(source, target)].lock.lock();
215 if (philosopherset[target].state != EATING &&
216 (forkset[forkid] & DIRTY_BIT) &&
217 (forkset[forkid] & REQUEST_0)) {
219 if (philosopherset[target].state != HORS_DOEUVRE) {
220 forkset[forkid] = OWNER_SOURCE;
221 clean_fork_count.inc();
222 if (philosopherset[target].state == HUNGRY) {
223 forkset[forkid] |= REQUEST_1;
225 philosopherset[source].forks_acquired++;
226 philosopherset[target].forks_acquired--;
229 else if (philosopherset[target].cancellation_sent ==
false) {
231 philosopherset[target].cancellation_sent =
true;
232 bool lockid = philosopherset[target].lockid;
233 philosopherset[source].lock.unlock();
234 philosopherset[target].lock.unlock();
235 issue_cancellation_request_unlocked(target, lockid);
236 philosopherset[std::min(source, target)].lock.lock();
237 philosopherset[std::max(source, target)].lock.lock();
257 void cancellation_request_unlocked(
lvid_type lvid,
procid_t requestor,
bool lockid) {
258 philosopherset[lvid].lock.lock();
260 if (philosopherset[lvid].lockid == lockid) {
261 if (philosopherset[lvid].counter > 0) {
264 ++philosopherset[lvid].counter;
265 bool lockid = philosopherset[lvid].lockid;
269 ": Cancellation accepted on " << gvid <<
270 "(" << (int)philosopherset[lvid].counter <<
")" << std::endl;
271 philosopherset[lvid].lock.unlock();
273 if (requestor != rmi.procid()) {
274 unsigned char pkey = rmi.dc().set_sequentialization_key(gvid % 254 + 1);
275 rmi.pod_call(requestor,
276 &dcm_type::rpc_cancellation_accept,
279 rmi.dc().set_sequentialization_key(pkey);
282 cancellation_accept_unlocked(lvid, lockid);
286 philosopherset[lvid].lock.unlock();
288 ": Cancellation on " << graph.global_vid(lvid) <<
289 " denied due to lock completion" << std::endl;
293 philosopherset[lvid].lock.unlock();
295 ": Cancellation on " << graph.global_vid(lvid) <<
296 " denied to invalid lock ID" << std::endl;
303 cancellation_request_unlocked(lvid, requestor, lockid);
306 void issue_cancellation_request_unlocked(
lvid_type lvid,
bool lockid) {
309 ": Requesting cancellation on " << graph.global_vid(lvid) << std::endl;
310 local_vertex_type lvertex(graph.l_vertex(lvid));
312 if (lvertex.owner() == rmi.procid()) {
313 cancellation_request_unlocked(lvid, rmi.procid(), lockid);
316 unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
317 rmi.pod_call(lvertex.owner(),
318 &dcm_type::rpc_cancellation_request,
322 rmi.dc().set_sequentialization_key(pkey);
338 cancellation_accept_unlocked(lvid, lockid);
341 void cancellation_accept_unlocked(
lvid_type p_id,
bool lockid) {
342 std::vector<lvid_type> retval;
343 philosopherset[p_id].lock.lock();
347 philosopherset[p_id].state = HUNGRY;
348 philosopherset[p_id].cancellation_sent =
false;
350 local_vertex_type lvertex(graph.l_vertex(p_id));
352 ": Cancellation accept received on " << lvertex.global_id() <<
" " <<
353 philosopherset[p_id].state << std::endl;
356 foreach(local_edge_type edge, lvertex.in_edges()) {
357 try_acquire_edge_with_backoff(edge.target().id(), edge.source().id());
358 if (philosopherset[p_id].state == HUNGRY) {
361 size_t edgeid = edge.id();
362 if (fork_owner(edgeid) == OWNER_TARGET && fork_dirty(edgeid)) {
364 if (advance_fork_state_on_lock(edgeid, edge.source().id(), edge.target().id()) &&
365 philosopherset[other].state == HUNGRY &&
366 philosopherset[other].forks_acquired == philosopherset[other].num_edges) {
367 philosopherset[other].state = HORS_DOEUVRE;
368 philosopherset[other].cancellation_sent =
false;
370 retval.push_back(other);
373 philosopherset[edge.source().id()].lock.unlock();
376 philosopherset[edge.source().id()].lock.unlock();
382 foreach(local_edge_type edge, lvertex.out_edges()) {
384 try_acquire_edge_with_backoff(edge.source().id(), edge.target().id());
385 if (philosopherset[p_id].state == HUNGRY) {
387 size_t edgeid = edge.id();
388 if (fork_owner(edgeid) == OWNER_SOURCE && fork_dirty(edgeid)) {
389 if (advance_fork_state_on_lock(edgeid, edge.source().id(), edge.target().id()) &&
390 philosopherset[other].state == HUNGRY &&
391 philosopherset[other].forks_acquired == philosopherset[other].num_edges) {
392 philosopherset[other].state = HORS_DOEUVRE;
393 philosopherset[other].cancellation_sent =
false;
395 retval.push_back(other);
398 philosopherset[edge.target().id()].lock.unlock();
401 philosopherset[edge.target().id()].lock.unlock();
406 if (philosopherset[p_id].state == HUNGRY &&
407 philosopherset[p_id].forks_acquired == philosopherset[p_id].num_edges) {
408 philosopherset[p_id].cancellation_sent =
false;
409 philosopherset[p_id].state = HORS_DOEUVRE;
410 retval.push_back(p_id);
413 philosopherset[p_id].lock.unlock();
415 enter_hors_doeuvre_unlocked(lvid);
435 void rpc_make_philosopher_hungry(
vertex_id_type gvid,
bool newlockid) {
438 ": Local HUNGRY Philosopher " << gvid << std::endl;
439 philosopherset[lvid].lock.lock();
442 philosopherset[lvid].state = HUNGRY;
445 philosopherset[lvid].lockid = newlockid;
447 philosopherset[lvid].lock.unlock();
449 local_philosopher_grabs_forks(lvid);
452 void local_philosopher_grabs_forks(
lvid_type p_id) {
453 philosopherset[p_id].lock.lock();
454 local_vertex_type lvertex(graph.l_vertex(p_id));
460 foreach(local_edge_type edge, lvertex.in_edges()) {
461 try_acquire_edge_with_backoff(edge.target().id(), edge.source().id());
462 if (philosopherset[p_id].state == HUNGRY) {
465 size_t edgeid = edge.id();
467 if (fork_owner(edgeid) == OWNER_SOURCE) {
468 request_for_fork(edgeid, OWNER_TARGET);
469 advance_fork_state_on_lock(edgeid, edge.source().id(), edge.target().id());
471 philosopherset[edge.source().id()].lock.unlock();
474 philosopherset[edge.source().id()].lock.unlock();
479 foreach(local_edge_type edge, lvertex.out_edges()) {
481 try_acquire_edge_with_backoff(edge.source().id(), edge.target().id());
482 if (philosopherset[p_id].state == HUNGRY) {
483 size_t edgeid = edge.id();
486 if (fork_owner(edgeid) == OWNER_TARGET) {
487 request_for_fork(edgeid, OWNER_SOURCE);
488 advance_fork_state_on_lock(edgeid, edge.source().id(), edge.target().id());
490 philosopherset[edge.target().id()].lock.unlock();
493 philosopherset[edge.target().id()].lock.unlock();
498 bool enter_hors =
false;
499 if (philosopherset[p_id].state == HUNGRY &&
500 philosopherset[p_id].forks_acquired == philosopherset[p_id].num_edges) {
501 philosopherset[p_id].state = HORS_DOEUVRE;
502 philosopherset[p_id].cancellation_sent =
false;
505 philosopherset[p_id].lock.unlock();
506 if (enter_hors) enter_hors_doeuvre_unlocked(p_id);
516 void enter_hors_doeuvre_unlocked(
lvid_type p_id) {
519 ": Local HORS_DOEUVRE Philosopher " << graph.global_vid(p_id) << std::endl;
521 local_vertex_type lvertex(graph.l_vertex(p_id));
523 if (lvertex.owner() == rmi.procid()) {
524 signal_ready_unlocked(p_id, philosopherset[p_id].lockid);
527 unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
528 if (hors_doeuvre_callback != NULL) hors_doeuvre_callback(p_id);
529 rmi.pod_call(lvertex.owner(),
530 &dcm_type::rpc_signal_ready,
531 lvertex.global_id(), philosopherset[p_id].lockid);
532 rmi.dc().set_sequentialization_key(pkey);
548 void signal_ready_unlocked(
lvid_type lvid,
bool lockid) {
549 philosopherset[lvid].lock.lock();
550 if(!(philosopherset[lvid].state == (
int)HUNGRY ||
551 philosopherset[lvid].state == (
int)HORS_DOEUVRE)) {
553 ": Bad signal ready state!!!! : " << (int)philosopherset[lvid].state << std::endl;
555 " Lock IDs : " << (int)philosopherset[lvid].lockid <<
" " << (
int)lockid << std::endl;
557 ": BAD Global HORS_DOEUVRE " << graph.global_vid(lvid)
558 <<
"(" << (int)philosopherset[lvid].counter <<
")" << std::endl;
565 philosopherset[lvid].counter--;
568 ": Global HORS_DOEUVRE " << graph.global_vid(lvid)
569 <<
"(" << (int)philosopherset[lvid].counter <<
")" <<
" " << (
int)(philosopherset[lvid].state) << std::endl;
571 if(philosopherset[lvid].counter == 0) {
572 philosopherset[lvid].lock.unlock();
574 local_vertex_type lvertex(graph.l_vertex(lvid));
575 unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
576 rmi.pod_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
577 &dcm_type::rpc_set_eating, lvertex.global_id(), lockid);
578 set_eating(lvid, lockid);
579 rmi.dc().set_sequentialization_key(pkey);
582 philosopherset[lvid].lock.unlock();
589 signal_ready_unlocked(lvid, lockid);
592 void set_eating(
lvid_type lvid,
bool lockid) {
593 philosopherset[lvid].lock.lock();
596 ": EATING " << graph.global_vid(lvid)
597 <<
"(" << (int)philosopherset[lvid].counter <<
")" << std::endl;
601 philosopherset[lvid].state = EATING;
602 philosopherset[lvid].cancellation_sent =
false;
603 philosopherset[lvid].lock.unlock();
604 if (graph.l_vertex(lvid).owner() == rmi.procid()) {
606 ": CALLBACK " << graph.global_vid(lvid) << std::endl;
615 ": Receive Set EATING " << gvid << std::endl;
618 set_eating(lvid, lockid);
628 inline bool advance_fork_state_on_unlock(
size_t forkid,
632 unsigned char currentowner = forkset[forkid] & OWNER_BIT;
633 if (currentowner == OWNER_SOURCE) {
636 if ((forkset[forkid] & DIRTY_BIT) &&
637 (forkset[forkid] & REQUEST_1)) {
640 clean_fork_count.inc();
641 forkset[forkid] = OWNER_TARGET;
642 philosopherset[source].forks_acquired--;
643 philosopherset[target].forks_acquired++;
650 if ((forkset[forkid] & DIRTY_BIT) &&
651 (forkset[forkid] & REQUEST_0)) {
654 clean_fork_count.inc();
655 forkset[forkid] = OWNER_SOURCE;
656 philosopherset[source].forks_acquired++;
657 philosopherset[target].forks_acquired--;
668 void local_philosopher_stops_eating(
lvid_type p_id) {
669 std::vector<lvid_type> retval;
670 philosopherset[p_id].lock.lock();
671 if (philosopherset[p_id].state != EATING) {
672 std::cout << rmi.procid() <<
": " << p_id <<
"FAILED!! Cannot Stop Eating!" << std::endl;
676 local_vertex_type lvertex(graph.l_vertex(p_id));
678 foreach(local_edge_type edge, lvertex.in_edges()) {
679 dirty_fork(edge.id());
682 foreach(local_edge_type edge, lvertex.out_edges()) {
683 dirty_fork(edge.id());
687 philosopherset[p_id].state = THINKING;
688 philosopherset[p_id].counter = 0;
691 foreach(local_edge_type edge, lvertex.in_edges()) {
692 try_acquire_edge_with_backoff(edge.target().id(), edge.source().id());
694 if (philosopherset[p_id].state == THINKING) {
695 size_t edgeid = edge.id();
696 advance_fork_state_on_unlock(edgeid, edge.source().id(), edge.target().id());
697 if (philosopherset[other].state == HUNGRY &&
698 philosopherset[other].forks_acquired ==
699 philosopherset[other].num_edges) {
700 philosopherset[other].state = HORS_DOEUVRE;
701 philosopherset[other].cancellation_sent =
false;
703 retval.push_back(other);
705 philosopherset[other].lock.unlock();
708 philosopherset[other].lock.unlock();
713 foreach(local_edge_type edge, lvertex.out_edges()) {
714 try_acquire_edge_with_backoff(edge.source().id(), edge.target().id());
716 if (philosopherset[p_id].state == THINKING) {
717 size_t edgeid = edge.id();
718 advance_fork_state_on_unlock(edgeid, edge.source().id(), edge.target().id());
719 if (philosopherset[other].state == HUNGRY &&
720 philosopherset[other].forks_acquired ==
721 philosopherset[other].num_edges) {
722 philosopherset[other].state = HORS_DOEUVRE;
723 philosopherset[other].cancellation_sent =
false;
725 retval.push_back(other);
727 philosopherset[other].lock.unlock();
730 philosopherset[other].lock.unlock();
735 philosopherset[p_id].lock.unlock();
737 enter_hors_doeuvre_unlocked(lvid);
742 logstream(
LOG_DEBUG) << rmi.procid() <<
": Receive STOP eating on " << gvid << std::endl;
743 local_philosopher_stops_eating(graph.local_vid(gvid));
747 inline distributed_chandy_misra(distributed_control &dc,
749 boost::function<
void(
lvid_type)> callback,
750 boost::function<
void(
lvid_type)> hors_doeuvre_callback = NULL
755 hors_doeuvre_callback(hors_doeuvre_callback){
756 forkset.resize(graph.num_local_edges(), 0);
757 philosopherset.resize(graph.num_local_vertices());
758 compute_initial_fork_arrangement();
763 size_t num_clean_forks()
const {
764 return clean_fork_count.value;
767 void initialize_master_philosopher_as_hungry_locked(
lvid_type p_id,
769 philosopherset[p_id].lockid = lockid;
770 philosopherset[p_id].state = HUNGRY;
771 philosopherset[p_id].counter = graph.l_vertex(p_id).num_mirrors() + 1;
774 void make_philosopher_hungry(
lvid_type p_id) {
775 local_vertex_type lvertex(graph.l_vertex(p_id));
777 philosopherset[p_id].lock.lock();
779 bool newlockid = !philosopherset[p_id].lockid;
780 initialize_master_philosopher_as_hungry_locked(p_id, newlockid);
783 ": Global HUNGRY " << lvertex.global_id()
784 <<
"(" << (int)philosopherset[p_id].counter <<
")" << std::endl;
786 philosopherset[p_id].lock.unlock();
788 unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
789 rmi.pod_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
790 &dcm_type::rpc_make_philosopher_hungry, lvertex.global_id(), newlockid);
791 rmi.dc().set_sequentialization_key(pkey);
792 local_philosopher_grabs_forks(p_id);
797 void make_philosopher_hungry_per_replica(
lvid_type p_id) {
798 local_vertex_type lvertex(graph.l_vertex(p_id));
799 philosopherset[p_id].lock.lock();
802 if (lvertex.owner() == rmi.procid()) {
803 bool newlockid = !philosopherset[p_id].lockid;
804 initialize_master_philosopher_as_hungry_locked(p_id, newlockid);
807 ": Global HUNGRY " << lvertex.global_id()
808 <<
"(" << (int)philosopherset[p_id].counter <<
")" << std::endl;
811 bool newlockid = !philosopherset[p_id].lockid;
812 philosopherset[p_id].lockid = newlockid;
813 philosopherset[p_id].state = HUNGRY;
815 philosopherset[p_id].lock.unlock();
816 local_philosopher_grabs_forks(p_id);
820 void philosopher_stops_eating(
lvid_type p_id) {
821 local_vertex_type lvertex(graph.l_vertex(p_id));
824 ": Global STOP Eating " << lvertex.global_id() << std::endl;
826 philosopherset[p_id].lock.lock();
828 philosopherset[p_id].counter = 0;
829 philosopherset[p_id].lock.unlock();
830 unsigned char pkey = rmi.dc().set_sequentialization_key(lvertex.global_id() % 254 + 1);
831 rmi.pod_call(lvertex.mirrors().begin(), lvertex.mirrors().end(),
832 &dcm_type::rpc_philosopher_stops_eating, lvertex.global_id());
833 rmi.dc().set_sequentialization_key(pkey);
834 local_philosopher_stops_eating(p_id);
837 void philosopher_stops_eating_per_replica(
lvid_type p_id) {
839 ": Global STOP Eating " << graph.global_vid(p_id) << std::endl;
843 local_philosopher_stops_eating(p_id);
847 void no_locks_consistency_check() {
849 for (
size_t i = 0;i < forkset.size(); ++i) ASSERT_TRUE(fork_dirty(i));
851 for (
size_t i = 0;i < philosopherset.size(); ++i) ASSERT_TRUE(philosopherset[i].state == THINKING);
856 boost::unordered_set<size_t> eidset1;
857 boost::unordered_set<size_t> eidset2;
858 for (
lvid_type v = 0; v < graph.num_local_vertices(); ++v) {
859 local_vertex_type lvertex(graph.l_vertex(v));
860 foreach(local_edge_type edge, lvertex.in_edges()) {
861 size_t edgeid = edge.id();
862 ASSERT_TRUE(eidset1.find(edgeid) == eidset1.end());
863 eidset1.insert(edgeid);
865 foreach(local_edge_type edge, lvertex.out_edges()) {
866 size_t edgeid = edge.id();
867 ASSERT_TRUE(eidset2.find(edgeid) == eidset2.end());
868 eidset2.insert(edgeid);
871 ASSERT_EQ(eidset1.size(), eidset2.size());
872 eidset1.clear(); eidset2.clear();
873 complete_consistency_check();
875 std::cout <<
"Philosophers\n";
876 std::cout <<
"------------\n";
877 for (
lvid_type v = 0; v < graph.num_local_vertices(); ++v) {
878 local_vertex_type lvertex(graph.l_vertex(v));
879 std::cout << graph.global_vid(v) <<
": " << (int)philosopherset[v].state <<
" " <<
880 philosopherset[v].forks_acquired <<
" " << philosopherset[v].num_edges <<
" ";
881 if (philosopherset[v].forks_acquired == philosopherset[v].num_edges) std::cout <<
"---------------!";
883 std::cout <<
"\tin: ";
884 foreach(local_edge_type edge, lvertex.in_edges()) {
885 size_t edgeid = edge.id();
886 if (fork_dirty(forkset[edgeid])) std::cout << edgeid <<
":" << (int)forkset[edgeid] <<
" ";
888 std::cout <<
"\n\tout: ";
889 foreach(local_edge_type edge, lvertex.out_edges()) {
890 size_t edgeid = edge.id();
891 if (fork_dirty(forkset[edgeid])) std::cout << edgeid <<
":" << (int)forkset[edgeid] <<
" ";
897 void complete_consistency_check() {
898 for (
lvid_type v = 0; v < graph.num_local_vertices(); ++v) {
899 local_vertex_type lvertex(graph.l_vertex(v));
902 size_t numowned_clean = 0;
903 foreach(local_edge_type edge, lvertex.in_edges()) {
904 size_t edgeid = edge.id();
905 if (fork_owner(edgeid) == OWNER_TARGET) {
907 if (!fork_dirty(edgeid)) numowned_clean++;
910 foreach(local_edge_type edge, lvertex.out_edges()) {
911 size_t edgeid = edge.id();
912 if (fork_owner(edgeid) == OWNER_SOURCE) {
914 if (!fork_dirty(edgeid)) numowned_clean++;
918 ASSERT_EQ(philosopherset[v].forks_acquired, numowned);
919 if (philosopherset[v].state == THINKING) {
920 ASSERT_EQ(numowned_clean, 0);
922 else if (philosopherset[v].state == HUNGRY) {
923 ASSERT_NE(philosopherset[v].num_edges, philosopherset[v].forks_acquired);
926 foreach(local_edge_type edge, lvertex.in_edges()) {
927 size_t edgeid = edge.id();
929 if (fork_owner(edgeid) == OWNER_SOURCE) {
930 if (philosopherset[edge.source()].state != EATING) {
931 if (fork_dirty(edgeid)) {
932 std::cout << (int)(forkset[edgeid]) <<
" "
933 << (int)philosopherset[edge.source()].state
934 <<
"->" << (int)philosopherset[edge.target()].state
936 ASSERT_FALSE(fork_dirty(edgeid));
939 ASSERT_NE(philosopherset[edge.source()].state, (int)THINKING);
942 foreach(local_edge_type edge, lvertex.out_edges()) {
943 size_t edgeid = edge.id();
944 if (fork_owner(edgeid) == OWNER_TARGET) {
945 if (philosopherset[edge.target()].state != EATING) {
946 if (fork_dirty(edgeid)) {
947 std::cout << (int)(forkset[edgeid]) <<
" "
948 << (int)philosopherset[edge.source()].state
950 << (int)philosopherset[edge.target()].state
952 ASSERT_FALSE(fork_dirty(edgeid));
955 ASSERT_NE(philosopherset[edge.target()].state, (int)THINKING);
960 else if (philosopherset[v].state == EATING) {
961 ASSERT_EQ(philosopherset[v].forks_acquired, philosopherset[v].num_edges);
969 #include <graphlab/macros_undef.hpp>