29 #include <graphlab/rpc/dc.hpp>
30 #include <graphlab/rpc/dc_dist_object.hpp>
31 #include <graphlab/rpc/distributed_event_log.hpp>
32 #include <graphlab/util/timer.hpp>
33 #include <graphlab/logger/assertions.hpp>
34 #include <graphlab/util/dense_bitset.hpp>
35 #include <graphlab/ui/metrics_server.hpp>
36 #include <graphlab/macros_def.hpp>
42 static std::pair<std::string, std::string>
43 metric_names_json(std::map<std::string, std::string>& vars);
45 static std::pair<std::string, std::string>
46 metric_aggregate_json(std::map<std::string, std::string>& vars);
48 static std::pair<std::string, std::string>
49 metric_by_machine_json(std::map<std::string, std::string>& vars);
52 static size_t time_to_index(
double t) {
53 return std::floor(t / 5);
56 static double index_to_time(
size_t t) {
61 size_t distributed_event_logger::allocate_log_entry(log_group* group) {
62 log_entry_lock.lock();
64 if (has_log_entry.first_zero_bit(
id) ==
false) {
66 "New log entries cannot be created");
70 has_log_entry.set_bit(
id);
71 log_entry_lock.unlock();
75 event_log_thread_local_type* distributed_event_logger::get_thread_counter_ref() {
76 void* v = pthread_getspecific(key);
79 event_log_thread_local_type* entry =
new event_log_thread_local_type;
81 for (
size_t i = 0; i < MAX_LOG_SIZE; ++i) entry->values[i] = 0;
85 pthread_setspecific(key, v);
88 thread_local_count_lock.lock();
91 if (thread_local_count_slots.first_zero_bit(b) ==
false) {
93 "Log counters cannot be created");
96 entry->thlocal_slot = b;
97 thread_local_count[b] = entry;
98 thread_local_count_slots.set_bit(b);
99 thread_local_count_lock.unlock();
102 event_log_thread_local_type* entry = (event_log_thread_local_type*)(v);
109 void distributed_event_logger::rpc_collect_log(
size_t srcproc,
size_t record_ctr,
110 std::vector<double> srccounts) {
111 foreach(
size_t log, has_log_entry) {
112 logs[log]->lock.lock();
114 size_t entryid = record_ctr;
115 logs[log]->earliest_modified_log =
116 std::min(entryid, logs[log]->earliest_modified_log);
117 logs[log]->machine_log_modified =
true;
119 for (
procid_t p = 0; p < logs[log]->machine.size(); ++p) {
120 if (logs[log]->machine[p].size() < entryid + 1) {
121 double prevvalue = 0;
122 if (logs[log]->machine[p].size() > 0) {
123 prevvalue = logs[log]->machine[p].back().value;
125 logs[log]->machine[p].resize(entryid + 1, log_entry(prevvalue));
128 logs[log]->machine[srcproc][entryid].value = srccounts[log];
129 logs[log]->lock.unlock();
133 void distributed_event_logger::collect_instantaneous_log() {
134 foreach(
size_t log, has_log_entry) {
135 if (logs[log]->logtype == log_type::INSTANTANEOUS) {
136 logs[log]->lock.lock();
139 if (logs[log]->is_callback_entry) {
140 logs[log]->sum_of_instantaneous_entries += logs[log]->callback();
141 ++logs[log]->count_of_instantaneous_entries;
145 foreach(
size_t thr, thread_local_count_slots) {
146 logs[log]->sum_of_instantaneous_entries += thread_local_count[thr]->values[log];
148 ++logs[log]->count_of_instantaneous_entries;
150 logs[log]->lock.unlock();
159 void distributed_event_logger::local_collect_log(
size_t record_ctr) {
161 std::vector<double> combined_counts(MAX_LOG_SIZE, 0);
166 foreach(
size_t log, has_log_entry) {
167 logs[log]->lock.lock();
169 if (logs[log]->logtype == log_type::CUMULATIVE) {
170 if (logs[log]->is_callback_entry) {
171 combined_counts[log] = logs[log]->callback();
173 foreach(
size_t thr, thread_local_count_slots) {
174 size_t* current_thread_counts = thread_local_count[thr]->values;
175 combined_counts[log] += current_thread_counts[log];
181 if (logs[log]->count_of_instantaneous_entries > 0) {
182 combined_counts[log] = (double)logs[log]->sum_of_instantaneous_entries /
183 logs[log]->count_of_instantaneous_entries;
186 combined_counts[log] = 0;
188 logs[log]->sum_of_instantaneous_entries = 0;
189 logs[log]->count_of_instantaneous_entries = 0;
191 logs[log]->lock.unlock();
195 if (rmi->procid() != 0) {
196 rmi->control_call(0, &distributed_event_logger::rpc_collect_log,
197 (
size_t)rmi->procid(), record_ctr, combined_counts);
200 rpc_collect_log((
size_t)0, record_ctr, combined_counts);
205 void distributed_event_logger::build_aggregate_log() {
206 ASSERT_EQ(rmi->procid(), 0);
207 foreach(
size_t log, has_log_entry) {
208 logs[log]->lock.lock();
209 if (logs[log]->machine_log_modified) {
214 size_t prevtime = logs[log]->earliest_modified_log;
215 size_t lasttime = prevtime + 1;
216 for (
procid_t p = 0; p < logs[log]->machine.size(); ++p) {
217 lasttime = std::max(lasttime, logs[log]->machine[p].size());
221 if (logs[log]->aggregate.size() < lasttime) {
222 if (logs[log]->logtype == log_type::CUMULATIVE) {
224 if (logs[log]->aggregate.size() > 0) {
225 lastval = logs[log]->aggregate.rbegin()->value;
227 logs[log]->aggregate.resize(lasttime, log_entry(lastval));
230 logs[log]->aggregate.resize(lasttime);
234 for (
size_t t = prevtime; t < lasttime; ++t) {
236 for (
procid_t p = 0; p < logs[log]->machine.size(); ++p) {
237 if (t < logs[log]->machine[p].size()) {
238 sum += logs[log]->machine[p][t].value;
241 logs[log]->aggregate[t].value = sum;
243 logs[log]->earliest_modified_log = (size_t)(-1);
244 logs[log]->machine_log_modified =
false;
246 logs[log]->lock.unlock();
250 void distributed_event_logger::periodic_timer() {
251 periodic_timer_lock.lock();
252 timer ti; ti.start();
256 int ticks_per_record = RECORD_FREQUENCY / TICK_FREQUENCY;
258 while (!periodic_timer_stop){
259 collect_instantaneous_log();
260 if (tick_ctr % ticks_per_record == 0) {
261 local_collect_log(record_ctr);
263 if (rmi->procid() == 0) build_aggregate_log();
267 int nexttick_time = tick_ctr * 1000 * TICK_FREQUENCY;
268 int nexttick_interval = nexttick_time - ti.current_time_millis();
270 if (nexttick_interval < 10)
continue;
271 periodic_timer_cond.timedwait_ms(periodic_timer_lock, nexttick_interval);
273 periodic_timer_lock.unlock();
276 distributed_event_logger::distributed_event_logger():rmi(NULL) {
277 pthread_key_create(&key, NULL);
279 has_log_entry.clear();
280 thread_local_count_slots.clear();
281 periodic_timer_stop =
false;
284 void distributed_event_logger::destroy_event_logger() {
286 bool thread_was_started =
false;
287 periodic_timer_lock.lock();
291 if (periodic_timer_stop ==
false) {
292 periodic_timer_stop =
true;
293 thread_was_started =
true;
294 periodic_timer_cond.signal();
296 periodic_timer_lock.unlock();
297 if (thread_was_started) tick_thread.join();
301 pthread_key_delete(key);
303 foreach(
size_t thr, thread_local_count_slots) {
304 if (thread_local_count[thr] != NULL)
delete thread_local_count[thr];
306 foreach(
size_t log, has_log_entry) {
307 if (logs[log] != NULL)
delete logs[log];
313 void distributed_event_logger::set_dc(distributed_control& dc) {
315 rmi =
new dc_dist_object<distributed_event_logger>(dc,
this);
319 dc.register_deletion_callback(boost::bind(
320 &distributed_event_logger::destroy_event_logger,
329 if (rmi->procid() == 0) {
332 periodic_timer_stop =
false;
334 tick_thread.launch(boost::bind(&distributed_event_logger::periodic_timer,
344 size_t distributed_event_logger::create_log_entry(std::string name,
346 log_type::log_type_enum logtype) {
348 bool has_existing =
false;
349 size_t existingid = 0;
350 log_entry_lock.lock();
351 foreach(
size_t log, has_log_entry) {
352 if (logs[log]->name == name) {
353 ASSERT_MSG(logs[log]->is_callback_entry ==
false,
354 "Cannot convert callback log to counter log");
360 log_entry_lock.unlock();
361 if (has_existing)
return existingid;
363 log_group* group =
new log_group;
364 group->logtype = logtype;
366 group->units = units;
367 group->callback = NULL;
368 group->is_callback_entry =
false;
369 group->earliest_modified_log = 1;
370 group->machine_log_modified =
false;
371 group->sum_of_instantaneous_entries = 0.0;
372 group->count_of_instantaneous_entries = 0;
375 if (rmi->procid() == 0) {
376 group->machine.resize(rmi->numprocs());
379 size_t id = allocate_log_entry(group);
385 size_t distributed_event_logger::create_callback_entry(std::string name,
387 boost::function<
double(
void)> callback,
388 log_type::log_type_enum logtype) {
389 bool has_existing =
false;
390 size_t existingid = 0;
391 log_entry_lock.lock();
392 foreach(
size_t log, has_log_entry) {
393 if (logs[log]->name == name) {
399 log_entry_lock.unlock();
403 ASSERT_MSG(logs[existingid]->is_callback_entry ==
true,
404 "Cannot convert counter log to callback log");
406 logs[existingid]->lock.lock();
407 ASSERT_MSG(logs[existingid]->callback == NULL,
408 "Cannot create another callback log entry with"
409 "the same name %s", name.c_str());
410 logs[existingid]->callback = callback;
411 logs[existingid]->lock.unlock();
415 log_group* group =
new log_group;
416 group->logtype = logtype;
418 group->units = units;
419 group->earliest_modified_log = 0;
420 group->machine_log_modified =
false;
421 group->callback = callback;
422 group->is_callback_entry =
true;
423 group->sum_of_instantaneous_entries = 0.0;
424 group->count_of_instantaneous_entries = 0;
428 if (rmi->procid() == 0) {
429 group->machine.resize(rmi->numprocs());
432 size_t id = allocate_log_entry(group);
438 void distributed_event_logger::thr_inc_log_entry(
size_t entry,
size_t value) {
439 event_log_thread_local_type* ev = get_thread_counter_ref();
440 ASSERT_LT(entry, MAX_LOG_SIZE);
441 ASSERT_EQ(logs[entry]->is_callback_entry,
false);
442 ev->values[entry] += value;
445 void distributed_event_logger::thr_dec_log_entry(
size_t entry,
size_t value) {
446 event_log_thread_local_type* ev = get_thread_counter_ref();
447 ASSERT_LT(entry, MAX_LOG_SIZE);
449 ASSERT_NE((
int)logs[entry]->logtype, (
int) log_type::CUMULATIVE);
450 ASSERT_EQ(logs[entry]->is_callback_entry,
false);
451 ev->values[entry] -= value;
455 void distributed_event_logger::free_callback_entry(
size_t entry) {
456 ASSERT_LT(entry, MAX_LOG_SIZE);
458 logs[entry]->lock.lock();
459 ASSERT_EQ(logs[entry]->is_callback_entry,
true);
460 logs[entry]->callback = NULL;
461 logs[entry]->lock.unlock();
464 distributed_event_logger& get_event_log() {
465 static distributed_event_logger dist_event_log;
466 return dist_event_log;
476 std::pair<std::string, std::string>
477 static metric_names_json(std::map<std::string, std::string>& vars) {
478 std::stringstream strm;
479 char *pname = getenv(
"_");
480 std::string progname;
481 if (pname != NULL) progname = pname;
484 distributed_event_logger& evlog = get_event_log();
485 log_group** logs = evlog.get_logs_ptr();
486 fixed_dense_bitset<MAX_LOG_SIZE>& has_log_entry = evlog.get_logs_bitset();
489 <<
" \"program_name\": \""<< progname <<
"\",\n"
490 <<
" \"time\": " << evlog.get_current_time() <<
",\n"
491 <<
" \"metrics\": [\n";
493 size_t nlogs = has_log_entry.popcount();
496 foreach(
size_t log, has_log_entry) {
498 logs[log]->lock.lock();
500 size_t len = logs[log]->aggregate.size();
502 double logtime = index_to_time(logs[log]->aggregate.size() - 1);
503 double logval = logs[log]->aggregate.rbegin()->value;
506 if (logs[log]->aggregate.size() >= 2) {
507 prevtime = index_to_time(logs[log]->aggregate.size() - 2);
508 prevval = logs[log]->aggregate[len - 2].value;
510 if (logs[log]->logtype == log_type::CUMULATIVE) {
511 rate_val = (logval - prevval) / (logtime - prevtime);
519 <<
" \"id\":" << log <<
",\n"
520 <<
" \"name\": \"" << logs[log]->name <<
"\",\n"
521 <<
" \"units\": \"" << logs[log]->units <<
"\",\n"
522 <<
" \"cumulative\": " << (int)(logs[log]->logtype) <<
",\n"
523 <<
" \"rate_val\": " << rate_val <<
",\n"
524 <<
" \"value\": " << ( logs[log]->aggregate.size() > 0 ?
525 logs[log]->aggregate.rbegin()->value
529 logs[log]->lock.unlock();
531 if (logcount < nlogs) strm <<
",";
536 return std::make_pair(std::string(
"text/plain"), strm.str());
539 std::pair<std::string, std::string>
540 static metric_aggregate_json(std::map<std::string, std::string>& vars) {
542 double tend = DBL_MAX;
547 size_t idxstart = time_to_index(tstart);
548 size_t idxend = (size_t)(-1);
549 if (vars.count(
"name")) name = vars[
"name"];
550 if (vars.count(
"tstart")) {
551 tstart = atof(vars[
"tstart"].c_str());
552 idxstart = time_to_index(tstart);
554 if (vars.count(
"tend")) {
555 tend = atof(vars[
"tend"].c_str());
556 idxend = time_to_index(tend) + 1;
558 if (vars.count(
"rate")) rate = (atoi(vars[
"rate"].c_str()) != 0);
559 if (vars.count(
"tlast")) {
560 double tlast = atof(vars[
"tlast"].c_str());
561 tstart = get_event_log().get_current_time() - tlast;
562 tstart = tstart < 0.0 ? 0.0 : tstart;
563 tend = get_event_log().get_current_time();
564 idxstart = time_to_index(tstart);
565 idxend = time_to_index(tend) + 1;
571 distributed_event_logger& evlog = get_event_log();
572 log_group** logs = evlog.get_logs_ptr();
573 fixed_dense_bitset<MAX_LOG_SIZE>& has_log_entry = evlog.get_logs_bitset();
575 std::stringstream strm;
577 size_t nlogs = has_log_entry.popcount();
581 bool extract_all = (name.length() == 0);
586 foreach(
size_t log, has_log_entry) {
588 if (logs[log]->name == name || extract_all) {
590 logs[log]->lock.lock();
592 <<
" \"id\":" << log <<
",\n"
593 <<
" \"name\": \"" << logs[log]->name <<
"\",\n"
594 <<
" \"units\": \"" << logs[log]->units <<
"\",\n"
595 <<
" \"name\": \"" << logs[log]->name <<
"\",\n"
596 <<
" \"cumulative\": " << (int)(logs[log]->logtype) <<
",\n"
599 std::vector<std::pair<double, double> > output_entries;
602 size_t log_idxend = std::min(idxend, logs[log]->aggregate.size());
603 for (
size_t i = idxstart; i < log_idxend ; ++i) {
604 double logtime = index_to_time(i);
605 double logval = logs[log]->aggregate[i].value;
607 if (rate == 0 || logs[log]->logtype == log_type::INSTANTANEOUS) {
608 output_entries.push_back(std::make_pair(logtime, logval));
614 prevtime = index_to_time(i - 1);
615 prevval = logs[log]->aggregate[i - 1].value;
619 if (logtime > prevtime) {
620 currate = (logval - prevval) / (logtime - prevtime);
622 output_entries.push_back(std::make_pair(logtime, currate));
626 logs[log]->lock.unlock();
627 for (
size_t i = 0 ;i < output_entries.size(); ++i) {
629 << output_entries[i].first <<
", "
630 << output_entries[i].second
633 if (i < output_entries.size() - 1) strm <<
", ";
639 if (!extract_all)
break;
641 if (logcount < nlogs) strm <<
",\n";
647 return std::make_pair(std::string(
"text/plain"), strm.str());
653 std::pair<std::string, std::string>
654 static metric_by_machine_json(std::map<std::string, std::string>& vars) {
656 double tend = DBL_MAX;
660 bool has_machine_filter =
false;
663 size_t idxend = (size_t)(-1);
665 if (vars.count(
"name")) name = vars[
"name"];
666 if (vars.count(
"machine")) {
667 has_machine_filter =
true;
668 machine = atoi(vars[
"machine"].c_str());
670 if (vars.count(
"tstart")) {
671 tstart = atof(vars[
"tstart"].c_str());
672 idxstart = time_to_index(tstart);
674 if (vars.count(
"tend")) {
675 tend = atof(vars[
"tend"].c_str());
676 idxend = time_to_index(tend) + 1;
678 if (vars.count(
"rate")) rate = (atoi(vars[
"rate"].c_str()) != 0);
679 if (vars.count(
"tlast")) {
680 double tlast = atof(vars[
"tlast"].c_str());
681 tstart = get_event_log().get_current_time() - tlast;
682 tstart = tstart < 0.0 ? 0.0 : tstart;
683 tend = get_event_log().get_current_time();
684 idxstart = time_to_index(tstart);
685 idxend = time_to_index(tend) + 1;
692 distributed_event_logger& evlog = get_event_log();
693 log_group** logs = evlog.get_logs_ptr();
694 fixed_dense_bitset<MAX_LOG_SIZE>& has_log_entry = evlog.get_logs_bitset();
696 std::stringstream strm;
698 size_t nlogs = has_log_entry.popcount();
702 bool extract_all = (name.length() == 0);
707 foreach(
size_t log, has_log_entry) {
708 if (logs[log]->name == name || extract_all) {
710 logs[log]->lock.lock();
712 <<
" \"id\":" << log <<
",\n"
713 <<
" \"name\": \"" << logs[log]->name <<
"\",\n"
714 <<
" \"units\": \"" << logs[log]->units <<
"\",\n"
715 <<
" \"cumulative\": " << (int)(logs[log]->logtype) <<
",\n"
718 std::vector<std::vector<std::pair<double, double> > > all_output_entries;
724 size_t p_end = logs[log]->machine.size();
725 if (has_machine_filter) {
729 for (
size_t p = p_start; p < p_end; ++p) {
730 std::vector<log_entry>& current = logs[log]->machine[p];
731 std::vector<std::pair<double, double> > output_entries;
733 size_t log_idxend = std::min(idxend, current.size());
734 for (
size_t i = idxstart; i < log_idxend; ++i) {
735 double logtime = index_to_time(i);
736 double logval = current[i].value;
738 if (logtime > tstart && logtime <= tend) {
740 if (rate == 0 || logs[log]->logtype == log_type::INSTANTANEOUS) {
741 output_entries.push_back(std::make_pair(logtime, logval));
747 prevtime = index_to_time(i - 1);
748 prevval = current[i - 1].value;
752 if (logtime > prevtime) {
753 currate = (logval - prevval) / (logtime - prevtime);
755 output_entries.push_back(std::make_pair(logtime, currate));
759 all_output_entries.push_back(output_entries);
762 logs[log]->lock.unlock();
764 for (
size_t p = 0; p < all_output_entries.size(); ++p) {
765 std::vector<std::pair<double, double> >& output_entries = all_output_entries[p];
767 for (
size_t i = 0 ;i < output_entries.size(); ++i) {
769 << output_entries[i].first <<
", "
770 << output_entries[i].second
773 if (i < output_entries.size() - 1) strm <<
", ";
777 if (p < all_output_entries.size() - 1) strm <<
", ";
784 if (!extract_all)
break;
786 if (logcount < nlogs) strm <<
",\n";
791 return std::make_pair(std::string(
"text/plain"), strm.str());