GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
distributed_event_log.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_DISTRIBUTED_EVENT_LOG_HPP
25 #define GRAPHLAB_DISTRIBUTED_EVENT_LOG_HPP
26 #include <iostream>
27 #include <string>
28 #include <vector>
29 #include <boost/bind.hpp>
30 #include <boost/function.hpp>
31 #include <graphlab/parallel/pthread_tools.hpp>
32 #include <graphlab/parallel/atomic.hpp>
33 #include <graphlab/util/timer.hpp>
34 #include <graphlab/util/dense_bitset.hpp>
35 #include <graphlab/util/stl_util.hpp>
36 namespace graphlab {
37 
38 // forward declaration because we need this in the
39 // class but we want dc_dist_object to be able
40 // to use this class too.
41 template <typename T>
42 class dc_dist_object;
43 class distributed_control;
44 
45 
46 
47 const size_t MAX_LOG_SIZE = 256;
48 const size_t MAX_LOG_THREADS = 1024;
49 const double TICK_FREQUENCY = 0.5;
50 const double RECORD_FREQUENCY = 5.0;
51 
52 
53 
54 /// A single entry in time
55 struct log_entry: public IS_POD_TYPE {
56  // The value at the time. If this is a CUMULATIVE entry, this
57  // will contain the total number of events since the start
58  double value;
59 
60  explicit log_entry(double value = 0): value(value) { }
61 };
62 
63 
64 namespace log_type {
65 enum log_type_enum {
66  INSTANTANEOUS = 0, ///< Sum of log values over time are not meaningful
67  CUMULATIVE = 1 ///< Sum of log values over time are meaningful
68 };
69 }
70 
71 /// Logging information for a particular log entry (say \#updates)
72 struct log_group{
73  mutex lock;
74 
75  /// name of the group
76  std::string name;
77 
78  /// unit of measurement
79  std::string units;
80 
81  /// Set to true if this is a callback entry
83 
84  /// The type of log. Instantaneous or Cumulative
85  log_type::log_type_enum logtype;
86 
87  boost::function<double(void)> callback;
88 
89  size_t sum_of_instantaneous_entries;
90  size_t count_of_instantaneous_entries;
91 
92  bool machine_log_modified;
93  size_t earliest_modified_log;
94 
95  /// machine[i] holds a vector of entries from machine i
96  std::vector<std::vector<log_entry> > machine;
97  /// aggregate holds vector of totals
98  std::vector<log_entry> aggregate;
99 };
100 
101 
102 /**
103  * This is the type that is held in the thread local store
104  */
106  /** The values written to by each thread.
107  * An array with max length MAX_LOG_SIZE
108  */
109  size_t values[MAX_LOG_SIZE];
110  size_t thlocal_slot;
111 
112  // These are used for time averaging instantaneous values
113 };
114 
115 
116 class distributed_event_logger {
117  private:
118  // a key to allow multiple threads, each to have their
119  // own counter. Decreases performance penalty of the
120  // the event logger.
121  pthread_key_t key;
122 
124 
125  // The array of logs. We can only have a maximum of MAX_LOG_SIZE logs
126  // This is only created on machine 0
127  log_group* logs[MAX_LOG_SIZE];
128  // this bit field is used to identify which log entries are active
129  fixed_dense_bitset<MAX_LOG_SIZE> has_log_entry;
130  mutex log_entry_lock;
131 
132  // A collection of slots, one for each thread, to hold
133  // the current thread's active log counter.
134  // Threads will write directly into here
135  // and a master timer will sum it all up periodically
136  event_log_thread_local_type* thread_local_count[MAX_LOG_THREADS];
137  // a bitset which lets me identify which slots in thread_local_counts
138  // are used.
139  fixed_dense_bitset<MAX_LOG_THREADS> thread_local_count_slots;
140  mutex thread_local_count_lock;
141 
142  // timer managing the frequency at which logs are transmitted to the root
143  timer ti;
144  thread tick_thread;
145 
146  size_t allocate_log_entry(log_group* group);
147  /**
148  * Returns a pointer to the current thread log counter
149  * creating one if one does not already exist.
150  */
151  event_log_thread_local_type* get_thread_counter_ref();
152 
153  /**
154  * Receives the log information from each machine
155  */
156  void rpc_collect_log(size_t srcproc, size_t record_ctr,
157  std::vector<double> srccounts);
158 
159  void collect_instantaneous_log();
160  /**
161  * Collects the machine level
162  * log entry. and sends it to machine 0
163  */
164  void local_collect_log(size_t record_ctr);
165 
166  // Called only by machine 0 to get the aggregate log
167  void build_aggregate_log();
168 
169  mutex periodic_timer_lock;
170  conditional periodic_timer_cond;
171  bool periodic_timer_stop;
172 
173  /** a new thread spawns here and sleeps for 5 seconds at a time
174  * when it wakes up it will insert log entries
175  */
176  void periodic_timer();
177  public:
178  distributed_event_logger();
179 
180  // called by the destruction of distributed_control
181  void destroy_event_logger();
182 
183 
184  /**
185  * Associates the event log with a DC object.
186  * Must be called by all machines simultaneously.
187  * Can be called more than once, but only the first call will have
188  * an effect.
189  */
190  void set_dc(distributed_control& dc);
191  /**
192  * Creates a new log entry with a given name and log type.
193  * Returns the ID of the log. Must be called by
194  * all machines simultaneously with the same settings.
195  * units is the unit of measurement.
196  */
197  size_t create_log_entry(std::string name, std::string units,
198  log_type::log_type_enum logtype);
199 
200  /**
201  * Creates a new callback log entry with a given name and log type.
202  * Returns the ID of the log. Must be called by
203  * all machines simultaneously with the same settings.
204  * units is the unit of measurement.
205  * Callback will be triggered periodically.
206  * Callback entries must be deleted once the callback goes
207  * out of scope.
208  */
209  size_t create_callback_entry(std::string name,
210  std::string units,
211  boost::function<double(void)> callback,
212  log_type::log_type_enum logtype);
213 
214  void free_callback_entry(size_t entry);
215 
216  /**
217  * Increments the value of a log entry
218  */
219  void thr_inc_log_entry(size_t entry, size_t value);
220 
221  /**
222  * Increments the value of a log entry
223  */
224  void thr_dec_log_entry(size_t entry, size_t value);
225 
226 
227  /// \cond GRAPHLAB_INTERNAL
228  inline double get_current_time() const {
229  return ti.current_time();
230  }
231 
232  inline log_group** get_logs_ptr() {
233  return logs;
234  }
235 
236  inline fixed_dense_bitset<MAX_LOG_SIZE>& get_logs_bitset() {
237  return has_log_entry;
238  }
239 
240  /// \endcond
241 
242 };
243 
244 
245 extern distributed_event_logger& get_event_log();
246 
247 
248 } // namespace graphlab
249 #define DECLARE_EVENT(name) size_t name;
250 
251 #define INITIALIZE_EVENT_LOG(dc) graphlab::get_event_log().set_dc(dc);
252 #define ADD_CUMULATIVE_EVENT(name, desc, units) \
253  name = graphlab::get_event_log().create_log_entry(desc, units, graphlab::log_type::CUMULATIVE);
254 
255 #define ADD_INSTANTANEOUS_EVENT(name, desc, units) \
256  name = graphlab::get_event_log().create_log_entry(desc, units, graphlab::log_type::INSTANTANEOUS);
257 
258 #define ADD_CUMULATIVE_CALLBACK_EVENT(name, desc, units, callback) \
259  name = graphlab::get_event_log().create_callback_entry(desc, units, callback, \
260  graphlab::log_type::CUMULATIVE);
261 
262 
263 #define ADD_INSTANTANEOUS_CALLBACK_EVENT(name, desc, units, callback) \
264  name = graphlab::get_event_log().create_callback_entry(desc, units, callback, \
265  graphlab::log_type::INSTANTANEOUS);
266 
267 
268 
269 #define FREE_CALLBACK_EVENT(name) \
270  graphlab::get_event_log().free_callback_entry(name);
271 
272 #define INCREMENT_EVENT(name, count) graphlab::get_event_log().thr_inc_log_entry(name, count);
273 #define DECREMENT_EVENT(name, count) graphlab::get_event_log().thr_dec_log_entry(name, count);
274 
275 #endif