GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
safe_circular_char_buffer.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef SAFE_CIRCULAR_CHAR_BUFFER_HPP
25 #define SAFE_CIRCULAR_CHAR_BUFFER_HPP
26 #include <graphlab/rpc/circular_char_buffer.hpp>
27 #include <graphlab/parallel/pthread_tools.hpp>
28 #include <graphlab/logger/assertions.hpp>
29 
30 namespace graphlab {
31 
32 /**
33 \ingroup util
34 A non-resizing circular char buffer
35 with thread-safe write operations and a single reader
36 */
38  public:
39  safe_circular_char_buffer(std::streamsize bufsize = 10485760 /*10 MB */);
40 
42 
43  /**
44  * Stops the buffer and signals any blocking calls.
45  */
46  void stop_reader();
47 
48 
49  /**
50  * Determine if the buffer is empty
51  */
52  bool empty() const;
53 
54  inline bool is_done() const {
55  return done;
56  }
57 
58  inline bool reader_is_blocked() const {
59  return iswaiting;
60  }
61  /**
62  * Get the total contents currently stored in the buffer.
63  */
64  std::streamsize size() const;
65 
66  /**
67  * Get the amount of free space reamining in the buffer
68  */
69  std::streamsize free_space() const;
70 
71  /** Gets the size of the buffer.
72  \note: The useable space is reserved_size() - 1 */
73  inline std::streamsize reserved_size() const {
74  return bufsize - 1;
75  }
76 
77 
78  /**
79  * Returns 0 if the write doesn't fit
80  *
81  * This function acquires the critical section
82  * to perform the write
83  */
84  std::streamsize write(const char* c, std::streamsize clen);
85 
86  /**
87  * Returns 0 if the write doesn't fit
88  *
89  * This does the same as write(), but does not acquire the critical
90  * section. The caller should ensure safety
91  */
92  std::streamsize write_unsafe(const char* c, std::streamsize clen);
93 
94 
95  /**
96  * Returns a pointer (through s) and a length of the read. This
97  * pointer is a direct pointer into the internal buffer of this
98  * datastructure. The pointer is valid as long as no other
99  * operations are performed on this structure. The length of the
100  * introspective_read may be less than the number of bytes
101  * requested. Multiple calls to introspective_read may be necessary
102  * to read all data in the buffer. If the function returns 0, the
103  * buffer is empty.
104  *
105  * No locks are acquired on this call.
106  */
107  std::streamsize introspective_read(char* &s, std::streamsize clen);
108 
109 
110  /**
111  * Same as introspective read. But blocks until there is something to read
112  * This function does not acquire a critical section.
113  */
114  std::streamsize blocking_introspective_read(char* &s,
115  std::streamsize clen);
116 
117 
118  void advance_head(const std::streamsize advance_len);
119 
120 
121  /** When begin critical section returns, it is
122  guaranteed that no other writer will be touching
123  the tail of the queue */
124  inline void begin_critical_section() {
125  mut.lock();
126  }
127 
128  /** Releases a critical section acquired by begin_critical_section */
129  inline void end_critical_section() {
130  mut.unlock();
131  }
132 
133  /** Releases a critical section acquired by begin_critical_section,
134  and signals the reader to begin reading if the reader is blocked */
136  cond.signal();
137  mut.unlock();
138  }
139 
140 
141  private:
142  char* buffer;
143  std::streamsize bufsize; // current size of the buffer
144 
145  /**
146  * points to the head of the queue. Reader reads from here
147  */
148  std::streamsize head;
149 
150  /**
151  * points to one past the end of the queue. writer writes to
152  * here. if tail == head, buffer must be empty
153  */
154  std::streamsize tail;
155 
156  mutex mut;
157  conditional cond;
158 
159  volatile bool done; // Once
160  volatile bool iswaiting;
161 };
162 
163 }
164 
165 #endif
166