24 #include <graphlab/rpc/async_consensus.hpp>
28 size_t required_threads_in_done,
29 const dc_impl::dc_dist_object_base *attach)
30 :rmi(dc, this), attachedobj(attach),
31 last_calls_sent(0), last_calls_received(0),
32 numactive(required_threads_in_done),
33 ncpus(required_threads_in_done),
38 hastoken(dc.procid() == 0),
41 cur_token.total_calls_sent = 0;
42 cur_token.total_calls_received = 0;
48 last_calls_received = 0;
51 trying_to_sleep =
false;
52 critical = std::vector<char>(ncpus, 0);
53 sleeping = std::vector<char>(ncpus, 0);
54 hastoken = (rmi.
procid() == 0);
55 cur_token.total_calls_sent = 0;
56 cur_token.total_calls_received = 0;
68 trying_to_sleep.inc();
69 critical[cpuid] =
true;
76 critical[cpuid] =
false;
77 trying_to_sleep.dec();
84 critical[cpuid] =
false;
85 trying_to_sleep.dec();
104 if (numactive == 0) {
105 logstream(
LOG_INFO) << rmi.
procid() <<
": Termination Possible" << std::endl;
106 if (hastoken) pass_the_token();
108 sleeping[cpuid] =
true;
114 if (sleeping[cpuid] ==
false || done) {
120 critical[cpuid] =
false;
121 trying_to_sleep.dec();
130 if (trying_to_sleep > 0 || numactive < ncpus) {
132 size_t oldnumactive = numactive;
144 if (numactive < ncpus) {
147 for (
size_t i = 0;i < ncpus; ++i) {
148 numactive += sleeping[i];
154 if (oldnumactive == 0 && !done) {
164 if (critical[cpuhint]) {
166 size_t oldnumactive = numactive;
168 if (sleeping[cpuhint]) {
169 numactive += sleeping[cpuhint];
170 sleeping[cpuhint] = 0;
171 if (oldnumactive == 0 && !done) {
174 cond[cpuhint].signal();
180 void async_consensus::receive_the_token(token &tok) {
186 logstream(
LOG_INFO) << rmi.
procid() <<
": Token Received" << std::endl;
187 if (numactive == 0) {
193 void async_consensus::pass_the_token() {
198 if (cur_token.last_change == rmi.
procid() &&
199 cur_token.total_calls_received == cur_token.total_calls_sent) {
200 logstream(
LOG_INFO) <<
"Completed Token: "
201 << cur_token.total_calls_received <<
" "
202 << cur_token.total_calls_sent << std::endl;
216 if (numactive < ncpus) {
219 for (
size_t i = 0;i < ncpus; ++i) {
220 numactive += sleeping[i];
235 callsrecv = attachedobj->calls_received();
236 callssent = attachedobj->calls_sent();
243 if (callssent != last_calls_sent ||
244 callsrecv != last_calls_received) {
245 cur_token.total_calls_sent += callssent - last_calls_sent;
246 cur_token.total_calls_received += callsrecv - last_calls_received;
247 cur_token.last_change = rmi.
procid();
252 last_calls_sent = callssent;
253 last_calls_received = callsrecv;
258 << cur_token.total_calls_received <<
" "
259 << cur_token.total_calls_sent << std::endl;
262 &async_consensus::receive_the_token,