GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
event_log.cpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <graphlab/util/event_log.hpp>
25 #include <graphlab/util/timer.hpp>
26 #include <graphlab/logger/assertions.hpp>
27 
28 #define EVENT_BAR_WIDTH 40
29 #define BAR_CHARACTER '#'
30 
31 namespace graphlab {
32 
33 static std::ofstream eventlog_file;
34 static mutex eventlog_file_mutex;
35 static bool eventlog_file_open = false;
36 
37 static timer event_timer;
38 static bool event_timer_started = false;
39 static mutex event_timer_mutex;
40 
41 
42 void event_log::initialize(std::ostream &ostrm,
43  size_t flush_interval_ms,
44  event_print_type event_print) {
45  m.lock();
46  out = &ostrm;
47  flush_interval = flush_interval_ms;
48  print_method = event_print;
49 
50  event_timer_mutex.lock();
51  if (event_timer_started == false) {
52  event_timer_started = true;
53  event_timer.start();
54  }
55  event_timer_mutex.unlock();
56  prevtime = event_timer.current_time_millis();
57 
58  cond.signal();
59  m.unlock();
60 
61  if (event_print == LOG_FILE) {
62  eventlog_file_mutex.lock();
63  if (!eventlog_file_open) {
64  eventlog_file_open = true;
65  eventlog_file.open("eventlog.txt");
66  }
67  out = &eventlog_file;
68  eventlog_file_mutex.unlock();
69  }
70 
71 }
72 
73 event_log::~event_log() {
74  finished = true;
75  m.lock();
76  cond.signal();
77  m.unlock();
78  printing_thread.join();
79  if (print_method != LOG_FILE) {
80  size_t pos;
81  if (hascounter.first_bit(pos)) {
82  do {
83  (*out) << descriptions[pos] << ":\t" << totalcounter[pos].value << " Events\n";
84  } while(hascounter.next_bit(pos));
85  }
86  }
87  else{
88  size_t pos;
89  if (hascounter.first_bit(pos)) {
90  do {
91  std::cout << descriptions[pos] << ":\t" << totalcounter[pos].value << " Events\n";
92  } while(hascounter.next_bit(pos));
93  }
94  }
95 }
96 
97 
98 void event_log::immediate_event(unsigned char eventid) {
99  m.lock();
100  immediate_events.push_back(std::make_pair(eventid, event_timer.current_time_millis()));
101  m.unlock();
102 }
103 
104 void event_log::close() {
105  out = NULL;
106  m.lock();
107  flush_interval = 0;
108  m.unlock();
109 }
110 
111 void event_log::add_event_type(unsigned char eventid,
112  std::string description) {
113  descriptions[eventid] = description;
114  max_desc_length = std::max(max_desc_length, description.length());
115  ASSERT_MSG(max_desc_length <= 30, "Event Description length must be <= 30 characters");
116  counters[eventid].value = 0;
117  hascounter.set_bit(eventid);
118 }
119 
120 void event_log::add_immediate_event_type(unsigned char eventid, std::string description) {
121  descriptions[eventid] = description;
122  max_desc_length = std::max(max_desc_length, description.length());
123  ASSERT_MSG(max_desc_length <= 30, "Event Description length must be <= 30 characters");
124  counters[eventid].value = 0;
125 }
126 
127 void event_log::flush() {
128  size_t pos;
129  if (!hascounter.first_bit(pos)) return;
130  double curtime = event_timer.current_time_millis();
131  double timegap = curtime - prevtime;
132  prevtime = curtime;
133 
134  if (hasevents == false && noeventctr == 1) return;
135 
136  bool found_events = false;
137  if (print_method == NUMBER) {
138  do {
139  size_t ctrval = counters[pos].exchange(0);
140  found_events = found_events || ctrval > 0;
141  (*out) << pos << ":\t" << curtime << "\t" << ctrval << "\t" << 1000 * ctrval / timegap << " /s\n";
142  } while(hascounter.next_bit(pos));
143  // flush immediate events
144  if (!immediate_events.empty()) {
145  std::vector<std::pair<unsigned char, size_t> > cur;
146  cur.swap(immediate_events);
147  for (size_t i = 0;i < cur.size(); ++i) {
148  (*out) << (size_t)cur[i].first << ":\t" << cur[i].second << "\t" << -1 << "\t" << 0 << " /s\n";
149  }
150  }
151  out->flush();
152  }
153  else if (print_method == DESCRIPTION) {
154  do {
155  size_t ctrval = counters[pos].exchange(0);
156  found_events = found_events || ctrval > 0;
157  (*out) << descriptions[pos] << ":\t" << curtime << "\t" << ctrval << "\t" << 1000 * ctrval / timegap << " /s\n";
158  } while(hascounter.next_bit(pos));
159  if (!immediate_events.empty()) {
160  std::vector<std::pair<unsigned char, size_t> > cur;
161  cur.swap(immediate_events);
162  for (size_t i = 0;i < cur.size(); ++i) {
163  (*out) << descriptions[cur[i].first] << ":\t" << cur[i].second << "\t" << -1 << "\t" << 0 << " /s\n";
164  }
165  }
166  out->flush();
167  }
168  else if (print_method == LOG_FILE) {
169  eventlog_file_mutex.lock();
170  do {
171  size_t ctrval = counters[pos].exchange(0);
172  found_events = found_events || ctrval > 0;
173  (*out) << descriptions[pos] << ":\t" << curtime << "\t" << ctrval << "\t" << 1000 * ctrval / timegap << "\n";
174  } while(hascounter.next_bit(pos));
175  if (!immediate_events.empty()) {
176  std::vector<std::pair<unsigned char, size_t> > cur;
177  cur.swap(immediate_events);
178  for (size_t i = 0;i < cur.size(); ++i) {
179  (*out) << descriptions[cur[i].first] << ":\t" << cur[i].second << "\t" << -1 << "\t" << 0 << " /s\n";
180  }
181  }
182  eventlog_file_mutex.unlock();
183  out->flush();
184  }
185  else if (print_method == RATE_BAR) {
186  (*out) << "Time: " << "+"<<timegap << "\t" << curtime << "\n";
187  char spacebuf[60];
188  char pbuf[61];
189  memset(spacebuf, ' ', EVENT_BAR_WIDTH);
190  memset(pbuf, BAR_CHARACTER, 60);
191  do {
192  size_t ctrval = counters[pos].exchange(0);
193  found_events = found_events || ctrval > 0;
194  maxcounter[pos] = std::max(maxcounter[pos], ctrval);
195  size_t barlen = 0;
196  size_t mc = maxcounter[pos];
197  if (mc > 0) barlen = ctrval * EVENT_BAR_WIDTH / mc;
198  if (barlen > EVENT_BAR_WIDTH) barlen = EVENT_BAR_WIDTH;
199 
200  pbuf[barlen] = '\0';
201  spacebuf[max_desc_length - descriptions[pos].length() + 1] = 0;
202  (*out) << descriptions[pos] << spacebuf << "|" << pbuf;
203  spacebuf[max_desc_length - descriptions[pos].length() + 1] =' ';
204  pbuf[barlen] = BAR_CHARACTER;
205  // now print the remaining spaces
206  spacebuf[EVENT_BAR_WIDTH - barlen] = '\0';
207  (*out) << spacebuf << "| " << ctrval << " : " << mc << " \n";
208  spacebuf[EVENT_BAR_WIDTH - barlen] = ' ';
209 
210  } while(hascounter.next_bit(pos));
211  out->flush();
212  }
213  if (found_events == false) {
214  ++noeventctr;
215  }
216  else {
217  noeventctr = 0;
218  }
219  hasevents = false;
220 
221 }
222 
223 void event_log::thread_loop() {
224  m.lock();
225  while(!finished) {
226  if (flush_interval == 0) {
227  cond.wait(m);
228  }
229  else {
230  m.unlock();
231  my_sleep_ms(flush_interval);
232  m.lock();
233  if (flush_interval > 0) flush();
234  }
235  }
236  m.unlock();
237 }
238 
239 } // namespace