GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
reply_increment_counter.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef REPLY_INCREMENT_COUNTER_HPP
25 #define REPLY_INCREMENT_COUNTER_HPP
26 #include <string>
27 #include <graphlab/parallel/atomic.hpp>
28 #include <graphlab/parallel/pthread_tools.hpp>
29 
30 namespace graphlab {
31 
32 class distributed_control;
33 
34 namespace dc_impl {
35 /**
36 \ingroup rpc
37 \internal
38 A wrapper around a char array. This structure
39 is incapable of freeing itself and must be managed externally
40 */
41 struct blob {
42  /// Constructs a blob containing a pointer to a character array with length len
43  blob(char* c, size_t len):c(c),len(len) { };
44  blob():c(NULL), len(0){ };
45 
46  char *c; ///< stored pointer
47  size_t len; ///< stored length
48 
49 
50  /// serialize the char array
51  void save(oarchive& oarc) const {
52  oarc << len;
53  if (len > 0) serialize(oarc, c, len);
54  }
55 
56  /// deserializes a char array. If there is already a char array here, it will be freed
57  void load(iarchive& iarc) {
58  if (c) ::free(c);
59  c = NULL;
60  iarc >> len;
61  if (len > 0) {
62  c = (char*) malloc(len);
63  deserialize(iarc, c, len);
64  }
65  }
66 
67  /// Free the stored char array.
68  void free() {
69  if (c) {
70  ::free(c);
71  c = NULL;
72  len = 0;
73  }
74  }
75 };
76 
77 /**
78 \internal
79 \ingroup rpc
80 Defines a really useful function that performs an atomic
81 increment of a flag when called. This is useful for waiting
82 for a reply to a request
83 \note: usemutex = false probably does not work and should be deprecated.
84 \see reply_increment_counter
85 */
86 struct reply_ret_type{
87  atomic<size_t> flag;
88  blob val;
89  bool usemutex;
90  mutex mut;
91  conditional cond;
92  /**
93  * Constructs a reply object which waits for 'retcount' replies.
94  * usemutex should always be true
95  */
96  reply_ret_type(bool usemutex, size_t retcount = 1):flag(retcount),
97  usemutex(true) {
98  }
99 
100  ~reply_ret_type() { }
101 
102  /**
103  * Waits for all replies to complete. It is up to the
104  * reply implementation to decrement the counter.
105  */
106  inline void wait() {
107  if (usemutex) {
108  mut.lock();
109  while(flag.value != 0) cond.wait(mut);
110  mut.unlock();
111  }
112  else {
113  while(flag.value != 0) sched_yield();
114  }
115  }
116 };
117 
118 
119 
120 /**
121  * \internal
122  * \ingroup rpc
123  * Like reply_ret_type but can store a blob for each reply.
124  * \see stored_increment_counter
125  */
126 struct stored_ret_type{
127  atomic<size_t> flag;
128  std::map<procid_t, blob> val;
129  mutex mut;
130  conditional cond;
131  /**
132  * Constructs a reply object which waits for 'retcount' replies.
133  * usemutex should always be true
134  */
135  stored_ret_type(size_t retcount = 1):flag(retcount) {
136  }
137 
138  ~stored_ret_type() { }
139 
140  /**
141  * Waits for all replies to complete. It is up to the
142  * reply implementation to decrement the counter.
143  */
144  inline void wait() {
145  mut.lock();
146  while(flag.value != 0) cond.wait(mut);
147  mut.unlock();
148  }
149 };
150 
151 }
152 
153 
154 /**
155  * \internal
156  * \ingroup rpc
157  * A simple RPC call which converts ptr to a pointer to a reply_ret_type,
158  * stores the blob in it, and decrements its reply counter.
159  * \see reply_ret_type
160  */
161 void reply_increment_counter(distributed_control &dc, procid_t src,
162  size_t ptr, dc_impl::blob ret);
163 
164 /**
165  * \internal
166  * \ingroup rpc
167  * A simple RPC call which converts ptr to a pointer to a stored_ret_type,
168  * stores the blob in it, and decrements its reply counter.
169  * \see stored_ret_type
170  */
171 void stored_increment_counter(distributed_control &dc, procid_t src,
172  size_t ptr, dc_impl::blob ret);
173 
174 }
175 
176 #endif
177