GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
distributed_event_log.cpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 
25 #include <pthread.h>
26 #include <string>
27 #include <limits>
28 #include <cfloat>
29 #include <graphlab/rpc/dc.hpp>
30 #include <graphlab/rpc/dc_dist_object.hpp>
31 #include <graphlab/rpc/distributed_event_log.hpp>
32 #include <graphlab/util/timer.hpp>
33 #include <graphlab/logger/assertions.hpp>
34 #include <graphlab/util/dense_bitset.hpp>
35 #include <graphlab/ui/metrics_server.hpp>
36 #include <graphlab/macros_def.hpp>
37 
38 namespace graphlab {
39 
40 
41 // predeclaration the metric server handlers
42 static std::pair<std::string, std::string>
43 metric_names_json(std::map<std::string, std::string>& vars);
44 
45 static std::pair<std::string, std::string>
46 metric_aggregate_json(std::map<std::string, std::string>& vars);
47 
48 static std::pair<std::string, std::string>
49 metric_by_machine_json(std::map<std::string, std::string>& vars);
50 
51 
52 static size_t time_to_index(double t) {
53  return std::floor(t / 5);
54 }
55 
56 static double index_to_time(size_t t) {
57  return 5 * t;
58 }
59 
60 
61 size_t distributed_event_logger::allocate_log_entry(log_group* group) {
62  log_entry_lock.lock();
63  size_t id = 0;
64  if (has_log_entry.first_zero_bit(id) == false) {
65  logger(LOG_FATAL, "More than 256 Log entries created. "
66  "New log entries cannot be created");
67  // does not return
68  }
69  logs[id] = group;
70  has_log_entry.set_bit(id);
71  log_entry_lock.unlock();
72  return id;
73 }
74 
75 event_log_thread_local_type* distributed_event_logger::get_thread_counter_ref() {
76  void* v = pthread_getspecific(key);
77  if (v == NULL) {
78  // allocate a new thread local entry
79  event_log_thread_local_type* entry = new event_log_thread_local_type;
80  // set all values to 0
81  for (size_t i = 0; i < MAX_LOG_SIZE; ++i) entry->values[i] = 0;
82  // cast and write it to v. We need it later.
83  // and set the thread local store
84  v = (void*)(entry);
85  pthread_setspecific(key, v);
86 
87  // register the key entry against the logger
88  thread_local_count_lock.lock();
89  // find an unused entry
90  size_t b = 0;
91  if (thread_local_count_slots.first_zero_bit(b) == false) {
92  logger(LOG_FATAL, "More than 1024 active threads. "
93  "Log counters cannot be created");
94  // does not return
95  }
96  entry->thlocal_slot = b;
97  thread_local_count[b] = entry;
98  thread_local_count_slots.set_bit(b);
99  thread_local_count_lock.unlock();
100  }
101 
102  event_log_thread_local_type* entry = (event_log_thread_local_type*)(v);
103  return entry;
104 }
105 
106 /**
107  * Receives the log information from each machine
108  */
109 void distributed_event_logger::rpc_collect_log(size_t srcproc, size_t record_ctr,
110  std::vector<double> srccounts) {
111  foreach(size_t log, has_log_entry) {
112  logs[log]->lock.lock();
113  // insert the new counts
114  size_t entryid = record_ctr;
115  logs[log]->earliest_modified_log =
116  std::min(entryid, logs[log]->earliest_modified_log);
117  logs[log]->machine_log_modified = true;
118  // resize all procs
119  for (procid_t p = 0; p < logs[log]->machine.size(); ++p) {
120  if (logs[log]->machine[p].size() < entryid + 1) {
121  double prevvalue = 0;
122  if (logs[log]->machine[p].size() > 0) {
123  prevvalue = logs[log]->machine[p].back().value;
124  }
125  logs[log]->machine[p].resize(entryid + 1, log_entry(prevvalue));
126  }
127  }
128  logs[log]->machine[srcproc][entryid].value = srccounts[log];
129  logs[log]->lock.unlock();
130  }
131 }
132 
133 void distributed_event_logger::collect_instantaneous_log() {
134  foreach(size_t log, has_log_entry) {
135  if (logs[log]->logtype == log_type::INSTANTANEOUS) {
136  logs[log]->lock.lock();
137  // for each log entry which is a callback entry
138  // call the callback to get the counts
139  if (logs[log]->is_callback_entry) {
140  logs[log]->sum_of_instantaneous_entries += logs[log]->callback();
141  ++logs[log]->count_of_instantaneous_entries;
142  }
143  else {
144  // sum it across all the threads
145  foreach(size_t thr, thread_local_count_slots) {
146  logs[log]->sum_of_instantaneous_entries += thread_local_count[thr]->values[log];
147  }
148  ++logs[log]->count_of_instantaneous_entries;
149  }
150  logs[log]->lock.unlock();
151  }
152  }
153 }
154 
155 /**
156  * Collects the machine level
157  * log entry. and sends it to machine 0
158  */
159 void distributed_event_logger::local_collect_log(size_t record_ctr) {
160  // put together an aggregate of all counters
161  std::vector<double> combined_counts(MAX_LOG_SIZE, 0);
162 
163  // for each thread and for each log entry which is
164  // not a callback entry. Accumulate the number of counts
165  //
166  foreach(size_t log, has_log_entry) {
167  logs[log]->lock.lock();
168  // cimulative entry. just add across all threads
169  if (logs[log]->logtype == log_type::CUMULATIVE) {
170  if (logs[log]->is_callback_entry) {
171  combined_counts[log] = logs[log]->callback();
172  } else {
173  foreach(size_t thr, thread_local_count_slots) {
174  size_t* current_thread_counts = thread_local_count[thr]->values;
175  combined_counts[log] += current_thread_counts[log];
176  }
177  }
178  }
179  else {
180  // take the average
181  if (logs[log]->count_of_instantaneous_entries > 0) {
182  combined_counts[log] = (double)logs[log]->sum_of_instantaneous_entries /
183  logs[log]->count_of_instantaneous_entries;
184  }
185  else {
186  combined_counts[log] = 0;
187  }
188  logs[log]->sum_of_instantaneous_entries = 0;
189  logs[log]->count_of_instantaneous_entries = 0;
190  }
191  logs[log]->lock.unlock();
192  }
193 
194  // send to machine 0
195  if (rmi->procid() != 0) {
196  rmi->control_call(0, &distributed_event_logger::rpc_collect_log,
197  (size_t)rmi->procid(), record_ctr, combined_counts);
198  }
199  else {
200  rpc_collect_log((size_t)0, record_ctr, combined_counts);
201  }
202 }
203 
204 // Called only by machine 0 to get the aggregate log
205 void distributed_event_logger::build_aggregate_log() {
206  ASSERT_EQ(rmi->procid(), 0);
207  foreach(size_t log, has_log_entry) {
208  logs[log]->lock.lock();
209  if (logs[log]->machine_log_modified) {
210  // what is the previous time the aggregate was computed?
211  // The sum takes the open interval (prevtime, current_time]
212  // thus the first time this is called, we may drop one entry
213  // if we let prevtime initialize at 0
214  size_t prevtime = logs[log]->earliest_modified_log;
215  size_t lasttime = prevtime + 1;
216  for (procid_t p = 0; p < logs[log]->machine.size(); ++p) {
217  lasttime = std::max(lasttime, logs[log]->machine[p].size());
218  }
219  // if it is a CUMULATIVE log, take the latest entry from each machine
220  // if it is an INSTANTANEOUS log, take the average of the last times.
221  if (logs[log]->aggregate.size() < lasttime) {
222  if (logs[log]->logtype == log_type::CUMULATIVE) {
223  double lastval = 0;
224  if (logs[log]->aggregate.size() > 0) {
225  lastval = logs[log]->aggregate.rbegin()->value;
226  }
227  logs[log]->aggregate.resize(lasttime, log_entry(lastval));
228  }
229  else {
230  logs[log]->aggregate.resize(lasttime);
231  }
232  }
233 
234  for (size_t t = prevtime; t < lasttime; ++t) {
235  double sum = 0;
236  for (procid_t p = 0; p < logs[log]->machine.size(); ++p) {
237  if (t < logs[log]->machine[p].size()) {
238  sum += logs[log]->machine[p][t].value;
239  }
240  }
241  logs[log]->aggregate[t].value = sum;
242  }
243  logs[log]->earliest_modified_log = (size_t)(-1);
244  logs[log]->machine_log_modified = false;
245  }
246  logs[log]->lock.unlock();
247  }
248 }
249 
250 void distributed_event_logger::periodic_timer() {
251  periodic_timer_lock.lock();
252  timer ti; ti.start();
253  int tick_ctr = 0;
254  int record_ctr = 0;
255 
256  int ticks_per_record = RECORD_FREQUENCY / TICK_FREQUENCY;
257 
258  while (!periodic_timer_stop){
259  collect_instantaneous_log();
260  if (tick_ctr % ticks_per_record == 0) {
261  local_collect_log(record_ctr);
262  ++record_ctr;
263  if (rmi->procid() == 0) build_aggregate_log();
264  }
265  // when is the next tick
266  ++tick_ctr;
267  int nexttick_time = tick_ctr * 1000 * TICK_FREQUENCY;
268  int nexttick_interval = nexttick_time - ti.current_time_millis();
269  // we lost a tick.
270  if (nexttick_interval < 10) continue;
271  periodic_timer_cond.timedwait_ms(periodic_timer_lock, nexttick_interval);
272  }
273  periodic_timer_lock.unlock();
274 }
275 
276 distributed_event_logger::distributed_event_logger():rmi(NULL) {
277  pthread_key_create(&key, NULL);
278  // clear the bit fields
279  has_log_entry.clear();
280  thread_local_count_slots.clear();
281  periodic_timer_stop = false;
282 }
283 
284 void distributed_event_logger::destroy_event_logger() {
285  // kill the tick thread
286  bool thread_was_started = false;
287  periodic_timer_lock.lock();
288  // if periodic_timer_stop is false, then
289  // thread was started. signal it and wait for it later to
290  // join
291  if (periodic_timer_stop == false) {
292  periodic_timer_stop = true;
293  thread_was_started = true;
294  periodic_timer_cond.signal();
295  }
296  periodic_timer_lock.unlock();
297  if (thread_was_started) tick_thread.join();
298  // make sure everyone has joined before I start freeing stuff
299  rmi->full_barrier();
300  delete rmi;
301  pthread_key_delete(key);
302  // here also free all the allocated memory!
303  foreach(size_t thr, thread_local_count_slots) {
304  if (thread_local_count[thr] != NULL) delete thread_local_count[thr];
305  }
306  foreach(size_t log, has_log_entry) {
307  if (logs[log] != NULL) delete logs[log];
308  }
309 
310 
311 }
312 
313 void distributed_event_logger::set_dc(distributed_control& dc) {
314  if (rmi == NULL) {
315  rmi = new dc_dist_object<distributed_event_logger>(dc, this);
316  // register a deletion callback since the distributed_event_logger
317  // will be destroyed only after main
318 
319  dc.register_deletion_callback(boost::bind(
320  &distributed_event_logger::destroy_event_logger,
321  this));
322 
323  dc.barrier();
324  // everyone starts the timer at the same time
325  // at the one distributed synchronization point we have
326  ti.start();
327  // procid 0 waits 0.2s to skew the local timer a little
328  // so everyone else's log has time to show up
329  if (rmi->procid() == 0) {
330  timer::sleep_ms(200);
331  }
332  periodic_timer_stop = false;
333  // spawn a thread for the tick
334  tick_thread.launch(boost::bind(&distributed_event_logger::periodic_timer,
335  this));
336 
337  // register the metric server callbacks
338  add_metric_server_callback("names.json", metric_names_json);
339  add_metric_server_callback("metrics_aggregate.json", metric_aggregate_json);
340  add_metric_server_callback("metrics_by_machine.json", metric_by_machine_json);
341  }
342 }
343 
344 size_t distributed_event_logger::create_log_entry(std::string name,
345  std::string units,
346  log_type::log_type_enum logtype) {
347  // look for an entry with the same name
348  bool has_existing = false;
349  size_t existingid = 0;
350  log_entry_lock.lock();
351  foreach(size_t log, has_log_entry) {
352  if (logs[log]->name == name) {
353  ASSERT_MSG(logs[log]->is_callback_entry == false,
354  "Cannot convert callback log to counter log");
355  has_existing = true;
356  existingid = log;
357  break;
358  }
359  }
360  log_entry_lock.unlock();
361  if (has_existing) return existingid;
362 
363  log_group* group = new log_group;
364  group->logtype = logtype;
365  group->name = name;
366  group->units = units;
367  group->callback = NULL;
368  group->is_callback_entry = false;
369  group->earliest_modified_log = 1;
370  group->machine_log_modified = false;
371  group->sum_of_instantaneous_entries = 0.0;
372  group->count_of_instantaneous_entries = 0;
373  // only allocate the machine vector on the root machine.
374  // no one else needs it
375  if (rmi->procid() == 0) {
376  group->machine.resize(rmi->numprocs());
377  }
378  // ok. get an ID
379  size_t id = allocate_log_entry(group);
380  // enforce that all machines are running this at the same time
381  rmi->barrier();
382  return id;
383 }
384 
385 size_t distributed_event_logger::create_callback_entry(std::string name,
386  std::string units,
387  boost::function<double(void)> callback,
388  log_type::log_type_enum logtype) {
389  bool has_existing = false;
390  size_t existingid = 0;
391  log_entry_lock.lock();
392  foreach(size_t log, has_log_entry) {
393  if (logs[log]->name == name) {
394  has_existing = true;
395  existingid = log;
396  break;
397  }
398  }
399  log_entry_lock.unlock();
400  if (has_existing) {
401  // ok... we have an existing entry. We may
402  // overwrite the callback if the callback is NULL
403  ASSERT_MSG(logs[existingid]->is_callback_entry == true,
404  "Cannot convert counter log to callback log");
405 
406  logs[existingid]->lock.lock();
407  ASSERT_MSG(logs[existingid]->callback == NULL,
408  "Cannot create another callback log entry with"
409  "the same name %s", name.c_str());
410  logs[existingid]->callback = callback;
411  logs[existingid]->lock.unlock();
412  return existingid;
413  }
414 
415  log_group* group = new log_group;
416  group->logtype = logtype;
417  group->name = name;
418  group->units = units;
419  group->earliest_modified_log = 0;
420  group->machine_log_modified = false;
421  group->callback = callback;
422  group->is_callback_entry = true;
423  group->sum_of_instantaneous_entries = 0.0;
424  group->count_of_instantaneous_entries = 0;
425 
426  // only allocate the machine vector on the root machine.
427  // no one else needs it
428  if (rmi->procid() == 0) {
429  group->machine.resize(rmi->numprocs());
430  }
431  // ok. get an ID
432  size_t id = allocate_log_entry(group);
433  // enforce that all machines are running this at the same time
434  rmi->barrier();
435  return id;
436 }
437 
438 void distributed_event_logger::thr_inc_log_entry(size_t entry, size_t value) {
439  event_log_thread_local_type* ev = get_thread_counter_ref();
440  ASSERT_LT(entry, MAX_LOG_SIZE);
441  ASSERT_EQ(logs[entry]->is_callback_entry, false);
442  ev->values[entry] += value;
443 }
444 
445 void distributed_event_logger::thr_dec_log_entry(size_t entry, size_t value) {
446  event_log_thread_local_type* ev = get_thread_counter_ref();
447  ASSERT_LT(entry, MAX_LOG_SIZE);
448  // does not work for cumulative logs
449  ASSERT_NE((int)logs[entry]->logtype, (int) log_type::CUMULATIVE);
450  ASSERT_EQ(logs[entry]->is_callback_entry, false);
451  ev->values[entry] -= value;
452 }
453 
454 
455 void distributed_event_logger::free_callback_entry(size_t entry) {
456  ASSERT_LT(entry, MAX_LOG_SIZE);
457  // does not work for cumulative logs
458  logs[entry]->lock.lock();
459  ASSERT_EQ(logs[entry]->is_callback_entry, true);
460  logs[entry]->callback = NULL;
461  logs[entry]->lock.unlock();
462 }
463 
464 distributed_event_logger& get_event_log() {
465  static distributed_event_logger dist_event_log;
466  return dist_event_log;
467 }
468 
469 
470 
471 
472 
473 /*
474  Used to process the names.json request
475 */
476 std::pair<std::string, std::string>
477 static metric_names_json(std::map<std::string, std::string>& vars) {
478  std::stringstream strm;
479  char *pname = getenv("_");
480  std::string progname;
481  if (pname != NULL) progname = pname;
482 
483 
484  distributed_event_logger& evlog = get_event_log();
485  log_group** logs = evlog.get_logs_ptr();
486  fixed_dense_bitset<MAX_LOG_SIZE>& has_log_entry = evlog.get_logs_bitset();
487 
488  strm << "{\n"
489  << " \"program_name\": \""<< progname << "\",\n"
490  << " \"time\": " << evlog.get_current_time() << ",\n"
491  << " \"metrics\": [\n";
492  // output the metrics
493  size_t nlogs = has_log_entry.popcount();
494 
495  size_t logcount = 0;
496  foreach(size_t log, has_log_entry) {
497 
498  logs[log]->lock.lock();
499  double rate_val = 0;
500  size_t len = logs[log]->aggregate.size();
501  if (len >= 1) {
502  double logtime = index_to_time(logs[log]->aggregate.size() - 1);
503  double logval = logs[log]->aggregate.rbegin()->value;
504  double prevtime = 0;
505  double prevval = 0;
506  if (logs[log]->aggregate.size() >= 2) {
507  prevtime = index_to_time(logs[log]->aggregate.size() - 2);
508  prevval = logs[log]->aggregate[len - 2].value;
509  }
510  if (logs[log]->logtype == log_type::CUMULATIVE) {
511  rate_val = (logval - prevval) / (logtime - prevtime);
512  }
513  else {
514  rate_val = logval;
515  }
516  }
517 
518  strm << " {\n"
519  << " \"id\":" << log << ",\n"
520  << " \"name\": \"" << logs[log]->name << "\",\n"
521  << " \"units\": \"" << logs[log]->units << "\",\n"
522  << " \"cumulative\": " << (int)(logs[log]->logtype) << ",\n"
523  << " \"rate_val\": " << rate_val << ",\n"
524  << " \"value\": " << ( logs[log]->aggregate.size() > 0 ?
525  logs[log]->aggregate.rbegin()->value
526  : 0 ) << "\n"
527  << " }\n";
528 
529  logs[log]->lock.unlock();
530  ++logcount;
531  if (logcount < nlogs) strm << ",";
532  }
533  strm << " ]\n"
534  << "}\n";
535 
536  return std::make_pair(std::string("text/plain"), strm.str());
537 }
538 
539 std::pair<std::string, std::string>
540 static metric_aggregate_json(std::map<std::string, std::string>& vars) {
541  double tstart = 0;
542  double tend = DBL_MAX;
543  bool rate = false;
544  std::string name;
545  // see what variables there are
546 
547  size_t idxstart = time_to_index(tstart);
548  size_t idxend = (size_t)(-1);
549  if (vars.count("name")) name = vars["name"];
550  if (vars.count("tstart")) {
551  tstart = atof(vars["tstart"].c_str());
552  idxstart = time_to_index(tstart);
553  }
554  if (vars.count("tend")) {
555  tend = atof(vars["tend"].c_str());
556  idxend = time_to_index(tend) + 1;
557  }
558  if (vars.count("rate")) rate = (atoi(vars["rate"].c_str()) != 0);
559  if (vars.count("tlast")) {
560  double tlast = atof(vars["tlast"].c_str());
561  tstart = get_event_log().get_current_time() - tlast;
562  tstart = tstart < 0.0 ? 0.0 : tstart;
563  tend = get_event_log().get_current_time();
564  idxstart = time_to_index(tstart);
565  idxend = time_to_index(tend) + 1;
566  }
567 
568  // name is not optional
569  name = trim(name);
570 
571  distributed_event_logger& evlog = get_event_log();
572  log_group** logs = evlog.get_logs_ptr();
573  fixed_dense_bitset<MAX_LOG_SIZE>& has_log_entry = evlog.get_logs_bitset();
574 
575  std::stringstream strm;
576 
577  size_t nlogs = has_log_entry.popcount();
578  size_t logcount = 0;
579 
580  // if name is empty, I should extract all metrics
581  bool extract_all = (name.length() == 0);
582 
583  // make a top level array
584 
585  strm << "[\n";
586  foreach(size_t log, has_log_entry) {
587 
588  if (logs[log]->name == name || extract_all) {
589 
590  logs[log]->lock.lock();
591  strm << " {\n"
592  << " \"id\":" << log << ",\n"
593  << " \"name\": \"" << logs[log]->name << "\",\n"
594  << " \"units\": \"" << logs[log]->units << "\",\n"
595  << " \"name\": \"" << logs[log]->name << "\",\n"
596  << " \"cumulative\": " << (int)(logs[log]->logtype) << ",\n"
597  << " \"record\": [";
598 
599  std::vector<std::pair<double, double> > output_entries;
600  // annoyingly, json does not let me put a trailing comma in the array.
601  // thus I need to first write it to a vector, before dumping it to json
602  size_t log_idxend = std::min(idxend, logs[log]->aggregate.size());
603  for (size_t i = idxstart; i < log_idxend ; ++i) {
604  double logtime = index_to_time(i);
605  double logval = logs[log]->aggregate[i].value;
606  // only cumulative logs can have rate
607  if (rate == 0 || logs[log]->logtype == log_type::INSTANTANEOUS) {
608  output_entries.push_back(std::make_pair(logtime, logval));
609  }
610  else {
611  double prevval = 0;
612  double prevtime = 0;
613  if (i > 0) {
614  prevtime = index_to_time(i - 1);
615  prevval = logs[log]->aggregate[i - 1].value;
616  }
617  double currate = 0;
618  // avoid divide by zero annoyances
619  if (logtime > prevtime) {
620  currate = (logval - prevval) / (logtime - prevtime);
621  }
622  output_entries.push_back(std::make_pair(logtime, currate));
623  }
624  }
625 
626  logs[log]->lock.unlock();
627  for (size_t i = 0 ;i < output_entries.size(); ++i) {
628  strm << " ["
629  << output_entries[i].first << ", "
630  << output_entries[i].second
631  << "] ";
632  // add a comma if this is not the last entry
633  if (i < output_entries.size() - 1) strm << ", ";
634  }
635  strm << "]\n"
636  << " }\n";
637 
638  // if I am not supposed to extract all, then I am done here.
639  if (!extract_all) break;
640  ++logcount;
641  if (logcount < nlogs) strm << ",\n";
642  }
643 
644  }
645 
646  strm << "]\n";
647  return std::make_pair(std::string("text/plain"), strm.str());
648 }
649 
650 
651 
652 
653 std::pair<std::string, std::string>
654 static metric_by_machine_json(std::map<std::string, std::string>& vars) {
655  double tstart = 0;
656  double tend = DBL_MAX;
657  bool rate = false;
658  std::string name;
659  size_t machine = 0;
660  bool has_machine_filter = false;
661  // see what variables there are
662  size_t idxstart = 0;
663  size_t idxend = (size_t)(-1);
664 
665  if (vars.count("name")) name = vars["name"];
666  if (vars.count("machine")) {
667  has_machine_filter = true;
668  machine = atoi(vars["machine"].c_str());
669  }
670  if (vars.count("tstart")) {
671  tstart = atof(vars["tstart"].c_str());
672  idxstart = time_to_index(tstart);
673  }
674  if (vars.count("tend")) {
675  tend = atof(vars["tend"].c_str());
676  idxend = time_to_index(tend) + 1;
677  }
678  if (vars.count("rate")) rate = (atoi(vars["rate"].c_str()) != 0);
679  if (vars.count("tlast")) {
680  double tlast = atof(vars["tlast"].c_str());
681  tstart = get_event_log().get_current_time() - tlast;
682  tstart = tstart < 0.0 ? 0.0 : tstart;
683  tend = get_event_log().get_current_time();
684  idxstart = time_to_index(tstart);
685  idxend = time_to_index(tend) + 1;
686  }
687 
688 
689  // name is not optional
690  name = trim(name);
691 
692  distributed_event_logger& evlog = get_event_log();
693  log_group** logs = evlog.get_logs_ptr();
694  fixed_dense_bitset<MAX_LOG_SIZE>& has_log_entry = evlog.get_logs_bitset();
695 
696  std::stringstream strm;
697 
698  size_t nlogs = has_log_entry.popcount();
699  size_t logcount = 0;
700 
701  // if name is empty, I should extract all metrics
702  bool extract_all = (name.length() == 0);
703 
704  // make a top level array
705 
706  strm << "[\n";
707  foreach(size_t log, has_log_entry) {
708  if (logs[log]->name == name || extract_all) {
709 
710  logs[log]->lock.lock();
711  strm << " {\n"
712  << " \"id\":" << log << ",\n"
713  << " \"name\": \"" << logs[log]->name << "\",\n"
714  << " \"units\": \"" << logs[log]->units << "\",\n"
715  << " \"cumulative\": " << (int)(logs[log]->logtype) << ",\n"
716  << " \"record\": ";
717 
718  std::vector<std::vector<std::pair<double, double> > > all_output_entries;
719  // annoyingly, json does not let me put a trailing comma in the array.
720  // thus I need to first write it to a vector, before dumping it to json
721  // and annoying 2 dimensional output arrays...
722  //
723  size_t p_start = 0;
724  size_t p_end = logs[log]->machine.size();
725  if (has_machine_filter) {
726  p_start = machine;
727  p_end = machine + 1;
728  }
729  for (size_t p = p_start; p < p_end; ++p) {
730  std::vector<log_entry>& current = logs[log]->machine[p];
731  std::vector<std::pair<double, double> > output_entries;
732 
733  size_t log_idxend = std::min(idxend, current.size());
734  for (size_t i = idxstart; i < log_idxend; ++i) {
735  double logtime = index_to_time(i);
736  double logval = current[i].value;
737 
738  if (logtime > tstart && logtime <= tend) {
739  // only cumulative logs can have rate
740  if (rate == 0 || logs[log]->logtype == log_type::INSTANTANEOUS) {
741  output_entries.push_back(std::make_pair(logtime, logval));
742  }
743  else {
744  double prevval = 0;
745  double prevtime = 0;
746  if (i > 0) {
747  prevtime = index_to_time(i - 1);
748  prevval = current[i - 1].value;
749  }
750  double currate = 0;
751  // avoid divide by zero annoyances
752  if (logtime > prevtime) {
753  currate = (logval - prevval) / (logtime - prevtime);
754  }
755  output_entries.push_back(std::make_pair(logtime, currate));
756  }
757  }
758  }
759  all_output_entries.push_back(output_entries);
760  }
761 
762  logs[log]->lock.unlock();
763  strm << "[ ";
764  for (size_t p = 0; p < all_output_entries.size(); ++p) {
765  std::vector<std::pair<double, double> >& output_entries = all_output_entries[p];
766  strm << "[ ";
767  for (size_t i = 0 ;i < output_entries.size(); ++i) {
768  strm << " ["
769  << output_entries[i].first << ", "
770  << output_entries[i].second
771  << "] ";
772  // add a comma if this is not the last entry
773  if (i < output_entries.size() - 1) strm << ", ";
774  }
775 
776  strm << "] ";
777  if (p < all_output_entries.size() - 1) strm << ", ";
778  }
779  strm << "]\n"
780  << " }\n";
781 
782 
783  // if I am not supposed to extract all, then I am done here.
784  if (!extract_all) break;
785  ++logcount;
786  if (logcount < nlogs) strm << ",\n";
787  }
788  }
789 
790  strm << "]\n";
791  return std::make_pair(std::string("text/plain"), strm.str());
792 }
793 
794 
795 
796 
797 } // namespace graphlab